IBR-DTNSuite
0.8
|
00001 /* 00002 * TCPConnection.cpp 00003 * 00004 * Created on: 26.04.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "Configuration.h" 00010 #include "core/BundleCore.h" 00011 #include "core/BundleEvent.h" 00012 #include "storage/BundleStorage.h" 00013 #include "core/FragmentManager.h" 00014 00015 #include "net/TCPConvergenceLayer.h" 00016 #include "net/BundleReceivedEvent.h" 00017 #include "net/ConnectionEvent.h" 00018 #include "net/TransferCompletedEvent.h" 00019 #include "net/TransferAbortedEvent.h" 00020 #include "routing/RequeueBundleEvent.h" 00021 00022 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00023 #include <ibrcommon/net/tcpclient.h> 00024 #include <ibrcommon/TimeMeasurement.h> 00025 #include <ibrcommon/net/vinterface.h> 00026 #include <ibrcommon/thread/Conditional.h> 00027 #include <ibrcommon/Logger.h> 00028 00029 #include <iostream> 00030 #include <iomanip> 00031 00032 #ifdef WITH_TLS 00033 #include <openssl/x509.h> 00034 #include "security/SecurityCertificateManager.h" 00035 #include <ibrcommon/TLSExceptions.h> 00036 #endif 00037 00038 #ifdef WITH_BUNDLE_SECURITY 00039 #include "security/SecurityManager.h" 00040 #endif 00041 00042 namespace dtn 00043 { 00044 namespace net 00045 { 00046 /* 00047 * class TCPConnection 00048 */ 00049 TCPConnection::TCPConnection(TCPConvergenceLayer & tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID & name, const size_t timeout) 00050 #ifdef WITH_TLS 00051 : _peer(), _node(name), _tcpstream(stream), _tlsstream(stream), _stream(*this, _tlsstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), 00052 _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false) 00053 #else 00054 : _peer(), _node(name), _tcpstream(stream), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), 00055 _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false) 00056 #endif 00057 { 00058 _stream.exceptions(std::ios::badbit | std::ios::eofbit); 00059 00060 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() ) 00061 { 00062 stream->enableNoDelay(); 00063 } 00064 00065 // add default TCP connection 00066 _node.clear(); 00067 _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, "0.0.0.0", 0, 30) ); 00068 00069 _flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00070 _flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS; 00071 00072 if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()) 00073 { 00074 _flags |= dtn::streams::StreamContactHeader::REQUEST_FRAGMENTATION; 00075 } 00076 00077 #ifdef WITH_TLS 00078 // set tls mode to server 00079 _tlsstream.setServer(true); 00080 #endif 00081 } 00082 00083 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout) 00084 #ifdef WITH_TLS 00085 : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _tlsstream(_tcpstream.get()), _stream(*this, _tlsstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), 00086 _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false) 00087 #else 00088 : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), 00089 _sender(*this), _keepalive_sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0), _aborted(false) 00090 #endif 00091 { 00092 _stream.exceptions(std::ios::badbit | std::ios::eofbit); 00093 00094 _flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00095 _flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS; 00096 00097 if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()) 00098 { 00099 _flags |= dtn::streams::StreamContactHeader::REQUEST_FRAGMENTATION; 00100 } 00101 } 00102 00103 TCPConnection::~TCPConnection() 00104 { 00105 // join the keepalive sender thread 00106 _keepalive_sender.join(); 00107 00108 // wait until the sender thread is finished 00109 _sender.join(); 00110 } 00111 00112 void TCPConnection::queue(const dtn::data::BundleID &bundle) 00113 { 00114 _sender.push(bundle); 00115 } 00116 00117 const StreamContactHeader& TCPConnection::getHeader() const 00118 { 00119 return _peer; 00120 } 00121 00122 const dtn::core::Node& TCPConnection::getNode() const 00123 { 00124 return _node; 00125 } 00126 00127 void TCPConnection::rejectTransmission() 00128 { 00129 _stream.reject(); 00130 } 00131 00132 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases) 00133 { 00134 } 00135 00136 void TCPConnection::eventTimeout() 00137 { 00138 // event 00139 ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node); 00140 00141 // stop the receiver thread 00142 this->stop(); 00143 } 00144 00145 void TCPConnection::eventError() 00146 { 00147 } 00148 00149 void TCPConnection::eventConnectionUp(const StreamContactHeader &header) 00150 { 00151 _peer = header; 00152 00153 // copy old attributes and urls to the new node object 00154 Node n_old = _node; 00155 _node = Node(header._localeid); 00156 _node += n_old; 00157 00158 _keepalive_timeout = header._keepalive * 1000; 00159 00160 #ifdef WITH_TLS 00161 /* if both nodes support TLS, activate it */ 00162 if((_peer._flags & dtn::streams::StreamContactHeader::REQUEST_TLS) 00163 && (_flags & dtn::streams::StreamContactHeader::REQUEST_TLS)){ 00164 try{ 00165 X509 *peer_cert = _tlsstream.activate(); 00166 if(!dtn::security::SecurityCertificateManager::validateSubject(peer_cert, _peer.getEID())){ 00167 IBRCOMMON_LOGGER(warning) << "TCPConnection: certificate does not fit the EID." << IBRCOMMON_LOGGER_ENDL; 00168 throw ibrcommon::TLSCertificateVerificationException("certificate does not fit the EID"); 00169 } 00170 } catch(...){ 00171 if(dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){ 00172 /* close the connection */ 00173 IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL; 00174 throw; 00175 } else { 00176 IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL; 00177 } 00178 } 00179 } else { 00180 /* TLS not supported by both Nodes, check if its required */ 00181 if(dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){ 00182 /* close the connection */ 00183 IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS not supported by both Peers. Closing the connection." << IBRCOMMON_LOGGER_ENDL; 00184 throw ibrcommon::TLSException("TLS not supported by peer."); 00185 } else if(_flags & dtn::streams::StreamContactHeader::REQUEST_TLS){ 00186 IBRCOMMON_LOGGER(notice) << "TCPConnection: TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL; 00187 } 00188 /* else: this node does not support TLS, should have already printed a warning */ 00189 } 00190 #endif 00191 00192 // set the incoming timer if set (> 0) 00193 if (_peer._keepalive > 0) 00194 { 00195 // set the timer 00196 _tcpstream->setTimeout(header._keepalive * 2); 00197 } 00198 00199 // enable idle timeout 00200 size_t _idle_timeout = dtn::daemon::Configuration::getInstance().getNetwork().getTCPIdleTimeout(); 00201 if (_idle_timeout > 0) 00202 { 00203 _stream.enableIdleTimeout(_idle_timeout); 00204 } 00205 00206 // raise up event 00207 ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node); 00208 } 00209 00210 void TCPConnection::eventConnectionDown() 00211 { 00212 IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL; 00213 00214 try { 00215 // shutdown the keepalive sender thread 00216 _keepalive_sender.stop(); 00217 00218 // stop the sender 00219 _sender.stop(); 00220 } catch (const ibrcommon::ThreadException &ex) { 00221 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00222 } 00223 00224 if (_peer._localeid != dtn::data::EID()) 00225 { 00226 // event 00227 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node); 00228 } 00229 } 00230 00231 void TCPConnection::eventBundleRefused() 00232 { 00233 try { 00234 const dtn::data::BundleID bundle = _sentqueue.getnpop(); 00235 00236 // requeue the bundle 00237 TransferAbortedEvent::raise(EID(_node.getEID()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED); 00238 00239 // set ACK to zero 00240 _lastack = 0; 00241 00242 } catch (const ibrcommon::QueueUnblockedException&) { 00243 // pop on empty queue! 00244 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL; 00245 } 00246 } 00247 00248 void TCPConnection::eventBundleForwarded() 00249 { 00250 try { 00251 const dtn::data::MetaBundle bundle = _sentqueue.getnpop(); 00252 00253 // signal completion of the transfer 00254 TransferCompletedEvent::raise(_node.getEID(), bundle); 00255 00256 // raise bundle event 00257 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED); 00258 00259 // set ACK to zero 00260 _lastack = 0; 00261 } catch (const ibrcommon::QueueUnblockedException&) { 00262 // pop on empty queue! 00263 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL; 00264 } 00265 } 00266 00267 void TCPConnection::eventBundleAck(size_t ack) 00268 { 00269 _lastack = ack; 00270 } 00271 00272 void TCPConnection::initialize() 00273 { 00274 // start the receiver for incoming bundles + handshake 00275 try { 00276 start(); 00277 } catch (const ibrcommon::ThreadException &ex) { 00278 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00279 } 00280 } 00281 00282 void TCPConnection::shutdown() 00283 { 00284 // shutdown 00285 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00286 00287 try { 00288 // abort the connection thread 00289 ibrcommon::DetachedThread::stop(); 00290 } catch (const ibrcommon::ThreadException &ex) { 00291 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00292 } 00293 } 00294 00295 void TCPConnection::__cancellation() 00296 { 00297 // mark the connection as aborted 00298 _aborted = true; 00299 00300 // close the stream 00301 try { 00302 (*_tcpstream).close(); 00303 } catch (const ibrcommon::ConnectionClosedException&) { }; 00304 } 00305 00306 void TCPConnection::finally() 00307 { 00308 IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL; 00309 00310 try { 00311 // shutdown the keepalive sender thread 00312 _keepalive_sender.stop(); 00313 00314 // shutdown the sender thread 00315 _sender.stop(); 00316 } catch (const std::exception&) { }; 00317 00318 // close the tcpstream 00319 try { 00320 _tcpstream->close(); 00321 } catch (const ibrcommon::ConnectionClosedException&) { }; 00322 00323 try { 00324 _callback.connectionDown(this); 00325 } catch (const ibrcommon::MutexException&) { }; 00326 00327 // clear the queue 00328 clearQueue(); 00329 } 00330 00331 void TCPConnection::setup() 00332 { 00333 // nothing to do here 00334 } 00335 00336 void TCPConnection::connect() 00337 { 00338 // variables for address and port 00339 std::string address = "0.0.0.0"; 00340 unsigned int port = 0; 00341 00342 // try to connect to the other side 00343 try { 00344 const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP); 00345 00346 for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); iter++) 00347 { 00348 // break-out if the connection has been aborted 00349 if (_aborted) throw ibrcommon::tcpclient::SocketException("connection has been aborted"); 00350 00351 try { 00352 // decode address and port 00353 const dtn::core::Node::URI &uri = (*iter); 00354 uri.decode(address, port); 00355 00356 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream); 00357 00358 IBRCOMMON_LOGGER_DEBUG(15) << "Initiate TCP connection to " << address << ":" << port << IBRCOMMON_LOGGER_ENDL; 00359 client.open(address, port, _timeout); 00360 00361 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() ) 00362 { 00363 _tcpstream->enableNoDelay(); 00364 } 00365 00366 // add TCP connection descriptor to the node object 00367 _node.clear(); 00368 _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri.value, 0, 30) ); 00369 00370 // connection successful 00371 return; 00372 } catch (const ibrcommon::tcpclient::SocketException&) { }; 00373 } 00374 00375 // no connection has been established 00376 throw ibrcommon::tcpclient::SocketException("no address available to connect"); 00377 00378 } catch (const ibrcommon::tcpclient::SocketException&) { 00379 // error on open, requeue all bundles in the queue 00380 IBRCOMMON_LOGGER(warning) << "connection to " << _node.toString() << " failed" << IBRCOMMON_LOGGER_ENDL; 00381 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00382 throw; 00383 } catch (const bad_cast&) { }; 00384 } 00385 00386 void TCPConnection::run() 00387 { 00388 try { 00389 // connect to the peer 00390 connect(); 00391 00392 // do the handshake 00393 _stream.handshake(_name, _timeout, _flags); 00394 00395 // start the sender 00396 _sender.start(); 00397 00398 // start keepalive sender 00399 _keepalive_sender.start(); 00400 00401 while (!_stream.eof()) 00402 { 00403 try { 00404 // create a new empty bundle 00405 dtn::data::Bundle bundle; 00406 00407 // deserialize the bundle 00408 (*this) >> bundle; 00409 00410 // check the bundle 00411 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) ) 00412 { 00413 // invalid bundle! 00414 throw dtn::data::Validator::RejectedException("destination or source EID is null"); 00415 } 00416 00417 // increment value in the scope control hop limit block 00418 try { 00419 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00420 schl.increment(); 00421 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { } 00422 00423 // raise default bundle received event 00424 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle, false, true); 00425 } 00426 catch (const dtn::data::Validator::RejectedException &ex) 00427 { 00428 // bundle rejected 00429 rejectTransmission(); 00430 00431 // display the rejection 00432 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00433 } 00434 catch (const dtn::InvalidDataException &ex) { 00435 // bundle rejected 00436 rejectTransmission(); 00437 00438 // display the rejection 00439 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00440 } 00441 00442 yield(); 00443 } 00444 } catch (const ibrcommon::ThreadException &ex) { 00445 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00446 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00447 } catch (const std::exception &ex) { 00448 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00449 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00450 } 00451 } 00452 00453 00454 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle) 00455 { 00456 std::iostream &stream = conn._stream; 00457 00458 // check if the stream is still good 00459 if (!stream.good()) throw ibrcommon::IOException("stream went bad"); 00460 00461 // create a deserializer for next bundle 00462 dtn::data::DefaultDeserializer deserializer(stream, dtn::core::BundleCore::getInstance()); 00463 00464 // enable/disable fragmentation support according to the contact header. 00465 deserializer.setFragmentationSupport(conn._peer._flags & StreamContactHeader::REQUEST_FRAGMENTATION); 00466 00467 // read the bundle (or the fragment if fragmentation is enabled) 00468 deserializer >> bundle; 00469 00470 return conn; 00471 } 00472 00473 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle) 00474 { 00475 // prepare a measurement 00476 ibrcommon::TimeMeasurement m; 00477 00478 std::iostream &stream = conn._stream; 00479 00480 // get the offset, if this bundle has been reactively fragmented before 00481 size_t offset = 0; 00482 if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation() 00483 && !bundle.get(dtn::data::PrimaryBlock::DONT_FRAGMENT)) 00484 { 00485 offset = dtn::core::FragmentManager::getOffset(conn.getNode().getEID(), bundle); 00486 } 00487 00488 // create a serializer 00489 dtn::data::DefaultSerializer serializer(stream); 00490 00491 // put the bundle into the sentqueue 00492 conn._sentqueue.push(bundle); 00493 00494 // start the measurement 00495 m.start(); 00496 00497 try { 00498 // activate exceptions for this method 00499 if (!stream.good()) throw ibrcommon::IOException("stream went bad"); 00500 00501 if (offset > 0) 00502 { 00503 // transmit the fragment 00504 serializer << dtn::data::BundleFragment(bundle, offset, -1); 00505 } 00506 else 00507 { 00508 // transmit the bundle 00509 serializer << bundle; 00510 } 00511 00512 // flush the stream 00513 stream << std::flush; 00514 00515 // stop the time measurement 00516 m.stop(); 00517 00518 // get throughput 00519 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024; 00520 00521 // print out throughput 00522 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with " 00523 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL; 00524 00525 } catch (const ibrcommon::Exception &ex) { 00526 // the connection not available 00527 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00528 00529 // forward exception 00530 throw; 00531 } 00532 00533 return conn; 00534 } 00535 00536 TCPConnection::KeepaliveSender::KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout) 00537 : _connection(connection), _keepalive_timeout(keepalive_timeout) 00538 { 00539 00540 } 00541 00542 TCPConnection::KeepaliveSender::~KeepaliveSender() 00543 { 00544 } 00545 00546 void TCPConnection::KeepaliveSender::run() 00547 { 00548 try { 00549 ibrcommon::MutexLock l(_wait); 00550 while (true) 00551 { 00552 try { 00553 _wait.wait(_keepalive_timeout); 00554 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00555 if (ex.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT) 00556 { 00557 // send a keepalive 00558 _connection.keepalive(); 00559 } 00560 else 00561 { 00562 throw; 00563 } 00564 } 00565 } 00566 } catch (const std::exception&) { }; 00567 } 00568 00569 void TCPConnection::KeepaliveSender::__cancellation() 00570 { 00571 ibrcommon::MutexLock l(_wait); 00572 _wait.abort(); 00573 } 00574 00575 TCPConnection::Sender::Sender(TCPConnection &connection) 00576 : _connection(connection) 00577 { 00578 } 00579 00580 TCPConnection::Sender::~Sender() 00581 { 00582 } 00583 00584 void TCPConnection::Sender::__cancellation() 00585 { 00586 // cancel the main thread in here 00587 ibrcommon::Queue<dtn::data::BundleID>::abort(); 00588 } 00589 00590 void TCPConnection::Sender::run() 00591 { 00592 try { 00593 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00594 00595 while (_connection.good()) 00596 { 00597 dtn::data::BundleID transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true); 00598 00599 try { 00600 // read the bundle out of the storage 00601 dtn::data::Bundle bundle = storage.get(transfer); 00602 00603 #ifdef WITH_BUNDLE_SECURITY 00604 const dtn::daemon::Configuration::Security::Level seclevel = 00605 dtn::daemon::Configuration::getInstance().getSecurity().getLevel(); 00606 00607 if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED) 00608 { 00609 try { 00610 dtn::security::SecurityManager::getInstance().auth(bundle); 00611 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00612 // sign requested, but no key is available 00613 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL; 00614 } 00615 } 00616 #endif 00617 // send bundle 00618 _connection << bundle; 00619 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00620 // send transfer aborted event 00621 TransferAbortedEvent::raise(_connection._node.getEID(), transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00622 } 00623 00624 // idle a little bit 00625 yield(); 00626 } 00627 } catch (const ibrcommon::QueueUnblockedException &ex) { 00628 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00629 return; 00630 } catch (const std::exception &ex) { 00631 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00632 } 00633 00634 _connection.stop(); 00635 } 00636 00637 void TCPConnection::clearQueue() 00638 { 00639 // requeue all bundles still queued 00640 try { 00641 while (true) 00642 { 00643 const dtn::data::BundleID id = _sender.getnpop(); 00644 00645 // raise transfer abort event for all bundles without an ACK 00646 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id); 00647 } 00648 } catch (const ibrcommon::QueueUnblockedException&) { 00649 // queue emtpy 00650 } 00651 00652 // requeue all bundles still in transit 00653 try { 00654 while (true) 00655 { 00656 const dtn::data::BundleID id = _sentqueue.getnpop(); 00657 00658 if ((_lastack > 0) && (_peer._flags & dtn::streams::StreamContactHeader::REQUEST_FRAGMENTATION)) 00659 { 00660 // some data are already acknowledged 00661 // store this information in the fragment manager 00662 dtn::core::FragmentManager::setOffset(_peer.getEID(), id, _lastack); 00663 00664 // raise transfer abort event for all bundles without an ACK 00665 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id); 00666 } 00667 else 00668 { 00669 // raise transfer abort event for all bundles without an ACK 00670 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id); 00671 } 00672 00673 // set last ack to zero 00674 _lastack = 0; 00675 } 00676 } catch (const ibrcommon::QueueUnblockedException&) { 00677 // queue emtpy 00678 } 00679 } 00680 00681 #ifdef WITH_TLS 00682 void dtn::net::TCPConnection::enableTLS() 00683 { 00684 _flags |= dtn::streams::StreamContactHeader::REQUEST_TLS; 00685 } 00686 #endif 00687 00688 void TCPConnection::keepalive() 00689 { 00690 _stream.keepalive(); 00691 } 00692 00693 bool TCPConnection::good() const 00694 { 00695 return _tcpstream->good(); 00696 } 00697 00698 void TCPConnection::Sender::finally() 00699 { 00700 } 00701 00702 bool TCPConnection::match(const dtn::core::Node &n) const 00703 { 00704 return (_node == n); 00705 } 00706 00707 bool TCPConnection::match(const dtn::data::EID &destination) const 00708 { 00709 return (_node.getEID() == destination.getNode()); 00710 } 00711 00712 bool TCPConnection::match(const NodeEvent &evt) const 00713 { 00714 const dtn::core::Node &n = evt.getNode(); 00715 return match(n); 00716 } 00717 } 00718 }