IBR-DTNSuite  0.12
Serializer.cpp
Go to the documentation of this file.
1 /*
2  * Serializer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "ibrdtn/data/Serializer.h"
24 #include "ibrdtn/data/Bundle.h"
25 #include "ibrdtn/data/Block.h"
29 #include "ibrdtn/data/MetaBundle.h"
30 #include "ibrdtn/data/DTNTime.h"
31 #include "ibrdtn/utils/Clock.h"
32 #include <ibrcommon/refcnt_ptr.h>
33 #include <ibrcommon/Logger.h>
34 #include <list>
35 #include <limits>
36 
37 #ifdef __DEVELOPMENT_ASSERTIONS__
38 #include <cassert>
39 #endif
40 
41 namespace dtn
42 {
43  namespace data
44  {
46  : _stream(stream), _compressable(false)
47  {
48  }
49 
50  DefaultSerializer::DefaultSerializer(std::ostream& stream, const Dictionary &d)
51  : _stream(stream), _dictionary(d), _compressable(false)
52  {
53  }
54 
56  {
57  // clear the dictionary
59 
60  // rebuild the dictionary
61  _dictionary.add(obj);
62 
63  // check if the bundle header could be compressed
65  }
66 
68  {
69  // rebuild the dictionary
70  rebuildDictionary(obj);
71 
72  // serialize the primary block
73  (*this) << (PrimaryBlock&)obj;
74 
75  // serialize all secondary blocks
76  for (Bundle::const_iterator iter = obj.begin(); iter != obj.end(); ++iter)
77  {
78  const Block &b = (**iter);
79  (*this) << b;
80  }
81 
82  return (*this);
83  }
84 
86  {
87  // rebuild the dictionary
89 
90  PrimaryBlock prim = obj._bundle;
92 
93  // set the application length according to the payload block size
95 
97  // keep appdatalength of the fragment
98  } else if (it != obj._bundle.end()) {
99  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(**it);
100  prim.appdatalength = payload.getLength();
101  } else {
102  prim.appdatalength = 0;
103  }
104 
105  // set the fragmentation offset
106  prim.fragmentoffset += obj._offset;
107 
108  // serialize the primary block
109  (*this) << prim;
110 
111  // serialize all secondary blocks
112  bool post_payload = false;
113 
114  for (Bundle::const_iterator iter = obj._bundle.begin(); iter != obj._bundle.end(); ++iter)
115  {
116  const Block &b = (**iter);
117 
118  try {
119  // test if this is the payload block
120  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(b);
121 
122  // serialize the clipped block
123  serialize(payload, obj._offset, obj._length);
124 
125  // we had serialized the payload block
126  post_payload = true;
127  } catch (const std::bad_cast&) {
128  // serialize this block if
129  // ... this block is before the payload block and marked as replicated in every fragment
130  // ... this block is after the payload block and all remaining bytes of the payload block are included
131  if (post_payload || b.get(dtn::data::Block::REPLICATE_IN_EVERY_FRAGMENT))
132  {
133  (*this) << b;
134  }
135  }
136  }
137 
138  return (*this);
139  }
140 
142  {
143  // check if all EID are compressable
144  bool compressable = ( obj.source.isCompressable() &&
145  obj.destination.isCompressable() &&
146  obj.reportto.isCompressable() &&
147  obj.custodian.isCompressable() );
148 
149  if (compressable)
150  {
151  // add EID of all secondary blocks
152  for (Bundle::const_iterator iter = obj.begin(); iter != obj.end(); ++iter)
153  {
154  const Block &b = (**iter);
155  const std::list<dtn::data::EID> eids = b.getEIDList();
156 
157  for (std::list<dtn::data::EID>::const_iterator eit = eids.begin(); eit != eids.end(); ++eit)
158  {
159  const dtn::data::EID &eid = (*eit);
160  if (!eid.isCompressable())
161  {
162  return false;
163  }
164  }
165  }
166  }
167 
168  return compressable;
169  }
170 
172  {
173  _stream << dtn::data::BUNDLE_VERSION; // bundle version
174  _stream << obj.procflags; // processing flags
175 
176  // predict the block length
177  Number len = 0;
178  dtn::data::Number primaryheader[14];
179 
180  primaryheader[8] = obj.timestamp; // timestamp
181  primaryheader[9] = obj.sequencenumber; // sequence number
182  primaryheader[10] = obj.lifetime; // lifetime
183 
185 
186  if (_compressable)
187  {
188  // destination reference
189  ref = obj.destination.getCompressed();
190  primaryheader[0] = ref.first;
191  primaryheader[1] = ref.second;
192 
193  // source reference
194  ref = obj.source.getCompressed();
195  primaryheader[2] = ref.first;
196  primaryheader[3] = ref.second;
197 
198  // reportto reference
199  ref = obj.reportto.getCompressed();
200  primaryheader[4] = ref.first;
201  primaryheader[5] = ref.second;
202 
203  // custodian reference
204  ref = obj.custodian.getCompressed();
205  primaryheader[6] = ref.first;
206  primaryheader[7] = ref.second;
207 
208  // dictionary size is zero in a compressed bundle header
209  primaryheader[11] = 0;
210  }
211  else
212  {
213  // destination reference
214  ref = _dictionary.getRef(obj.destination);
215  primaryheader[0] = ref.first;
216  primaryheader[1] = ref.second;
217 
218  // source reference
219  ref = _dictionary.getRef(obj.source);
220  primaryheader[2] = ref.first;
221  primaryheader[3] = ref.second;
222 
223  // reportto reference
224  ref = _dictionary.getRef(obj.reportto);
225  primaryheader[4] = ref.first;
226  primaryheader[5] = ref.second;
227 
228  // custodian reference
229  ref = _dictionary.getRef(obj.custodian);
230  primaryheader[6] = ref.first;
231  primaryheader[7] = ref.second;
232 
233  // dictionary size
234  primaryheader[11] = Number(_dictionary.getSize());
235  len += _dictionary.getSize();
236  }
237 
238  for (int i = 0; i < 12; ++i)
239  {
240  len += primaryheader[i].getLength();
241  }
242 
244  {
245  primaryheader[12] = obj.fragmentoffset;
246  primaryheader[13] = obj.appdatalength;
247 
248  len += primaryheader[12].getLength();
249  len += primaryheader[13].getLength();
250  }
251 
252  // write the block length
253  _stream << len;
254 
255  /*
256  * write the ref block of the dictionary
257  * this includes scheme and ssp for destination, source, reportto and custodian.
258  */
259  for (int i = 0; i < 11; ++i)
260  {
261  _stream << primaryheader[i];
262  }
263 
264  if (_compressable)
265  {
266  // write the size of the dictionary (always zero here)
267  _stream << primaryheader[11];
268  }
269  else
270  {
271  // write size of dictionary + bytearray
272  _stream << _dictionary;
273  }
274 
276  {
277  _stream << primaryheader[12]; // FRAGMENTATION_OFFSET
278  _stream << primaryheader[13]; // APPLICATION_DATA_LENGTH
279  }
280 
281  return (*this);
282  }
283 
285  {
286  _stream.put((char&)obj.getType());
287  _stream << obj.getProcessingFlags();
288 
289  const Block::eid_list &eids = obj.getEIDList();
290 
291 #ifdef __DEVELOPMENT_ASSERTIONS__
292  // test: BLOCK_CONTAINS_EIDS => (eids.size() > 0)
293  assert(!obj.get(Block::BLOCK_CONTAINS_EIDS) || (eids.size() > 0));
294 #endif
295 
297  {
298  _stream << Number(eids.size());
299  for (Block::eid_list::const_iterator it = eids.begin(); it != eids.end(); ++it)
300  {
302 
303  if (_compressable)
304  {
305  offsets = (*it).getCompressed();
306  }
307  else
308  {
309  offsets = _dictionary.getRef(*it);
310  }
311 
312  _stream << offsets.first;
313  _stream << offsets.second;
314  }
315  }
316 
317  // write size of the payload in the block
318  _stream << Number(obj.getLength());
319 
320  // write the payload of the block
321  Length slength = 0;
322  obj.serialize(_stream, slength);
323 
324  return (*this);
325  }
326 
327  Serializer& DefaultSerializer::serialize(const dtn::data::PayloadBlock& obj, const Length &clip_offset, const Length &clip_length)
328  {
329  _stream.put((char&)obj.getType());
330  _stream << obj.getProcessingFlags();
331 
332  const Block::eid_list &eids = obj.getEIDList();
333 
334 #ifdef __DEVELOPMENT_ASSERTIONS__
335  // test: BLOCK_CONTAINS_EIDS => (eids.size() > 0)
336  assert(!obj.get(Block::BLOCK_CONTAINS_EIDS) || (eids.size() > 0));
337 #endif
338 
340  {
341  _stream << Number(eids.size());
342  for (Block::eid_list::const_iterator it = eids.begin(); it != eids.end(); ++it)
343  {
345 
346  if (_compressable)
347  {
348  offsets = (*it).getCompressed();
349  }
350  else
351  {
352  offsets = _dictionary.getRef(*it);
353  }
354 
355  _stream << offsets.first;
356  _stream << offsets.second;
357  }
358  }
359 
360  // get the remaining payload size
361  Length payload_size = obj.getLength();
362 
363  // check if the remaining data length is >= clip_length
364  Length frag_len = (clip_offset < payload_size) ? payload_size - clip_offset : 0;
365 
366  // limit the fragment length to the clip length
367  if (frag_len > clip_length) frag_len = clip_length;
368 
369  // set the real predicted payload length
370  // write size of the payload in the block
371  _stream << Number(frag_len);
372 
373  if (frag_len > 0)
374  {
375  // now skip the <offset>-bytes and all bytes after <offset + length>
376  obj.serialize( _stream, clip_offset, frag_len );
377  }
378 
379  return (*this);
380  }
381 
383  {
384  // rebuild the dictionary
385  rebuildDictionary(obj);
386 
387  Length len = 0;
388  len += getLength( (PrimaryBlock&)obj );
389 
390  // add size of all blocks
391  for (Bundle::const_iterator iter = obj.begin(); iter != obj.end(); ++iter)
392  {
393  const Block &b = (**iter);
394  len += getLength( b );
395  }
396 
397  return len;
398  }
399 
401  {
402  Length len = 0;
403 
404  len += sizeof(dtn::data::BUNDLE_VERSION); // bundle version
405  len += obj.procflags.getLength(); // processing flags
406 
407  // primary header
408  dtn::data::Number primaryheader[14];
410 
411  if (_compressable)
412  {
413  // destination reference
414  ref = obj.destination.getCompressed();
415  primaryheader[0] = ref.first;
416  primaryheader[1] = ref.second;
417 
418  // source reference
419  ref = obj.source.getCompressed();
420  primaryheader[2] = ref.first;
421  primaryheader[3] = ref.second;;
422 
423  // reportto reference
424  ref = obj.reportto.getCompressed();
425  primaryheader[4] = ref.first;
426  primaryheader[5] = ref.second;;
427 
428  // custodian reference
429  ref = obj.custodian.getCompressed();
430  primaryheader[6] = ref.first;
431  primaryheader[7] = ref.second;;
432 
433  // dictionary size
434  primaryheader[11] = Number(0);
435  }
436  else
437  {
438  // destination reference
439  ref = _dictionary.getRef(obj.destination);
440  primaryheader[0] = ref.first;
441  primaryheader[1] = ref.second;;
442 
443  // source reference
444  ref = _dictionary.getRef(obj.source);
445  primaryheader[2] = ref.first;
446  primaryheader[3] = ref.second;;
447 
448  // reportto reference
449  ref = _dictionary.getRef(obj.reportto);
450  primaryheader[4] = ref.first;
451  primaryheader[5] = ref.second;;
452 
453  // custodian reference
454  ref = _dictionary.getRef(obj.custodian);
455  primaryheader[6] = ref.first;
456  primaryheader[7] = ref.second;;
457 
458  // dictionary size
459  primaryheader[11] = Number(_dictionary.getSize());
460  }
461 
462  // timestamp
463  primaryheader[8] = obj.timestamp;
464 
465  // sequence number
466  primaryheader[9] = obj.sequencenumber;
467 
468  // lifetime
469  primaryheader[10] = obj.lifetime;
470 
471  for (int i = 0; i < 11; ++i)
472  {
473  len += primaryheader[i].getLength();
474  }
475 
476  if (_compressable)
477  {
478  // write the size of the dictionary (always zero here)
479  len += primaryheader[11].getLength();
480  }
481  else
482  {
483  // write size of dictionary + bytearray
484  len += primaryheader[11].getLength();
485  len += _dictionary.getSize();
486  }
487 
489  {
490  primaryheader[12] = obj.fragmentoffset;
491  primaryheader[13] = obj.appdatalength;
492 
493  len += primaryheader[12].getLength();
494  len += primaryheader[13].getLength();
495  }
496 
497  len += Number(len).getLength();
498 
499  return len;
500  }
501 
503  {
504  Length len = 0;
505 
506  len += sizeof(obj.getType());
507  len += obj.getProcessingFlags().getLength();
508 
509  const Block::eid_list &eids = obj.getEIDList();
510 
511 #ifdef __DEVELOPMENT_ASSERTIONS__
512  // test: BLOCK_CONTAINS_EIDS => (eids.size() > 0)
513  assert(!obj.get(Block::BLOCK_CONTAINS_EIDS) || (eids.size() > 0));
514 #endif
515 
517  {
518  len += dtn::data::Number(eids.size()).getLength();
519  for (Block::eid_list::const_iterator it = eids.begin(); it != eids.end(); ++it)
520  {
522  len += offsets.first.getLength();
523  len += offsets.second.getLength();
524  }
525  }
526 
527  // size of the payload in the block
528  Length payload_size = obj.getLength();
529  len += payload_size;
530 
531  // size of the payload size
532  len += Number(payload_size).getLength();
533 
534  return len;
535  }
536 
538  : _stream(stream), _validator(_default_validator), _compressed(false), _fragmentation(false)
539  {
540  }
541 
543  : _stream(stream), _validator(v), _compressed(false), _fragmentation(false)
544  {
545  }
546 
548  {
549  _fragmentation = val;
550  }
551 
553  {
554  // clear all blocks
555  obj.clear();
556 
557  // read the primary block
558  (*this) >> (PrimaryBlock&)obj;
559 
560  // read until the last block
561  bool lastblock = false;
562 
563  block_t block_type;
565 
566  // create a bundle builder
567  dtn::data::BundleBuilder builder(obj);
568 
569  // read all BLOCKs
570  while (!_stream.eof() && !lastblock)
571  {
572  // BLOCK_TYPE
573  _stream.get((char&)block_type);
574 
575  // read processing flags
576  _stream >> procflags;
577 
578  try {
579  // create a block object
580  dtn::data::Block &block = builder.insert(block_type, procflags);
581 
582  try {
583  // read block content
584  (*this).read(obj, block);
585  } catch (dtn::PayloadReceptionInterrupted &ex) {
586  // some debugging
587  IBRCOMMON_LOGGER_DEBUG_TAG("DefaultDeserializer", 15) << "Reception of bundle payload failed." << IBRCOMMON_LOGGER_ENDL;
588 
589  // interrupted transmission
590  if (!obj.get(dtn::data::PrimaryBlock::DONT_FRAGMENT) && (block.getLength() > 0) && _fragmentation)
591  {
592  IBRCOMMON_LOGGER_DEBUG_TAG("DefaultDeserializer", 25) << "Create a fragment." << IBRCOMMON_LOGGER_ENDL;
593 
595  {
597  obj.appdatalength = ex.length;
598  obj.fragmentoffset = 0;
599  }
600  }
601  else
602  {
603  throw;
604  }
605  }
607  // skip EIDs
608  if ( procflags.getBit(dtn::data::Block::BLOCK_CONTAINS_EIDS) )
609  {
610  Number eidcount;
611  _stream >> eidcount;
612 
613  for (unsigned int i = 0; eidcount > i; ++i)
614  {
615  Number scheme, ssp;
616  _stream >> scheme;
617  _stream >> ssp;
618  }
619  }
620 
621  // read the size of the payload in the block
622  Number block_size;
623  _stream >> block_size;
624 
625  // skip payload
626  _stream.ignore(block_size.get<std::streamsize>());
627  }
628 
629  lastblock = procflags.getBit(Block::LAST_BLOCK);
630  }
631 
632  // validate this bundle
633  _validator.validate(obj);
634 
635  return (*this);
636  }
637 
639  {
641  (*this) >> pb;
642 
643  obj.appdatalength = pb.appdatalength;
644  obj.custodian = pb.custodian;
645  obj.destination = pb.destination;
646  obj.expiretime = dtn::utils::Clock::getExpireTime(pb.timestamp, pb.lifetime);
647  obj.hopcount = 0;
648  obj.lifetime = pb.lifetime;
649  obj.fragmentoffset = pb.fragmentoffset;
650  obj.procflags = pb.procflags;
651  obj.reportto = pb.reportto;
652  obj.sequencenumber = pb.sequencenumber;
653  obj.source = pb.source;
654  obj.timestamp = pb.timestamp;
655 
656  return (*this);
657  }
658 
660  {
661  char version = 0;
662  Number blocklength;
663 
664  // check for the right version
665  _stream.get(version);
666  if (version != dtn::data::BUNDLE_VERSION) throw dtn::InvalidProtocolException("Bundle version differ from ours.");
667 
668  // PROCFLAGS
669  _stream >> obj.procflags; // processing flags
670 
671  // BLOCK LENGTH
672  _stream >> blocklength;
673 
674  // EID References
676  for (int i = 0; i < 4; ++i)
677  {
678  _stream >> ref[i].first;
679  _stream >> ref[i].second;
680  }
681 
682  // timestamp
683  _stream >> obj.timestamp;
684 
685  // sequence number
686  _stream >> obj.sequencenumber;
687 
688  // lifetime
689  _stream >> obj.lifetime;
690 
691  try {
692  // dictionary
693  _stream >> _dictionary;
694 
695  // decode EIDs
696  obj.destination = _dictionary.get(ref[0].first, ref[0].second);
697  obj.source = _dictionary.get(ref[1].first, ref[1].second);
698  obj.reportto = _dictionary.get(ref[2].first, ref[2].second);
699  obj.custodian = _dictionary.get(ref[3].first, ref[3].second);
700  _compressed = false;
701  } catch (const dtn::InvalidDataException&) {
702  // error while reading the dictionary. We assume that this is a compressed bundle header.
703  obj.destination = dtn::data::EID(ref[0].first, ref[0].second);
704  obj.source = dtn::data::EID(ref[1].first, ref[1].second);
705  obj.reportto = dtn::data::EID(ref[2].first, ref[2].second);
706  obj.custodian = dtn::data::EID(ref[3].first, ref[3].second);
707  _compressed = true;
708  }
709 
710  // fragmentation?
712  {
713  _stream >> obj.fragmentoffset;
714  _stream >> obj.appdatalength;
715  }
716 
717  // validate this primary block
718  _validator.validate(obj);
719 
720  return (*this);
721  }
722 
724  {
725  // read EIDs
727  {
728  Number eidcount;
729  _stream >> eidcount;
730 
731  for (unsigned int i = 0; eidcount > i; ++i)
732  {
733  Number scheme, ssp;
734  _stream >> scheme;
735  _stream >> ssp;
736 
737  if (_compressed)
738  {
739  obj.addEID( dtn::data::EID(scheme, ssp) );
740  }
741  else
742  {
743  obj.addEID( _dictionary.get(scheme, ssp) );
744  }
745  }
746  }
747 
748  // read the size of the payload in the block
749  Number block_size;
750  _stream >> block_size;
751 
752  // validate this block
753  _validator.validate(obj, block_size);
754 
755  // read the payload of the block
756  obj.deserialize(_stream, block_size.get<dtn::data::Length>());
757 
758  return (*this);
759  }
760 
762  {
763  // read EIDs
765  {
766  Number eidcount;
767  _stream >> eidcount;
768 
769  for (unsigned int i = 0; eidcount > i; ++i)
770  {
771  Number scheme, ssp;
772  _stream >> scheme;
773  _stream >> ssp;
774 
775  if (_compressed)
776  {
777  obj.addEID( dtn::data::EID(scheme, ssp) );
778  }
779  else
780  {
781  obj.addEID( _dictionary.get(scheme, ssp) );
782  }
783  }
784  }
785 
786  // read the size of the payload in the block
787  Number block_size;
788  _stream >> block_size;
789 
790  // validate this block
791  _validator.validate(bundle, obj, block_size);
792 
793  // read the payload of the block
794  obj.deserialize(_stream, block_size.get<Length>());
795 
796  return (*this);
797  }
798 
800  {
801  }
802 
804  {
805  }
806 
808  {
809  }
810 
812  {
813 
814  }
815 
817  {
818 
819  }
820 
822  {
823 
824  }
825 
827  : DefaultSerializer(stream)
828  {
829  }
830 
832  {
833  }
834 
836  {
837  _stream.put((char&)obj.getType());
838  _stream << obj.getProcessingFlags();
839 
840  const Block::eid_list &eids = obj.getEIDList();
841 
842 #ifdef __DEVELOPMENT_ASSERTIONS__
843  // test: BLOCK_CONTAINS_EIDS => (_ids.size() > 0)
844  assert(!obj.get(Block::BLOCK_CONTAINS_EIDS) || (eids.size() > 0));
845 #endif
846 
848  {
849  _stream << Number(eids.size());
850  for (Block::eid_list::const_iterator it = eids.begin(); it != eids.end(); ++it)
851  {
852  dtn::data::BundleString str((*it).getString());
853  _stream << str;
854  }
855  }
856 
857  // write size of the payload in the block
858  _stream << Number(obj.getLength());
859 
860  // write the payload of the block
861  Length slength = 0;
862  obj.serialize(_stream, slength);
863 
864  return (*this);
865  }
866 
868  {
869  Length len = 0;
870 
871  len += sizeof(obj.getType());
872  len += obj.getProcessingFlags().getLength();
873 
874  const Block::eid_list &eids = obj.getEIDList();
875 
876 #ifdef __DEVELOPMENT_ASSERTIONS__
877  // test: BLOCK_CONTAINS_EIDS => (eids.size() > 0)
878  assert(!obj.get(Block::BLOCK_CONTAINS_EIDS) || (eids.size() > 0));
879 #endif
880 
882  {
883  len += dtn::data::Number(eids.size()).getLength();
884  for (Block::eid_list::const_iterator it = eids.begin(); it != eids.end(); ++it)
885  {
886  dtn::data::BundleString str((*it).getString());
887  len += str.getLength();
888  }
889  }
890 
891  // size of the payload in the block
892  len += obj.getLength();
893 
894  return len;
895  }
896 
898  : DefaultDeserializer(stream), _bundle(b)
899  {
900  }
901 
903  {
904  }
905 
907  {
908  BundleBuilder builder(_bundle);
909 
910  block_t block_type;
911  Bitset<Block::ProcFlags> procflags;
912 
913  // BLOCK_TYPE
914  _stream.get((char&)block_type);
915 
916  // read processing flags
917  _stream >> procflags;
918 
919  // create a block object
920  dtn::data::Block &obj = builder.insert(block_type, procflags);
921 
922  // read EIDs
924  {
925  Number eidcount;
926  _stream >> eidcount;
927 
928  for (unsigned int i = 0; eidcount > i; ++i)
929  {
931  _stream >> str;
932  obj.addEID(dtn::data::EID(str));
933  }
934  }
935 
936  // read the size of the payload in the block
937  Number block_size;
938  _stream >> block_size;
939 
940  // validate this block
941  _validator.validate(obj, block_size);
942 
943  // read the payload of the block
944  obj.deserialize(_stream, block_size.get<Length>());
945 
946  return obj;
947  }
948  }
949 }