IBR-DTNSuite  0.12
FragmentManager.cpp
Go to the documentation of this file.
1 /*
2  * FragmentManager.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "core/EventDispatcher.h"
24 #include "core/FragmentManager.h"
25 #include "core/BundleCore.h"
26 #include "core/BundlePurgeEvent.h"
30 #include <ibrdtn/utils/Clock.h>
31 #include <ibrcommon/Logger.h>
33 
34 namespace dtn
35 {
36  namespace core
37  {
38  const std::string FragmentManager::TAG = "FragmentManager";
39 
40  ibrcommon::Mutex FragmentManager::_offsets_mutex;
41  std::set<FragmentManager::Transmission> FragmentManager::_offsets;
42 
44  : _running(false)
45  {
46  }
47 
49  {
50  }
51 
52  const std::string FragmentManager::getName() const
53  {
54  return "FragmentManager";
55  }
56 
58  {
59  _running = false;
60  _incoming.abort();
61  }
62 
64  {
65  // routine checked for throw() on 15.02.2013
67  _running = true;
68  }
69 
71  {
72  // get reference to the storage
74 
75  // TODO: scan storage for fragments to reassemble on startup
76 
78 
79  // create a task loop to reassemble fragments asynchronously
80  try {
81  while (_running)
82  {
83  dtn::data::MetaBundle meta = _incoming.getnpop(true);
84 
85  // skip merge if complete bundle is already in the storage
86  dtn::data::BundleID origin(meta);
87  origin.setFragment(false);
88  if (storage.contains(origin)) continue;
89 
90  // search for matching bundles
91  list.clear();
92  search(meta, list);
93 
94  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "found " << list.size() << " fragments similar to bundle " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
95 
96  // TODO: drop fragments if other fragments available containing the same payload or larger payload
97 
98  // check first if all fragment are available
99  std::set<BundleMerger::Chunk> chunks;
100  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
101  {
102  const dtn::data::MetaBundle &m = (*iter);
103  if (meta.getPayloadLength() > 0)
104  {
106  chunks.insert(chunk);
107  }
108  }
109 
110  // wait for the next bundle if the fragment is not complete
111  if (!BundleMerger::Chunk::isComplete(meta.appdatalength.get<dtn::data::Length>(), chunks)) continue;
112 
113  // create a new bundle merger container
115 
116  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
117  {
118  const dtn::data::MetaBundle &meta = (*iter);
119 
120  if (meta.getPayloadLength() > 0)
121  {
122  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 20) << "fragment: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL;
123 
124  try {
125  // load bundle from storage
127 
128  // merge the bundle
129  c << bundle;
130  } catch (const dtn::storage::NoBundleFoundException&) {
131  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) << "could not load fragment to merge bundle" << IBRCOMMON_LOGGER_ENDL;
132  };
133  }
134  }
135 
136  if (c.isComplete())
137  {
138  dtn::data::Bundle &merged = c.getBundle();
139 
140  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, notice) << "Bundle " << merged.toString() << " merged" << IBRCOMMON_LOGGER_ENDL;
141 
142  // raise default bundle received event
144 
145  // delete all fragments of the merged bundle
147  {
148  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
149  {
151  }
152  }
153  }
154  }
155  } catch (const ibrcommon::QueueUnblockedException&) { }
156  }
157 
159  {
161 
162  stop();
163  join();
164  }
165 
167  {
168  try {
169  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
170 
171  // process fragments
172  if (queued.bundle.isFragment()) signal(queued.bundle);
173  } catch (const std::bad_cast&) {}
174  }
175 
177  {
178  // do not merge a bundle if it is non-local and singleton
179  // we only touch local and group bundles which might be delivered locally
181  {
183  {
184  return;
185  }
186  }
187 
188  // push the meta bundle into the incoming queue
189  _incoming.push(meta);
190  }
191 
192  void FragmentManager::search(const dtn::data::MetaBundle &meta, dtn::storage::BundleResult &list)
193  {
194  class BundleFilter : public dtn::storage::BundleSelector
195  {
196  public:
197  BundleFilter(const dtn::data::MetaBundle &meta)
198  : _similar(meta)
199  {};
200 
201  virtual ~BundleFilter() {};
202 
203  virtual dtn::data::Size limit() const throw () { return 0; };
204 
205  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
206  {
207  // fragments only
208  if (!meta.isFragment()) return false;
209 
210  // with the same unique bundle id
211  if (meta.source != _similar.source) return false;
212  if (meta.timestamp != _similar.timestamp) return false;
213  if (meta.sequencenumber != _similar.sequencenumber) return false;
214 
215  return true;
216  };
217 
218  private:
219  const dtn::data::MetaBundle &_similar;
220  };
221 
222  // create a bundle filter
223  BundleFilter filter(meta);
225 
226  try {
227  storage.get(filter, list);
228  } catch (const dtn::storage::NoBundleFoundException&) { }
229  }
230 
231  void FragmentManager::setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset) throw ()
232  {
233  try {
234  Transmission t;
236  t.offset = get_payload_offset(b, abs_offset, frag_offset);
237 
238  if (t.offset <= 0) return;
239 
240  t.id = id;
241  t.peer = peer;
242  t.expires = dtn::utils::Clock::getExpireTime( b );
243 
244  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 4) << "Store offset of partial transmitted bundle " << id.toString() << " to " << peer.getString() <<
245  ", offset: " << t.offset << " (" << abs_offset << ")" << IBRCOMMON_LOGGER_ENDL;
246 
247  ibrcommon::MutexLock l(_offsets_mutex);
248  _offsets.erase(t);
249  _offsets.insert(t);
250  } catch (const dtn::storage::NoBundleFoundException&) { };
251  }
252 
254  {
255  ibrcommon::MutexLock l(_offsets_mutex);
256  for (std::set<Transmission>::const_iterator iter = _offsets.begin(); iter != _offsets.end(); ++iter)
257  {
258  const Transmission &t = (*iter);
259  if (t.peer != peer) continue;
260  if (t.id != id) continue;
261  return t.offset;
262  }
263 
264  return 0;
265  }
266 
267  dtn::data::Length FragmentManager::get_payload_offset(const dtn::data::Bundle &bundle, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset) throw ()
268  {
269  try {
270  // build the dictionary for EID lookup
271  const dtn::data::Dictionary dict(bundle);
272 
273  PrimaryBlock prim = bundle;
274 
275  if (frag_offset > 0) {
276  prim.fragmentoffset += frag_offset;
278  prim.appdatalength = bundle.find<dtn::data::PayloadBlock>().getLength();
280  }
281  }
282 
283  // create a default serializer
284  dtn::data::DefaultSerializer serializer(std::cout, dict);
285 
286  // get the encoded length of the primary block
287  dtn::data::Length header = serializer.getLength(prim);
288 
289  for (dtn::data::Bundle::const_iterator iter = bundle.begin(); iter != bundle.end(); ++iter)
290  {
291  const dtn::data::Block &b = (**iter);
292  header += serializer.getLength(b);
293 
294  try {
295  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(b);
296  header -= payload.getLength();
297  if (abs_offset < header) return 0;
298  return frag_offset + (abs_offset - header);
299  } catch (std::bad_cast&) { };
300  }
301  } catch (const dtn::InvalidDataException&) {
302  // failure while calculating the bundle length
303  }
304 
305  return 0;
306  }
307 
308  void FragmentManager::expire_offsets(const dtn::data::Timestamp &timestamp)
309  {
310  ibrcommon::MutexLock l(_offsets_mutex);
311  for (std::set<Transmission>::iterator iter = _offsets.begin(); iter != _offsets.end();)
312  {
313  const Transmission &t = (*iter);
314  if (t.expires >= timestamp) return;
315  _offsets.erase(iter++);
316  }
317  }
318 
319  void FragmentManager::split(const dtn::data::Bundle &bundle, const dtn::data::Length &maxPayloadLength, std::list<dtn::data::Bundle> &fragments) throw (FragmentationAbortedException)
320  {
321  // get bundle DONT_FRAGMENT Flag
323  throw FragmentationProhibitedException("Bundle fragmentation is restricted by do-not-fragment bit.");
324 
325  try {
326  const dtn::data::PayloadBlock &payloadBlock = bundle.find<dtn::data::PayloadBlock>();
327 
328  // get bundle payload length
329  dtn::data::Length payloadLength = payloadBlock.getLength();
330 
331  // check if fragmentation needed
332  if (payloadLength <= maxPayloadLength)
333  throw FragmentationNotNecessaryException("Fragmentation not necessary. The payload block is smaller than the max. payload length.");
334 
335  // copy the origin bundle as template for the new fragment
336  Bundle fragment = bundle;
337 
338  // clear all the blocks
339  fragment.clear();
340 
341  // set bundle is fragment flag
342  fragment.set(dtn::data::Bundle::FRAGMENT, true);
343 
344  // set application data length
345  fragment.appdatalength = payloadLength;
346 
347  ibrcommon::BLOB::Reference ref = payloadBlock.getBLOB();
348  ibrcommon::BLOB::iostream stream = ref.iostream();
349 
350  bool isFirstFragment = true;
351  dtn::data::Length offset = 0;
352 
353  while (!(*stream).eof() && (payloadLength > offset))
354  {
355  // clear all the blocks
356  fragment.clear();
357 
358  // set fragment offset
359  fragment.fragmentoffset = offset;
360 
361  // copy partial payload to the payload of the fragment
362  try {
363  // create new blob for fragment payload
365 
366  // get the iostream object
367  ibrcommon::BLOB::iostream fragment_stream = fragment_ref.iostream();
368 
369  if ((offset + maxPayloadLength) > payloadLength) {
370  // copy payload to the fragment
371  ibrcommon::BLOB::copy(*fragment_stream, *stream, payloadLength - offset, 65535);
372  } else {
373  // copy payload to the fragment
374  ibrcommon::BLOB::copy(*fragment_stream, *stream, maxPayloadLength, 65535);
375  }
376 
377  // set new offset position
378  offset += fragment_stream.size();
379 
380  // create fragment payload block
381  dtn::data::PayloadBlock &fragment_payloadBlock = fragment.push_back(fragment_ref);
382 
383  // add all necessary blocks from the bundle to the fragment
384  addBlocksFromBundleToFragment(bundle, fragment, fragment_payloadBlock, isFirstFragment, payloadLength == offset);
385  } catch (const ibrcommon::IOException &ex) {
386  IBRCOMMON_LOGGER_TAG(FragmentManager::TAG, error) << "error while splitting bundle into fragments: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
387  }
388 
389  // add current fragment to fragments list
390  fragments.push_back(fragment);
391 
392  if (isFirstFragment) isFirstFragment = false;
393 
394  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Fragment created: " << fragment.toString() << IBRCOMMON_LOGGER_ENDL;
395  }
397  // bundle has no payload
398  throw FragmentationAbortedException("Fragmentation aborted. No payload block found.");
399  }
400  }
401 
402  void FragmentManager::addBlocksFromBundleToFragment(const dtn::data::Bundle &bundle, dtn::data::Bundle &fragment, dtn::data::PayloadBlock &fragment_payloadBlock, bool isFirstFragment, bool isLastFragment)
403  {
404  bool isAfterPayload = false;
405  bool isReplicateInEveryBundle = false;
406 
407  char block_type = 0;
408 
409  IBRCOMMON_LOGGER_DEBUG_TAG("FragmentManager", 5) << "Fragment original bundle block count: " << fragment.toString() << " " << bundle.size() << IBRCOMMON_LOGGER_ENDL;
410 
411  //check for each block if it has to be added to the fragment
412  for (dtn::data::Bundle::const_iterator it = bundle.begin(); it != bundle.end(); ++it)
413  {
414  //get the current block
415  const Block &current_block = dynamic_cast<const Block&>(**it);
416 
417  block_type = current_block.getType();
418 
419  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Fragment block found: " << fragment.toString() << " " << (unsigned int)block_type << IBRCOMMON_LOGGER_ENDL;
420 
421 
422  if (block_type == dtn::data::PayloadBlock::BLOCK_TYPE)
423  {
424  isAfterPayload = true;
425  }
426  else
427  {
428  isReplicateInEveryBundle = current_block.get(dtn::data::Block::REPLICATE_IN_EVERY_FRAGMENT);
429 
430  //if block is before payload
431  //add if fragment is the first one
432  //or if ReplicateInEveryBundle Flag is set
433  if (!isAfterPayload && (isFirstFragment || isReplicateInEveryBundle))
434  {
435  try
436  { //get factory
438 
439  //insert new Block before payload block
440  dtn::data::Block &fragment_block = fragment.insert(fragment.find(fragment_payloadBlock), f);
441 
442  //copy block
443  fragment_block = current_block;
444 
445  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block before payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
446  }
447  catch(const ibrcommon::Exception &ex)
448  {
449  //insert new Block before payload block
450  dtn::data::Block &fragment_block = fragment.insert<dtn::data::ExtensionBlock>(fragment.find(fragment_payloadBlock));
451 
452  //copy block
453  fragment_block = current_block;
454 
455  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block before payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
456  }
457 
458  }
459  //if block is after payload
460  //add if fragment is the last one
461  //or if ReplicateInEveryBundle Flag is set
462  else if (isAfterPayload && (isLastFragment || isReplicateInEveryBundle))
463  {
464  try
465  { //get factory
467 
468  //push back new Block after payload block
469  dtn::data::Block &fragment_block = fragment.push_back(f);
470 
471  //copy block
472  fragment_block = current_block;
473 
474  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block after payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
475  }
476  catch (const ibrcommon::Exception &ex)
477  {
478  //push back new Block after payload block
479  dtn::data::Block &fragment_block = fragment.push_back<dtn::data::ExtensionBlock>();
480 
481  //copy block
482  fragment_block = current_block;
483 
484  IBRCOMMON_LOGGER_DEBUG_TAG(FragmentManager::TAG, 5) << "Added block after payload: " << fragment.toString()<< " " << block_type << IBRCOMMON_LOGGER_ENDL;
485  }
486  }
487  }
488  }
489 
490  }
491 
492  FragmentManager::Transmission::Transmission()
493  : offset(0), expires(0)
494  {
495  }
496 
497  FragmentManager::Transmission::~Transmission()
498  {
499  }
500 
501  bool FragmentManager::Transmission::operator<(const Transmission &other) const
502  {
503  if (expires < other.expires) return true;
504  if (expires != other.expires) return false;
505 
506  if (peer < other.peer) return true;
507  if (peer != other.peer) return false;
508 
509  return (id < other.id);
510  }
511 
512  bool FragmentManager::Transmission::operator==(const Transmission &other) const
513  {
514  if (expires != other.expires) return false;
515  if (peer != other.peer) return false;
516  if (id != other.id) return false;
517 
518  return true;
519  }
520  } /* namespace core */
521 } /* namespace dtn */