IBR-DTNSuite  0.8
daemon/src/net/DatagramConnection.h
Go to the documentation of this file.
00001 /*
00002  * DatagramConnection.h
00003  *
00004  *  Created on: 21.11.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef DATAGRAMCONNECTION_H_
00009 #define DATAGRAMCONNECTION_H_
00010 
00011 #include "net/ConvergenceLayer.h"
00012 #include "net/DatagramConnectionParameter.h"
00013 #include <ibrcommon/thread/Thread.h>
00014 #include <ibrcommon/thread/Queue.h>
00015 #include <ibrcommon/thread/Conditional.h>
00016 #include <streambuf>
00017 #include <iostream>
00018 #include <stdint.h>
00019 
00020 namespace dtn
00021 {
00022         namespace net
00023         {
00024                 class DatagramException : public ibrcommon::Exception
00025                 {
00026                 public:
00027                         DatagramException(const std::string &what) : ibrcommon::Exception(what)
00028                         {};
00029 
00030                         virtual ~DatagramException() throw() {};
00031                 };
00032 
00033                 class DatagramConnection;
00034 
00035                 class DatagramConnectionCallback
00036                 {
00037                 public:
00038                         virtual ~DatagramConnectionCallback() {};
00039                         virtual void callback_send(DatagramConnection &connection, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, int len) throw (DatagramException) = 0;
00040                         virtual void callback_ack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination) throw (DatagramException) = 0;
00041 
00042                         virtual void connectionUp(const DatagramConnection *conn) = 0;
00043                         virtual void connectionDown(const DatagramConnection *conn) = 0;
00044                 };
00045 
00046                 class DatagramConnection : public ibrcommon::DetachedThread
00047                 {
00048                 public:
00049                         DatagramConnection(const std::string &identifier, const DatagramConnectionParameter &params, DatagramConnectionCallback &callback);
00050                         virtual ~DatagramConnection();
00051 
00052                         void run();
00053                         void setup();
00054                         void finally();
00055 
00056                         virtual void __cancellation();
00057 
00058                         void shutdown();
00059 
00060                         const std::string& getIdentifier() const;
00061 
00066                         void queue(const ConvergenceLayer::Job &job);
00067 
00073                         void queue(const char &flags, const unsigned int &seqno, const char *buf, int len);
00074 
00079                         void ack(const unsigned int &seqno);
00080 
00081                 private:
00082                         class Stream : public std::basic_streambuf<char, std::char_traits<char> >, public std::iostream
00083                         {
00084                         public:
00085                                 enum HEADER_FLAGS
00086                                 {
00087                                         SEGMENT_FIRST = 0x02,
00088                                         SEGMENT_LAST = 0x01,
00089                                         SEGMENT_MIDDLE = 0x00
00090                                 };
00091 
00092                                 Stream(DatagramConnection &conn, const size_t maxmsglen, const unsigned int maxseqno);
00093                                 virtual ~Stream();
00094 
00100                                 void queue(const char &flags, const unsigned int &seqno, const char *buf, int len);
00101 
00106                                 void close();
00107 
00108                         protected:
00109                                 virtual int sync();
00110                                 virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof());
00111                                 virtual std::char_traits<char>::int_type underflow();
00112 
00113                         private:
00114                                 // buffer size and maximum message size
00115                                 const size_t _buf_size;
00116 
00117                                 // maximum count of sequence numbers
00118                                 const unsigned int _maxseqno;
00119 
00120                                 // state for incoming segments
00121                                 char _in_state;
00122 
00123                                 // out flags
00124                                 char _out_state;
00125 
00126                                 // buffer for incoming data to queue
00127                                 // the underflow method will block until
00128                                 // this buffer contains any data
00129                                 char *_queue_buf;
00130 
00131                                 // the number of bytes available in the queue buffer
00132                                 int _queue_buf_len;
00133 
00134                                 // conditional to lock the queue buffer and the
00135                                 // corresponding length variable
00136                                 ibrcommon::Conditional _queue_buf_cond;
00137 
00138                                 // outgoing data from the upper layer is stored
00139                                 // here first and processed by the overflow() method
00140                                 char *_out_buf;
00141 
00142                                 // incoming data to deliver data to the upper layer
00143                                 // is stored in this buffer by the underflow() method
00144                                 char *_in_buf;
00145 
00146                                 // next expected incoming sequence number
00147                                 unsigned int in_seq_num_;
00148 
00149                                 // next outgoing sequence number
00150                                 unsigned int out_seq_num_;
00151 
00152                                 // this variable is set to true to shutdown
00153                                 // this stream
00154                                 bool _abort;
00155 
00156                                 // callback to the corresponding connection object
00157                                 DatagramConnection &_callback;
00158                         };
00159 
00160                         class Sender : public ibrcommon::JoinableThread
00161                         {
00162                         public:
00163                                 Sender(DatagramConnection &conn, Stream &stream);
00164                                 ~Sender();
00165 
00166                                 void run();
00167                                 void finally();
00168                                 void __cancellation();
00169 
00170                                 void clearQueue();
00171 
00172                                 ibrcommon::Queue<ConvergenceLayer::Job> queue;
00173 
00174                         private:
00175                                 ConvergenceLayer::Job _current_job;
00176                                 DatagramConnection::Stream &_stream;
00177 
00178                                 // callback to the corresponding connection object
00179                                 DatagramConnection &_connection;
00180                         };
00181 
00182                         void stream_send(const char &flags, const unsigned int &seqno, const char *buf, int len) throw (DatagramException);
00183 
00184                         DatagramConnectionCallback &_callback;
00185                         bool _running;
00186                         const std::string _identifier;
00187                         DatagramConnection::Stream _stream;
00188                         DatagramConnection::Sender _sender;
00189 
00190                         ibrcommon::Conditional _ack_cond;
00191                         size_t _last_ack;
00192                         size_t _wait_ack;
00193 
00194                         const DatagramConnectionParameter _params;
00195                 };
00196         } /* namespace data */
00197 } /* namespace dtn */
00198 #endif /* DATAGRAMCONNECTION_H_ */