IBR-DTNSuite  0.8
daemon/src/net/TCPConnection.cpp
Go to the documentation of this file.
00001 /*
00002  * TCPConnection.cpp
00003  *
00004  *  Created on: 26.04.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "core/BundleCore.h"
00011 #include "core/BundleEvent.h"
00012 #include "storage/BundleStorage.h"
00013 #include "core/FragmentManager.h"
00014 
00015 #include "net/TCPConvergenceLayer.h"
00016 #include "net/BundleReceivedEvent.h"
00017 #include "net/ConnectionEvent.h"
00018 #include "net/TransferCompletedEvent.h"
00019 #include "net/TransferAbortedEvent.h"
00020 #include "routing/RequeueBundleEvent.h"
00021 
00022 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00023 #include <ibrcommon/net/tcpclient.h>
00024 #include <ibrcommon/TimeMeasurement.h>
00025 #include <ibrcommon/net/vinterface.h>
00026 #include <ibrcommon/thread/Conditional.h>
00027 #include <ibrcommon/Logger.h>
00028 
00029 #include <iostream>
00030 #include <iomanip>
00031 
00032 #ifdef WITH_TLS
00033 #include <openssl/x509.h>
00034 #include "security/SecurityCertificateManager.h"
00035 #include <ibrcommon/TLSExceptions.h>
00036 #endif
00037 
00038 #ifdef WITH_BUNDLE_SECURITY
00039 #include "security/SecurityManager.h"
00040 #endif
00041 
00042 namespace dtn
00043 {
00044         namespace net
00045         {
00046                 /*
00047                  * class TCPConnection
00048                  */
00049                 TCPConnection::TCPConnection(TCPConvergenceLayer & tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID & name, const size_t timeout)
00050 #ifdef WITH_TLS
00051                  : _peer(), _node(name), _tcpstream(stream), _tlsstream(stream), _stream(*this, _tlsstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00052                    _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false)
00053 #else
00054                  : _peer(), _node(name), _tcpstream(stream), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00055                    _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false)
00056 #endif
00057                 {
00058                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00059 
00060                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00061                         {
00062                                 stream->enableNoDelay();
00063                         }
00064 
00065                         // add default TCP connection
00066                         _node.clear();
00067                         _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, "0.0.0.0", 0, 30) );
00068 
00069                         _flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00070                         _flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00071 
00072                         if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation())
00073                         {
00074                                 _flags |= dtn::streams::StreamContactHeader::REQUEST_FRAGMENTATION;
00075                         }
00076 
00077 #ifdef WITH_TLS
00078                         // set tls mode to server
00079                         _tlsstream.setServer(true);
00080 #endif
00081                 }
00082 
00083                 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout)
00084 #ifdef WITH_TLS
00085                  : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _tlsstream(_tcpstream.get()), _stream(*this, _tlsstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00086                    _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false)
00087 #else
00088                  : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00089                    _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false)
00090 #endif
00091                 {
00092                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00093 
00094                         _flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00095                         _flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00096 
00097                         if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation())
00098                         {
00099                                 _flags |= dtn::streams::StreamContactHeader::REQUEST_FRAGMENTATION;
00100                         }
00101                 }
00102 
00103                 TCPConnection::~TCPConnection()
00104                 {
00105                         // join the keepalive sender thread
00106                         _keepalive_sender.join();
00107 
00108                         // wait until the sender thread is finished
00109                         _sender.join();
00110                 }
00111 
00112                 void TCPConnection::queue(const dtn::data::BundleID &bundle)
00113                 {
00114                         _sender.push(bundle);
00115                 }
00116 
00117                 const StreamContactHeader& TCPConnection::getHeader() const
00118                 {
00119                         return _peer;
00120                 }
00121 
00122                 const dtn::core::Node& TCPConnection::getNode() const
00123                 {
00124                         return _node;
00125                 }
00126 
00127                 void TCPConnection::rejectTransmission()
00128                 {
00129                         _stream.reject();
00130                 }
00131 
00132                 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases)
00133                 {
00134                 }
00135 
00136                 void TCPConnection::eventTimeout()
00137                 {
00138                         // event
00139                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00140 
00141                         // stop the receiver thread
00142                         this->stop();
00143                 }
00144 
00145                 void TCPConnection::eventError()
00146                 {
00147                 }
00148 
00149                 void TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00150                 {
00151                         _peer = header;
00152 
00153                         // copy old attributes and urls to the new node object
00154                         Node n_old = _node;
00155                         _node = Node(header._localeid);
00156                         _node += n_old;
00157 
00158                         _keepalive_timeout = header._keepalive * 1000;
00159 
00160 #ifdef WITH_TLS
00161                         /* if both nodes support TLS, activate it */
00162                         if((_peer._flags & dtn::streams::StreamContactHeader::REQUEST_TLS)
00163                                         && (_flags & dtn::streams::StreamContactHeader::REQUEST_TLS)){
00164                                 try{
00165                                         X509 *peer_cert = _tlsstream.activate();
00166                                         if(!dtn::security::SecurityCertificateManager::validateSubject(peer_cert, _peer.getEID())){
00167                                                 IBRCOMMON_LOGGER(warning) << "TCPConnection: certificate does not fit the EID." << IBRCOMMON_LOGGER_ENDL;
00168                                                 throw ibrcommon::TLSCertificateVerificationException("certificate does not fit the EID");
00169                                         }
00170                                 } catch(...){
00171                                         if(dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
00172                                                 /* close the connection */
00173                                                 IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL;
00174                                                 throw;
00175                                         } else {
00176                                                 IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL;
00177                                         }
00178                                 }
00179                         } else {
00180                                 /* TLS not supported by both Nodes, check if its required */
00181                                 if(dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
00182                                         /* close the connection */
00183                                         IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS not supported by both Peers. Closing the connection." << IBRCOMMON_LOGGER_ENDL;
00184                                         throw ibrcommon::TLSException("TLS not supported by peer.");
00185                                 } else if(_flags & dtn::streams::StreamContactHeader::REQUEST_TLS){
00186                                         IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL;
00187                                 }
00188                                 /* else: this node does not support TLS, should have already printed a warning */
00189                         }
00190 #endif
00191 
00192                         // set the incoming timer if set (> 0)
00193                         if (_peer._keepalive > 0)
00194                         {
00195                                 // set the timer
00196                                 _tcpstream->setTimeout(header._keepalive * 2);
00197                         }
00198 
00199                         // enable idle timeout
00200                         size_t _idle_timeout = dtn::daemon::Configuration::getInstance().getNetwork().getTCPIdleTimeout();
00201                         if (_idle_timeout > 0)
00202                         {
00203                                 _stream.enableIdleTimeout(_idle_timeout);
00204                         }
00205 
00206                         // raise up event
00207                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00208                 }
00209 
00210                 void TCPConnection::eventConnectionDown()
00211                 {
00212                         IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00213 
00214                         try {
00215                                 // shutdown the keepalive sender thread
00216                                 _keepalive_sender.stop();
00217 
00218                                 // stop the sender
00219                                 _sender.stop();
00220                         } catch (const ibrcommon::ThreadException &ex) {
00221                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00222                         }
00223 
00224                         if (_peer._localeid != dtn::data::EID())
00225                         {
00226                                 // event
00227                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00228                         }
00229                 }
00230 
00231                 void TCPConnection::eventBundleRefused()
00232                 {
00233                         try {
00234                                 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00235 
00236                                 // requeue the bundle
00237                                 TransferAbortedEvent::raise(EID(_node.getEID()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00238 
00239                                 // set ACK to zero
00240                                 _lastack = 0;
00241 
00242                         } catch (const ibrcommon::QueueUnblockedException&) {
00243                                 // pop on empty queue!
00244                                 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00245                         }
00246                 }
00247 
00248                 void TCPConnection::eventBundleForwarded()
00249                 {
00250                         try {
00251                                 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00252 
00253                                 // signal completion of the transfer
00254                                 TransferCompletedEvent::raise(_node.getEID(), bundle);
00255 
00256                                 // raise bundle event
00257                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00258 
00259                                 // set ACK to zero
00260                                 _lastack = 0;
00261                         } catch (const ibrcommon::QueueUnblockedException&) {
00262                                 // pop on empty queue!
00263                                 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00264                         }
00265                 }
00266 
00267                 void TCPConnection::eventBundleAck(size_t ack)
00268                 {
00269                         _lastack = ack;
00270                 }
00271 
00272                 void TCPConnection::initialize()
00273                 {
00274                         // start the receiver for incoming bundles + handshake
00275                         try {
00276                                 start();
00277                         } catch (const ibrcommon::ThreadException &ex) {
00278                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00279                         }
00280                 }
00281 
00282                 void TCPConnection::shutdown()
00283                 {
00284                         // shutdown
00285                         _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00286 
00287                         try {
00288                                 // abort the connection thread
00289                                 ibrcommon::DetachedThread::stop();
00290                         } catch (const ibrcommon::ThreadException &ex) {
00291                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00292                         }
00293                 }
00294 
00295                 void TCPConnection::__cancellation()
00296                 {
00297                         // mark the connection as aborted
00298                         _aborted = true;
00299 
00300                         // close the stream
00301                         try {
00302                                 (*_tcpstream).close();
00303                         } catch (const ibrcommon::ConnectionClosedException&) { };
00304                 }
00305 
00306                 void TCPConnection::finally()
00307                 {
00308                         IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00309 
00310                         try {
00311                                 // shutdown the keepalive sender thread
00312                                 _keepalive_sender.stop();
00313 
00314                                 // shutdown the sender thread
00315                                 _sender.stop();
00316                         } catch (const std::exception&) { };
00317 
00318                         // close the tcpstream
00319                         try {
00320                                 _tcpstream->close();
00321                         } catch (const ibrcommon::ConnectionClosedException&) { };
00322 
00323                         try {
00324                                 _callback.connectionDown(this);
00325                         } catch (const ibrcommon::MutexException&) { };
00326 
00327                         // clear the queue
00328                         clearQueue();
00329                 }
00330 
00331                 void TCPConnection::setup()
00332                 {
00333                         // nothing to do here
00334                 }
00335 
00336                 void TCPConnection::connect()
00337                 {
00338                         // variables for address and port
00339                         std::string address = "0.0.0.0";
00340                         unsigned int port = 0;
00341 
00342                         // try to connect to the other side
00343                         try {
00344                                 const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP);
00345 
00346                                 for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); iter++)
00347                                 {
00348                                         // break-out if the connection has been aborted
00349                                         if (_aborted) throw ibrcommon::tcpclient::SocketException("connection has been aborted");
00350 
00351                                         try {
00352                                                 // decode address and port
00353                                                 const dtn::core::Node::URI &uri = (*iter);
00354                                                 uri.decode(address, port);
00355 
00356                                                 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream);
00357 
00358                                                 IBRCOMMON_LOGGER_DEBUG(15) << "Initiate TCP connection to " << address << ":" << port << IBRCOMMON_LOGGER_ENDL;
00359                                                 client.open(address, port, _timeout);
00360 
00361                                                 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00362                                                 {
00363                                                         _tcpstream->enableNoDelay();
00364                                                 }
00365 
00366                                                 // add TCP connection descriptor to the node object
00367                                                 _node.clear();
00368                                                 _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri.value, 0, 30) );
00369 
00370                                                 // connection successful
00371                                                 return;
00372                                         } catch (const ibrcommon::tcpclient::SocketException&) { };
00373                                 }
00374 
00375                                 // no connection has been established
00376                                 throw ibrcommon::tcpclient::SocketException("no address available to connect");
00377 
00378                         } catch (const ibrcommon::tcpclient::SocketException&) {
00379                                 // error on open, requeue all bundles in the queue
00380                                 IBRCOMMON_LOGGER(warning) << "connection to " << _node.toString() << " failed" << IBRCOMMON_LOGGER_ENDL;
00381                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00382                                 throw;
00383                         } catch (const bad_cast&) { };
00384                 }
00385 
00386                 void TCPConnection::run()
00387                 {
00388                         try {
00389                                 // connect to the peer
00390                                 connect();
00391 
00392                                 // do the handshake
00393                                 _stream.handshake(_name, _timeout, _flags);
00394 
00395                                 // start the sender
00396                                 _sender.start();
00397 
00398                                 // start keepalive sender
00399                                 _keepalive_sender.start();
00400 
00401                                 while (!_stream.eof())
00402                                 {
00403                                         try {
00404                                                 // create a new empty bundle
00405                                                 dtn::data::Bundle bundle;
00406 
00407                                                 // deserialize the bundle
00408                                                 (*this) >> bundle;
00409 
00410                                                 // check the bundle
00411                                                 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00412                                                 {
00413                                                         // invalid bundle!
00414                                                         throw dtn::data::Validator::RejectedException("destination or source EID is null");
00415                                                 }
00416 
00417                                                 // increment value in the scope control hop limit block
00418                                                 try {
00419                                                         dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00420                                                         schl.increment();
00421                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }
00422 
00423                                                 // raise default bundle received event
00424                                                 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle, false, true);
00425                                         }
00426                                         catch (const dtn::data::Validator::RejectedException &ex)
00427                                         {
00428                                                 // bundle rejected
00429                                                 rejectTransmission();
00430 
00431                                                 // display the rejection
00432                                                 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00433                                         }
00434                                         catch (const dtn::InvalidDataException &ex) {
00435                                                 // bundle rejected
00436                                                 rejectTransmission();
00437 
00438                                                 // display the rejection
00439                                                 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00440                                         }
00441 
00442                                         yield();
00443                                 }
00444                         } catch (const ibrcommon::ThreadException &ex) {
00445                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00446                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00447                         } catch (const std::exception &ex) {
00448                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00449                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00450                         }
00451                 }
00452 
00453 
00454                 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle)
00455                 {
00456                         std::iostream &stream = conn._stream;
00457 
00458                         // check if the stream is still good
00459                         if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00460 
00461                         // create a deserializer for next bundle
00462                         dtn::data::DefaultDeserializer deserializer(stream, dtn::core::BundleCore::getInstance());
00463 
00464                         // enable/disable fragmentation support according to the contact header.
00465                         deserializer.setFragmentationSupport(conn._peer._flags & StreamContactHeader::REQUEST_FRAGMENTATION);
00466 
00467                         // read the bundle (or the fragment if fragmentation is enabled)
00468                         deserializer >> bundle;
00469 
00470                         return conn;
00471                 }
00472 
00473                 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle)
00474                 {
00475                         // prepare a measurement
00476                         ibrcommon::TimeMeasurement m;
00477 
00478                         std::iostream &stream = conn._stream;
00479 
00480                         // get the offset, if this bundle has been reactively fragmented before
00481                         size_t offset = 0;
00482                         if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
00483                                         && !bundle.get(dtn::data::PrimaryBlock::DONT_FRAGMENT))
00484                         {
00485                                 offset = dtn::core::FragmentManager::getOffset(conn.getNode().getEID(), bundle);
00486                         }
00487 
00488                         // create a serializer
00489                         dtn::data::DefaultSerializer serializer(stream);
00490 
00491                         // put the bundle into the sentqueue
00492                         conn._sentqueue.push(bundle);
00493 
00494                         // start the measurement
00495                         m.start();
00496 
00497                         try {
00498                                 // activate exceptions for this method
00499                                 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00500 
00501                                 if (offset > 0)
00502                                 {
00503                                         // transmit the fragment
00504                                         serializer << dtn::data::BundleFragment(bundle, offset, -1);
00505                                 }
00506                                 else
00507                                 {
00508                                         // transmit the bundle
00509                                         serializer << bundle;
00510                                 }
00511 
00512                                 // flush the stream
00513                                 stream << std::flush;
00514 
00515                                 // stop the time measurement
00516                                 m.stop();
00517 
00518                                 // get throughput
00519                                 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00520 
00521                                 // print out throughput
00522                                 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00523                                                 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00524 
00525                         } catch (const ibrcommon::Exception &ex) {
00526                                 // the connection not available
00527                                 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00528 
00529                                 // forward exception
00530                                 throw;
00531                         }
00532 
00533                         return conn;
00534                 }
00535 
00536                 TCPConnection::KeepaliveSender::KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout)
00537                  : _connection(connection), _keepalive_timeout(keepalive_timeout)
00538                 {
00539 
00540                 }
00541 
00542                 TCPConnection::KeepaliveSender::~KeepaliveSender()
00543                 {
00544                 }
00545 
00546                 void TCPConnection::KeepaliveSender::run()
00547                 {
00548                         try {
00549                                 ibrcommon::MutexLock l(_wait);
00550                                 while (true)
00551                                 {
00552                                         try {
00553                                                 _wait.wait(_keepalive_timeout);
00554                                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00555                                                 if (ex.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
00556                                                 {
00557                                                         // send a keepalive
00558                                                         _connection.keepalive();
00559                                                 }
00560                                                 else
00561                                                 {
00562                                                         throw;
00563                                                 }
00564                                         }
00565                                 }
00566                         } catch (const std::exception&) { };
00567                 }
00568 
00569                 void TCPConnection::KeepaliveSender::__cancellation()
00570                 {
00571                         ibrcommon::MutexLock l(_wait);
00572                         _wait.abort();
00573                 }
00574 
00575                 TCPConnection::Sender::Sender(TCPConnection &connection)
00576                  : _connection(connection)
00577                 {
00578                 }
00579 
00580                 TCPConnection::Sender::~Sender()
00581                 {
00582                 }
00583 
00584                 void TCPConnection::Sender::__cancellation()
00585                 {
00586                         // cancel the main thread in here
00587                         ibrcommon::Queue<dtn::data::BundleID>::abort();
00588                 }
00589 
00590                 void TCPConnection::Sender::run()
00591                 {
00592                         try {
00593                                 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00594 
00595                                 while (_connection.good())
00596                                 {
00597                                         dtn::data::BundleID transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true);
00598 
00599                                         try {
00600                                                 // read the bundle out of the storage
00601                                                 dtn::data::Bundle bundle = storage.get(transfer);
00602 
00603 #ifdef WITH_BUNDLE_SECURITY
00604                                                 const dtn::daemon::Configuration::Security::Level seclevel =
00605                                                                 dtn::daemon::Configuration::getInstance().getSecurity().getLevel();
00606 
00607                                                 if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED)
00608                                                 {
00609                                                         try {
00610                                                                 dtn::security::SecurityManager::getInstance().auth(bundle);
00611                                                         } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00612                                                                 // sign requested, but no key is available
00613                                                                 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00614                                                         }
00615                                                 }
00616 #endif
00617                                                 // send bundle
00618                                                 _connection << bundle;
00619                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00620                                                 // send transfer aborted event
00621                                                 TransferAbortedEvent::raise(_connection._node.getEID(), transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00622                                         }
00623 
00624                                         // idle a little bit
00625                                         yield();
00626                                 }
00627                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00628                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00629                                 return;
00630                         } catch (const std::exception &ex) {
00631                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00632                         }
00633 
00634                         _connection.stop();
00635                 }
00636 
00637                 void TCPConnection::clearQueue()
00638                 {
00639                         // requeue all bundles still queued
00640                         try {
00641                                 while (true)
00642                                 {
00643                                         const dtn::data::BundleID id = _sender.getnpop();
00644 
00645                                         // raise transfer abort event for all bundles without an ACK
00646                                         dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00647                                 }
00648                         } catch (const ibrcommon::QueueUnblockedException&) {
00649                                 // queue emtpy
00650                         }
00651 
00652                         // requeue all bundles still in transit
00653                         try {
00654                                 while (true)
00655                                 {
00656                                         const dtn::data::BundleID id = _sentqueue.getnpop();
00657 
00658                                         if ((_lastack > 0) && (_peer._flags & dtn::streams::StreamContactHeader::REQUEST_FRAGMENTATION))
00659                                         {
00660                                                 // some data are already acknowledged
00661                                                 // store this information in the fragment manager
00662                                                 dtn::core::FragmentManager::setOffset(_peer.getEID(), id, _lastack);
00663 
00664                                                 // raise transfer abort event for all bundles without an ACK
00665                                                 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00666                                         }
00667                                         else
00668                                         {
00669                                                 // raise transfer abort event for all bundles without an ACK
00670                                                 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00671                                         }
00672 
00673                                         // set last ack to zero
00674                                         _lastack = 0;
00675                                 }
00676                         } catch (const ibrcommon::QueueUnblockedException&) {
00677                                 // queue emtpy
00678                         }
00679                 }
00680 
00681 #ifdef WITH_TLS
00682                 void dtn::net::TCPConnection::enableTLS()
00683                 {
00684                         _flags |= dtn::streams::StreamContactHeader::REQUEST_TLS;
00685                 }
00686 #endif
00687 
00688                 void TCPConnection::keepalive()
00689                 {
00690                         _stream.keepalive();
00691                 }
00692 
00693                 bool TCPConnection::good() const
00694                 {
00695                         return _tcpstream->good();
00696                 }
00697 
00698                 void TCPConnection::Sender::finally()
00699                 {
00700                 }
00701 
00702                 bool TCPConnection::match(const dtn::core::Node &n) const
00703                 {
00704                         return (_node == n);
00705                 }
00706 
00707                 bool TCPConnection::match(const dtn::data::EID &destination) const
00708                 {
00709                         return (_node.getEID() == destination.getNode());
00710                 }
00711 
00712                 bool TCPConnection::match(const NodeEvent &evt) const
00713                 {
00714                         const dtn::core::Node &n = evt.getNode();
00715                         return match(n);
00716                 }
00717         }
00718 }