IBR-DTNSuite
0.8
|
00001 /* 00002 * StreamConnection.h 00003 * 00004 * Created on: 01.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef STREAMCONNECTION_H_ 00009 #define STREAMCONNECTION_H_ 00010 00011 00012 #include "ibrdtn/data/Bundle.h" 00013 #include "ibrdtn/data/Exceptions.h" 00014 #include "ibrdtn/streams/StreamContactHeader.h" 00015 #include "ibrdtn/streams/StreamDataSegment.h" 00016 #include <ibrcommon/thread/Mutex.h> 00017 #include <ibrcommon/thread/MutexLock.h> 00018 #include <ibrcommon/thread/Timer.h> 00019 #include <ibrcommon/Exceptions.h> 00020 #include <ibrcommon/thread/Queue.h> 00021 #include <iostream> 00022 #include <streambuf> 00023 00024 namespace dtn 00025 { 00026 namespace streams 00027 { 00028 class StreamConnection : public iostream 00029 { 00030 public: 00031 enum ConnectionShutdownCases 00032 { 00033 CONNECTION_SHUTDOWN_NOTSET = 0, 00034 CONNECTION_SHUTDOWN_IDLE = 1, 00035 CONNECTION_SHUTDOWN_ERROR = 2, 00036 CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN = 3, 00037 CONNECTION_SHUTDOWN_NODE_TIMEOUT = 4, 00038 CONNECTION_SHUTDOWN_PEER_SHUTDOWN = 5 00039 }; 00040 00041 class TransmissionInterruptedException : public ibrcommon::IOException 00042 { 00043 public: 00044 TransmissionInterruptedException(const dtn::data::Bundle &bundle, const size_t position) throw() 00045 : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position) 00046 { 00047 }; 00048 00049 virtual ~TransmissionInterruptedException() throw () 00050 { 00051 }; 00052 00053 const dtn::data::Bundle _bundle; 00054 const size_t _position; 00055 }; 00056 00057 class StreamClosedException : public ibrcommon::IOException 00058 { 00059 public: 00060 StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what) 00061 { 00062 }; 00063 }; 00064 00065 class StreamErrorException : public ibrcommon::IOException 00066 { 00067 public: 00068 StreamErrorException(string what = "StreamError") throw() : IOException(what) 00069 { 00070 }; 00071 }; 00072 00073 class StreamShutdownException : public ibrcommon::IOException 00074 { 00075 public: 00076 StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what) 00077 { 00078 }; 00079 }; 00080 00081 class Callback 00082 { 00083 public: 00088 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) = 0; 00089 00094 virtual void eventTimeout() = 0; 00095 00099 virtual void eventError() = 0; 00100 00104 virtual void eventBundleRefused() = 0; 00108 virtual void eventBundleForwarded() = 0; 00112 virtual void eventBundleAck(size_t ack) = 0; 00113 00118 virtual void eventConnectionUp(const StreamContactHeader &header) = 0; 00119 00123 virtual void eventConnectionDown() = 0; 00124 }; 00125 00131 StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size = 4096); 00132 00137 virtual ~StreamConnection(); 00138 00144 void handshake(const dtn::data::EID &eid, const size_t timeout = 10, const char flags = 0); 00145 00167 void shutdown(ConnectionShutdownCases csc = CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN); 00168 00172 void reject(); 00173 00177 void keepalive(); 00178 00183 void enableIdleTimeout(size_t seconds); 00184 00185 private: 00189 class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::TimerCallback 00190 { 00191 public: 00192 enum State 00193 { 00194 INITIAL = 0, 00195 IDLE = 1, 00196 DATA_AVAILABLE = 2, 00197 DATA_TRANSFER = 3, 00198 SHUTDOWN = 4 00199 }; 00200 00204 StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size = 1024); 00205 virtual ~StreamBuffer(); 00206 00212 const StreamContactHeader handshake(const StreamContactHeader &header); 00213 00217 void close(); 00218 00222 void shutdown(const StreamDataSegment::ShutdownReason reason = StreamDataSegment::MSG_SHUTDOWN_NONE); 00223 00227 void reject(); 00228 00232 void wait(); 00233 00234 void abort(); 00235 00239 void keepalive(); 00240 00246 size_t timeout(ibrcommon::Timer *timer); 00247 00252 void enableIdleTimeout(size_t seconds); 00253 00254 protected: 00255 virtual int sync(); 00256 virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof()); 00257 virtual std::char_traits<char>::int_type underflow(); 00258 00259 private: 00263 bool __good() const; 00264 00268 void __error() const; 00269 00270 enum timerNames 00271 { 00272 TIMER_IN = 1, 00273 TIMER_OUT = 2 00274 }; 00275 00276 enum StateBits 00277 { 00278 STREAM_FAILED = 1 << 0, 00279 STREAM_BAD = 1 << 1, 00280 STREAM_EOF = 1 << 2, 00281 STREAM_HANDSHAKE = 1 << 3, 00282 STREAM_SHUTDOWN = 1 << 4, 00283 STREAM_CLOSED = 1 << 5, 00284 STREAM_REJECT = 1 << 6, 00285 STREAM_SKIP = 1 << 7, 00286 STREAM_ACK_SUPPORT = 1 << 8, 00287 STREAM_NACK_SUPPORT = 1 << 9, 00288 STREAM_SOB = 1 << 10, // start of bundle 00289 STREAM_TIMER_SUPPORT = 1 << 11 00290 }; 00291 00292 void skipData(size_t &size); 00293 00294 bool get(const StateBits bit) const; 00295 void set(const StateBits bit); 00296 void unset(const StateBits bit); 00297 00298 const size_t _buffer_size; 00299 00300 ibrcommon::Mutex _statelock; 00301 int _statebits; 00302 00303 StreamConnection &_conn; 00304 00305 // Input buffer 00306 char *in_buf_; 00307 00308 // Output buffer 00309 char *out_buf_; 00310 ibrcommon::Mutex _sendlock; 00311 00312 std::iostream &_stream; 00313 00314 size_t _recv_size; 00315 00316 // this queue contains all sent data segments 00317 // they are removed if an ack or nack is received 00318 ibrcommon::Queue<StreamDataSegment> _segments; 00319 std::queue<StreamDataSegment> _rejected_segments; 00320 00321 size_t _underflow_data_remain; 00322 State _underflow_state; 00323 00324 ibrcommon::Timer _idle_timer; 00325 }; 00326 00327 void connectionTimeout(); 00328 00329 void eventShutdown(StreamConnection::ConnectionShutdownCases csc); 00330 00331 void eventBundleAck(size_t ack); 00332 void eventBundleRefused(); 00333 void eventBundleForwarded(); 00334 00335 StreamConnection::Callback &_callback; 00336 00337 dtn::streams::StreamContactHeader _peer; 00338 00339 StreamConnection::StreamBuffer _buf; 00340 00341 ibrcommon::Mutex _shutdown_reason_lock; 00342 ConnectionShutdownCases _shutdown_reason; 00343 }; 00344 } 00345 } 00346 00347 #endif /* STREAMCONNECTION_H_ */ 00348