IBR-DTNSuite  0.12
TCPConnection.cpp
Go to the documentation of this file.
1 /*
2  * TCPConnection.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "core/BundleCore.h"
25 #include "core/BundleEvent.h"
26 #include "storage/BundleStorage.h"
27 #include "core/FragmentManager.h"
28 
31 #include "net/ConnectionEvent.h"
33 
34 #include <ibrcommon/net/socket.h>
38 #include <ibrcommon/Logger.h>
39 
40 #include <iostream>
41 #include <iomanip>
42 #include <memory>
43 
44 #ifdef WITH_TLS
46 #include <openssl/x509.h>
49 #endif
50 
51 #ifdef WITH_BUNDLE_SECURITY
53 #endif
54 
55 namespace dtn
56 {
57  namespace net
58  {
59  const std::string TCPConnection::TAG = "TCPConnection";
60 
61  /*
62  * class TCPConnection
63  */
65  : _peer(), _node(node), _socket(sock), _socket_stream(NULL), _sec_stream(NULL), _protocol_stream(NULL), _sender(*this),
66  _keepalive_sender(*this, _keepalive_timeout), _timeout(timeout), _lastack(0), _resume_offset(0), _keepalive_timeout(0),
67  _callback(tcpsrv), _flags(0), _aborted(false)
68  {
69  }
70 
72  {
73  // join the keepalive sender thread
74  _keepalive_sender.join();
75 
76  // wait until the sender thread is finished
77  _sender.join();
78 
79  // clean-up
80  {
81  ibrcommon::RWLock l(_protocol_stream_mutex, ibrcommon::RWMutex::LOCK_READWRITE);
82  delete _protocol_stream;
83  _protocol_stream = NULL;
84  }
85 
86  if (_sec_stream != NULL) {
87  delete _sec_stream;
88  }
89 
90  if ((_socket != NULL) && (_socket_stream == NULL)) {
91  delete _socket;
92  } else if (_socket_stream != NULL) {
93  delete _socket_stream;
94  }
95  }
96 
98  {
99  _sender.push(job);
100  }
101 
103  {
104  return _peer;
105  }
106 
108  {
109  return _node;
110  }
111 
113  {
114  (*getProtocolStream()).reject();
115  }
116 
118  {
119  }
120 
122  {
123  // event
125 
126  try {
127  // stop the receiver thread
128  this->stop();
129  } catch (const ibrcommon::ThreadException &ex) {
130  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
131  }
132  }
133 
135  {
136  }
137 
138  void TCPConnection::initiateExtendedHandshake() throw (ibrcommon::Exception)
139  {
140 #ifdef WITH_TLS
141  /* if both nodes support TLS, activate it */
144  {
145  try{
146  ibrcommon::TLSStream &tls = dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream);
147  X509 *peer_cert = tls.activate();
148 
149  // check the full EID first
150  const std::string cn = _peer.getEID().getString();
151 
152  try {
153  try {
155  } catch (const dtn::security::SecurityCertificateException &ex) {
156  // check using the hostname
157  std::string weak_cn = _peer.getEID().getHost();
158 
159  // strip leading "//"
160  if (weak_cn.find_first_of("//") == 0) {
161  weak_cn = weak_cn.substr(2, weak_cn.length() - 2);
162  }
163 
164  try {
166  } catch (const dtn::security::SecurityCertificateException &ex_weak) {
167  throw ex;
168  }
169  }
170  } catch (const dtn::security::SecurityCertificateException &ex) {
171  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
173  }
174  } catch (const std::exception&) {
176  /* close the connection */
177  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) << "TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL;
178  throw;
179  } else {
180  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) << "TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL;
181  }
182  }
183  } else {
184  /* TLS not supported by both Nodes, check if its required */
185  if (dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
186  /* close the connection */
187  throw ibrcommon::TLSException("TLS not supported by peer.");
189  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, notice) << "TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL;
190  }
191  /* else: this node does not support TLS, should have already printed a warning */
192  }
193 #endif
194  }
195 
197  {
198  _peer = header;
199 
200  // copy old attributes and urls to the new node object
201  Node n_old = _node;
202  _node = Node(header._localeid);
203  _node += n_old;
204 
205  // check if the peer has the same EID
206  if (_node.getEID() == dtn::core::BundleCore::getInstance().local)
207  {
208  // abort the connection
209  shutdown();
210 
211  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "connection to local endpoint rejected" << IBRCOMMON_LOGGER_ENDL;
212  return;
213  }
214 
215  _keepalive_timeout = header._keepalive * 1000;
216 
217  try {
218  // initiate extended handshake (e.g. TLS)
219  initiateExtendedHandshake();
220  } catch (const ibrcommon::Exception &ex) {
221  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
222 
223  // abort the connection
224  shutdown();
225  return;
226  }
227 
228  // set the incoming timer if set (> 0)
229  if (_peer._keepalive > 0)
230  {
231  // set the timer
232  timeval timeout;
233  timerclear(&timeout);
234  timeout.tv_sec = header._keepalive * 2;
235  _socket_stream->setTimeout(timeout);
236  }
237 
238  try {
239  // enable idle timeout
241  if (_idle_timeout > 0)
242  {
243  (*getProtocolStream()).enableIdleTimeout(_idle_timeout);
244  }
245  } catch (const ibrcommon::Exception&) {};
246 
247  // raise up event
249  }
250 
252  {
253  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 40) << "eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
254 
255  try {
256  // shutdown the keepalive sender thread
257  _keepalive_sender.stop();
258 
259  // stop the sender
260  _sender.stop();
261  } catch (const ibrcommon::ThreadException &ex) {
262  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
263  }
264 
265  if (_peer._localeid != dtn::data::EID())
266  {
267  // event
269  }
270  }
271 
273  {
275 
276  // stop here if the queue is already empty
277  if (l.empty()) {
278  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
279  return;
280  }
281 
282  // get the job on top of the sent queue
283  dtn::net::BundleTransfer &job = l.front();
284 
285  // abort the transmission
287 
288  // set ACK to zero
289  _lastack = 0;
290 
291  // release the job
292  l.pop();
293  }
294 
296  {
298 
299  // stop here if the queue is already empty
300  if (l.empty()) {
301  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
302  return;
303  }
304 
305  // get the job on top of the sent queue
306  dtn::net::BundleTransfer &job = l.front();
307 
308  // mark job as complete
309  job.complete();
310 
311  // set ACK to zero
312  _lastack = 0;
313 
314  // release the job
315  l.pop();
316  }
317 
319  {
320  _lastack = ack;
321  }
322 
323  void TCPConnection::addTrafficIn(size_t amount) throw ()
324  {
325  _callback.addTrafficIn(amount);
326  }
327 
328  void TCPConnection::addTrafficOut(size_t amount) throw ()
329  {
330  _callback.addTrafficOut(amount);
331  }
332 
334  {
335  // start the receiver for incoming bundles + handshake
336  try {
337  start();
338  } catch (const ibrcommon::ThreadException &ex) {
339  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
340  }
341  }
342 
343  void TCPConnection::shutdown() throw ()
344  {
345  try {
346  // shutdown
348  } catch (const ibrcommon::Exception&) {};
349 
350  try {
351  // abort the connection thread
353  } catch (const ibrcommon::ThreadException &ex) {
354  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "shutdown failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
355  }
356  }
357 
359  {
360  // mark the connection as aborted
361  _aborted = true;
362 
363  // close the stream
364  if (_socket_stream != NULL) _socket_stream->close();
365  }
366 
367  void TCPConnection::finally() throw ()
368  {
369  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
370 
371  try {
372  // shutdown the keepalive sender thread
373  _keepalive_sender.stop();
374 
375  // shutdown the sender thread
376  _sender.stop();
377  } catch (const std::exception&) { };
378 
379  // close the tcpstream
380  if (_socket_stream != NULL) _socket_stream->close();
381 
382  try {
383  _callback.connectionDown(this);
384  } catch (const ibrcommon::MutexException&) { };
385 
386  // clear the queue
387  clearQueue();
388  }
389 
390  void TCPConnection::setup() throw ()
391  {
394 
395  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation())
396  {
398  }
399  }
400 
401  void TCPConnection::__setup_socket(ibrcommon::clientsocket *sock, bool server)
402  {
403  if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
404  {
406  }
407 
408  _socket_stream = new ibrcommon::socketstream(sock);
409 
410 #ifdef WITH_TLS
411  // initialize security layer if available
412  _sec_stream = new ibrcommon::TLSStream(_socket_stream);
413  if (server) dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream).setServer(true);
414  else dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream).setServer(false);
415 #endif
416 
417  // create a new stream connection
419 
420  ibrcommon::RWLock l(_protocol_stream_mutex, ibrcommon::RWMutex::LOCK_READWRITE);
421  if (_protocol_stream != NULL) delete _protocol_stream;
422  _protocol_stream = new dtn::streams::StreamConnection(*this, (_sec_stream == NULL) ? *_socket_stream : *_sec_stream, chunksize);
423  _protocol_stream->exceptions(std::ios::badbit | std::ios::eofbit);
424  }
425 
427  {
428  // do not connect to other hosts if we are in server
429  if (_socket != NULL) return;
430 
431  // do not connect to anyone if we are already connected
432  if (_socket_stream != NULL) return;
433 
434  // variables for address and port
435  std::string address = "0.0.0.0";
436  unsigned int port = 0;
437 
438  // try to connect to the other side
439  try {
440  const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP);
441 
442  for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); ++iter)
443  {
444  // break-out if the connection has been aborted
445  if (_aborted) throw ibrcommon::socket_exception("connection has been aborted");
446 
447  try {
448  // decode address and port
449  const dtn::core::Node::URI &uri = (*iter);
450  uri.decode(address, port);
451 
452  // create a virtual address to connect to
453  ibrcommon::vaddress addr(address, port);
454 
455  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 15) << "Initiate TCP connection to " << address << ":" << port << IBRCOMMON_LOGGER_ENDL;
456 
457  // create a new tcpsocket
458  timeval tv;
459  timerclear(&tv);
460  tv.tv_sec = _timeout;
461 
462  // create a new tcp connection via the tcpsocket object
463  ibrcommon::tcpsocket *client = new ibrcommon::tcpsocket(addr, &tv);
464 
465  try {
466  // connect to the node
467  client->up();
468 
469  // setup a new tcp connection
470  __setup_socket(client, false);
471 
472  // add TCP connection descriptor to the node object
473  _node.clear();
474  _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri.value, 0, 30) );
475 
476  // connection successful
477  return;
478  } catch (const ibrcommon::socket_exception&) {
479  delete client;
480  }
481  } catch (const ibrcommon::socket_exception&) { };
482  }
483 
484  // no connection has been established
485  throw ibrcommon::socket_exception("no address available to connect");
486 
487  } catch (const ibrcommon::socket_exception&) {
488  // error on open, requeue all bundles in the queue
489  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "connection to " << _node.toString() << " failed" << IBRCOMMON_LOGGER_ENDL;
490  try {
492  } catch (const ibrcommon::Exception&) {};
493  throw;
494  } catch (const bad_cast&) { };
495  }
496 
497  void TCPConnection::run() throw ()
498  {
499  try {
500  if (_socket == NULL) {
501  // connect to the peer
502  connect();
503  } else {
504  // accept remote connection as server
505  __setup_socket(_socket, true);
506  }
507 
508  TCPConnection::safe_streamconnection sc = getProtocolStream();
509  std::iostream &stream = (*sc);
510 
511  // do the handshake
512  (*sc).handshake(dtn::core::BundleCore::local, _timeout, _flags);
513 
514  // start the sender
515  _sender.start();
516 
517  // start keepalive sender
518  _keepalive_sender.start();
519 
520  // create a deserializer for next bundle
522 
523  while (!(*sc).eof())
524  {
525  try {
526  // create a new empty bundle
527  dtn::data::Bundle bundle;
528 
529  // check if the stream is still good
530  if (!stream.good()) throw ibrcommon::IOException("stream went bad");
531 
532  // enable/disable fragmentation support according to the contact header.
534 
535  // read the bundle (or the fragment if fragmentation is enabled)
536  deserializer >> bundle;
537 
538  // check the bundle
539  if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
540  {
541  // invalid bundle!
542  throw dtn::data::Validator::RejectedException("destination or source EID is null");
543  }
544 
545  // raise default bundle received event
546  dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle, false);
547  }
549  {
550  // bundle rejected
552 
553  // display the rejection
554  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
555  }
556  catch (const dtn::InvalidDataException &ex) {
557  // bundle rejected
559 
560  // display the rejection
561  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
562  }
563 
564  yield();
565  }
566  } catch (const ibrcommon::ThreadException &ex) {
567  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
568  try {
570  } catch (const ibrcommon::Exception&) {};
571  } catch (const std::exception &ex) {
572  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
573  try {
575  } catch (const ibrcommon::Exception&) {};
576  }
577  }
578 
579  TCPConnection::safe_streamconnection TCPConnection::getProtocolStream() throw (ibrcommon::Exception)
580  {
581  return safe_streamconnection(_protocol_stream, _protocol_stream_mutex);
582  }
583 
584  TCPConnection::KeepaliveSender::KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout)
585  : _connection(connection), _keepalive_timeout(keepalive_timeout)
586  {
587 
588  }
589 
590  TCPConnection::KeepaliveSender::~KeepaliveSender()
591  {
592  }
593 
594  void TCPConnection::KeepaliveSender::run() throw ()
595  {
596  try {
597  ibrcommon::MutexLock l(_wait);
598  while (true)
599  {
600  try {
601  _wait.wait(_keepalive_timeout);
604  {
605  // send a keepalive
606  _connection.keepalive();
607  }
608  else
609  {
610  throw;
611  }
612  }
613  }
614  } catch (const std::exception&) { };
615  }
616 
617  void TCPConnection::KeepaliveSender::__cancellation() throw ()
618  {
619  ibrcommon::MutexLock l(_wait);
620  _wait.abort();
621  }
622 
623  TCPConnection::Sender::Sender(TCPConnection &connection)
624  : _connection(connection)
625  {
626  }
627 
628  TCPConnection::Sender::~Sender()
629  {
630  }
631 
632  void TCPConnection::Sender::__cancellation() throw ()
633  {
634  // cancel the main thread in here
636  }
637 
638  void TCPConnection::Sender::run() throw ()
639  {
640  try {
642 
643  TCPConnection::safe_streamconnection sc = _connection.getProtocolStream();
644  std::iostream &stream = (*sc);
645 
646  // create a serializer
647  dtn::data::DefaultSerializer serializer(stream);
648 
649  while (stream.good())
650  {
652 
653  // check if the transfer is directed to the connected neighbor
654  if (transfer.getNeighbor() != _connection.getNode().getEID()) continue;
655 
656  try {
657  // read the bundle out of the storage
658  dtn::data::Bundle bundle = storage.get(transfer.getBundle());
659 
660 #ifdef WITH_BUNDLE_SECURITY
662 
664  {
665  try {
668  // sign requested, but no key is available
669  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
670  }
671  }
672 #endif
673 
674  // send bundle
675  // get the offset, if this bundle has been reactively fragmented before
676  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
678  {
679  _connection._resume_offset = dtn::core::FragmentManager::getOffset(_connection.getNode().getEID(), bundle);
680  }
681  else
682  {
683  _connection._resume_offset = 0;
684  }
685 
686  // put the bundle into the sentqueue
687  _connection._sentqueue.push(transfer);
688 
689  try {
690  // activate exceptions for this method
691  if (!stream.good()) throw ibrcommon::IOException("stream went bad");
692 
693  if (_connection._resume_offset > 0)
694  {
695  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 4) << "Resume transfer of bundle " << bundle.toString() << " to " << _connection.getNode().getEID().getString() << ", offset: " << _connection._resume_offset << IBRCOMMON_LOGGER_ENDL;
696 
697  // transmit the fragment
698  serializer << dtn::data::BundleFragment(bundle, _connection._resume_offset, -1);
699  }
700  else
701  {
702  // transmit the bundle
703  serializer << bundle;
704  }
705 
706  // flush the stream
707  stream << std::flush;
708  } catch (const ibrcommon::Exception &ex) {
709  // the connection not available
710  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
711 
712  // forward exception
713  throw;
714  }
715  } catch (const dtn::storage::NoBundleFoundException&) {
716  // send transfer aborted event
718  }
719 
720  // idle a little bit
721  yield();
722  }
723  } catch (const ibrcommon::QueueUnblockedException &ex) {
724  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 50) << "Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
725  return;
726  } catch (const std::exception &ex) {
727  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
728  }
729 
730  _connection.stop();
731  }
732 
734  {
735  // requeue all bundles still in transit
737 
738  while (!l.empty())
739  {
740  // get the job on top of the sent queue
741  const dtn::net::BundleTransfer &job = l.front();
742 
744  {
745  // some data are already acknowledged
746  // store this information in the fragment manager
747  dtn::core::FragmentManager::setOffset(_peer.getEID(), job.getBundle(), _lastack, _resume_offset);
748  }
749 
750  // set last ack to zero
751  _lastack = 0;
752 
753  // release the job
754  l.pop();
755  }
756  }
757 
758 #ifdef WITH_TLS
759  void dtn::net::TCPConnection::enableTLS()
760  {
762  }
763 #endif
764 
766  {
767  (*getProtocolStream()).keepalive();
768  }
769 
770  bool TCPConnection::good() const
771  {
772  return _socket_stream->good();
773  }
774 
775  void TCPConnection::Sender::finally() throw ()
776  {
777  }
778 
780  {
781  return (_node == n);
782  }
783 
784  bool TCPConnection::match(const dtn::data::EID &destination) const
785  {
786  return _node.getEID().sameHost(destination);
787  }
788 
789  bool TCPConnection::match(const NodeEvent &evt) const
790  {
791  const dtn::core::Node &n = evt.getNode();
792  return match(n);
793  }
794  }
795 }