IBR-DTNSuite  0.12
StreamConnection.h
Go to the documentation of this file.
1 /*
2  * StreamConnection.h
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #ifndef STREAMCONNECTION_H_
23 #define STREAMCONNECTION_H_
24 
25 
26 #include "ibrdtn/data/Bundle.h"
27 #include "ibrdtn/data/Exceptions.h"
30 #include <ibrcommon/thread/Mutex.h>
32 #include <ibrcommon/thread/Timer.h>
33 #include <ibrcommon/Exceptions.h>
34 #include <ibrcommon/thread/Queue.h>
35 #include <iostream>
36 #include <streambuf>
37 #include <vector>
38 
39 namespace dtn
40 {
41  namespace streams
42  {
43  class StreamConnection : public iostream
44  {
45  public:
47  {
54  };
55 
57  {
58  public:
60  : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
61  {
62  };
63 
65  {
66  };
67 
70  };
71 
73  {
74  public:
75  StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
76  {
77  };
78  };
79 
81  {
82  public:
83  StreamErrorException(string what = "StreamError") throw() : IOException(what)
84  {
85  };
86  };
87 
89  {
90  public:
91  StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
92  {
93  };
94  };
95 
96  class Callback
97  {
98  public:
103  virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) throw () = 0;
104 
109  virtual void eventTimeout() throw () = 0;
110 
114  virtual void eventError() throw () = 0;
115 
119  virtual void eventBundleRefused() throw () = 0;
123  virtual void eventBundleForwarded() throw () = 0;
127  virtual void eventBundleAck(const dtn::data::Length &ack) throw () = 0;
128 
133  virtual void eventConnectionUp(const StreamContactHeader &header) throw () = 0;
134 
138  virtual void eventConnectionDown() throw () = 0;
139 
143  virtual void addTrafficIn(size_t) throw () { };
144 
148  virtual void addTrafficOut(size_t) throw () { };
149  };
150 
156  StreamConnection(StreamConnection::Callback &cb, iostream &stream, const dtn::data::Length buffer_size = 4096);
157 
162  virtual ~StreamConnection();
163 
170 
193 
197  void reject();
198 
202  void keepalive();
203 
208  void enableIdleTimeout(const dtn::data::Timeout &seconds);
209 
210  private:
214  class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::TimerCallback
215  {
216  friend class StreamConnection;
217  public:
218  enum State
219  {
220  INITIAL = 0,
221  IDLE = 1,
222  DATA_AVAILABLE = 2,
223  DATA_TRANSFER = 3,
224  SHUTDOWN = 4
225  };
226 
230  StreamBuffer(StreamConnection &conn, iostream &stream, const dtn::data::Length buffer_size = 1024);
231  virtual ~StreamBuffer();
232 
238  const StreamContactHeader handshake(const StreamContactHeader &header);
239 
243  void close();
244 
249 
253  void reject();
254 
258  void wait();
259 
260  void abort();
261 
265  void keepalive();
266 
272  size_t timeout(ibrcommon::Timer *timer);
273 
278  void enableIdleTimeout(const dtn::data::Timeout &seconds);
279 
280  protected:
281  virtual int sync();
282  virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof());
283  virtual std::char_traits<char>::int_type underflow();
284 
285  private:
289  bool __good() const;
290 
294  void __error() const;
295 
296  enum timerNames
297  {
298  TIMER_IN = 1,
299  TIMER_OUT = 2
300  };
301 
302  enum StateBits
303  {
304  STREAM_FAILED = 1 << 0,
305  STREAM_BAD = 1 << 1,
306  STREAM_EOF = 1 << 2,
307  STREAM_HANDSHAKE = 1 << 3,
308  STREAM_SHUTDOWN = 1 << 4,
309  STREAM_CLOSED = 1 << 5,
310  STREAM_REJECT = 1 << 6,
311  STREAM_SKIP = 1 << 7,
312  STREAM_ACK_SUPPORT = 1 << 8,
313  STREAM_NACK_SUPPORT = 1 << 9,
314  STREAM_SOB = 1 << 10, // start of bundle
315  STREAM_TIMER_SUPPORT = 1 << 11
316  };
317 
318  void skipData(dtn::data::Length &size);
319 
320  bool get(const StateBits bit) const;
321  void set(const StateBits bit);
322  void unset(const StateBits bit);
323 
324  const dtn::data::Length _buffer_size;
325 
326  ibrcommon::Mutex _statelock;
327  int _statebits;
328 
330 
331  // Input buffer
332  std::vector<char> in_buf_;
333 
334  // Output buffer
335  std::vector<char> out_buf_;
336 
337  ibrcommon::Mutex _sendlock;
338 
339  std::iostream &_stream;
340 
341  dtn::data::Number _recv_size;
342 
343  // this queue contains all sent data segments
344  // they are removed if an ack or nack is received
346  std::queue<StreamDataSegment> _rejected_segments;
347 
348  Length _underflow_data_remain;
349  State _underflow_state;
350 
351  ibrcommon::Timer _idle_timer;
352  };
353 
354  void connectionTimeout();
355 
356  void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
357 
358  void eventBundleAck(const dtn::data::Length &ack);
359  void eventBundleRefused();
360  void eventBundleForwarded();
361 
362  StreamConnection::Callback &_callback;
363 
365 
366  StreamConnection::StreamBuffer _buf;
367 
368  ibrcommon::Mutex _shutdown_reason_lock;
369  ConnectionShutdownCases _shutdown_reason;
370  };
371  }
372 }
373 
374 #endif /* STREAMCONNECTION_H_ */
375