IBR-DTNSuite  0.10
StreamConnection.h
Go to the documentation of this file.
1 /*
2  * StreamConnection.h
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #ifndef STREAMCONNECTION_H_
23 #define STREAMCONNECTION_H_
24 
25 
26 #include "ibrdtn/data/Bundle.h"
27 #include "ibrdtn/data/Exceptions.h"
30 #include <ibrcommon/thread/Mutex.h>
32 #include <ibrcommon/thread/Timer.h>
33 #include <ibrcommon/Exceptions.h>
34 #include <ibrcommon/thread/Queue.h>
35 #include <iostream>
36 #include <streambuf>
37 #include <vector>
38 
39 namespace dtn
40 {
41  namespace streams
42  {
43  class StreamConnection : public iostream
44  {
45  public:
47  {
54  };
55 
57  {
58  public:
60  : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
61  {
62  };
63 
65  {
66  };
67 
70  };
71 
73  {
74  public:
75  StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
76  {
77  };
78  };
79 
81  {
82  public:
83  StreamErrorException(string what = "StreamError") throw() : IOException(what)
84  {
85  };
86  };
87 
89  {
90  public:
91  StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
92  {
93  };
94  };
95 
96  class Callback
97  {
98  public:
103  virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) throw () = 0;
104 
109  virtual void eventTimeout() throw () = 0;
110 
114  virtual void eventError() throw () = 0;
115 
119  virtual void eventBundleRefused() throw () = 0;
123  virtual void eventBundleForwarded() throw () = 0;
127  virtual void eventBundleAck(const dtn::data::Length &ack) throw () = 0;
128 
133  virtual void eventConnectionUp(const StreamContactHeader &header) throw () = 0;
134 
138  virtual void eventConnectionDown() throw () = 0;
139  };
140 
146  StreamConnection(StreamConnection::Callback &cb, iostream &stream, const dtn::data::Length buffer_size = 4096);
147 
152  virtual ~StreamConnection();
153 
160 
183 
187  void reject();
188 
192  void keepalive();
193 
198  void enableIdleTimeout(const dtn::data::Timeout &seconds);
199 
203  void setMonitor(bool val);
204  void resetMonitorStats();
205  size_t getMonitorStat(int index);
206 
207  private:
211  class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::TimerCallback
212  {
213  friend class StreamConnection;
214  public:
215  enum State
216  {
217  INITIAL = 0,
218  IDLE = 1,
219  DATA_AVAILABLE = 2,
220  DATA_TRANSFER = 3,
221  SHUTDOWN = 4
222  };
223 
227  StreamBuffer(StreamConnection &conn, iostream &stream, const dtn::data::Length buffer_size = 1024);
228  virtual ~StreamBuffer();
229 
236 
240  void close();
241 
246 
250  void reject();
251 
255  void wait();
256 
257  void abort();
258 
262  void keepalive();
263 
269  size_t timeout(ibrcommon::Timer *timer);
270 
275  void enableIdleTimeout(const dtn::data::Timeout &seconds);
276 
277  protected:
278  virtual int sync();
279  virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof());
280  virtual std::char_traits<char>::int_type underflow();
281 
282  private:
286  bool __good() const;
287 
291  void __error() const;
292 
293  enum timerNames
294  {
295  TIMER_IN = 1,
296  TIMER_OUT = 2
297  };
298 
299  enum StateBits
300  {
301  STREAM_FAILED = 1 << 0,
302  STREAM_BAD = 1 << 1,
303  STREAM_EOF = 1 << 2,
304  STREAM_HANDSHAKE = 1 << 3,
305  STREAM_SHUTDOWN = 1 << 4,
306  STREAM_CLOSED = 1 << 5,
307  STREAM_REJECT = 1 << 6,
308  STREAM_SKIP = 1 << 7,
309  STREAM_ACK_SUPPORT = 1 << 8,
310  STREAM_NACK_SUPPORT = 1 << 9,
311  STREAM_SOB = 1 << 10, // start of bundle
312  STREAM_TIMER_SUPPORT = 1 << 11
313  };
314 
315  void skipData(dtn::data::Length &size);
316 
317  bool get(const StateBits bit) const;
318  void set(const StateBits bit);
319  void unset(const StateBits bit);
320 
321  const dtn::data::Length _buffer_size;
322 
323  ibrcommon::Mutex _statelock;
324  int _statebits;
325 
327 
328  // Input buffer
329  std::vector<char> in_buf_;
330 
331  // Output buffer
332  std::vector<char> out_buf_;
333 
334  ibrcommon::Mutex _sendlock;
335 
336  std::iostream &_stream;
337 
338  dtn::data::Number _recv_size;
339 
340  // this queue contains all sent data segments
341  // they are removed if an ack or nack is received
343  std::queue<StreamDataSegment> _rejected_segments;
344 
345  Length _underflow_data_remain;
346  State _underflow_state;
347 
348  ibrcommon::Timer _idle_timer;
349 
350  // traffic monitoring
351  bool _monitor;
352  std::vector<size_t> _monitor_stats;
353  };
354 
355  void connectionTimeout();
356 
357  void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
358 
359  void eventBundleAck(const dtn::data::Length &ack);
360  void eventBundleRefused();
361  void eventBundleForwarded();
362 
363  StreamConnection::Callback &_callback;
364 
366 
367  StreamConnection::StreamBuffer _buf;
368 
369  ibrcommon::Mutex _shutdown_reason_lock;
370  ConnectionShutdownCases _shutdown_reason;
371  };
372  }
373 }
374 
375 #endif /* STREAMCONNECTION_H_ */
376