IBR-DTNSuite
0.8
|
00001 /* 00002 * TCPConvergenceLayer.h 00003 * 00004 * Created on: 05.08.2009 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef TCPCONVERGENCELAYER_H_ 00009 #define TCPCONVERGENCELAYER_H_ 00010 00011 #include "config.h" 00012 #include "Component.h" 00013 00014 #include "core/NodeEvent.h" 00015 #include "core/Node.h" 00016 #include "net/ConvergenceLayer.h" 00017 #include "net/DiscoveryService.h" 00018 #include "net/DiscoveryServiceProvider.h" 00019 00020 #include <ibrdtn/data/Bundle.h> 00021 #include <ibrdtn/data/EID.h> 00022 #include <ibrdtn/data/MetaBundle.h> 00023 #include <ibrdtn/data/Serializer.h> 00024 #include <ibrdtn/streams/StreamConnection.h> 00025 #include <ibrdtn/streams/StreamContactHeader.h> 00026 00027 #include <ibrcommon/net/tcpserver.h> 00028 #include <ibrcommon/net/tcpstream.h> 00029 #include <ibrcommon/net/vinterface.h> 00030 #include <ibrcommon/thread/Queue.h> 00031 #include <memory> 00032 00033 #ifdef WITH_TLS 00034 #include <ibrcommon/net/TLSStream.h> 00035 #endif 00036 00037 using namespace dtn::streams; 00038 00039 namespace dtn 00040 { 00041 namespace net 00042 { 00043 class TCPConvergenceLayer; 00044 00045 class TCPConnection : public StreamConnection::Callback, public ibrcommon::DetachedThread 00046 { 00047 public: 00056 TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout = 10); 00057 00066 TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout = 10); 00067 00072 virtual ~TCPConnection(); 00073 00077 virtual void initialize(); 00078 00082 void shutdown(); 00083 00088 const StreamContactHeader& getHeader() const; 00089 00094 const dtn::core::Node& getNode() const; 00095 00099 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc); 00100 virtual void eventTimeout(); 00101 virtual void eventError(); 00102 virtual void eventConnectionUp(const StreamContactHeader &header); 00103 virtual void eventConnectionDown(); 00104 00105 virtual void eventBundleRefused(); 00106 virtual void eventBundleForwarded(); 00107 virtual void eventBundleAck(size_t ack); 00108 00109 dtn::core::Node::Protocol getDiscoveryProtocol() const; 00110 00115 void queue(const dtn::data::BundleID &bundle); 00116 00117 bool match(const dtn::core::Node &n) const; 00118 bool match(const dtn::data::EID &destination) const; 00119 bool match(const dtn::core::NodeEvent &evt) const; 00120 00121 friend TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle); 00122 friend TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle); 00123 00124 #ifdef WITH_TLS 00125 00128 void enableTLS(); 00129 #endif 00130 00131 protected: 00132 void rejectTransmission(); 00133 00134 void setup(); 00135 void connect(); 00136 void run(); 00137 void finally(); 00138 void __cancellation(); 00139 00140 void clearQueue(); 00141 00142 void keepalive(); 00143 bool good() const; 00144 00145 private: 00146 class KeepaliveSender : public ibrcommon::JoinableThread 00147 { 00148 public: 00149 KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout); 00150 ~KeepaliveSender(); 00151 00155 void run(); 00156 00160 void __cancellation(); 00161 00162 private: 00163 ibrcommon::Conditional _wait; 00164 TCPConnection &_connection; 00165 size_t &_keepalive_timeout; 00166 }; 00167 00168 class Sender : public ibrcommon::JoinableThread, public ibrcommon::Queue<dtn::data::BundleID> 00169 { 00170 public: 00171 Sender(TCPConnection &connection); 00172 virtual ~Sender(); 00173 00174 protected: 00175 void run(); 00176 void finally(); 00177 void __cancellation(); 00178 00179 private: 00180 TCPConnection &_connection; 00181 //dtn::data::BundleID _current_transfer; 00182 }; 00183 00184 StreamContactHeader _peer; 00185 dtn::core::Node _node; 00186 00187 std::auto_ptr<ibrcommon::tcpstream> _tcpstream; 00188 00189 #ifdef WITH_TLS 00190 ibrcommon::TLSStream _tlsstream; 00191 #endif 00192 00193 StreamConnection _stream; 00194 00195 // This thread gets awaiting bundles of the queue 00196 // and transmit them to the peer. 00197 Sender _sender; 00198 00199 // Keepalive sender 00200 KeepaliveSender _keepalive_sender; 00201 00202 // handshake variables 00203 dtn::data::EID _name; 00204 size_t _timeout; 00205 00206 ibrcommon::Queue<dtn::data::MetaBundle> _sentqueue; 00207 size_t _lastack; 00208 size_t _keepalive_timeout; 00209 00210 TCPConvergenceLayer &_callback; 00211 00212 /* flags to be used in this nodes StreamContactHeader */ 00213 char _flags; 00214 00215 /* with this boolean the connection is marked as aborted */ 00216 bool _aborted; 00217 }; 00218 00223 class TCPConvergenceLayer : public dtn::daemon::IndependentComponent, public ConvergenceLayer, public DiscoveryServiceProvider 00224 { 00225 friend class TCPConnection; 00226 public: 00232 TCPConvergenceLayer(); 00233 00237 virtual ~TCPConvergenceLayer(); 00238 00244 void bind(const ibrcommon::vinterface &net, int port); 00245 00250 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job); 00251 00257 void open(const dtn::core::Node &n); 00258 00262 virtual const std::string getName() const; 00263 00268 dtn::core::Node::Protocol getDiscoveryProtocol() const; 00269 00273 void update(const ibrcommon::vinterface &iface, std::string &name, std::string &data) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException); 00274 00275 protected: 00276 void __cancellation(); 00277 00278 void componentUp(); 00279 void componentRun(); 00280 void componentDown(); 00281 00282 private: 00286 void closeAll(); 00287 00292 void connectionUp(TCPConnection *conn); 00293 00299 void connectionDown(TCPConnection *conn); 00300 00301 static const int DEFAULT_PORT; 00302 bool _running; 00303 00304 ibrcommon::tcpserver _tcpsrv; 00305 00306 ibrcommon::Conditional _connections_cond; 00307 std::list<TCPConnection*> _connections; 00308 std::list<ibrcommon::vinterface> _interfaces; 00309 std::map<ibrcommon::vinterface, unsigned int> _portmap; 00310 }; 00311 } 00312 } 00313 00314 #endif /* TCPCONVERGENCELAYER_H_ */