IBR-DTNSuite
0.8
|
00001 /* 00002 * DatagramConnection.h 00003 * 00004 * Created on: 21.11.2011 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef DATAGRAMCONNECTION_H_ 00009 #define DATAGRAMCONNECTION_H_ 00010 00011 #include "net/ConvergenceLayer.h" 00012 #include "net/DatagramConnectionParameter.h" 00013 #include <ibrcommon/thread/Thread.h> 00014 #include <ibrcommon/thread/Queue.h> 00015 #include <ibrcommon/thread/Conditional.h> 00016 #include <streambuf> 00017 #include <iostream> 00018 #include <stdint.h> 00019 00020 namespace dtn 00021 { 00022 namespace net 00023 { 00024 class DatagramException : public ibrcommon::Exception 00025 { 00026 public: 00027 DatagramException(const std::string &what) : ibrcommon::Exception(what) 00028 {}; 00029 00030 virtual ~DatagramException() throw() {}; 00031 }; 00032 00033 class DatagramConnection; 00034 00035 class DatagramConnectionCallback 00036 { 00037 public: 00038 virtual ~DatagramConnectionCallback() {}; 00039 virtual void callback_send(DatagramConnection &connection, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, int len) throw (DatagramException) = 0; 00040 virtual void callback_ack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination) throw (DatagramException) = 0; 00041 00042 virtual void connectionUp(const DatagramConnection *conn) = 0; 00043 virtual void connectionDown(const DatagramConnection *conn) = 0; 00044 }; 00045 00046 class DatagramConnection : public ibrcommon::DetachedThread 00047 { 00048 public: 00049 DatagramConnection(const std::string &identifier, const DatagramConnectionParameter ¶ms, DatagramConnectionCallback &callback); 00050 virtual ~DatagramConnection(); 00051 00052 void run(); 00053 void setup(); 00054 void finally(); 00055 00056 virtual void __cancellation(); 00057 00058 void shutdown(); 00059 00060 const std::string& getIdentifier() const; 00061 00066 void queue(const ConvergenceLayer::Job &job); 00067 00073 void queue(const char &flags, const unsigned int &seqno, const char *buf, int len); 00074 00079 void ack(const unsigned int &seqno); 00080 00081 private: 00082 class Stream : public std::basic_streambuf<char, std::char_traits<char> >, public std::iostream 00083 { 00084 public: 00085 enum HEADER_FLAGS 00086 { 00087 SEGMENT_FIRST = 0x02, 00088 SEGMENT_LAST = 0x01, 00089 SEGMENT_MIDDLE = 0x00 00090 }; 00091 00092 Stream(DatagramConnection &conn, const size_t maxmsglen, const unsigned int maxseqno); 00093 virtual ~Stream(); 00094 00100 void queue(const char &flags, const unsigned int &seqno, const char *buf, int len); 00101 00106 void close(); 00107 00108 protected: 00109 virtual int sync(); 00110 virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof()); 00111 virtual std::char_traits<char>::int_type underflow(); 00112 00113 private: 00114 // buffer size and maximum message size 00115 const size_t _buf_size; 00116 00117 // maximum count of sequence numbers 00118 const unsigned int _maxseqno; 00119 00120 // state for incoming segments 00121 char _in_state; 00122 00123 // out flags 00124 char _out_state; 00125 00126 // buffer for incoming data to queue 00127 // the underflow method will block until 00128 // this buffer contains any data 00129 char *_queue_buf; 00130 00131 // the number of bytes available in the queue buffer 00132 int _queue_buf_len; 00133 00134 // conditional to lock the queue buffer and the 00135 // corresponding length variable 00136 ibrcommon::Conditional _queue_buf_cond; 00137 00138 // outgoing data from the upper layer is stored 00139 // here first and processed by the overflow() method 00140 char *_out_buf; 00141 00142 // incoming data to deliver data to the upper layer 00143 // is stored in this buffer by the underflow() method 00144 char *_in_buf; 00145 00146 // next expected incoming sequence number 00147 unsigned int in_seq_num_; 00148 00149 // next outgoing sequence number 00150 unsigned int out_seq_num_; 00151 00152 // this variable is set to true to shutdown 00153 // this stream 00154 bool _abort; 00155 00156 // callback to the corresponding connection object 00157 DatagramConnection &_callback; 00158 }; 00159 00160 class Sender : public ibrcommon::JoinableThread 00161 { 00162 public: 00163 Sender(DatagramConnection &conn, Stream &stream); 00164 ~Sender(); 00165 00166 void run(); 00167 void finally(); 00168 void __cancellation(); 00169 00170 void clearQueue(); 00171 00172 ibrcommon::Queue<ConvergenceLayer::Job> queue; 00173 00174 private: 00175 ConvergenceLayer::Job _current_job; 00176 DatagramConnection::Stream &_stream; 00177 00178 // callback to the corresponding connection object 00179 DatagramConnection &_connection; 00180 }; 00181 00182 void stream_send(const char &flags, const unsigned int &seqno, const char *buf, int len) throw (DatagramException); 00183 00184 DatagramConnectionCallback &_callback; 00185 bool _running; 00186 const std::string _identifier; 00187 DatagramConnection::Stream _stream; 00188 DatagramConnection::Sender _sender; 00189 00190 ibrcommon::Conditional _ack_cond; 00191 size_t _last_ack; 00192 size_t _wait_ack; 00193 00194 const DatagramConnectionParameter _params; 00195 }; 00196 } /* namespace data */ 00197 } /* namespace dtn */ 00198 #endif /* DATAGRAMCONNECTION_H_ */