IBR-DTNSuite  0.12
StreamBuffer.cpp
Go to the documentation of this file.
1 /*
2  * bpstreambuf.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include <ibrcommon/Logger.h>
25 #include <vector>
26 
27 namespace dtn
28 {
29  namespace streams
30  {
31  StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const dtn::data::Length buffer_size)
32  : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(buffer_size), out_buf_(buffer_size), _stream(stream),
33  _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE), _idle_timer(*this, 0)
34  {
35  // Initialize get pointer. This should be zero so that underflow is called upon first read.
36  setg(0, 0, 0);
37  setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
38  }
39 
40  StreamConnection::StreamBuffer::~StreamBuffer()
41  {
42  // stop the idle timer
43  _idle_timer.stop();
44  }
45 
46  bool StreamConnection::StreamBuffer::get(const StateBits bit) const
47  {
48  return (_statebits & bit);
49  }
50 
51  void StreamConnection::StreamBuffer::set(const StateBits bit)
52  {
53  ibrcommon::MutexLock l(_statelock);
54  _statebits |= bit;
55  }
56 
57  void StreamConnection::StreamBuffer::unset(const StateBits bit)
58  {
59  ibrcommon::MutexLock l(_statelock);
60  _statebits &= ~(bit);
61  }
62 
63  void StreamConnection::StreamBuffer::__error() const
64  {
65  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
66  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
67  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
68  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
69  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Recv size: " << _recv_size.toString() << IBRCOMMON_LOGGER_ENDL;
70  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
71  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
72  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
73  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
74  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
75 
76  if (_statebits & STREAM_FAILED)
77  {
78  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
79  }
80 
81  if (_statebits & STREAM_BAD)
82  {
83  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
84  }
85 
86  if (_statebits & STREAM_EOF)
87  {
88  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
89  }
90 
91  if (_statebits & STREAM_SHUTDOWN)
92  {
93  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
94  }
95 
96  if (_statebits & STREAM_CLOSED)
97  {
98  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
99  }
100 
101  if (!_stream.good())
102  {
103  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
104  }
105  }
106 
107  bool StreamConnection::StreamBuffer::__good() const
108  {
109  int badbits = STREAM_FAILED | STREAM_BAD | STREAM_EOF | STREAM_SHUTDOWN | STREAM_CLOSED;
110  return !(badbits & _statebits);
111  }
112 
121  const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
122  {
123  StreamContactHeader peer;
124 
125  try {
126  // make the send-call atomic
127  {
128  ibrcommon::MutexLock l(_sendlock);
129 
130  // transfer the local header
131  _stream << header << std::flush;
132  }
133 
134  // receive the remote header
135  _stream >> peer;
136 
137  // enable/disable ACK/NACK support
138  if (peer._flags.getBit(StreamContactHeader::REQUEST_ACKNOWLEDGMENTS)) set(STREAM_ACK_SUPPORT);
139  if (peer._flags.getBit(StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS)) set(STREAM_NACK_SUPPORT);
140 
141  // set the incoming timer if set (> 0)
142  if (peer._keepalive > 0)
143  {
144  // mark timer support
145  set(STREAM_TIMER_SUPPORT);
146  }
147 
148  // set handshake completed bit
149  set(STREAM_HANDSHAKE);
150 
151  } catch (const std::exception&) {
152  // set failed bit
153  set(STREAM_FAILED);
154 
155  // shutdown the stream
157 
158  // call the shutdown event
159  _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
160 
161  // forward the catched exception
162  throw StreamErrorException("handshake not completed");
163  }
164 
165  // return the received header
166  return peer;
167  }
168 
174  void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
175  {
176  try {
177  ibrcommon::MutexLock l(_sendlock);
178  // send a SHUTDOWN message
179  _stream << StreamDataSegment(reason) << std::flush;
180  } catch (const std::exception&) {
181  // set failed bit
182  set(STREAM_FAILED);
183 
184  throw StreamErrorException("can not send shutdown message");
185  }
186  }
187 
188  void StreamConnection::StreamBuffer::keepalive()
189  {
190  try {
191  try {
192  ibrcommon::MutexTryLock l(_sendlock);
193  _stream << StreamDataSegment() << std::flush;
194  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
195  } catch (const ibrcommon::MutexException&) {
196  // could not grab the lock - another process is sending something
197  // then we do nothing since a data frame do the same as a keepalive frame.
198  };
199  } catch (const std::exception&) {
200  // set failed bit
201  set(STREAM_FAILED);
202  }
203  }
204 
205  void StreamConnection::StreamBuffer::close()
206  {
207  // set shutdown bit
208  set(STREAM_SHUTDOWN);
209  }
210 
211  void StreamConnection::StreamBuffer::reject()
212  {
213  // we have to reject the current transmission
214  // so we have to discard all all data until the next segment with a start bit
215  set(STREAM_REJECT);
216 
217  // set the current in buffer to zero
218  // this should result in a underflow call on the next read
219  setg(0, 0, 0);
220  }
221 
222  void StreamConnection::StreamBuffer::abort()
223  {
224  _segments.abort();
225  }
226 
227  void StreamConnection::StreamBuffer::wait()
228  {
229  // TODO: get max time to wait out of the timeout values
230  dtn::data::Timeout timeout = 0;
231 
232  try {
233  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
234  _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
235  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
236  } catch (const ibrcommon::QueueUnblockedException&) {
237  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
238  }
239  }
240 
241  // This function is called when the output buffer is filled.
242  // In this function, the buffer should be written to wherever it should
243  // be written to (in this case, the streambuf object that this is controlling).
244  std::char_traits<char>::int_type StreamConnection::StreamBuffer::overflow(std::char_traits<char>::int_type c)
245  {
246  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 90) << "overflow() called" << IBRCOMMON_LOGGER_ENDL;
247 
248  try {
249  char *ibegin = &out_buf_[0];
250  char *iend = pptr();
251 
252  // mark the buffer as free
253  setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
254 
255  // append the last character
256  if(!traits_type::eq_int_type(c, traits_type::eof())) {
257  *iend++ = traits_type::to_char_type(c);
258  }
259 
260  // if there is nothing to send, just return
261  if ((iend - ibegin) == 0)
262  {
263  return traits_type::not_eof(c);
264  }
265 
266  // wrap a segment around the data
268 
269  // set the start flag
270  if (get(STREAM_SOB))
271  {
273  unset(STREAM_SKIP);
274  unset(STREAM_SOB);
275  }
276 
277  if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
278  {
279  // set the end flag
281  set(STREAM_SOB);
282  }
283 
284  if (!get(STREAM_SKIP))
285  {
286  // put the segment into the queue
287  if (get(STREAM_ACK_SUPPORT))
288  {
289  _segments.push(seg);
290  }
292  {
293  // without ACK support we have to assume that a bundle is forwarded
294  // when the last segment is sent.
295  _conn.eventBundleForwarded();
296  }
297 
298  ibrcommon::MutexLock l(_sendlock);
299  if (!_stream.good()) throw StreamErrorException("stream went bad");
300 
301  // write the segment to the stream
302  _stream << seg;
303  _stream.write(&out_buf_[0], seg._value.get<size_t>());
304 
305  // record statistics
306  _conn._callback.addTrafficOut(seg._value.get<size_t>());
307  }
308 
309  return traits_type::not_eof(c);
310  } catch (const StreamClosedException&) {
311  // set failed bit
312  set(STREAM_FAILED);
313 
314  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
315 
316  throw;
317  } catch (const StreamErrorException&) {
318  // set failed bit
319  set(STREAM_FAILED);
320 
321  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
322 
323  throw;
324  } catch (const ios_base::failure&) {
325  // set failed bit
326  set(STREAM_FAILED);
327 
328  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
329 
330  throw;
331  }
332 
333  return traits_type::eof();
334  }
335 
336  // This is called to flush the buffer.
337  // This is called when we're done with the file stream (or when .flush() is called).
338  int StreamConnection::StreamBuffer::sync()
339  {
340  int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
341  traits_type::eof()) ? -1 : 0;
342 
343  try {
344  ibrcommon::MutexLock l(_sendlock);
345 
346  // ... and flush.
347  _stream.flush();
348  } catch (const ios_base::failure&) {
349  // set failed bit
350  set(STREAM_BAD);
351 
352  _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
353  }
354 
355  return ret;
356  }
357 
358  void StreamConnection::StreamBuffer::skipData(dtn::data::Length &size)
359  {
360  // a temporary buffer
361  std::vector<char> tmpbuf(_buffer_size);
362 
363  try {
364  // and read until the next segment
365  while (size > 0 && _stream.good())
366  {
367  dtn::data::Length readsize = _buffer_size;
368  if (size < _buffer_size) readsize = size;
369 
370  // to reject a bundle read all remaining data of this segment
371  _stream.read(&tmpbuf[0], (std::streamsize)readsize);
372 
373  // record statistics
374  _conn._callback.addTrafficIn(readsize);
375 
376  // reset idle timeout
377  _idle_timer.reset();
378 
379  // adjust the remain counter
380  size -= readsize;
381  }
382  } catch (const ios_base::failure &ex) {
383  _underflow_state = IDLE;
384  throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
385  }
386  }
387 
388  // Fill the input buffer. This reads out of the streambuf.
389  std::char_traits<char>::int_type StreamConnection::StreamBuffer::underflow()
390  {
391  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
392 
393  try {
394  if (_underflow_state == DATA_TRANSFER)
395  {
396  // on bundle reject
397  if (get(STREAM_REJECT))
398  {
399  // send NACK on bundle reject
400  if (get(STREAM_NACK_SUPPORT))
401  {
402  ibrcommon::MutexLock l(_sendlock);
403  if (!_stream.good()) throw StreamErrorException("stream went bad");
404 
405  // send a REFUSE message
407  }
408 
409  // skip data in this segment
410  skipData(_underflow_data_remain);
411 
412  // return to idle state
413  _underflow_state = IDLE;
414  }
415  // send ACK if the data segment is received completely
416  else if (_underflow_data_remain == 0)
417  {
418  // New data segment received. Send an ACK.
419  if (get(STREAM_ACK_SUPPORT))
420  {
421  ibrcommon::MutexLock l(_sendlock);
422  if (!_stream.good()) throw StreamErrorException("stream went bad");
423  _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
424  }
425 
426  // return to idle state
427  _underflow_state = IDLE;
428  }
429  }
430 
431  // read segments until DATA is AVAILABLE
432  while (_underflow_state == IDLE)
433  {
434  // container for segment data
436 
437  try {
438  // read the segment
439  if (!_stream.good()) throw StreamErrorException("stream went bad");
440 
441  _stream >> seg;
442  } catch (const ios_base::failure &ex) {
443  throw StreamErrorException("read error: " + std::string(ex.what()));
444  }
445 
447  {
448  // reset idle timeout
449  _idle_timer.reset();
450  }
451 
452  switch (seg._type)
453  {
455  {
456  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_DATA_SEGMENT received, size: " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
457 
459  {
460  _recv_size = seg._value;
461  unset(STREAM_REJECT);
462  }
463  else
464  {
465  _recv_size += seg._value;
466  }
467 
468  // set the new data length
469  _underflow_data_remain = seg._value.get<Length>();
470 
471  if (get(STREAM_REJECT))
472  {
473  // send NACK on bundle reject
474  if (get(STREAM_NACK_SUPPORT))
475  {
476  // lock for sending
477  ibrcommon::MutexLock l(_sendlock);
478  if (!_stream.good()) throw StreamErrorException("stream went bad");
479 
480  // send a NACK message
481  _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
482  }
483 
484  // skip data in this segment
485  skipData(_underflow_data_remain);
486  }
487  else
488  {
489  // announce the new data block
490  _underflow_state = DATA_TRANSFER;
491  }
492  break;
493  }
494 
496  {
497  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_ACK_SEGMENT received, size: " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
498 
499  // remove the segment in the queue
500  if (get(STREAM_ACK_SUPPORT))
501  {
502  ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
503  if (q.empty())
504  {
505  IBRCOMMON_LOGGER_TAG("StreamBuffer", error) << "got an unexpected ACK with size of " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
506  }
507  else
508  {
509  StreamDataSegment &qs = q.front();
510 
511  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
512 
513  _conn.eventBundleAck(seg._value.get<Length>());
514 
516  {
517  _conn.eventBundleForwarded();
518  }
519 
520  q.pop();
521  }
522  }
523  break;
524  }
525 
527  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_KEEPALIVE received, size: " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
528  break;
529 
531  {
532  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_REFUSE_BUNDLE received, flags: " << (int)seg._flags << IBRCOMMON_LOGGER_ENDL;
533 
534  // TODO: Test bundle rejection!
535 
536  // remove the segment in the queue
537  if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
538  {
539  // skip segments
540  if (!_rejected_segments.empty())
541  {
542  _rejected_segments.pop();
543 
544  // we received a NACK
545  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
546  }
547  else try
548  {
549  StreamDataSegment qs = _segments.getnpop();
550 
551  // we received a NACK
552  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
553 
554  // get all segment ACKs in the queue for this transmission
555  while (!_segments.empty())
556  {
557  StreamDataSegment &seg = _segments.front();
559  {
560  break;
561  }
562 
563  // move the segments to another queue
564  _rejected_segments.push(seg);
565  _segments.pop();
566  }
567 
568  // call event reject
569  _conn.eventBundleRefused();
570 
571  // we received a NACK
572  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
573 
574  // the queue is empty, then skip the current transfer
575  if (_segments.empty())
576  {
577  set(STREAM_SKIP);
578 
579  // we received a NACK
580  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
581  }
582 
583  } catch (const ibrcommon::QueueUnblockedException&) {
584  IBRCOMMON_LOGGER_TAG("StreamBuffer", error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
585  }
586 
587  }
588  else
589  {
590  IBRCOMMON_LOGGER_TAG("StreamBuffer", error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
591  }
592 
593  break;
594  }
595 
597  {
598  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
599  throw StreamShutdownException();
600  }
601  }
602  }
603 
604  // currently transferring data
605  dtn::data::Length readsize = _buffer_size;
606  if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
607 
608  try {
609  if (!_stream.good()) throw StreamErrorException("stream went bad");
610 
611  // here receive the data
612  _stream.read(&in_buf_[0], (std::streamsize)readsize);
613 
614  // record statistics
615  _conn._callback.addTrafficIn(readsize);
616 
617  // reset idle timeout
618  _idle_timer.reset();
619  } catch (const ios_base::failure &ex) {
620  _underflow_state = IDLE;
621  throw StreamErrorException("read error: " + std::string(ex.what()));
622  }
623 
624  // adjust the remain counter
625  _underflow_data_remain -= readsize;
626 
627  // Since the input buffer content is now valid (or is new)
628  // the get pointer should be initialized (or reset).
629  setg(&in_buf_[0], &in_buf_[0], &in_buf_[0] + readsize);
630 
631  return traits_type::not_eof(in_buf_[0]);
632 
633  } catch (const StreamClosedException&) {
634  // set failed bit
635  set(STREAM_FAILED);
636 
637  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
638 
639  } catch (const StreamErrorException &ex) {
640  // set failed bit
641  set(STREAM_FAILED);
642 
643  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
644 
645  throw;
646  } catch (const StreamShutdownException&) {
647  // set failed bit
648  set(STREAM_FAILED);
649 
650  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
651  }
652 
653  return traits_type::eof();
654  }
655 
656  size_t StreamConnection::StreamBuffer::timeout(ibrcommon::Timer*)
657  {
658  if (__good())
659  {
661  }
663  }
664 
665  void StreamConnection::StreamBuffer::enableIdleTimeout(const dtn::data::Timeout &seconds)
666  {
667  _idle_timer.set(seconds);
668  _idle_timer.start();
669  }
670  }
671 }