IBR-DTNSuite
0.8
|
00001 /* 00002 * CompressedPayloadBlock.cpp 00003 * 00004 * Created on: 20.04.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/config.h" 00009 #include "ibrdtn/data/CompressedPayloadBlock.h" 00010 #include "ibrdtn/data/PayloadBlock.h" 00011 #include <ibrcommon/data/BLOB.h> 00012 #include <cassert> 00013 00014 #ifdef HAVE_ZLIB 00015 #include "zlib.h" 00016 #endif 00017 00018 namespace dtn 00019 { 00020 namespace data 00021 { 00022 dtn::data::Block* CompressedPayloadBlock::Factory::create() 00023 { 00024 return new CompressedPayloadBlock(); 00025 } 00026 00027 CompressedPayloadBlock::CompressedPayloadBlock() 00028 : dtn::data::Block(CompressedPayloadBlock::BLOCK_TYPE), _algorithm(COMPRESSION_UNKNOWN), _origin_size(0) 00029 { 00030 } 00031 00032 CompressedPayloadBlock::~CompressedPayloadBlock() 00033 { 00034 } 00035 00036 size_t CompressedPayloadBlock::getLength() const 00037 { 00038 return _algorithm.getLength() + _origin_size.getLength(); 00039 } 00040 00041 std::ostream& CompressedPayloadBlock::serialize(std::ostream &stream, size_t &length) const 00042 { 00043 stream << _algorithm << _origin_size; 00044 return stream; 00045 } 00046 00047 std::istream& CompressedPayloadBlock::deserialize(std::istream &stream, const size_t length) 00048 { 00049 stream >> _algorithm; 00050 stream >> _origin_size; 00051 return stream; 00052 } 00053 00054 void CompressedPayloadBlock::setAlgorithm(CompressedPayloadBlock::COMPRESS_ALGS alg) 00055 { 00056 _algorithm = alg; 00057 } 00058 00059 CompressedPayloadBlock::COMPRESS_ALGS CompressedPayloadBlock::getAlgorithm() const 00060 { 00061 return COMPRESS_ALGS( _algorithm.getValue() ); 00062 } 00063 00064 void CompressedPayloadBlock::setOriginSize(size_t s) 00065 { 00066 _origin_size = s; 00067 } 00068 00069 size_t CompressedPayloadBlock::getOriginSize() const 00070 { 00071 return _origin_size.getValue(); 00072 } 00073 00074 void CompressedPayloadBlock::compress(dtn::data::Bundle &b, CompressedPayloadBlock::COMPRESS_ALGS alg) 00075 { 00076 dtn::data::PayloadBlock &p = b.getBlock<dtn::data::PayloadBlock>(); 00077 00078 // get a data container for the compressed payload 00079 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create(); 00080 00081 // lock the BLOBs while we are compress the payload 00082 { 00083 // get streams of both containers 00084 ibrcommon::BLOB::iostream is = p.getBLOB().iostream(); 00085 ibrcommon::BLOB::iostream os = ref.iostream(); 00086 00087 // compress the payload 00088 CompressedPayloadBlock::compress(alg, *is, *os); 00089 } 00090 00091 // add a compressed payload block in front of the old payload block 00092 dtn::data::CompressedPayloadBlock &cpb = b.push_front<CompressedPayloadBlock>(); 00093 00094 // set cpb values 00095 cpb.setAlgorithm(alg); 00096 cpb.setOriginSize(p.getLength()); 00097 00098 // add the new payload block to the bundle 00099 b.insert(p, ref); 00100 00101 // delete the old payload block 00102 b.remove(p); 00103 } 00104 00105 void CompressedPayloadBlock::extract(dtn::data::Bundle &b) 00106 { 00107 // get the CPB first 00108 dtn::data::CompressedPayloadBlock &cpb = b.getBlock<CompressedPayloadBlock>(); 00109 00110 // get the payload block 00111 dtn::data::PayloadBlock &p = b.getBlock<dtn::data::PayloadBlock>(); 00112 00113 // get a data container for the uncompressed payload 00114 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create(); 00115 00116 // lock the BLOBs while we are uncompress the payload 00117 { 00118 // get streams of both containers 00119 ibrcommon::BLOB::iostream is = p.getBLOB().iostream(); 00120 ibrcommon::BLOB::iostream os = ref.iostream(); 00121 00122 // compress the payload 00123 CompressedPayloadBlock::extract(cpb.getAlgorithm(), *is, *os); 00124 } 00125 00126 // add the new payload block to the bundle 00127 b.insert(p, ref); 00128 00129 // delete the old payload block 00130 b.remove(p); 00131 00132 // delete the CPB 00133 b.remove(cpb); 00134 } 00135 00136 void CompressedPayloadBlock::compress(CompressedPayloadBlock::COMPRESS_ALGS alg, std::istream &is, std::ostream &os) 00137 { 00138 switch (alg) 00139 { 00140 case COMPRESSION_ZLIB: 00141 { 00142 #ifdef HAVE_ZLIB 00143 const size_t CHUNK_SIZE = 16384; 00144 00145 int ret, flush; 00146 unsigned have; 00147 unsigned char in[CHUNK_SIZE]; 00148 unsigned char out[CHUNK_SIZE]; 00149 z_stream strm; 00150 00151 /* allocate deflate state */ 00152 strm.zalloc = Z_NULL; 00153 strm.zfree = Z_NULL; 00154 strm.opaque = Z_NULL; 00155 ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION); 00156 00157 // exit if something is wrong 00158 if (ret != Z_OK) throw ibrcommon::Exception("initialization of zlib failed"); 00159 00160 do { 00161 is.read((char*)&in, CHUNK_SIZE); 00162 strm.avail_in = is.gcount(); 00163 00164 flush = is.eof() ? Z_FINISH : Z_NO_FLUSH; 00165 strm.next_in = in; 00166 00167 do { 00168 strm.avail_out = CHUNK_SIZE; 00169 strm.next_out = out; 00170 00171 ret = deflate(&strm, flush); /* no bad return value */ 00172 assert(ret != Z_STREAM_ERROR); /* state not clobbered */ 00173 00174 // determine how many bytes are available 00175 have = CHUNK_SIZE - strm.avail_out; 00176 00177 // write the buffer to the output stream 00178 os.write((char*)&out, have); 00179 00180 if (!os.good()) 00181 { 00182 (void)deflateEnd(&strm); 00183 throw ibrcommon::Exception("decompression failed. output stream went wrong."); 00184 break; 00185 } 00186 00187 } while (strm.avail_out == 0); 00188 assert(strm.avail_in == 0); /* all input will be used */ 00189 } while (flush != Z_FINISH); 00190 assert(ret == Z_STREAM_END); /* stream will be complete */ 00191 00192 (void)deflateEnd(&strm); 00193 #else 00194 throw ibrcommon::Exception("zlib is not supported"); 00195 #endif 00196 break; 00197 } 00198 00199 default: 00200 throw ibrcommon::Exception("compression mode is not supported"); 00201 } 00202 } 00203 00204 void CompressedPayloadBlock::extract(CompressedPayloadBlock::COMPRESS_ALGS alg, std::istream &is, std::ostream &os) 00205 { 00206 switch (alg) 00207 { 00208 case COMPRESSION_ZLIB: 00209 { 00210 #ifdef HAVE_ZLIB 00211 const size_t CHUNK_SIZE = 16384; 00212 00213 int ret; 00214 unsigned have; 00215 unsigned char in[CHUNK_SIZE]; 00216 unsigned char out[CHUNK_SIZE]; 00217 z_stream strm; 00218 00219 strm.zalloc = Z_NULL; 00220 strm.zfree = Z_NULL; 00221 strm.opaque = Z_NULL; 00222 strm.avail_in = 0; 00223 strm.next_in = Z_NULL; 00224 ret = inflateInit(&strm); 00225 00226 // exit if something is wrong 00227 if (ret != Z_OK) throw ibrcommon::Exception("initialization of zlib failed"); 00228 00229 do { 00230 is.read((char*)&in, CHUNK_SIZE); 00231 strm.avail_in = is.gcount(); 00232 00233 // we're done if there is no more input 00234 if ((strm.avail_in == 0) && (ret != Z_STREAM_END)) 00235 { 00236 (void)inflateEnd(&strm); 00237 throw ibrcommon::Exception("decompression failed. no enough data available."); 00238 } 00239 strm.next_in = in; 00240 00241 do { 00242 strm.avail_out = CHUNK_SIZE; 00243 strm.next_out = out; 00244 00245 ret = inflate(&strm, Z_NO_FLUSH); 00246 assert(ret != Z_STREAM_ERROR); /* state not clobbered */ 00247 00248 switch (ret) 00249 { 00250 case Z_NEED_DICT: 00251 ret = Z_DATA_ERROR; /* and fall through */ 00252 case Z_DATA_ERROR: 00253 case Z_MEM_ERROR: 00254 (void)inflateEnd(&strm); 00255 throw ibrcommon::Exception("decompression failed. memory error."); 00256 } 00257 00258 // determine how many bytes are available 00259 have = CHUNK_SIZE - strm.avail_out; 00260 00261 // write the buffer to the output stream 00262 os.write((char*)&out, have); 00263 00264 00265 if (!os.good()) 00266 { 00267 (void)inflateEnd(&strm); 00268 throw ibrcommon::Exception("decompression failed. output stream went wrong."); 00269 } 00270 00271 } while (strm.avail_out == 0); 00272 assert(strm.avail_in == 0); /* all input will be used */ 00273 } while (ret != Z_STREAM_END); 00274 00275 (void)inflateEnd(&strm); 00276 #else 00277 throw ibrcommon::Exception("zlib is not supported"); 00278 #endif 00279 break; 00280 } 00281 00282 default: 00283 throw ibrcommon::Exception("compression mode is not supported"); 00284 } 00285 } 00286 } 00287 }