IBR-DTNSuite  0.8
ibrdtn/ibrdtn/data/CompressedPayloadBlock.cpp
Go to the documentation of this file.
00001 /*
00002  * CompressedPayloadBlock.cpp
00003  *
00004  *  Created on: 20.04.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrdtn/config.h"
00009 #include "ibrdtn/data/CompressedPayloadBlock.h"
00010 #include "ibrdtn/data/PayloadBlock.h"
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <cassert>
00013 
00014 #ifdef HAVE_ZLIB
00015 #include "zlib.h"
00016 #endif
00017 
00018 namespace dtn
00019 {
00020         namespace data
00021         {
00022                 dtn::data::Block* CompressedPayloadBlock::Factory::create()
00023                 {
00024                         return new CompressedPayloadBlock();
00025                 }
00026 
00027                 CompressedPayloadBlock::CompressedPayloadBlock()
00028                  : dtn::data::Block(CompressedPayloadBlock::BLOCK_TYPE), _algorithm(COMPRESSION_UNKNOWN), _origin_size(0)
00029                 {
00030                 }
00031 
00032                 CompressedPayloadBlock::~CompressedPayloadBlock()
00033                 {
00034                 }
00035 
00036                 size_t CompressedPayloadBlock::getLength() const
00037                 {
00038                         return _algorithm.getLength() + _origin_size.getLength();
00039                 }
00040 
00041                 std::ostream& CompressedPayloadBlock::serialize(std::ostream &stream, size_t &length) const
00042                 {
00043                         stream << _algorithm << _origin_size;
00044                         return stream;
00045                 }
00046 
00047                 std::istream& CompressedPayloadBlock::deserialize(std::istream &stream, const size_t length)
00048                 {
00049                         stream >> _algorithm;
00050                         stream >> _origin_size;
00051                         return stream;
00052                 }
00053 
00054                 void CompressedPayloadBlock::setAlgorithm(CompressedPayloadBlock::COMPRESS_ALGS alg)
00055                 {
00056                         _algorithm = alg;
00057                 }
00058 
00059                 CompressedPayloadBlock::COMPRESS_ALGS CompressedPayloadBlock::getAlgorithm() const
00060                 {
00061                         return COMPRESS_ALGS( _algorithm.getValue() );
00062                 }
00063 
00064                 void CompressedPayloadBlock::setOriginSize(size_t s)
00065                 {
00066                         _origin_size = s;
00067                 }
00068 
00069                 size_t CompressedPayloadBlock::getOriginSize() const
00070                 {
00071                         return _origin_size.getValue();
00072                 }
00073 
00074                 void CompressedPayloadBlock::compress(dtn::data::Bundle &b, CompressedPayloadBlock::COMPRESS_ALGS alg)
00075                 {
00076                         dtn::data::PayloadBlock &p = b.getBlock<dtn::data::PayloadBlock>();
00077 
00078                         // get a data container for the compressed payload
00079                         ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
00080 
00081                         // lock the BLOBs while we are compress the payload
00082                         {
00083                                 // get streams of both containers
00084                                 ibrcommon::BLOB::iostream is = p.getBLOB().iostream();
00085                                 ibrcommon::BLOB::iostream os = ref.iostream();
00086 
00087                                 // compress the payload
00088                                 CompressedPayloadBlock::compress(alg, *is, *os);
00089                         }
00090 
00091                         // add a compressed payload block in front of the old payload block
00092                         dtn::data::CompressedPayloadBlock &cpb = b.push_front<CompressedPayloadBlock>();
00093 
00094                         // set cpb values
00095                         cpb.setAlgorithm(alg);
00096                         cpb.setOriginSize(p.getLength());
00097 
00098                         // add the new payload block to the bundle
00099                         b.insert(p, ref);
00100 
00101                         // delete the old payload block
00102                         b.remove(p);
00103                 }
00104 
00105                 void CompressedPayloadBlock::extract(dtn::data::Bundle &b)
00106                 {
00107                         // get the CPB first
00108                         dtn::data::CompressedPayloadBlock &cpb = b.getBlock<CompressedPayloadBlock>();
00109 
00110                         // get the payload block
00111                         dtn::data::PayloadBlock &p = b.getBlock<dtn::data::PayloadBlock>();
00112 
00113                         // get a data container for the uncompressed payload
00114                         ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
00115 
00116                         // lock the BLOBs while we are uncompress the payload
00117                         {
00118                                 // get streams of both containers
00119                                 ibrcommon::BLOB::iostream is = p.getBLOB().iostream();
00120                                 ibrcommon::BLOB::iostream os = ref.iostream();
00121 
00122                                 // compress the payload
00123                                 CompressedPayloadBlock::extract(cpb.getAlgorithm(), *is, *os);
00124                         }
00125 
00126                         // add the new payload block to the bundle
00127                         b.insert(p, ref);
00128 
00129                         // delete the old payload block
00130                         b.remove(p);
00131 
00132                         // delete the CPB
00133                         b.remove(cpb);
00134                 }
00135 
00136                 void CompressedPayloadBlock::compress(CompressedPayloadBlock::COMPRESS_ALGS alg, std::istream &is, std::ostream &os)
00137                 {
00138                         switch (alg)
00139                         {
00140                                 case COMPRESSION_ZLIB:
00141                                 {
00142 #ifdef HAVE_ZLIB
00143                                         const size_t CHUNK_SIZE = 16384;
00144 
00145                                         int ret, flush;
00146                                         unsigned have;
00147                                         unsigned char in[CHUNK_SIZE];
00148                                         unsigned char out[CHUNK_SIZE];
00149                                         z_stream strm;
00150 
00151                                         /* allocate deflate state */
00152                                         strm.zalloc = Z_NULL;
00153                                         strm.zfree = Z_NULL;
00154                                         strm.opaque = Z_NULL;
00155                                         ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
00156 
00157                                         // exit if something is wrong
00158                                         if (ret != Z_OK) throw ibrcommon::Exception("initialization of zlib failed");
00159 
00160                                         do {
00161                                                 is.read((char*)&in, CHUNK_SIZE);
00162                                                 strm.avail_in = is.gcount();
00163 
00164                                                 flush = is.eof() ? Z_FINISH : Z_NO_FLUSH;
00165                                                 strm.next_in = in;
00166 
00167                                                 do {
00168                                                         strm.avail_out = CHUNK_SIZE;
00169                                                         strm.next_out = out;
00170 
00171                                                         ret = deflate(&strm, flush);    /* no bad return value */
00172                                                         assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
00173 
00174                                                         // determine how many bytes are available
00175                                                         have = CHUNK_SIZE - strm.avail_out;
00176 
00177                                                         // write the buffer to the output stream
00178                                                         os.write((char*)&out, have);
00179 
00180                                                         if (!os.good())
00181                                                         {
00182                                                                 (void)deflateEnd(&strm);
00183                                                                 throw ibrcommon::Exception("decompression failed. output stream went wrong.");
00184                                                                 break;
00185                                                         }
00186 
00187                                                 } while (strm.avail_out == 0);
00188                                                 assert(strm.avail_in == 0);     /* all input will be used */
00189                                         } while (flush != Z_FINISH);
00190                                         assert(ret == Z_STREAM_END);        /* stream will be complete */
00191 
00192                                         (void)deflateEnd(&strm);
00193 #else
00194                                         throw ibrcommon::Exception("zlib is not supported");
00195 #endif
00196                                         break;
00197                                 }
00198 
00199                                 default:
00200                                         throw ibrcommon::Exception("compression mode is not supported");
00201                         }
00202                 }
00203 
00204                 void CompressedPayloadBlock::extract(CompressedPayloadBlock::COMPRESS_ALGS alg, std::istream &is, std::ostream &os)
00205                 {
00206                         switch (alg)
00207                         {
00208                                 case COMPRESSION_ZLIB:
00209                                 {
00210 #ifdef HAVE_ZLIB
00211                                         const size_t CHUNK_SIZE = 16384;
00212 
00213                                         int ret;
00214                                         unsigned have;
00215                                         unsigned char in[CHUNK_SIZE];
00216                                         unsigned char out[CHUNK_SIZE];
00217                                         z_stream strm;
00218 
00219                                         strm.zalloc = Z_NULL;
00220                                         strm.zfree = Z_NULL;
00221                                         strm.opaque = Z_NULL;
00222                                         strm.avail_in = 0;
00223                                         strm.next_in = Z_NULL;
00224                                         ret = inflateInit(&strm);
00225 
00226                                         // exit if something is wrong
00227                                         if (ret != Z_OK) throw ibrcommon::Exception("initialization of zlib failed");
00228 
00229                                         do {
00230                                                 is.read((char*)&in, CHUNK_SIZE);
00231                                                 strm.avail_in = is.gcount();
00232 
00233                                                 // we're done if there is no more input
00234                                                 if ((strm.avail_in == 0) && (ret != Z_STREAM_END))
00235                                                 {
00236                                                         (void)inflateEnd(&strm);
00237                                                         throw ibrcommon::Exception("decompression failed. no enough data available.");
00238                                                 }
00239                                                 strm.next_in = in;
00240 
00241                                                 do {
00242                                                         strm.avail_out = CHUNK_SIZE;
00243                                                         strm.next_out = out;
00244 
00245                                                         ret = inflate(&strm, Z_NO_FLUSH);
00246                                                         assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
00247 
00248                                                         switch (ret)
00249                                                         {
00250                                                                 case Z_NEED_DICT:
00251                                                                         ret = Z_DATA_ERROR;     /* and fall through */
00252                                                                 case Z_DATA_ERROR:
00253                                                                 case Z_MEM_ERROR:
00254                                                                         (void)inflateEnd(&strm);
00255                                                                         throw ibrcommon::Exception("decompression failed. memory error.");
00256                                                         }
00257 
00258                                                         // determine how many bytes are available
00259                                                         have = CHUNK_SIZE - strm.avail_out;
00260 
00261                                                         // write the buffer to the output stream
00262                                                         os.write((char*)&out, have);
00263 
00264 
00265                                                         if (!os.good())
00266                                                         {
00267                                                                 (void)inflateEnd(&strm);
00268                                                                 throw ibrcommon::Exception("decompression failed. output stream went wrong.");
00269                                                         }
00270 
00271                                                 } while (strm.avail_out == 0);
00272                                                 assert(strm.avail_in == 0);     /* all input will be used */
00273                                         } while (ret != Z_STREAM_END);
00274 
00275                                         (void)inflateEnd(&strm);
00276 #else
00277                                         throw ibrcommon::Exception("zlib is not supported");
00278 #endif
00279                                         break;
00280                                 }
00281 
00282                                 default:
00283                                         throw ibrcommon::Exception("compression mode is not supported");
00284                         }
00285                 }
00286         }
00287 }