IBR-DTNSuite  0.10
FragmentManager.cpp
Go to the documentation of this file.
1 /*
2  * FragmentManager.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "core/EventDispatcher.h"
24 #include "core/FragmentManager.h"
25 #include "core/BundleCore.h"
26 #include "core/TimeEvent.h"
27 #include "core/BundlePurgeEvent.h"
31 #include <ibrdtn/utils/Clock.h>
32 #include <ibrcommon/Logger.h>
34 
35 namespace dtn
36 {
37  namespace core
38  {
39  const std::string FragmentManager::TAG = "FragmentManager";
40 
41  ibrcommon::Mutex FragmentManager::_offsets_mutex;
42  std::set<FragmentManager::Transmission> FragmentManager::_offsets;
43 
45  : _running(false)
46  {
47  }
48 
50  {
51  }
52 
53  const std::string FragmentManager::getName() const
54  {
55  return "FragmentManager";
56  }
57 
59  {
60  _running = false;
61  _incoming.abort();
62  }
63 
65  {
66  // routine checked for throw() on 15.02.2013
68  _running = true;
69  }
70 
72  {
73  // TODO: scan storage for fragments to reassemble on startup
74 
76 
77  // create a task loop to reassemble fragments asynchronously
78  try {
79  while (_running)
80  {
81  dtn::data::MetaBundle meta = _incoming.getnpop(true);
82 
83  // search for matching bundles
84  list.clear();
85  search(meta, list);
86 
87  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "found " << list.size() << " fragments similar to bundle " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
88 
89  // TODO: drop fragments if other fragments available containing the same payload
90 
91  // check first if all fragment are available
92  std::set<BundleMerger::Chunk> chunks;
93  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
94  {
95  const dtn::data::MetaBundle &m = (*iter);
96  if (meta.payloadlength > 0)
97  {
99  chunks.insert(chunk);
100  }
101  }
102 
103  // wait for the next bundle if the fragment is not complete
104  if (!BundleMerger::Chunk::isComplete(meta.appdatalength.get<dtn::data::Length>(), chunks)) continue;
105 
106  // create a new bundle merger container
108 
109  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
110  {
111  const dtn::data::MetaBundle &meta = (*iter);
112 
113  if (meta.payloadlength > 0)
114  {
115  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "fragment: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL;
116 
117  try {
118  // load bundle from storage
120 
121  // merge the bundle
122  c << bundle;
123  } catch (const dtn::storage::NoBundleFoundException&) {
124  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) << "could not load fragment to merge bundle" << IBRCOMMON_LOGGER_ENDL;
125  };
126  }
127  }
128 
129  if (c.isComplete())
130  {
131  dtn::data::Bundle &merged = c.getBundle();
132 
133  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, notice) << "Bundle " << merged.toString() << " merged" << IBRCOMMON_LOGGER_ENDL;
134 
135  // raise default bundle received event
137 
138  // delete all fragments of the merged bundle
139  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
140  {
142  {
144  }
145  }
146  }
147  }
148  } catch (const ibrcommon::QueueUnblockedException&) { }
149  }
150 
152  {
154 
155  stop();
156  join();
157  }
158 
159  void FragmentManager::raiseEvent(const Event *evt) throw ()
160  {
161  try {
162  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
163 
164  // process fragments
165  if (queued.bundle.fragment) signal(queued.bundle);
166  } catch (const std::bad_cast&) {}
167  }
168 
170  {
171  // do not merge a bundle if it is non-local and singleton
172  // we only touch local and group bundles which might be delivered locally
174  {
176  {
177  return;
178  }
179  }
180 
181  // push the meta bundle into the incoming queue
182  _incoming.push(meta);
183  }
184 
185  void FragmentManager::search(const dtn::data::MetaBundle &meta, dtn::storage::BundleResult &list)
186  {
187  class BundleFilter : public dtn::storage::BundleSelector
188  {
189  public:
190  BundleFilter(const dtn::data::MetaBundle &meta)
191  : _similar(meta)
192  {};
193 
194  virtual ~BundleFilter() {};
195 
196  virtual dtn::data::Size limit() const throw () { return 0; };
197 
198  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
199  {
200  // fragments only
201  if (!meta.fragment) return false;
202 
203  // with the same unique bundle id
204  if (meta.source != _similar.source) return false;
205  if (meta.timestamp != _similar.timestamp) return false;
206  if (meta.sequencenumber != _similar.sequencenumber) return false;
207 
208  return true;
209  };
210 
211  private:
212  const dtn::data::MetaBundle &_similar;
213  };
214 
215  // create a bundle filter
216  BundleFilter filter(meta);
218 
219  try {
220  storage.get(filter, list);
221  } catch (const dtn::storage::NoBundleFoundException&) { }
222  }
223 
224  void FragmentManager::setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset) throw ()
225  {
226  try {
227  Transmission t;
229  t.offset = get_payload_offset(b, abs_offset);
230 
231  if (t.offset <= 0) return;
232 
233  t.id = id;
234  t.peer = peer;
235  t.expires = dtn::utils::Clock::getExpireTime( b );
236 
237  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "[FragmentManager] store offset of partial transmitted bundle " <<
238  id.toString() << "; offset: " << t.offset << " (" << abs_offset << ")" << IBRCOMMON_LOGGER_ENDL;
239 
240  ibrcommon::MutexLock l(_offsets_mutex);
241  _offsets.erase(t);
242  _offsets.insert(t);
243  } catch (const dtn::storage::NoBundleFoundException&) { };
244  }
245 
247  {
248  ibrcommon::MutexLock l(_offsets_mutex);
249  for (std::set<Transmission>::const_iterator iter = _offsets.begin(); iter != _offsets.end(); ++iter)
250  {
251  const Transmission &t = (*iter);
252  if (t.peer != peer) continue;
253  if (t.id != id) continue;
254  return t.offset;
255  }
256 
257  return 0;
258  }
259 
260  dtn::data::Length FragmentManager::get_payload_offset(const dtn::data::Bundle &bundle, const dtn::data::Length &abs_offset) throw ()
261  {
262  try {
263  // build the dictionary for EID lookup
264  const dtn::data::Dictionary dict(bundle);
265 
266  // create a default serializer
267  dtn::data::DefaultSerializer serializer(std::cout, dict);
268 
269  // get the encoded length of the primary block
270  dtn::data::Length header = serializer.getLength((dtn::data::PrimaryBlock&)bundle);
271 
272  for (dtn::data::Bundle::const_iterator iter = bundle.begin(); iter != bundle.end(); ++iter)
273  {
274  const dtn::data::Block &b = (**iter);
275  header += serializer.getLength(b);
276 
277  try {
278  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(b);
279  header -= payload.getLength();
280  if (abs_offset < header) return 0;
281  return abs_offset - header;
282  } catch (std::bad_cast&) { };
283  }
284  } catch (const dtn::InvalidDataException&) {
285  // failure while calculating the bundle length
286  }
287 
288  return 0;
289  }
290 
291  void FragmentManager::expire_offsets(const dtn::data::Timestamp &timestamp)
292  {
293  ibrcommon::MutexLock l(_offsets_mutex);
294  for (std::set<Transmission>::iterator iter = _offsets.begin(); iter != _offsets.end();)
295  {
296  const Transmission &t = (*iter);
297  if (t.expires >= timestamp) return;
298  _offsets.erase(iter++);
299  }
300  }
301 
302  void FragmentManager::split(const dtn::data::Bundle &bundle, const dtn::data::Length &maxPayloadLength, std::list<dtn::data::Bundle> &fragments) throw (FragmentationAbortedException)
303  {
304  // get bundle DONT_FRAGMENT Flag
306  throw FragmentationProhibitedException("Bundle fragmentation is restricted by do-not-fragment bit.");
307 
308  try {
309  const dtn::data::PayloadBlock &payloadBlock = bundle.find<dtn::data::PayloadBlock>();
310 
311  // get bundle payload length
312  dtn::data::Length payloadLength = payloadBlock.getLength();
313 
314  // check if fragmentation needed
315  if (payloadLength <= maxPayloadLength)
316  throw FragmentationNotNecessaryException("Fragmentation not necessary. The payload block is smaller than the max. payload length.");
317 
318  ibrcommon::BLOB::Reference ref = payloadBlock.getBLOB();
319  ibrcommon::BLOB::iostream stream = ref.iostream();
320 
321  bool isFirstFragment = true;
322  dtn::data::Length offset = 0;
323 
324  while (!(*stream).eof() && (payloadLength > offset))
325  {
326  // copy the origin bundle as template for the new fragment
327  Bundle fragment = bundle;
328 
329  // clear all the blocks
330  fragment.clear();
331 
332  // set bundle is fragment flag
333  fragment.set(dtn::data::Bundle::FRAGMENT, true);
334 
335  // set application data length
336  fragment.appdatalength = payloadLength;
337 
338  // set fragment offset
339  fragment.fragmentoffset = offset;
340 
341  // copy partial payload to the payload of the fragment
342  try {
343  // create new blob for fragment payload
345 
346  // get the iostream object
347  ibrcommon::BLOB::iostream fragment_stream = fragment_ref.iostream();
348 
349  if ((offset + maxPayloadLength) > payloadLength) {
350  // copy payload to the fragment
351  ibrcommon::BLOB::copy(*fragment_stream, *stream, payloadLength - offset, 65535);
352  } else {
353  // copy payload to the fragment
354  ibrcommon::BLOB::copy(*fragment_stream, *stream, maxPayloadLength, 65535);
355  }
356 
357  // set new offset position
358  offset += fragment_stream.size();
359 
360  // create fragment payload block
361  dtn::data::PayloadBlock &fragment_payloadBlock = fragment.push_back(fragment_ref);
362 
363  // add all necessary blocks from the bundle to the fragment
364  addBlocksFromBundleToFragment(bundle, fragment, fragment_payloadBlock, isFirstFragment, payloadLength == offset);
365  } catch (const ibrcommon::IOException &ex) {
366  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) << "error while splitting bundle into fragments: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
367  }
368 
369  // add current fragment to fragments list
370  fragments.push_back(fragment);
371 
372  if (isFirstFragment) isFirstFragment = false;
373 
374  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Fragment created: " << fragment.toString() << IBRCOMMON_LOGGER_ENDL;
375  }
377  // bundle has no payload
378  throw FragmentationAbortedException("Fragmentation aborted. No payload block found.");
379  }
380  }
381 
382  void FragmentManager::addBlocksFromBundleToFragment(const dtn::data::Bundle &bundle, dtn::data::Bundle &fragment, dtn::data::PayloadBlock &fragment_payloadBlock, bool isFirstFragment, bool isLastFragment)
383  {
384  bool isAfterPayload = false;
385  bool isReplicateInEveryBundle = false;
386 
387  char block_type = 0;
388 
389  IBRCOMMON_LOGGER_DEBUG_TAG("FragmentManager", 5) << "Fragment original bundle block count: " << fragment.toString() << " " << bundle.size() << IBRCOMMON_LOGGER_ENDL;
390 
391  //check for each block if it has to be added to the fragment
392  for (dtn::data::Bundle::const_iterator it = bundle.begin(); it != bundle.end(); ++it)
393  {
394  //get the current block
395  const Block &current_block = dynamic_cast<const Block&>(**it);
396 
397  block_type = current_block.getType();
398 
399  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Fragment Block found: " << fragment.toString() << " " << (unsigned int)block_type << IBRCOMMON_LOGGER_ENDL;
400 
401 
402  if (block_type == dtn::data::PayloadBlock::BLOCK_TYPE)
403  {
404  isAfterPayload = true;
405  }
406  else
407  {
408  isReplicateInEveryBundle = current_block.get(dtn::data::Block::REPLICATE_IN_EVERY_FRAGMENT);
409 
410  //if block is before payload
411  //add if fragment is the first one
412  //or if ReplicateInEveryBundle Flag is set
413  if (!isAfterPayload && (isFirstFragment || isReplicateInEveryBundle))
414  {
415  try
416  { //get factory
418 
419  //insert new Block before payload block
420  dtn::data::Block &fragment_block = fragment.insert(fragment.find(fragment_payloadBlock), f);
421 
422  //copy block
423  fragment_block = current_block;
424 
425  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added Block before Payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
426  }
427  catch(const ibrcommon::Exception &ex)
428  {
429  //insert new Block before payload block
430  dtn::data::Block &fragment_block = fragment.insert<dtn::data::ExtensionBlock>(fragment.find(fragment_payloadBlock));
431 
432  //copy block
433  fragment_block = current_block;
434 
435  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added Block before Payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
436  }
437 
438  }
439  //if block is after payload
440  //add if fragment is the last one
441  //or if ReplicateInEveryBundle Flag is set
442  else if (isAfterPayload && (isLastFragment || isReplicateInEveryBundle))
443  {
444  try
445  { //get factory
447 
448  //push back new Block after payload block
449  dtn::data::Block &fragment_block = fragment.push_back(f);
450 
451  //copy block
452  fragment_block = current_block;
453 
454  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added Block after Payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
455  }
456  catch (const ibrcommon::Exception &ex)
457  {
458  //push back new Block after payload block
459  dtn::data::Block &fragment_block = fragment.push_back<dtn::data::ExtensionBlock>();
460 
461  //copy block
462  fragment_block = current_block;
463 
464  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added Block after Payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
465  }
466  }
467  }
468  }
469 
470  }
471 
472  FragmentManager::Transmission::Transmission()
473  : offset(0), expires(0)
474  {
475  }
476 
477  FragmentManager::Transmission::~Transmission()
478  {
479  }
480 
481  bool FragmentManager::Transmission::operator<(const Transmission &other) const
482  {
483  if (expires < other.expires) return true;
484  if (expires != other.expires) return false;
485 
486  if (peer < other.peer) return true;
487  if (peer != other.peer) return false;
488 
489  return (id < other.id);
490  }
491 
492  bool FragmentManager::Transmission::operator==(const Transmission &other) const
493  {
494  if (expires != other.expires) return false;
495  if (peer != other.peer) return false;
496  if (id != other.id) return false;
497 
498  return true;
499  }
500  } /* namespace core */
501 } /* namespace dtn */