IBR-DTNSuite  0.12
TCPConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * TCPConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "net/P2PDialupEvent.h"
25 #include "net/ConnectionEvent.h"
26 #include "net/DiscoveryAgent.h"
27 #include "core/BundleCore.h"
28 #include "core/EventDispatcher.h"
29 
32 #include <ibrcommon/Logger.h>
33 #include <ibrcommon/net/socket.h>
34 #include <ibrcommon/Logger.h>
35 
36 #include <streambuf>
37 #include <functional>
38 #include <list>
39 #include <algorithm>
40 
41 #ifdef WITH_TLS
43 #endif
44 
45 namespace dtn
46 {
47  namespace net
48  {
49  /*
50  * class TCPConvergenceLayer
51  */
52  const std::string TCPConvergenceLayer::TAG = "TCPConvergenceLayer";
53 
54  const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
55 
57  : _vsocket_state(false), _any_port(0), _stats_in(0), _stats_out(0)
58  {
59  }
60 
62  {
63  // unsubscribe to NetLink events
65 
66  // un-register as discovery handler
68 
69  join();
70 
71  // delete all sockets
72  _vsocket.destroy();
73  }
74 
75  void TCPConvergenceLayer::add(const ibrcommon::vinterface &net, int port) throw ()
76  {
77  // do not allow any futher binding if we already bound to any interface
78  if (_any_port > 0) return;
79 
80  if (net.isAny()) {
81  // bind to any interface
82  _vsocket.add(new ibrcommon::tcpserversocket(port));
83  _any_port = port;
84  } else {
85  listen(net, port);
86  }
87  }
88 
89  void TCPConvergenceLayer::listen(const ibrcommon::vinterface &net, int port) throw ()
90  {
91  try {
92  // add the new interface to internal data-structures
93  {
94  ibrcommon::MutexLock l(_interface_lock);
95 
96  // only add the interface once
97  if (_interfaces.find(net) != _interfaces.end()) return;
98 
99  // store the new interface in the list of interfaces
100  _interfaces.insert(net);
101  }
102 
103  // subscribe to NetLink events on our interfaces
105 
106  // register as discovery handler for this interface
108 
109  // store port of the interface
110  {
111  ibrcommon::MutexLock l(_portmap_lock);
112  _portmap[net] = port;
113  }
114 
115  // create sockets for all addresses on the interface
116  // may throw "interface_not_set"
117  std::list<ibrcommon::vaddress> addrs = net.getAddresses();
118 
119  // convert the port into a string
120  std::stringstream ss; ss << port;
121 
122  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
123  ibrcommon::vaddress &addr = (*iter);
124 
125  // handle the addresses according to their family
126  // may throw "address_exception"
127  try {
128  switch (addr.family()) {
129  case AF_INET:
130  case AF_INET6:
131  {
132  addr.setService(ss.str());
134  if (_vsocket_state) sock->up();
135  _vsocket.add(sock, net);
136 
137  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << "bound to " << net.toString() << " (" << addr.toString() << ", family: " << addr.family() << ")" << IBRCOMMON_LOGGER_ENDL;
138  break;
139  }
140  default:
141  break;
142  }
143  } catch (const ibrcommon::vaddress::address_exception &ex) {
144  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
145  } catch (const ibrcommon::socket_exception &ex) {
146  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
147  }
148  }
149  } catch (const ibrcommon::vinterface::interface_not_set &ex) {
150  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
151  }
152  }
153 
154  void TCPConvergenceLayer::unlisten(const ibrcommon::vinterface &iface) throw ()
155  {
156  ibrcommon::socketset socks = _vsocket.get(iface);
157  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
158  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
159  _vsocket.remove(sock);
160  try {
161  sock->down();
162  } catch (const ibrcommon::socket_exception &ex) {
163  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
164  }
165  delete sock;
166  }
167 
168  {
169  ibrcommon::MutexLock l(_portmap_lock);
170  _portmap.erase(iface);
171  }
172 
173  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << "unbound from " << iface.toString() << IBRCOMMON_LOGGER_ENDL;
174  }
175 
177  {
179  }
180 
182  {
183  ibrcommon::MutexLock l(_interface_lock);
184 
185  // announce port only if we are bound to any interface
186  if (_interfaces.empty() && (_any_port > 0)) {
187  std::stringstream service;
188  // ... set the port only
189  ibrcommon::MutexLock l(_portmap_lock);
190  service << "port=" << _portmap[iface] << ";";
191  beacon.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
192  return;
193  }
194 
195  // determine if we should enable crosslayer discovery by sending out our own address
197 
198  // this marker should set to true if we added an service description
199  bool announced = false;
200 
201  // search for the matching interface
202  for (std::set<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); ++it)
203  {
204  const ibrcommon::vinterface &it_iface = *it;
205  if (it_iface == iface)
206  {
207  try {
208  // check if cross layer discovery is disabled
209  if (!crosslayer) throw ibrcommon::Exception("crosslayer discovery disabled!");
210 
211  // get all addresses of this interface
212  std::list<ibrcommon::vaddress> list = it_iface.getAddresses();
213 
214  // if no address is returned... (goto catch block)
215  if (list.empty()) throw ibrcommon::Exception("no address found");
216 
217  for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
218  {
219  const ibrcommon::vaddress &addr = (*addr_it);
220 
221  if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL) continue;
222 
223  try {
224  // do not announce non-IP addresses
225  sa_family_t f = addr.family();
226  if ((f != AF_INET) && (f != AF_INET6)) continue;
227 
228  std::stringstream service;
229  // fill in the ip address
230  ibrcommon::MutexLock l(_portmap_lock);
231  service << "ip=" << addr.address() << ";port=" << _portmap[iface] << ";";
232  beacon.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
233 
234  // set the announce mark
235  announced = true;
236  } catch (const ibrcommon::vaddress::address_exception &ex) {
237  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
238  }
239  }
240  } catch (const ibrcommon::Exception &ex) {
241  // address collection process aborted
242  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 65) << "Address collection aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
243  };
244 
245  // if we still not announced anything...
246  if (!announced) {
247  std::stringstream service;
248  // ... set the port only
249  ibrcommon::MutexLock l(_portmap_lock);
250  service << "port=" << _portmap[iface] << ";";
251  beacon.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
252  }
253  return;
254  }
255  }
256 
258  }
259 
260  const std::string TCPConvergenceLayer::getName() const
261  {
262  return TCPConvergenceLayer::TAG;
263  }
264 
266  {
267  try {
268  const dtn::net::P2PDialupEvent &dialup = dynamic_cast<const dtn::net::P2PDialupEvent&>(*evt);
269 
270  switch (dialup.type)
271  {
273  {
274  // listen to the new interface
275  listen(dialup.iface, 4556);
276  break;
277  }
278 
280  {
281  // check if the interface is bound by us
282  {
283  ibrcommon::MutexLock l(_interface_lock);
284 
285  // only remove the interface if it exists
286  if (_interfaces.find(dialup.iface) == _interfaces.end()) return;
287 
288  // remove the interface from the stored set
289  _interfaces.erase(dialup.iface);
290  }
291 
292  // un-subscribe to NetLink events on our interfaces
294 
295  // un-register as discovery handler for this interface
297 
298  // remove all sockets on this interface
299  unlisten(dialup.iface);
300  break;
301  }
302  }
303  } catch (std::bad_cast&) {
304 
305  }
306  }
307 
309  {
310  // do not do anything if we are bound on all interfaces
311  if (_any_port > 0) return;
312 
313  {
314  ibrcommon::MutexLock l(_interface_lock);
315  if (_interfaces.find(evt.getInterface()) == _interfaces.end()) return;
316  }
317 
318  switch (evt.getAction())
319  {
321  {
322  ibrcommon::vaddress bindaddr = evt.getAddress();
323  // convert the port into a string
324  ibrcommon::MutexLock l(_portmap_lock);
325  std::stringstream ss; ss << _portmap[evt.getInterface()];
326  bindaddr.setService(ss.str());
328  try {
329  sock->up();
330  _vsocket.add(sock, evt.getInterface());
331  } catch (const ibrcommon::socket_exception&) {
332  delete sock;
333  }
334  break;
335  }
336 
338  {
339  ibrcommon::socketset socks = _vsocket.getAll();
340  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
341  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
342  if (sock->get_address().address() == evt.getAddress().address()) {
343  _vsocket.remove(sock);
344  sock->down();
345  delete sock;
346  break;
347  }
348  }
349  break;
350  }
351 
353  {
354  // remove all sockets on this interface
355  const ibrcommon::vinterface &iface = evt.getInterface();
356 
357  ibrcommon::socketset socks = _vsocket.get(iface);
358  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
359  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
360  _vsocket.remove(sock);
361  try {
362  sock->down();
363  } catch (const ibrcommon::socket_exception &ex) {
364  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
365  }
366  delete sock;
367  }
368  break;
369  }
370 
371  default:
372  break;
373  }
374  }
375 
377  {
378  // search for an existing connection
379  ibrcommon::MutexLock l(_connections_cond);
380 
381  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
382  {
383  TCPConnection &conn = *(*iter);
384 
385  if (conn.match(n))
386  {
387  return;
388  }
389  }
390 
391  try {
392  // create a connection
393  TCPConnection *conn = new TCPConnection(*this, n, NULL, 10);
394 
395 #ifdef WITH_TLS
396  // enable TLS Support
398  {
399  conn->enableTLS();
400  }
401 #endif
402 
403  // raise setup event
405 
406  // add connection as pending
407  _connections.push_back( conn );
408 
409  // start the ClientHandler (service)
410  conn->initialize();
411 
412  // signal that there is a new connection
413  _connections_cond.signal(true);
414  } catch (const ibrcommon::Exception&) { };
415 
416  return;
417  }
418 
420  {
421  // search for an existing connection
422  ibrcommon::MutexLock l(_connections_cond);
423 
424  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
425  {
426  TCPConnection &conn = *(*iter);
427 
428  if (conn.match(n))
429  {
430  conn.queue(job);
431  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
432 
433  return;
434  }
435  }
436 
437  try {
438  // create a connection
439  TCPConnection *conn = new TCPConnection(*this, n, NULL, 10);
440 
441 #ifdef WITH_TLS
442  // enable TLS Support
444  {
445  conn->enableTLS();
446  }
447 #endif
448 
449  // raise setup event
451 
452  // add connection as pending
453  _connections.push_back( conn );
454 
455  // start the ClientHandler (service)
456  conn->initialize();
457 
458  // queue the bundle
459  conn->queue(job);
460 
461  // signal that there is a new connection
462  _connections_cond.signal(true);
463 
464  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
465  } catch (const ibrcommon::Exception&) {
466 
467  }
468  }
469 
470  void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
471  {
472  ibrcommon::MutexLock l(_connections_cond);
473  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
474  {
475  if (conn == (*iter))
476  {
477  // put pending connection to the active connections
478  return;
479  }
480  }
481 
482  _connections.push_back( conn );
483 
484  // signal that there is a new connection
485  _connections_cond.signal(true);
486 
487  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
488  }
489 
490  void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
491  {
492  ibrcommon::MutexLock l(_connections_cond);
493  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
494  {
495  if (conn == (*iter))
496  {
497  _connections.erase(iter);
498  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
499 
500  // signal that there is a connection less
501  _connections_cond.signal(true);
502  return;
503  }
504  }
505  }
506 
507  void TCPConvergenceLayer::addTrafficIn(size_t amount) throw ()
508  {
509  ibrcommon::MutexLock l(_stats_lock);
510  _stats_in += amount;
511  }
512 
513  void TCPConvergenceLayer::addTrafficOut(size_t amount) throw ()
514  {
515  ibrcommon::MutexLock l(_stats_lock);
516  _stats_out += amount;
517  }
518 
520  {
521  try {
522  while (true)
523  {
524  ibrcommon::socketset readfds;
525 
526  // wait for incoming connections
527  _vsocket.select(&readfds, NULL, NULL, NULL);
528 
529  for (ibrcommon::socketset::iterator iter = readfds.begin(); iter != readfds.end(); ++iter) {
530  try {
531  // assume that all sockets are serversockets
532  ibrcommon::serversocket &sock = dynamic_cast<ibrcommon::serversocket&>(**iter);
533 
534  // wait for incoming connections
535  ibrcommon::vaddress peeraddr;
536  ibrcommon::clientsocket *client = sock.accept(peeraddr);
537 
538  // create a EID based on the peer address
539  dtn::data::EID source("tcp://" + peeraddr.address() + ":" + peeraddr.service());
540 
541  // create a new node object
542  dtn::core::Node node(source);
543 
544  // add TCP connection
545  const std::string uri = "ip=" + peeraddr.address() + ";port=" + peeraddr.service() + ";";
546  node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri, 0, 30) );
547 
548  // create a new TCPConnection and return the pointer
549  TCPConnection *obj = new TCPConnection(*this, node, client, 10);
550 
551 #ifdef WITH_TLS
552  // enable TLS Support
554  {
555  obj->enableTLS();
556  }
557 #endif
558 
559  // add the connection to the connection list
560  connectionUp(obj);
561 
562  // initialize the object
563  obj->initialize();
564  } catch (const std::bad_cast&) {
565 
566  }
567  }
568 
569  // breakpoint
571  }
572  } catch (const std::exception&) {
573  // ignore all errors
574  return;
575  }
576  }
577 
579  {
580  _vsocket.down();
581  }
582 
583  void TCPConvergenceLayer::closeAll()
584  {
585  // search for an existing connection
586  ibrcommon::MutexLock l(_connections_cond);
587  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
588  {
589  TCPConnection &conn = *(*iter);
590 
591  // close the connection immediately
592  conn.shutdown();
593  }
594  }
595 
597  {
598  // listen on P2P dial-up events
600 
601  // routine checked for throw() on 15.02.2013
602  try {
603  // listen on the socket
604  _vsocket.up();
605  _vsocket_state = true;
606  } catch (const ibrcommon::socket_exception &ex) {
607  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) << "bind failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
608  }
609  }
610 
612  {
613  // un-listen on P2P dial-up events
615 
616  // routine checked for throw() on 15.02.2013
617  try {
618  // shutdown all sockets
619  _vsocket.down();
620  _vsocket_state = false;
621  } catch (const ibrcommon::socket_exception &ex) {
622  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) << "shutdown failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
623  }
624 
625  // close all active connections
626  closeAll();
627 
628  // wait until all tcp connections are down
629  {
630  ibrcommon::MutexLock l(_connections_cond);
631  while (_connections.size() > 0) _connections_cond.wait();
632  }
633  }
634  }
635 
636  void TCPConvergenceLayer::getStats(ConvergenceLayer::stats_data &data) const
637  {
638  std::stringstream ss_format;
639 
640  static const std::string IN_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|in";
641  static const std::string OUT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|out";
642 
643  ss_format << _stats_in;
644  data[IN_TAG] = ss_format.str();
645  ss_format.str("");
646 
647  ss_format << _stats_out;
648  data[OUT_TAG] = ss_format.str();
649  }
650 
651  void TCPConvergenceLayer::resetStats()
652  {
653  _stats_in = 0;
654  _stats_out = 0;
655  }
656 }