IBR-DTNSuite  0.10
StreamBuffer.cpp
Go to the documentation of this file.
1 /*
2  * bpstreambuf.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include <ibrcommon/Logger.h>
25 #include <vector>
26 
27 namespace dtn
28 {
29  namespace streams
30  {
31  StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const dtn::data::Length buffer_size)
32  : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(buffer_size), out_buf_(buffer_size), _stream(stream),
33  _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE), _idle_timer(*this, 0), _monitor(false), _monitor_stats(2, 0)
34  {
35  // Initialize get pointer. This should be zero so that underflow is called upon first read.
36  setg(0, 0, 0);
37  setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
38  }
39 
40  StreamConnection::StreamBuffer::~StreamBuffer()
41  {
42  // stop the idle timer
43  _idle_timer.stop();
44  }
45 
46  bool StreamConnection::StreamBuffer::get(const StateBits bit) const
47  {
48  return (_statebits & bit);
49  }
50 
51  void StreamConnection::StreamBuffer::set(const StateBits bit)
52  {
53  ibrcommon::MutexLock l(_statelock);
54  _statebits |= bit;
55  }
56 
57  void StreamConnection::StreamBuffer::unset(const StateBits bit)
58  {
59  ibrcommon::MutexLock l(_statelock);
60  _statebits &= ~(bit);
61  }
62 
63  void StreamConnection::StreamBuffer::__error() const
64  {
65  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
66  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
67  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
68  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
69  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Recv size: " << _recv_size.toString() << IBRCOMMON_LOGGER_ENDL;
70  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
71  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
72  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
73  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
74  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
75 
76  if (_statebits & STREAM_FAILED)
77  {
78  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
79  }
80 
81  if (_statebits & STREAM_BAD)
82  {
83  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
84  }
85 
86  if (_statebits & STREAM_EOF)
87  {
88  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
89  }
90 
91  if (_statebits & STREAM_SHUTDOWN)
92  {
93  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
94  }
95 
96  if (_statebits & STREAM_CLOSED)
97  {
98  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
99  }
100 
101  if (!_stream.good())
102  {
103  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
104  }
105  }
106 
107  bool StreamConnection::StreamBuffer::__good() const
108  {
109  int badbits = STREAM_FAILED | STREAM_BAD | STREAM_EOF | STREAM_SHUTDOWN | STREAM_CLOSED;
110  return !(badbits & _statebits);
111  }
112 
121  const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
122  {
123  StreamContactHeader peer;
124 
125  try {
126  // make the send-call atomic
127  {
128  ibrcommon::MutexLock l(_sendlock);
129 
130  // transfer the local header
131  _stream << header << std::flush;
132  }
133 
134  // receive the remote header
135  _stream >> peer;
136 
137  // enable/disable ACK/NACK support
138  if (peer._flags.getBit(StreamContactHeader::REQUEST_ACKNOWLEDGMENTS)) set(STREAM_ACK_SUPPORT);
139  if (peer._flags.getBit(StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS)) set(STREAM_NACK_SUPPORT);
140 
141  // set the incoming timer if set (> 0)
142  if (peer._keepalive > 0)
143  {
144  // mark timer support
145  set(STREAM_TIMER_SUPPORT);
146  }
147 
148  // set handshake completed bit
149  set(STREAM_HANDSHAKE);
150 
151  } catch (const std::exception&) {
152  // set failed bit
153  set(STREAM_FAILED);
154 
155  // shutdown the stream
157 
158  // call the shutdown event
159  _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
160 
161  // forward the catched exception
162  throw StreamErrorException("handshake not completed");
163  }
164 
165  // return the received header
166  return peer;
167  }
168 
174  void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
175  {
176  try {
177  ibrcommon::MutexLock l(_sendlock);
178  // send a SHUTDOWN message
179  _stream << StreamDataSegment(reason) << std::flush;
180  } catch (const std::exception&) {
181  // set failed bit
182  set(STREAM_FAILED);
183 
184  throw StreamErrorException("can not send shutdown message");
185  }
186  }
187 
188  void StreamConnection::StreamBuffer::keepalive()
189  {
190  try {
191  try {
192  ibrcommon::MutexTryLock l(_sendlock);
193  _stream << StreamDataSegment() << std::flush;
194  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
195  } catch (const ibrcommon::MutexException&) {
196  // could not grab the lock - another process is sending something
197  // then we do nothing since a data frame do the same as a keepalive frame.
198  };
199  } catch (const std::exception&) {
200  // set failed bit
201  set(STREAM_FAILED);
202  }
203  }
204 
205  void StreamConnection::StreamBuffer::close()
206  {
207  // set shutdown bit
208  set(STREAM_SHUTDOWN);
209  }
210 
211  void StreamConnection::StreamBuffer::reject()
212  {
213  // we have to reject the current transmission
214  // so we have to discard all all data until the next segment with a start bit
215  set(STREAM_REJECT);
216 
217  // set the current in buffer to zero
218  // this should result in a underflow call on the next read
219  setg(0, 0, 0);
220  }
221 
222  void StreamConnection::StreamBuffer::abort()
223  {
224  _segments.abort();
225  }
226 
227  void StreamConnection::StreamBuffer::wait()
228  {
229  // TODO: get max time to wait out of the timeout values
230  dtn::data::Timeout timeout = 0;
231 
232  try {
233  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
234  _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
235  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
236  } catch (const ibrcommon::QueueUnblockedException&) {
237  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
238  }
239  }
240 
241  // This function is called when the output buffer is filled.
242  // In this function, the buffer should be written to wherever it should
243  // be written to (in this case, the streambuf object that this is controlling).
244  std::char_traits<char>::int_type StreamConnection::StreamBuffer::overflow(std::char_traits<char>::int_type c)
245  {
246  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 90) << "overflow() called" << IBRCOMMON_LOGGER_ENDL;
247 
248  try {
249  char *ibegin = &out_buf_[0];
250  char *iend = pptr();
251 
252  // mark the buffer as free
253  setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
254 
255  // append the last character
256  if(!traits_type::eq_int_type(c, traits_type::eof())) {
257  *iend++ = traits_type::to_char_type(c);
258  }
259 
260  // if there is nothing to send, just return
261  if ((iend - ibegin) == 0)
262  {
263  return traits_type::not_eof(c);
264  }
265 
266  // wrap a segment around the data
268 
269  // set the start flag
270  if (get(STREAM_SOB))
271  {
273  unset(STREAM_SKIP);
274  unset(STREAM_SOB);
275  }
276 
277  if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
278  {
279  // set the end flag
281  set(STREAM_SOB);
282  }
283 
284  if (!get(STREAM_SKIP))
285  {
286  // put the segment into the queue
287  if (get(STREAM_ACK_SUPPORT))
288  {
289  _segments.push(seg);
290  }
292  {
293  // without ACK support we have to assume that a bundle is forwarded
294  // when the last segment is sent.
295  _conn.eventBundleForwarded();
296  }
297 
298  ibrcommon::MutexLock l(_sendlock);
299  if (!_stream.good()) throw StreamErrorException("stream went bad");
300 
301  // write the segment to the stream
302  _stream << seg;
303  _stream.write(&out_buf_[0], seg._value.get<size_t>());
304 
305  // record statistics
306  if (_monitor) {
307  _monitor_stats[1] += seg._value.get<size_t>();
308  }
309  }
310 
311  return traits_type::not_eof(c);
312  } catch (const StreamClosedException&) {
313  // set failed bit
314  set(STREAM_FAILED);
315 
316  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
317 
318  throw;
319  } catch (const StreamErrorException&) {
320  // set failed bit
321  set(STREAM_FAILED);
322 
323  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
324 
325  throw;
326  } catch (const ios_base::failure&) {
327  // set failed bit
328  set(STREAM_FAILED);
329 
330  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
331 
332  throw;
333  }
334 
335  return traits_type::eof();
336  }
337 
338  // This is called to flush the buffer.
339  // This is called when we're done with the file stream (or when .flush() is called).
340  int StreamConnection::StreamBuffer::sync()
341  {
342  int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
343  traits_type::eof()) ? -1 : 0;
344 
345  try {
346  ibrcommon::MutexLock l(_sendlock);
347 
348  // ... and flush.
349  _stream.flush();
350  } catch (const ios_base::failure&) {
351  // set failed bit
352  set(STREAM_BAD);
353 
354  _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
355  }
356 
357  return ret;
358  }
359 
360  void StreamConnection::StreamBuffer::skipData(dtn::data::Length &size)
361  {
362  // a temporary buffer
363  std::vector<char> tmpbuf(_buffer_size);
364 
365  try {
366  // and read until the next segment
367  while (size > 0 && _stream.good())
368  {
369  dtn::data::Length readsize = _buffer_size;
370  if (size < _buffer_size) readsize = size;
371 
372  // to reject a bundle read all remaining data of this segment
373  _stream.read(&tmpbuf[0], (std::streamsize)readsize);
374 
375  // record statistics
376  if (_monitor) {
377  _monitor_stats[0] += readsize;
378  }
379 
380  // reset idle timeout
381  _idle_timer.reset();
382 
383  // adjust the remain counter
384  size -= readsize;
385  }
386  } catch (const ios_base::failure &ex) {
387  _underflow_state = IDLE;
388  throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
389  }
390  }
391 
392  // Fill the input buffer. This reads out of the streambuf.
393  std::char_traits<char>::int_type StreamConnection::StreamBuffer::underflow()
394  {
395  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
396 
397  try {
398  if (_underflow_state == DATA_TRANSFER)
399  {
400  // on bundle reject
401  if (get(STREAM_REJECT))
402  {
403  // send NACK on bundle reject
404  if (get(STREAM_NACK_SUPPORT))
405  {
406  ibrcommon::MutexLock l(_sendlock);
407  if (!_stream.good()) throw StreamErrorException("stream went bad");
408 
409  // send a REFUSE message
411  }
412 
413  // skip data in this segment
414  skipData(_underflow_data_remain);
415 
416  // return to idle state
417  _underflow_state = IDLE;
418  }
419  // send ACK if the data segment is received completely
420  else if (_underflow_data_remain == 0)
421  {
422  // New data segment received. Send an ACK.
423  if (get(STREAM_ACK_SUPPORT))
424  {
425  ibrcommon::MutexLock l(_sendlock);
426  if (!_stream.good()) throw StreamErrorException("stream went bad");
427  _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
428  }
429 
430  // return to idle state
431  _underflow_state = IDLE;
432  }
433  }
434 
435  // read segments until DATA is AVAILABLE
436  while (_underflow_state == IDLE)
437  {
438  // container for segment data
440 
441  try {
442  // read the segment
443  if (!_stream.good()) throw StreamErrorException("stream went bad");
444 
445  _stream >> seg;
446  } catch (const ios_base::failure &ex) {
447  throw StreamErrorException("read error: " + std::string(ex.what()));
448  }
449 
451  {
452  // reset idle timeout
453  _idle_timer.reset();
454  }
455 
456  switch (seg._type)
457  {
459  {
460  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_DATA_SEGMENT received, size: " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
461 
463  {
464  _recv_size = seg._value;
465  unset(STREAM_REJECT);
466  }
467  else
468  {
469  _recv_size += seg._value;
470  }
471 
472  // set the new data length
473  _underflow_data_remain = seg._value.get<Length>();
474 
475  if (get(STREAM_REJECT))
476  {
477  // send NACK on bundle reject
478  if (get(STREAM_NACK_SUPPORT))
479  {
480  // lock for sending
481  ibrcommon::MutexLock l(_sendlock);
482  if (!_stream.good()) throw StreamErrorException("stream went bad");
483 
484  // send a NACK message
485  _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
486  }
487 
488  // skip data in this segment
489  skipData(_underflow_data_remain);
490  }
491  else
492  {
493  // announce the new data block
494  _underflow_state = DATA_TRANSFER;
495  }
496  break;
497  }
498 
500  {
501  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_ACK_SEGMENT received, size: " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
502 
503  // remove the segment in the queue
504  if (get(STREAM_ACK_SUPPORT))
505  {
506  ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
507  if (q.empty())
508  {
509  IBRCOMMON_LOGGER_TAG("StreamBuffer", error) << "got an unexpected ACK with size of " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
510  }
511  else
512  {
513  StreamDataSegment &qs = q.front();
514 
516  {
517  _conn.eventBundleForwarded();
518  }
519 
520  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
521 
522  _conn.eventBundleAck(seg._value.get<Length>());
523 
524  q.pop();
525  }
526  }
527  break;
528  }
529 
531  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_KEEPALIVE received, size: " << seg._value.toString() << IBRCOMMON_LOGGER_ENDL;
532  break;
533 
535  {
536  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_REFUSE_BUNDLE received, flags: " << (int)seg._flags << IBRCOMMON_LOGGER_ENDL;
537 
538  // TODO: Test bundle rejection!
539 
540  // remove the segment in the queue
541  if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
542  {
543  // skip segments
544  if (!_rejected_segments.empty())
545  {
546  _rejected_segments.pop();
547 
548  // we received a NACK
549  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
550  }
551  else try
552  {
553  StreamDataSegment qs = _segments.getnpop();
554 
555  // we received a NACK
556  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
557 
558  // get all segment ACKs in the queue for this transmission
559  while (!_segments.empty())
560  {
561  StreamDataSegment &seg = _segments.front();
563  {
564  break;
565  }
566 
567  // move the segments to another queue
568  _rejected_segments.push(seg);
569  _segments.pop();
570  }
571 
572  // call event reject
573  _conn.eventBundleRefused();
574 
575  // we received a NACK
576  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
577 
578  // the queue is empty, then skip the current transfer
579  if (_segments.empty())
580  {
581  set(STREAM_SKIP);
582 
583  // we received a NACK
584  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
585  }
586 
587  } catch (const ibrcommon::QueueUnblockedException&) {
588  IBRCOMMON_LOGGER_TAG("StreamBuffer", error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
589  }
590 
591  }
592  else
593  {
594  IBRCOMMON_LOGGER_TAG("StreamBuffer", error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
595  }
596 
597  break;
598  }
599 
601  {
602  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
603  throw StreamShutdownException();
604  }
605  }
606  }
607 
608  // currently transferring data
609  dtn::data::Length readsize = _buffer_size;
610  if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
611 
612  try {
613  if (!_stream.good()) throw StreamErrorException("stream went bad");
614 
615  // here receive the data
616  _stream.read(&in_buf_[0], (std::streamsize)readsize);
617 
618  // record statistics
619  if (_monitor) {
620  _monitor_stats[0] += readsize;
621  }
622 
623  // reset idle timeout
624  _idle_timer.reset();
625  } catch (const ios_base::failure &ex) {
626  _underflow_state = IDLE;
627  throw StreamErrorException("read error: " + std::string(ex.what()));
628  }
629 
630  // adjust the remain counter
631  _underflow_data_remain -= readsize;
632 
633  // Since the input buffer content is now valid (or is new)
634  // the get pointer should be initialized (or reset).
635  setg(&in_buf_[0], &in_buf_[0], &in_buf_[0] + readsize);
636 
637  return traits_type::not_eof(in_buf_[0]);
638 
639  } catch (const StreamClosedException&) {
640  // set failed bit
641  set(STREAM_FAILED);
642 
643  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
644 
645  } catch (const StreamErrorException &ex) {
646  // set failed bit
647  set(STREAM_FAILED);
648 
649  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
650 
651  throw;
652  } catch (const StreamShutdownException&) {
653  // set failed bit
654  set(STREAM_FAILED);
655 
656  IBRCOMMON_LOGGER_DEBUG_TAG("StreamBuffer", 10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
657  }
658 
659  return traits_type::eof();
660  }
661 
662  size_t StreamConnection::StreamBuffer::timeout(ibrcommon::Timer*)
663  {
664  if (__good())
665  {
667  }
669  }
670 
671  void StreamConnection::StreamBuffer::enableIdleTimeout(const dtn::data::Timeout &seconds)
672  {
673  _idle_timer.set(seconds);
674  _idle_timer.start();
675  }
676  }
677 }