IBR-DTNSuite  0.10
CompressedPayloadBlock.cpp
Go to the documentation of this file.
1 /*
2  * CompressedPayloadBlock.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "ibrdtn/config.h"
25 #include <ibrcommon/data/BLOB.h>
26 #include <cassert>
27 
28 #ifdef HAVE_ZLIB
29 #include "zlib.h"
30 #endif
31 
32 namespace dtn
33 {
34  namespace data
35  {
37 
39  {
40  return new CompressedPayloadBlock();
41  }
42 
44  : dtn::data::Block(CompressedPayloadBlock::BLOCK_TYPE), _algorithm(COMPRESSION_UNKNOWN), _origin_size(0)
45  {
46  }
47 
49  {
50  }
51 
53  {
54  return _algorithm.getLength() + _origin_size.getLength();
55  }
56 
57  std::ostream& CompressedPayloadBlock::serialize(std::ostream &stream, Length &len) const
58  {
59  stream << _algorithm << _origin_size;
60  len -= _algorithm.getLength() + _origin_size.getLength();
61  return stream;
62  }
63 
64  std::istream& CompressedPayloadBlock::deserialize(std::istream &stream, const Length&)
65  {
66  stream >> _algorithm;
67  stream >> _origin_size;
68  return stream;
69  }
70 
72  {
73  _algorithm = alg;
74  }
75 
77  {
78  return CompressedPayloadBlock::COMPRESS_ALGS(_algorithm.get<size_t>());
79  }
80 
82  {
83  _origin_size = s;
84  }
85 
87  {
88  return _origin_size;
89  }
90 
92  {
94  if (p_it == b.end()) throw ibrcommon::Exception("Payload block missing.");
95  dtn::data::PayloadBlock &p = dynamic_cast<dtn::data::PayloadBlock&>(**p_it);
96 
97  // get a data container for the compressed payload
99 
100  // lock the BLOBs while we are compress the payload
101  {
102  // get streams of both containers
105 
106  // compress the payload
107  CompressedPayloadBlock::compress(alg, *is, *os);
108  }
109 
110  // add a compressed payload block in front of the old payload block
112 
113  // set cpb values
114  cpb.setAlgorithm(alg);
115  cpb.setOriginSize(p.getLength());
116 
117  // add the new payload block to the bundle
118  b.insert(p_it, ref);
119 
120  // delete the old payload block
121  b.erase(p_it);
122  }
123 
125  {
126  // get the CPB first
128 
129  // get the payload block
131  if (p_it == b.end()) throw ibrcommon::Exception("Payload block missing.");
132  dtn::data::PayloadBlock &p = dynamic_cast<dtn::data::PayloadBlock&>(**p_it);
133 
134  // get a data container for the uncompressed payload
136 
137  // lock the BLOBs while we are uncompress the payload
138  {
139  // get streams of both containers
142 
143  // compress the payload
145  }
146 
147  // add the new payload block to the bundle
148  b.insert(p_it, ref);
149 
150  // delete the old payload block
151  b.erase(p_it);
152 
153  // delete the CPB
154  b.remove(cpb);
155  }
156 
157  void CompressedPayloadBlock::compress(CompressedPayloadBlock::COMPRESS_ALGS alg, std::istream &is, std::ostream &os)
158  {
159  switch (alg)
160  {
161  case COMPRESSION_ZLIB:
162  {
163 #ifdef HAVE_ZLIB
164  const uInt CHUNK_SIZE = 16384;
165 
166  int ret, flush;
167  uInt have;
168  unsigned char in[CHUNK_SIZE];
169  unsigned char out[CHUNK_SIZE];
170  z_stream strm;
171 
172  /* allocate deflate state */
173  strm.zalloc = Z_NULL;
174  strm.zfree = Z_NULL;
175  strm.opaque = Z_NULL;
176  ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
177 
178  // exit if something is wrong
179  if (ret != Z_OK) throw ibrcommon::Exception("initialization of zlib failed");
180 
181  do {
182  is.read((char*)&in, CHUNK_SIZE);
183  strm.avail_in = static_cast<uInt>(is.gcount());
184 
185  flush = is.eof() ? Z_FINISH : Z_NO_FLUSH;
186  strm.next_in = in;
187 
188  do {
189  strm.avail_out = CHUNK_SIZE;
190  strm.next_out = out;
191 
192  ret = deflate(&strm, flush); /* no bad return value */
193  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
194 
195  // determine how many bytes are available
196  have = CHUNK_SIZE - strm.avail_out;
197 
198  // write the buffer to the output stream
199  os.write((char*)&out, have);
200 
201  if (!os.good())
202  {
203  (void)deflateEnd(&strm);
204  throw ibrcommon::Exception("decompression failed. output stream went wrong.");
205  }
206 
207  } while (strm.avail_out == 0);
208  assert(strm.avail_in == 0); /* all input will be used */
209  } while (flush != Z_FINISH);
210  assert(ret == Z_STREAM_END); /* stream will be complete */
211 
212  (void)deflateEnd(&strm);
213 #else
214  throw ibrcommon::Exception("zlib is not supported");
215 #endif
216  break;
217  }
218 
219  default:
220  throw ibrcommon::Exception("compression mode is not supported");
221  }
222  }
223 
224  void CompressedPayloadBlock::extract(CompressedPayloadBlock::COMPRESS_ALGS alg, std::istream &is, std::ostream &os)
225  {
226  switch (alg)
227  {
228  case COMPRESSION_ZLIB:
229  {
230 #ifdef HAVE_ZLIB
231  const uInt CHUNK_SIZE = 16384;
232 
233  int ret;
234  uInt have;
235  unsigned char in[CHUNK_SIZE];
236  unsigned char out[CHUNK_SIZE];
237  z_stream strm;
238 
239  strm.zalloc = Z_NULL;
240  strm.zfree = Z_NULL;
241  strm.opaque = Z_NULL;
242  strm.avail_in = 0;
243  strm.next_in = Z_NULL;
244  ret = inflateInit(&strm);
245 
246  // exit if something is wrong
247  if (ret != Z_OK) throw ibrcommon::Exception("initialization of zlib failed");
248 
249  do {
250  is.read((char*)&in, CHUNK_SIZE);
251  strm.avail_in = static_cast<uInt>(is.gcount());
252 
253  // we're done if there is no more input
254  if ((strm.avail_in == 0) && (ret != Z_STREAM_END))
255  {
256  (void)inflateEnd(&strm);
257  throw ibrcommon::Exception("decompression failed. no enough data available.");
258  }
259  strm.next_in = in;
260 
261  do {
262  strm.avail_out = CHUNK_SIZE;
263  strm.next_out = out;
264 
265  ret = inflate(&strm, Z_NO_FLUSH);
266  assert(ret != Z_STREAM_ERROR); /* state not clobbered */
267 
268  switch (ret)
269  {
270  case Z_NEED_DICT:
271  ret = Z_DATA_ERROR; /* and fall through */
272  case Z_DATA_ERROR:
273  case Z_MEM_ERROR:
274  (void)inflateEnd(&strm);
275  throw ibrcommon::Exception("decompression failed. memory error.");
276  }
277 
278  // determine how many bytes are available
279  have = CHUNK_SIZE - strm.avail_out;
280 
281  // write the buffer to the output stream
282  os.write((char*)&out, have);
283 
284 
285  if (!os.good())
286  {
287  (void)inflateEnd(&strm);
288  throw ibrcommon::Exception("decompression failed. output stream went wrong.");
289  }
290 
291  } while (strm.avail_out == 0);
292  assert(strm.avail_in == 0); /* all input will be used */
293  } while (ret != Z_STREAM_END);
294 
295  (void)inflateEnd(&strm);
296 #else
297  throw ibrcommon::Exception("zlib is not supported");
298 #endif
299  break;
300  }
301 
302  default:
303  throw ibrcommon::Exception("compression mode is not supported");
304  }
305  }
306  }
307 }