IBR-DTNSuite  0.10
BundleStreamBuf.cpp
Go to the documentation of this file.
1 /*
2  * BundleStreamBuf.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "BundleStreamBuf.h"
23 #include "core/BundleCore.h"
25 #include <ibrcommon/Logger.h>
27 
28 namespace dtn
29 {
30  namespace api
31  {
32  BundleStreamBuf::BundleStreamBuf(BundleStreamBufCallback &callback, const dtn::data::Length chunk_size, bool wait_seq_zero)
33  : _callback(callback), _in_buf(BUFF_SIZE), _out_buf(BUFF_SIZE),
34  _chunk_size(chunk_size), _chunk_payload(ibrcommon::BLOB::create()), _chunk_offset(0), _in_seq(0),
35  _out_seq(0), _streaming(wait_seq_zero), _first_chunk(true), _last_chunk_received(false), _timeout_receive(0)
36  {
37  // Initialize get pointer. This should be zero so that underflow is called upon first read.
38  setg(0, 0, 0);
39  setp(&_in_buf[0], &_in_buf[BUFF_SIZE - 1]);
40  }
41 
43  {
44  }
45 
47  {
48  int ret = std::char_traits<char>::eq_int_type(this->overflow(
49  std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
50  : 0;
51 
52  // send the current chunk and clear it
53  flushPayload(true);
54 
55  return ret;
56  }
57 
58  std::char_traits<char>::int_type BundleStreamBuf::overflow(std::char_traits<char>::int_type c)
59  {
60  char *ibegin = &_in_buf[0];
61  char *iend = pptr();
62 
63  // mark the buffer as free
64  setp(&_in_buf[0], &_in_buf[0] + BUFF_SIZE - 1);
65 
66  if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
67  {
68  *iend++ = std::char_traits<char>::to_char_type(c);
69  }
70 
71  // if there is nothing to send, just return
72  if ((iend - ibegin) == 0)
73  {
74  return std::char_traits<char>::not_eof(c);
75  }
76 
77  // copy data into the bundles payload
78  BundleStreamBuf::append(_chunk_payload, &_in_buf[0], iend - ibegin);
79 
80  // if size exceeds chunk limit, send it
81  if (_chunk_payload.iostream().size() > static_cast<std::streamsize>(_chunk_size))
82  {
83  flushPayload();
84  }
85 
86  return std::char_traits<char>::not_eof(c);
87  }
88 
89  void BundleStreamBuf::flushPayload(bool final)
90  {
91  // do not send a bundle if there are no bytes buffered
92  // and no bundle has been sent before
93  if ((_first_chunk) && (_chunk_payload.iostream().size() == 0))
94  {
95  return;
96  }
97 
98  // create an empty bundle
100 
102  block.setSequenceNumber(_out_seq);
103  if (final) block.set(dtn::data::StreamBlock::STREAM_END, true);
104  block.set(dtn::data::StreamBlock::STREAM_BEGIN, _first_chunk);
105  if (_first_chunk) _first_chunk = false;
106 
107  // add tmp payload to the bundle
108  b.push_back(_chunk_payload);
109 
110  // send the current chunk
111  _callback.put(b);
112 
113  // and clear the payload
114  _chunk_payload = ibrcommon::BLOB::create();
115 
116  // increment the sequence number
117  _out_seq++;
118  }
119 
121  {
122  _chunk_size = size;
123  }
124 
126  {
127  _timeout_receive = timeout;
128  }
129 
130  void BundleStreamBuf::append(ibrcommon::BLOB::Reference &ref, const char* data, const dtn::data::Length &length)
131  {
132  ibrcommon::BLOB::iostream stream = ref.iostream();
133  (*stream).seekp(0, ios::end);
134  (*stream).write(data, length);
135  }
136 
137  std::char_traits<char>::int_type BundleStreamBuf::underflow()
138  {
139  // return with EOF if the last chunk was received
140  if (_last_chunk_received)
141  {
142  return std::char_traits<char>::eof();
143  }
144 
145  // receive chunks until the next sequence number is received
146  while (_chunks.empty())
147  {
148  // request the next bundle
149  dtn::data::MetaBundle b = _callback.get();
150 
151  IBRCOMMON_LOGGER_DEBUG_TAG("BundleStreamBuf", 40) << "bundle received" << IBRCOMMON_LOGGER_ENDL;
152 
153  // create a chunk object
154  Chunk c(b);
155 
156  if (c._seq >= _in_seq)
157  {
158  IBRCOMMON_LOGGER_DEBUG_TAG("BundleStreamBuf", 40) << "bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL;
159  _chunks.insert(c);
160  }
161  }
162 
164  tm.start();
165 
166  // while not the right sequence number received -> wait
167  while (_in_seq != (*_chunks.begin())._seq)
168  {
169  try {
170  // request the next bundle
171  dtn::data::MetaBundle b = _callback.get(_timeout_receive);
172  IBRCOMMON_LOGGER_DEBUG_TAG("BundleStreamBuf", 40) << "bundle received" << IBRCOMMON_LOGGER_ENDL;
173 
174  // create a chunk object
175  Chunk c(b);
176 
177  if (c._seq >= _in_seq)
178  {
179  IBRCOMMON_LOGGER_DEBUG_TAG("BundleStreamBuf", 40) << "bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL;
180  _chunks.insert(c);
181  }
182  } catch (std::exception&) {
183  // timed out
184  }
185 
186  tm.stop();
187  if (((_timeout_receive > 0) && (tm.getSeconds() > _timeout_receive)) || !_streaming)
188  {
189  // skip the missing bundles and proceed with the last received one
190  _in_seq = (*_chunks.begin())._seq;
191 
192  // set streaming to active
193  _streaming = true;
194  }
195  }
196 
197  IBRCOMMON_LOGGER_DEBUG_TAG("BundleStreamBuf", 40) << "read the payload" << IBRCOMMON_LOGGER_ENDL;
198 
199  // get the first chunk in the buffer
200  const Chunk &c = (*_chunks.begin());
201 
202  if (c._meta != _current_bundle)
203  {
204  // load the bundle from storage
206  _current_bundle = storage.get(c._meta);
207 
208  // process the bundle block (security, compression, ...)
209  dtn::core::BundleCore::processBlocks(_current_bundle);
210  }
211 
212  const dtn::data::PayloadBlock &payload = _current_bundle.find<dtn::data::PayloadBlock>();
213  ibrcommon::BLOB::Reference r = payload.getBLOB();
214 
215  bool end_of_stream = false;
216  std::streamsize bytes = 0;
217 
218  // lock the stream while reading from it
219  {
220  // get stream lock
221  ibrcommon::BLOB::iostream stream = r.iostream();
222 
223  // jump to the offset position
224  (*stream).seekg(_chunk_offset, ios::beg);
225 
226  // copy the data of the last received bundle into the buffer
227  (*stream).read(&_out_buf[0], BUFF_SIZE);
228 
229  // get the read bytes
230  bytes = (*stream).gcount();
231 
232  // check for end of stream
233  end_of_stream = (*stream).eof();
234  }
235 
236  if (end_of_stream)
237  {
238  // bundle consumed
239  // std::cerr << std::endl << "# " << c._seq << std::endl << std::flush;
240 
241  // check if this was the last chunk
242  if (c._last)
243  {
244  _last_chunk_received = true;
245  }
246 
247  // set bundle as delivered
248  _callback.delivered(c._meta);
249 
250  // delete the last chunk
251  _chunks.erase(c);
252 
253  // reset the chunk offset
254  _chunk_offset = 0;
255 
256  // increment sequence number
257  _in_seq++;
258 
259  // if no more bytes are read, get the next bundle -> call underflow() recursive
260  if (bytes == 0)
261  {
262  return underflow();
263  }
264  }
265  else
266  {
267  // increment the chunk offset
268  _chunk_offset += bytes;
269  }
270 
271  // Since the input buffer content is now valid (or is new)
272  // the get pointer should be initialized (or reset).
273  setg(&_out_buf[0], &_out_buf[0], &_out_buf[0] + bytes);
274 
275  return std::char_traits<char>::not_eof(_out_buf[0]);
276  }
277 
278  BundleStreamBuf::Chunk::Chunk(const dtn::data::MetaBundle &m)
279  : _meta(m), _seq(0), _first(false), _last(false)
280  {
282  dtn::data::Bundle bundle = storage.get(_meta);
283 
284  try {
285  const dtn::data::StreamBlock &block = bundle.find<dtn::data::StreamBlock>();
286  _seq = block.getSequenceNumber();
290  }
291 
292  BundleStreamBuf::Chunk::~Chunk()
293  {
294  }
295 
296  bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
297  {
298  return (_seq == other._seq);
299  }
300 
301  bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
302  {
303  return (_seq < other._seq);
304  }
305  } /* namespace data */
306 } /* namespace dtn */