IBR-DTNSuite  0.10
TCPConnection.cpp
Go to the documentation of this file.
1 /*
2  * TCPConnection.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "core/BundleCore.h"
25 #include "core/BundleEvent.h"
26 #include "storage/BundleStorage.h"
27 #include "core/FragmentManager.h"
28 
31 #include "net/ConnectionEvent.h"
33 
34 #include <ibrcommon/net/socket.h>
38 #include <ibrcommon/Logger.h>
39 
40 #include <iostream>
41 #include <iomanip>
42 #include <memory>
43 
44 #ifdef WITH_TLS
46 #include <openssl/x509.h>
49 #endif
50 
51 #ifdef WITH_BUNDLE_SECURITY
53 #endif
54 
55 namespace dtn
56 {
57  namespace net
58  {
59  const std::string TCPConnection::TAG = "TCPConnection";
60 
61  /*
62  * class TCPConnection
63  */
65  : _peer(), _node(node), _socket(sock), _socket_stream(NULL), _sec_stream(NULL), _protocol_stream(NULL), _sender(*this),
66  _keepalive_sender(*this, _keepalive_timeout), _timeout(timeout), _lastack(0), _keepalive_timeout(0),
67  _callback(tcpsrv), _flags(0), _aborted(false)
68  {
69  }
70 
72  {
73  // join the keepalive sender thread
74  _keepalive_sender.join();
75 
76  // wait until the sender thread is finished
77  _sender.join();
78 
79  // clean-up
80  {
81  ibrcommon::RWLock l(_protocol_stream_mutex, ibrcommon::RWMutex::LOCK_READWRITE);
82  delete _protocol_stream;
83  _protocol_stream = NULL;
84  }
85 
86  if (_sec_stream != NULL) {
87  delete _sec_stream;
88  }
89 
90  if ((_socket != NULL) && (_socket_stream == NULL)) {
91  delete _socket;
92  } else if (_socket_stream != NULL) {
93  delete _socket_stream;
94  }
95  }
96 
98  {
99  _sender.push(job);
100  }
101 
103  {
104  return _peer;
105  }
106 
108  {
109  return _node;
110  }
111 
113  {
114  (*getProtocolStream()).reject();
115  }
116 
118  {
119  }
120 
122  {
123  // event
125 
126  try {
127  // stop the receiver thread
128  this->stop();
129  } catch (const ibrcommon::ThreadException &ex) {
130  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
131  }
132  }
133 
135  {
136  }
137 
138  void TCPConnection::initiateExtendedHandshake() throw (ibrcommon::Exception)
139  {
140 #ifdef WITH_TLS
141  /* if both nodes support TLS, activate it */
144  {
145  try{
146  ibrcommon::TLSStream &tls = dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream);
147  X509 *peer_cert = tls.activate();
148 
149  // check the full EID first
150  const std::string cn = _peer.getEID().getString();
151 
152  try {
153  try {
155  } catch (const dtn::security::SecurityCertificateException &ex) {
156  // check using the hostname
157  std::string weak_cn = _peer.getEID().getHost();
158 
159  // strip leading "//"
160  if (weak_cn.find_first_of("//") == 0) {
161  weak_cn = weak_cn.substr(2, weak_cn.length() - 2);
162  }
163 
164  try {
166  } catch (const dtn::security::SecurityCertificateException &ex_weak) {
167  throw ex;
168  }
169  }
170  } catch (const dtn::security::SecurityCertificateException &ex) {
171  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
173  }
174  } catch (const std::exception&) {
176  /* close the connection */
177  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) << "TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL;
178  throw;
179  } else {
180  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) << "TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL;
181  }
182  }
183  } else {
184  /* TLS not supported by both Nodes, check if its required */
185  if (dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
186  /* close the connection */
187  throw ibrcommon::TLSException("TLS not supported by peer.");
189  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, notice) << "TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL;
190  }
191  /* else: this node does not support TLS, should have already printed a warning */
192  }
193 #endif
194  }
195 
197  {
198  _peer = header;
199 
200  // copy old attributes and urls to the new node object
201  Node n_old = _node;
202  _node = Node(header._localeid);
203  _node += n_old;
204 
205  _keepalive_timeout = header._keepalive * 1000;
206 
207  try {
208  // initiate extended handshake (e.g. TLS)
209  initiateExtendedHandshake();
210  } catch (const ibrcommon::Exception &ex) {
211  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
212 
213  // abort the connection
214  shutdown();
215  return;
216  }
217 
218  // set the incoming timer if set (> 0)
219  if (_peer._keepalive > 0)
220  {
221  // set the timer
222  timeval timeout;
223  timerclear(&timeout);
224  timeout.tv_sec = header._keepalive * 2;
225  _socket_stream->setTimeout(timeout);
226  }
227 
228  try {
229  // enable idle timeout
231  if (_idle_timeout > 0)
232  {
233  (*getProtocolStream()).enableIdleTimeout(_idle_timeout);
234  }
235  } catch (const ibrcommon::Exception&) {};
236 
237  // raise up event
239  }
240 
242  {
243  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 40) << "eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
244 
245  try {
246  // shutdown the keepalive sender thread
247  _keepalive_sender.stop();
248 
249  // stop the sender
250  _sender.stop();
251  } catch (const ibrcommon::ThreadException &ex) {
252  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
253  }
254 
255  if (_peer._localeid != dtn::data::EID())
256  {
257  // event
259  }
260  }
261 
263  {
265 
266  // stop here if the queue is already empty
267  if (l.empty()) {
268  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
269  return;
270  }
271 
272  // get the job on top of the sent queue
273  dtn::net::BundleTransfer &job = l.front();
274 
275  // abort the transmission
277 
278  // set ACK to zero
279  _lastack = 0;
280 
281  // release the job
282  l.pop();
283  }
284 
286  {
288 
289  // stop here if the queue is already empty
290  if (l.empty()) {
291  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
292  return;
293  }
294 
295  // get the job on top of the sent queue
296  dtn::net::BundleTransfer &job = l.front();
297 
298  // mark job as complete
299  job.complete();
300 
301  // set ACK to zero
302  _lastack = 0;
303 
304  // release the job
305  l.pop();
306  }
307 
309  {
310  _lastack = ack;
311  }
312 
314  {
315  // start the receiver for incoming bundles + handshake
316  try {
317  start();
318  } catch (const ibrcommon::ThreadException &ex) {
319  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
320  }
321  }
322 
323  void TCPConnection::shutdown() throw ()
324  {
325  try {
326  // shutdown
328  } catch (const ibrcommon::Exception&) {};
329 
330  try {
331  // abort the connection thread
333  } catch (const ibrcommon::ThreadException &ex) {
334  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "shutdown failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
335  }
336  }
337 
339  {
340  // mark the connection as aborted
341  _aborted = true;
342 
343  // close the stream
344  _socket_stream->close();
345  }
346 
347  void TCPConnection::finally() throw ()
348  {
349  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
350 
351  try {
352  // shutdown the keepalive sender thread
353  _keepalive_sender.stop();
354 
355  // shutdown the sender thread
356  _sender.stop();
357  } catch (const std::exception&) { };
358 
359  // close the tcpstream
360  if (_socket_stream != NULL) _socket_stream->close();
361 
362  try {
363  _callback.connectionDown(this);
364  } catch (const ibrcommon::MutexException&) { };
365 
366  // clear the queue
367  clearQueue();
368  }
369 
370  void TCPConnection::setup() throw ()
371  {
374 
375  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation())
376  {
378  }
379  }
380 
381  void TCPConnection::__setup_socket(ibrcommon::clientsocket *sock, bool server)
382  {
383  if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
384  {
386  }
387 
388  _socket_stream = new ibrcommon::socketstream(sock);
389 
390 #ifdef WITH_TLS
391  // initialize security layer if available
392  _sec_stream = new ibrcommon::TLSStream(_socket_stream);
393  if (server) dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream).setServer(true);
394  else dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream).setServer(false);
395 #endif
396 
397  // create a new stream connection
399 
400  ibrcommon::RWLock l(_protocol_stream_mutex, ibrcommon::RWMutex::LOCK_READWRITE);
401  if (_protocol_stream != NULL) delete _protocol_stream;
402  _protocol_stream = new dtn::streams::StreamConnection(*this, (_sec_stream == NULL) ? *_socket_stream : *_sec_stream, chunksize);
403  _protocol_stream->exceptions(std::ios::badbit | std::ios::eofbit);
404 
405  if (dtn::daemon::Configuration::getInstance().enableTrafficStats()) {
406  // enable traffic monitoring
407  _protocol_stream->setMonitor(true);
408  }
409  }
410 
412  {
413  // do not connect to other hosts if we are in server
414  if (_socket != NULL) return;
415 
416  // do not connect to anyone if we are already connected
417  if (_socket_stream != NULL) return;
418 
419  // variables for address and port
420  std::string address = "0.0.0.0";
421  unsigned int port = 0;
422 
423  // try to connect to the other side
424  try {
425  const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP);
426 
427  for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); ++iter)
428  {
429  // break-out if the connection has been aborted
430  if (_aborted) throw ibrcommon::socket_exception("connection has been aborted");
431 
432  try {
433  // decode address and port
434  const dtn::core::Node::URI &uri = (*iter);
435  uri.decode(address, port);
436 
437  // create a virtual address to connect to
438  ibrcommon::vaddress addr(address, port);
439 
440  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 15) << "Initiate TCP connection to " << address << ":" << port << IBRCOMMON_LOGGER_ENDL;
441 
442  // create a new tcpsocket
443  timeval tv;
444  timerclear(&tv);
445  tv.tv_sec = _timeout;
446 
447  // create a new tcp connection via the tcpsocket object
448  ibrcommon::tcpsocket *client = new ibrcommon::tcpsocket(addr, &tv);
449 
450  try {
451  // connect to the node
452  client->up();
453 
454  // setup a new tcp connection
455  __setup_socket(client, false);
456 
457  // add TCP connection descriptor to the node object
458  _node.clear();
459  _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri.value, 0, 30) );
460 
461  // connection successful
462  return;
463  } catch (const ibrcommon::socket_exception&) {
464  delete client;
465  }
466  } catch (const ibrcommon::socket_exception&) { };
467  }
468 
469  // no connection has been established
470  throw ibrcommon::socket_exception("no address available to connect");
471 
472  } catch (const ibrcommon::socket_exception&) {
473  // error on open, requeue all bundles in the queue
474  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "connection to " << _node.toString() << " failed" << IBRCOMMON_LOGGER_ENDL;
476  throw;
477  } catch (const bad_cast&) { };
478  }
479 
480  void TCPConnection::run() throw ()
481  {
482  try {
483  if (_socket == NULL) {
484  // connect to the peer
485  connect();
486  } else {
487  // accept remote connection as server
488  __setup_socket(_socket, true);
489  }
490 
491  TCPConnection::safe_streamconnection sc = getProtocolStream();
492  std::iostream &stream = (*sc);
493 
494  // do the handshake
495  (*sc).handshake(dtn::core::BundleCore::local, _timeout, _flags);
496 
497  // start the sender
498  _sender.start();
499 
500  // start keepalive sender
501  _keepalive_sender.start();
502 
503  // create a deserializer for next bundle
505 
506  while (!(*sc).eof())
507  {
508  try {
509  // create a new empty bundle
510  dtn::data::Bundle bundle;
511 
512  // check if the stream is still good
513  if (!stream.good()) throw ibrcommon::IOException("stream went bad");
514 
515  // enable/disable fragmentation support according to the contact header.
517 
518  // read the bundle (or the fragment if fragmentation is enabled)
519  deserializer >> bundle;
520 
521  // check the bundle
522  if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
523  {
524  // invalid bundle!
525  throw dtn::data::Validator::RejectedException("destination or source EID is null");
526  }
527 
528  // raise default bundle received event
529  dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle, false);
530  }
532  {
533  // bundle rejected
535 
536  // display the rejection
537  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
538  }
539  catch (const dtn::InvalidDataException &ex) {
540  // bundle rejected
542 
543  // display the rejection
544  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
545  }
546 
547  yield();
548  }
549  } catch (const ibrcommon::ThreadException &ex) {
550  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
551  try {
553  } catch (const ibrcommon::Exception&) {};
554  } catch (const std::exception &ex) {
555  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
556  try {
558  } catch (const ibrcommon::Exception&) {};
559  }
560  }
561 
562  TCPConnection::safe_streamconnection TCPConnection::getProtocolStream() throw (ibrcommon::Exception)
563  {
564  return safe_streamconnection(_protocol_stream, _protocol_stream_mutex);
565  }
566 
567  TCPConnection::KeepaliveSender::KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout)
568  : _connection(connection), _keepalive_timeout(keepalive_timeout)
569  {
570 
571  }
572 
573  TCPConnection::KeepaliveSender::~KeepaliveSender()
574  {
575  }
576 
577  void TCPConnection::KeepaliveSender::run() throw ()
578  {
579  try {
580  ibrcommon::MutexLock l(_wait);
581  while (true)
582  {
583  try {
584  _wait.wait(_keepalive_timeout);
587  {
588  // send a keepalive
589  _connection.keepalive();
590  }
591  else
592  {
593  throw;
594  }
595  }
596  }
597  } catch (const std::exception&) { };
598  }
599 
600  void TCPConnection::KeepaliveSender::__cancellation() throw ()
601  {
602  ibrcommon::MutexLock l(_wait);
603  _wait.abort();
604  }
605 
606  TCPConnection::Sender::Sender(TCPConnection &connection)
607  : _connection(connection)
608  {
609  }
610 
611  TCPConnection::Sender::~Sender()
612  {
613  }
614 
615  void TCPConnection::Sender::__cancellation() throw ()
616  {
617  // cancel the main thread in here
619  }
620 
621  void TCPConnection::Sender::run() throw ()
622  {
623  try {
625 
626  TCPConnection::safe_streamconnection sc = _connection.getProtocolStream();
627  std::iostream &stream = (*sc);
628 
629  // create a serializer
630  dtn::data::DefaultSerializer serializer(stream);
631 
632  while (stream.good())
633  {
635 
636  try {
637  // read the bundle out of the storage
638  dtn::data::Bundle bundle = storage.get(transfer.getBundle());
639 
640 #ifdef WITH_BUNDLE_SECURITY
642 
644  {
645  try {
648  // sign requested, but no key is available
649  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
650  }
651  }
652 #endif
653 
654  // send bundle
655  // get the offset, if this bundle has been reactively fragmented before
656  size_t offset = 0;
657  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
659  {
660  offset = dtn::core::FragmentManager::getOffset(_connection.getNode().getEID(), bundle);
661  }
662 
663  // put the bundle into the sentqueue
664  _connection._sentqueue.push(transfer);
665 
666  try {
667  // activate exceptions for this method
668  if (!stream.good()) throw ibrcommon::IOException("stream went bad");
669 
670  if (offset > 0)
671  {
672  // transmit the fragment
673  serializer << dtn::data::BundleFragment(bundle, offset, -1);
674  }
675  else
676  {
677  // transmit the bundle
678  serializer << bundle;
679  }
680 
681  // flush the stream
682  stream << std::flush;
683  } catch (const ibrcommon::Exception &ex) {
684  // the connection not available
685  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
686 
687  // forward exception
688  throw;
689  }
690  } catch (const dtn::storage::NoBundleFoundException&) {
691  // send transfer aborted event
693  }
694 
695  // idle a little bit
696  yield();
697  }
698  } catch (const ibrcommon::QueueUnblockedException &ex) {
699  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 50) << "Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
700  return;
701  } catch (const std::exception &ex) {
702  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
703  }
704 
705  _connection.stop();
706  }
707 
709  {
710  // requeue all bundles still in transit
712 
713  while (!l.empty())
714  {
715  // get the job on top of the sent queue
716  const dtn::net::BundleTransfer &job = l.front();
717 
719  {
720  // some data are already acknowledged
721  // store this information in the fragment manager
722  dtn::core::FragmentManager::setOffset(_peer.getEID(), job.getBundle(), _lastack);
723  }
724 
725  // set last ack to zero
726  _lastack = 0;
727 
728  // release the job
729  l.pop();
730  }
731  }
732 
733 #ifdef WITH_TLS
734  void dtn::net::TCPConnection::enableTLS()
735  {
737  }
738 #endif
739 
741  {
742  (*getProtocolStream()).keepalive();
743  }
744 
745  bool TCPConnection::good() const
746  {
747  return _socket_stream->good();
748  }
749 
750  void TCPConnection::Sender::finally() throw ()
751  {
752  }
753 
755  {
756  return (_node == n);
757  }
758 
759  bool TCPConnection::match(const dtn::data::EID &destination) const
760  {
761  return (_node.getEID() == destination.getNode());
762  }
763 
764  bool TCPConnection::match(const NodeEvent &evt) const
765  {
766  const dtn::core::Node &n = evt.getNode();
767  return match(n);
768  }
769 
771  {
772  try {
773  safe_streamconnection sc = getProtocolStream();
774  return (*sc).getMonitorStat(index);
775  } catch (const ibrcommon::Exception&) {
776  return 0;
777  }
778  }
779 
781  {
782  try {
783  safe_streamconnection sc = getProtocolStream();
784  return (*sc).resetMonitorStats();
785  } catch (const ibrcommon::Exception&) {
786  }
787  }
788  }
789 }