IBR-DTNSuite  0.8
daemon/src/net/TCPConvergenceLayer.cpp
Go to the documentation of this file.
00001 /*
00002  * TCPConvergenceLayer.cpp
00003  *
00004  *  Created on: 05.08.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "net/ConnectionEvent.h"
00010 #include "routing/RequeueBundleEvent.h"
00011 #include "core/BundleCore.h"
00012 
00013 #include <ibrcommon/net/vinterface.h>
00014 #include <ibrcommon/thread/MutexLock.h>
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/Logger.h>
00018 
00019 #include <streambuf>
00020 #include <functional>
00021 #include <list>
00022 #include <algorithm>
00023 
00024 using namespace ibrcommon;
00025 
00026 namespace dtn
00027 {
00028         namespace net
00029         {
00030                 /*
00031                  * class TCPConvergenceLayer
00032                  */
00033                 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00034 
00035                 TCPConvergenceLayer::TCPConvergenceLayer()
00036                 {
00037                 }
00038 
00039                 TCPConvergenceLayer::~TCPConvergenceLayer()
00040                 {
00041                         join();
00042                 }
00043 
00044                 void TCPConvergenceLayer::bind(const ibrcommon::vinterface &net, int port)
00045                 {
00046                         _interfaces.push_back(net);
00047                         _tcpsrv.bind(net, port);
00048                         _portmap[net] = port;
00049                 }
00050 
00051                 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00052                 {
00053                         return dtn::core::Node::CONN_TCPIP;
00054                 }
00055 
00056                 void TCPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string &params) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00057                 {
00058                         name = "tcpcl";
00059                         stringstream service;
00060 
00061                         // TODO: get the main address of this host, if no interface is specified
00062 
00063                         // search for the matching interface
00064                         for (std::list<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); it++)
00065                         {
00066                                 const ibrcommon::vinterface &interface = *it;
00067                                 if (interface == iface)
00068                                 {
00069                                         try {
00070                                                 // get all addresses of this interface
00071                                                 std::list<vaddress> list = interface.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00072 
00073                                                 // if no address is returned... (goto catch block)
00074                                                 if (list.empty()) throw ibrcommon::Exception("no address found");
00075 
00076                                                 // fill in the ip address
00077                                                 service << "ip=" << list.front().get(false) << ";port=" << _portmap[iface] << ";";
00078                                         } catch (const ibrcommon::Exception&) {
00079                                                 // ... set the port only
00080                                                 service << "port=" << _portmap[iface] << ";";
00081                                         };
00082 
00083                                         params = service.str();
00084                                         return;
00085                                 }
00086                         }
00087 
00088                         throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00089                 }
00090 
00091                 const std::string TCPConvergenceLayer::getName() const
00092                 {
00093                         return "TCPConvergenceLayer";
00094                 }
00095 
00096                 void TCPConvergenceLayer::open(const dtn::core::Node &n)
00097                 {
00098                         // search for an existing connection
00099                         ibrcommon::MutexLock l(_connections_cond);
00100 
00101                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00102                         {
00103                                 TCPConnection &conn = *(*iter);
00104 
00105                                 if (conn.match(n))
00106                                 {
00107                                         return;
00108                                 }
00109                         }
00110 
00111                         try {
00112                                 // create a connection
00113                                 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00114 
00115 #ifdef WITH_TLS
00116                                 // enable TLS Support
00117                                 if ( ibrcommon::TLSStream::isInitialized() )
00118                                 {
00119                                         conn->enableTLS();
00120                                 }
00121 #endif
00122 
00123                                 // raise setup event
00124                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00125 
00126                                 // add connection as pending
00127                                 _connections.push_back( conn );
00128 
00129                                 // start the ClientHandler (service)
00130                                 conn->initialize();
00131 
00132                                 // signal that there is a new connection
00133                                 _connections_cond.signal(true);
00134                         } catch (const ibrcommon::Exception&) { };
00135 
00136                         return;
00137                 }
00138 
00139                 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00140                 {
00141                         // search for an existing connection
00142                         ibrcommon::MutexLock l(_connections_cond);
00143 
00144                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00145                         {
00146                                 TCPConnection &conn = *(*iter);
00147 
00148                                 if (conn.match(n))
00149                                 {
00150                                         conn.queue(job._bundle);
00151                                         IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00152 
00153                                         return;
00154                                 }
00155                         }
00156 
00157                         try {
00158                                 // create a connection
00159                                 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00160 
00161 #ifdef WITH_TLS
00162                                 // enable TLS Support
00163                                 if ( ibrcommon::TLSStream::isInitialized() )
00164                                 {
00165                                         conn->enableTLS();
00166                                 }
00167 #endif
00168 
00169                                 // raise setup event
00170                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00171 
00172                                 // add connection as pending
00173                                 _connections.push_back( conn );
00174 
00175                                 // start the ClientHandler (service)
00176                                 conn->initialize();
00177 
00178                                 // queue the bundle
00179                                 conn->queue(job._bundle);
00180 
00181                                 // signal that there is a new connection
00182                                 _connections_cond.signal(true);
00183 
00184                                 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00185                         } catch (const ibrcommon::Exception&) {
00186                                 // raise transfer abort event for all bundles without an ACK
00187                                 dtn::routing::RequeueBundleEvent::raise(n.getEID(), job._bundle);
00188                         }
00189                 }
00190 
00191                 void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
00192                 {
00193                         ibrcommon::MutexLock l(_connections_cond);
00194                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00195                         {
00196                                 if (conn == (*iter))
00197                                 {
00198                                         // put pending connection to the active connections
00199                                         return;
00200                                 }
00201                         }
00202 
00203                         _connections.push_back( conn );
00204 
00205                         // signal that there is a new connection
00206                         _connections_cond.signal(true);
00207 
00208                         IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00209                 }
00210 
00211                 void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
00212                 {
00213                         ibrcommon::MutexLock l(_connections_cond);
00214                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00215                         {
00216                                 if (conn == (*iter))
00217                                 {
00218                                         _connections.erase(iter);
00219                                         IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00220 
00221                                         // signal that there is a connection less
00222                                         _connections_cond.signal(true);
00223                                         return;
00224                                 }
00225                         }
00226                 }
00227 
00228                 void TCPConvergenceLayer::componentRun()
00229                 {
00230                         try {
00231                                 while (true)
00232                                 {
00233                                         // wait for incoming connections
00234                                         tcpstream *stream = _tcpsrv.accept();
00235 
00236                                         // create a new TCPConnection and return the pointer
00237                                         TCPConnection *obj = new TCPConnection(*this, stream, dtn::core::BundleCore::local, 10);
00238 
00239 #ifdef WITH_TLS
00240                                         // enable TLS Support
00241                                         if ( ibrcommon::TLSStream::isInitialized() )
00242                                         {
00243                                                 obj->enableTLS();
00244                                         }
00245 #endif
00246 
00247                                         // add the connection to the connection list
00248                                         connectionUp(obj);
00249 
00250                                         // initialize the object
00251                                         obj->initialize();
00252 
00253                                         // breakpoint
00254                                         ibrcommon::Thread::yield();
00255                                 }
00256                         } catch (const std::exception&) {
00257                                 // ignore all errors
00258                                 return;
00259                         }
00260                 }
00261 
00262                 void TCPConvergenceLayer::__cancellation()
00263                 {
00264                         _tcpsrv.shutdown();
00265                         _tcpsrv.close();
00266                 }
00267 
00268                 void TCPConvergenceLayer::closeAll()
00269                 {
00270                         // search for an existing connection
00271                         ibrcommon::MutexLock l(_connections_cond);
00272                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00273                         {
00274                                 TCPConnection &conn = *(*iter);
00275 
00276                                 // close the connection immediately
00277                                 conn.shutdown();
00278                         }
00279                 }
00280 
00281                 void TCPConvergenceLayer::componentUp()
00282                 {
00283                         // listen on the socket, max. 5 concurrent awaiting connections
00284                         _tcpsrv.listen(5);
00285                 }
00286 
00287                 void TCPConvergenceLayer::componentDown()
00288                 {
00289                         // shutdown the TCP server
00290                         _tcpsrv.shutdown();
00291                         _tcpsrv.close();
00292 
00293                         // close all active connections
00294                         closeAll();
00295 
00296                         // wait until all tcp connections are down
00297                         {
00298                                 ibrcommon::MutexLock l(_connections_cond);
00299                                 while (_connections.size() > 0) _connections_cond.wait();
00300                         }
00301                 }
00302         }
00303 }