IBR-DTNSuite  0.8
daemon/src/net/TCPConvergenceLayer.h
Go to the documentation of this file.
00001 /*
00002  * TCPConvergenceLayer.h
00003  *
00004  *  Created on: 05.08.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef TCPCONVERGENCELAYER_H_
00009 #define TCPCONVERGENCELAYER_H_
00010 
00011 #include "config.h"
00012 #include "Component.h"
00013 
00014 #include "core/NodeEvent.h"
00015 #include "core/Node.h"
00016 #include "net/ConvergenceLayer.h"
00017 #include "net/DiscoveryService.h"
00018 #include "net/DiscoveryServiceProvider.h"
00019 
00020 #include <ibrdtn/data/Bundle.h>
00021 #include <ibrdtn/data/EID.h>
00022 #include <ibrdtn/data/MetaBundle.h>
00023 #include <ibrdtn/data/Serializer.h>
00024 #include <ibrdtn/streams/StreamConnection.h>
00025 #include <ibrdtn/streams/StreamContactHeader.h>
00026 
00027 #include <ibrcommon/net/tcpserver.h>
00028 #include <ibrcommon/net/tcpstream.h>
00029 #include <ibrcommon/net/vinterface.h>
00030 #include <ibrcommon/thread/Queue.h>
00031 #include <memory>
00032 
00033 #ifdef WITH_TLS
00034 #include <ibrcommon/net/TLSStream.h>
00035 #endif
00036 
00037 using namespace dtn::streams;
00038 
00039 namespace dtn
00040 {
00041         namespace net
00042         {
00043                 class TCPConvergenceLayer;
00044 
00045                 class TCPConnection : public StreamConnection::Callback, public ibrcommon::DetachedThread
00046                 {
00047                 public:
00056                         TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout = 10);
00057 
00066                         TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout = 10);
00067 
00072                         virtual ~TCPConnection();
00073 
00077                         virtual void initialize();
00078 
00082                         void shutdown();
00083 
00088                         const StreamContactHeader& getHeader() const;
00089 
00094                         const dtn::core::Node& getNode() const;
00095 
00099                         virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00100                         virtual void eventTimeout();
00101                         virtual void eventError();
00102                         virtual void eventConnectionUp(const StreamContactHeader &header);
00103                         virtual void eventConnectionDown();
00104 
00105                         virtual void eventBundleRefused();
00106                         virtual void eventBundleForwarded();
00107                         virtual void eventBundleAck(size_t ack);
00108 
00109                         dtn::core::Node::Protocol getDiscoveryProtocol() const;
00110 
00115                         void queue(const dtn::data::BundleID &bundle);
00116 
00117                         bool match(const dtn::core::Node &n) const;
00118                         bool match(const dtn::data::EID &destination) const;
00119                         bool match(const dtn::core::NodeEvent &evt) const;
00120 
00121                         friend TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle);
00122                         friend TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle);
00123 
00124 #ifdef WITH_TLS
00125 
00128                         void enableTLS();
00129 #endif
00130 
00131                 protected:
00132                         void rejectTransmission();
00133 
00134                         void setup();
00135                         void connect();
00136                         void run();
00137                         void finally();
00138                         void __cancellation();
00139 
00140                         void clearQueue();
00141 
00142                         void keepalive();
00143                         bool good() const;
00144 
00145                 private:
00146                         class KeepaliveSender : public ibrcommon::JoinableThread
00147                         {
00148                         public:
00149                                 KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout);
00150                                 ~KeepaliveSender();
00151 
00155                                 void run();
00156 
00160                                 void __cancellation();
00161 
00162                         private:
00163                                 ibrcommon::Conditional _wait;
00164                                 TCPConnection &_connection;
00165                                 size_t &_keepalive_timeout;
00166                         };
00167 
00168                         class Sender : public ibrcommon::JoinableThread, public ibrcommon::Queue<dtn::data::BundleID>
00169                         {
00170                         public:
00171                                 Sender(TCPConnection &connection);
00172                                 virtual ~Sender();
00173 
00174                         protected:
00175                                 void run();
00176                                 void finally();
00177                                 void __cancellation();
00178 
00179                         private:
00180                                 TCPConnection &_connection;
00181                                 //dtn::data::BundleID _current_transfer;
00182                         };
00183 
00184                         StreamContactHeader _peer;
00185                         dtn::core::Node _node;
00186 
00187                         std::auto_ptr<ibrcommon::tcpstream> _tcpstream;
00188 
00189 #ifdef WITH_TLS
00190                         ibrcommon::TLSStream _tlsstream;
00191 #endif
00192 
00193                         StreamConnection _stream;
00194 
00195                         // This thread gets awaiting bundles of the queue
00196                         // and transmit them to the peer.
00197                         Sender _sender;
00198 
00199                         // Keepalive sender
00200                         KeepaliveSender _keepalive_sender;
00201 
00202                         // handshake variables
00203                         dtn::data::EID _name;
00204                         size_t _timeout;
00205 
00206                         ibrcommon::Queue<dtn::data::MetaBundle> _sentqueue;
00207                         size_t _lastack;
00208                         size_t _keepalive_timeout;
00209 
00210                         TCPConvergenceLayer &_callback;
00211 
00212                         /* flags to be used in this nodes StreamContactHeader */
00213                         char _flags;
00214 
00215                         /* with this boolean the connection is marked as aborted */
00216                         bool _aborted;
00217                 };
00218 
00223                 class TCPConvergenceLayer : public dtn::daemon::IndependentComponent, public ConvergenceLayer, public DiscoveryServiceProvider
00224                 {
00225                         friend class TCPConnection;
00226                 public:
00232                         TCPConvergenceLayer();
00233 
00237                         virtual ~TCPConvergenceLayer();
00238 
00244                         void bind(const ibrcommon::vinterface &net, int port);
00245 
00250                         void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00251 
00257                         void open(const dtn::core::Node &n);
00258 
00262                         virtual const std::string getName() const;
00263 
00268                         dtn::core::Node::Protocol getDiscoveryProtocol() const;
00269 
00273                         void update(const ibrcommon::vinterface &iface, std::string &name, std::string &data) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException);
00274 
00275                 protected:
00276                         void __cancellation();
00277 
00278                         void componentUp();
00279                         void componentRun();
00280                         void componentDown();
00281 
00282                 private:
00286                         void closeAll();
00287 
00292                         void connectionUp(TCPConnection *conn);
00293 
00299                         void connectionDown(TCPConnection *conn);
00300 
00301                         static const int DEFAULT_PORT;
00302                         bool _running;
00303 
00304                         ibrcommon::tcpserver _tcpsrv;
00305 
00306                         ibrcommon::Conditional _connections_cond;
00307                         std::list<TCPConnection*> _connections;
00308                         std::list<ibrcommon::vinterface> _interfaces;
00309                         std::map<ibrcommon::vinterface, unsigned int> _portmap;
00310                 };
00311         }
00312 }
00313 
00314 #endif /* TCPCONVERGENCELAYER_H_ */