IBR-DTNSuite  0.8
ibrdtn/ibrdtn/streams/StreamConnection.h
Go to the documentation of this file.
00001 /*
00002  * StreamConnection.h
00003  *
00004  *  Created on: 01.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef STREAMCONNECTION_H_
00009 #define STREAMCONNECTION_H_
00010 
00011 
00012 #include "ibrdtn/data/Bundle.h"
00013 #include "ibrdtn/data/Exceptions.h"
00014 #include "ibrdtn/streams/StreamContactHeader.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include <ibrcommon/thread/Mutex.h>
00017 #include <ibrcommon/thread/MutexLock.h>
00018 #include <ibrcommon/thread/Timer.h>
00019 #include <ibrcommon/Exceptions.h>
00020 #include <ibrcommon/thread/Queue.h>
00021 #include <iostream>
00022 #include <streambuf>
00023 
00024 namespace dtn
00025 {
00026         namespace streams
00027         {
00028                 class StreamConnection : public iostream
00029                 {
00030                 public:
00031                         enum ConnectionShutdownCases
00032                         {
00033                                 CONNECTION_SHUTDOWN_NOTSET = 0,
00034                                 CONNECTION_SHUTDOWN_IDLE = 1,
00035                                 CONNECTION_SHUTDOWN_ERROR = 2,
00036                                 CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN = 3,
00037                                 CONNECTION_SHUTDOWN_NODE_TIMEOUT = 4,
00038                                 CONNECTION_SHUTDOWN_PEER_SHUTDOWN = 5
00039                         };
00040 
00041                         class TransmissionInterruptedException : public ibrcommon::IOException
00042                         {
00043                                 public:
00044                                         TransmissionInterruptedException(const dtn::data::Bundle &bundle, const size_t position) throw()
00045                                          : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
00046                                         {
00047                                         };
00048 
00049                                         virtual ~TransmissionInterruptedException() throw ()
00050                                         {
00051                                         };
00052 
00053                                         const dtn::data::Bundle _bundle;
00054                                         const size_t _position;
00055                         };
00056 
00057                         class StreamClosedException : public ibrcommon::IOException
00058                         {
00059                         public:
00060                                 StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
00061                                 {
00062                                 };
00063                         };
00064 
00065                         class StreamErrorException : public ibrcommon::IOException
00066                         {
00067                         public:
00068                                 StreamErrorException(string what = "StreamError") throw() : IOException(what)
00069                                 {
00070                                 };
00071                         };
00072 
00073                         class StreamShutdownException : public ibrcommon::IOException
00074                         {
00075                         public:
00076                                 StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
00077                                 {
00078                                 };
00079                         };
00080 
00081                         class Callback
00082                         {
00083                         public:
00088                                 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) = 0;
00089 
00094                                 virtual void eventTimeout() = 0;
00095 
00099                                 virtual void eventError() = 0;
00100 
00104                                 virtual void eventBundleRefused() = 0;
00108                                 virtual void eventBundleForwarded() = 0;
00112                                 virtual void eventBundleAck(size_t ack) = 0;
00113 
00118                                 virtual void eventConnectionUp(const StreamContactHeader &header) = 0;
00119 
00123                                 virtual void eventConnectionDown() = 0;
00124                         };
00125 
00131                         StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size = 4096);
00132 
00137                         virtual ~StreamConnection();
00138 
00144                         void handshake(const dtn::data::EID &eid, const size_t timeout = 10, const char flags = 0);
00145 
00167                         void shutdown(ConnectionShutdownCases csc = CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00168 
00172                         void reject();
00173 
00177                         void keepalive();
00178 
00183                         void enableIdleTimeout(size_t seconds);
00184 
00185                 private:
00189                         class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::TimerCallback
00190                         {
00191                         public:
00192                                 enum State
00193                                 {
00194                                         INITIAL = 0,
00195                                         IDLE = 1,
00196                                         DATA_AVAILABLE = 2,
00197                                         DATA_TRANSFER = 3,
00198                                         SHUTDOWN = 4
00199                                 };
00200 
00204                                 StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size = 1024);
00205                                 virtual ~StreamBuffer();
00206 
00212                                 const StreamContactHeader handshake(const StreamContactHeader &header);
00213 
00217                                 void close();
00218 
00222                                 void shutdown(const StreamDataSegment::ShutdownReason reason = StreamDataSegment::MSG_SHUTDOWN_NONE);
00223 
00227                                 void reject();
00228 
00232                                 void wait();
00233 
00234                                 void abort();
00235 
00239                                 void keepalive();
00240 
00246                                 size_t timeout(ibrcommon::Timer *timer);
00247 
00252                                 void enableIdleTimeout(size_t seconds);
00253 
00254                         protected:
00255                                 virtual int sync();
00256                                 virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof());
00257                                 virtual std::char_traits<char>::int_type underflow();
00258 
00259                         private:
00263                                 bool __good() const;
00264 
00268                                 void __error() const;
00269 
00270                                 enum timerNames
00271                                 {
00272                                         TIMER_IN = 1,
00273                                         TIMER_OUT = 2
00274                                 };
00275 
00276                                 enum StateBits
00277                                 {
00278                                         STREAM_FAILED = 1 << 0,
00279                                         STREAM_BAD = 1 << 1,
00280                                         STREAM_EOF = 1 << 2,
00281                                         STREAM_HANDSHAKE = 1 << 3,
00282                                         STREAM_SHUTDOWN = 1 << 4,
00283                                         STREAM_CLOSED = 1 << 5,
00284                                         STREAM_REJECT = 1 << 6,
00285                                         STREAM_SKIP = 1 << 7,
00286                                         STREAM_ACK_SUPPORT = 1 << 8,
00287                                         STREAM_NACK_SUPPORT = 1 << 9,
00288                                         STREAM_SOB = 1 << 10,                   // start of bundle
00289                                         STREAM_TIMER_SUPPORT = 1 << 11
00290                                 };
00291 
00292                                 void skipData(size_t &size);
00293 
00294                                 bool get(const StateBits bit) const;
00295                                 void set(const StateBits bit);
00296                                 void unset(const StateBits bit);
00297 
00298                                 const size_t _buffer_size;
00299 
00300                                 ibrcommon::Mutex _statelock;
00301                                 int _statebits;
00302 
00303                                 StreamConnection &_conn;
00304 
00305                                 // Input buffer
00306                                 char *in_buf_;
00307 
00308                                 // Output buffer
00309                                 char *out_buf_;
00310                                 ibrcommon::Mutex _sendlock;
00311 
00312                                 std::iostream &_stream;
00313 
00314                                 size_t _recv_size;
00315 
00316                                 // this queue contains all sent data segments
00317                                 // they are removed if an ack or nack is received
00318                                 ibrcommon::Queue<StreamDataSegment> _segments;
00319                                 std::queue<StreamDataSegment> _rejected_segments;
00320 
00321                                 size_t _underflow_data_remain;
00322                                 State _underflow_state;
00323 
00324                                 ibrcommon::Timer _idle_timer;
00325                         };
00326 
00327                         void connectionTimeout();
00328 
00329                         void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00330 
00331                         void eventBundleAck(size_t ack);
00332                         void eventBundleRefused();
00333                         void eventBundleForwarded();
00334 
00335                         StreamConnection::Callback &_callback;
00336 
00337                         dtn::streams::StreamContactHeader _peer;
00338 
00339                         StreamConnection::StreamBuffer _buf;
00340 
00341                         ibrcommon::Mutex _shutdown_reason_lock;
00342                         ConnectionShutdownCases _shutdown_reason;
00343                 };
00344         }
00345 }
00346 
00347 #endif /* STREAMCONNECTION_H_ */
00348