IBR-DTNSuite
0.8
|
00001 /* 00002 * TCPConvergenceLayer.cpp 00003 * 00004 * Created on: 05.08.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "net/TCPConvergenceLayer.h" 00009 #include "net/ConnectionEvent.h" 00010 #include "routing/RequeueBundleEvent.h" 00011 #include "core/BundleCore.h" 00012 00013 #include <ibrcommon/net/vinterface.h> 00014 #include <ibrcommon/thread/MutexLock.h> 00015 #include <ibrcommon/Logger.h> 00016 #include <ibrcommon/net/tcpclient.h> 00017 #include <ibrcommon/Logger.h> 00018 00019 #include <streambuf> 00020 #include <functional> 00021 #include <list> 00022 #include <algorithm> 00023 00024 using namespace ibrcommon; 00025 00026 namespace dtn 00027 { 00028 namespace net 00029 { 00030 /* 00031 * class TCPConvergenceLayer 00032 */ 00033 const int TCPConvergenceLayer::DEFAULT_PORT = 4556; 00034 00035 TCPConvergenceLayer::TCPConvergenceLayer() 00036 { 00037 } 00038 00039 TCPConvergenceLayer::~TCPConvergenceLayer() 00040 { 00041 join(); 00042 } 00043 00044 void TCPConvergenceLayer::bind(const ibrcommon::vinterface &net, int port) 00045 { 00046 _interfaces.push_back(net); 00047 _tcpsrv.bind(net, port); 00048 _portmap[net] = port; 00049 } 00050 00051 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const 00052 { 00053 return dtn::core::Node::CONN_TCPIP; 00054 } 00055 00056 void TCPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException) 00057 { 00058 name = "tcpcl"; 00059 stringstream service; 00060 00061 // TODO: get the main address of this host, if no interface is specified 00062 00063 // search for the matching interface 00064 for (std::list<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); it++) 00065 { 00066 const ibrcommon::vinterface &interface = *it; 00067 if (interface == iface) 00068 { 00069 try { 00070 // get all addresses of this interface 00071 std::list<vaddress> list = interface.getAddresses(ibrcommon::vaddress::VADDRESS_INET); 00072 00073 // if no address is returned... (goto catch block) 00074 if (list.empty()) throw ibrcommon::Exception("no address found"); 00075 00076 // fill in the ip address 00077 service << "ip=" << list.front().get(false) << ";port=" << _portmap[iface] << ";"; 00078 } catch (const ibrcommon::Exception&) { 00079 // ... set the port only 00080 service << "port=" << _portmap[iface] << ";"; 00081 }; 00082 00083 params = service.str(); 00084 return; 00085 } 00086 } 00087 00088 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException(); 00089 } 00090 00091 const std::string TCPConvergenceLayer::getName() const 00092 { 00093 return "TCPConvergenceLayer"; 00094 } 00095 00096 void TCPConvergenceLayer::open(const dtn::core::Node &n) 00097 { 00098 // search for an existing connection 00099 ibrcommon::MutexLock l(_connections_cond); 00100 00101 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00102 { 00103 TCPConnection &conn = *(*iter); 00104 00105 if (conn.match(n)) 00106 { 00107 return; 00108 } 00109 } 00110 00111 try { 00112 // create a connection 00113 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10); 00114 00115 #ifdef WITH_TLS 00116 // enable TLS Support 00117 if ( ibrcommon::TLSStream::isInitialized() ) 00118 { 00119 conn->enableTLS(); 00120 } 00121 #endif 00122 00123 // raise setup event 00124 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n); 00125 00126 // add connection as pending 00127 _connections.push_back( conn ); 00128 00129 // start the ClientHandler (service) 00130 conn->initialize(); 00131 00132 // signal that there is a new connection 00133 _connections_cond.signal(true); 00134 } catch (const ibrcommon::Exception&) { }; 00135 00136 return; 00137 } 00138 00139 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job) 00140 { 00141 // search for an existing connection 00142 ibrcommon::MutexLock l(_connections_cond); 00143 00144 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00145 { 00146 TCPConnection &conn = *(*iter); 00147 00148 if (conn.match(n)) 00149 { 00150 conn.queue(job._bundle); 00151 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00152 00153 return; 00154 } 00155 } 00156 00157 try { 00158 // create a connection 00159 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10); 00160 00161 #ifdef WITH_TLS 00162 // enable TLS Support 00163 if ( ibrcommon::TLSStream::isInitialized() ) 00164 { 00165 conn->enableTLS(); 00166 } 00167 #endif 00168 00169 // raise setup event 00170 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n); 00171 00172 // add connection as pending 00173 _connections.push_back( conn ); 00174 00175 // start the ClientHandler (service) 00176 conn->initialize(); 00177 00178 // queue the bundle 00179 conn->queue(job._bundle); 00180 00181 // signal that there is a new connection 00182 _connections_cond.signal(true); 00183 00184 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00185 } catch (const ibrcommon::Exception&) { 00186 // raise transfer abort event for all bundles without an ACK 00187 dtn::routing::RequeueBundleEvent::raise(n.getEID(), job._bundle); 00188 } 00189 } 00190 00191 void TCPConvergenceLayer::connectionUp(TCPConnection *conn) 00192 { 00193 ibrcommon::MutexLock l(_connections_cond); 00194 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00195 { 00196 if (conn == (*iter)) 00197 { 00198 // put pending connection to the active connections 00199 return; 00200 } 00201 } 00202 00203 _connections.push_back( conn ); 00204 00205 // signal that there is a new connection 00206 _connections_cond.signal(true); 00207 00208 IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00209 } 00210 00211 void TCPConvergenceLayer::connectionDown(TCPConnection *conn) 00212 { 00213 ibrcommon::MutexLock l(_connections_cond); 00214 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00215 { 00216 if (conn == (*iter)) 00217 { 00218 _connections.erase(iter); 00219 IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00220 00221 // signal that there is a connection less 00222 _connections_cond.signal(true); 00223 return; 00224 } 00225 } 00226 } 00227 00228 void TCPConvergenceLayer::componentRun() 00229 { 00230 try { 00231 while (true) 00232 { 00233 // wait for incoming connections 00234 tcpstream *stream = _tcpsrv.accept(); 00235 00236 // create a new TCPConnection and return the pointer 00237 TCPConnection *obj = new TCPConnection(*this, stream, dtn::core::BundleCore::local, 10); 00238 00239 #ifdef WITH_TLS 00240 // enable TLS Support 00241 if ( ibrcommon::TLSStream::isInitialized() ) 00242 { 00243 obj->enableTLS(); 00244 } 00245 #endif 00246 00247 // add the connection to the connection list 00248 connectionUp(obj); 00249 00250 // initialize the object 00251 obj->initialize(); 00252 00253 // breakpoint 00254 ibrcommon::Thread::yield(); 00255 } 00256 } catch (const std::exception&) { 00257 // ignore all errors 00258 return; 00259 } 00260 } 00261 00262 void TCPConvergenceLayer::__cancellation() 00263 { 00264 _tcpsrv.shutdown(); 00265 _tcpsrv.close(); 00266 } 00267 00268 void TCPConvergenceLayer::closeAll() 00269 { 00270 // search for an existing connection 00271 ibrcommon::MutexLock l(_connections_cond); 00272 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00273 { 00274 TCPConnection &conn = *(*iter); 00275 00276 // close the connection immediately 00277 conn.shutdown(); 00278 } 00279 } 00280 00281 void TCPConvergenceLayer::componentUp() 00282 { 00283 // listen on the socket, max. 5 concurrent awaiting connections 00284 _tcpsrv.listen(5); 00285 } 00286 00287 void TCPConvergenceLayer::componentDown() 00288 { 00289 // shutdown the TCP server 00290 _tcpsrv.shutdown(); 00291 _tcpsrv.close(); 00292 00293 // close all active connections 00294 closeAll(); 00295 00296 // wait until all tcp connections are down 00297 { 00298 ibrcommon::MutexLock l(_connections_cond); 00299 while (_connections.size() > 0) _connections_cond.wait(); 00300 } 00301 } 00302 } 00303 }