IBR-DTNSuite  0.10
TCPConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * TCPConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "net/P2PDialupEvent.h"
25 #include "net/ConnectionEvent.h"
26 #include "core/BundleCore.h"
27 #include "core/EventDispatcher.h"
28 
31 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/net/socket.h>
33 #include <ibrcommon/Logger.h>
34 
35 #include <streambuf>
36 #include <functional>
37 #include <list>
38 #include <algorithm>
39 
40 #ifdef WITH_TLS
42 #endif
43 
44 namespace dtn
45 {
46  namespace net
47  {
48  /*
49  * class TCPConvergenceLayer
50  */
51  const std::string TCPConvergenceLayer::TAG = "TCPConvergenceLayer";
52 
53  const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
54 
56  : _vsocket_state(false), _any_port(0)
57  {
58  }
59 
61  {
62  // unsubscribe to NetLink events
64 
65  join();
66 
67  // delete all sockets
68  _vsocket.destroy();
69  }
70 
71  void TCPConvergenceLayer::add(const ibrcommon::vinterface &net, int port) throw ()
72  {
73  // do not allow any futher binding if we already bound to any interface
74  if (_any_port > 0) return;
75 
76  if (net.empty()) {
77  // bind to any interface
78  _vsocket.add(new ibrcommon::tcpserversocket(port));
79  _any_port = port;
80  } else {
81  listen(net, port);
82  }
83  }
84 
85  void TCPConvergenceLayer::listen(const ibrcommon::vinterface &net, int port) throw ()
86  {
87  try {
88  // add the new interface to internal data-structures
89  {
90  ibrcommon::MutexLock l(_interface_lock);
91 
92  // only add the interface once
93  if (_interfaces.find(net) != _interfaces.end()) return;
94 
95  // store the new interface in the list of interfaces
96  _interfaces.insert(net);
97  }
98 
99  // subscribe to NetLink events on our interfaces
101 
102  // store port of the interface
103  {
104  ibrcommon::MutexLock l(_portmap_lock);
105  _portmap[net] = port;
106  }
107 
108  // create sockets for all addresses on the interface
109  // may throw "interface_not_set"
110  std::list<ibrcommon::vaddress> addrs = net.getAddresses();
111 
112  // convert the port into a string
113  std::stringstream ss; ss << port;
114 
115  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
116  ibrcommon::vaddress &addr = (*iter);
117 
118  // handle the addresses according to their family
119  // may throw "address_exception"
120  try {
121  switch (addr.family()) {
122  case AF_INET:
123  case AF_INET6:
124  {
125  addr.setService(ss.str());
127  if (_vsocket_state) sock->up();
128  _vsocket.add(sock, net);
129  break;
130  }
131  default:
132  break;
133  }
134  } catch (const ibrcommon::vaddress::address_exception &ex) {
135  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
136  } catch (const ibrcommon::socket_exception &ex) {
137  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
138  }
139  }
140 
141  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << "bound to " << net.toString() << IBRCOMMON_LOGGER_ENDL;
142  } catch (const ibrcommon::vinterface::interface_not_set &ex) {
143  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
144  }
145  }
146 
147  void TCPConvergenceLayer::unlisten(const ibrcommon::vinterface &iface) throw ()
148  {
149  ibrcommon::socketset socks = _vsocket.get(iface);
150  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
151  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
152  _vsocket.remove(sock);
153  try {
154  sock->down();
155  } catch (const ibrcommon::socket_exception &ex) {
156  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
157  }
158  delete sock;
159  }
160 
161  {
162  ibrcommon::MutexLock l(_portmap_lock);
163  _portmap.erase(iface);
164  }
165 
166  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << "unbound from " << iface.toString() << IBRCOMMON_LOGGER_ENDL;
167  }
168 
170  {
172  }
173 
175  {
176  ibrcommon::MutexLock l(_interface_lock);
177 
178  // announce port only if we are bound to any interface
179  if (_interfaces.empty() && (_any_port > 0)) {
180  std::stringstream service;
181  // ... set the port only
182  ibrcommon::MutexLock l(_portmap_lock);
183  service << "port=" << _portmap[iface] << ";";
184  announcement.addService( DiscoveryService("tcpcl", service.str()));
185  return;
186  }
187 
188  // determine if we should enable crosslayer discovery by sending out our own address
190 
191  // this marker should set to true if we added an service description
192  bool announced = false;
193 
194  // search for the matching interface
195  for (std::set<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); ++it)
196  {
197  const ibrcommon::vinterface &it_iface = *it;
198  if (it_iface == iface)
199  {
200  try {
201  // check if cross layer discovery is disabled
202  if (!crosslayer) throw ibrcommon::Exception("crosslayer discovery disabled!");
203 
204  // get all addresses of this interface
205  std::list<ibrcommon::vaddress> list = it_iface.getAddresses();
206 
207  // if no address is returned... (goto catch block)
208  if (list.empty()) throw ibrcommon::Exception("no address found");
209 
210  for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
211  {
212  const ibrcommon::vaddress &addr = (*addr_it);
213 
214  if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL) continue;
215 
216  try {
217  // do not announce non-IP addresses
218  sa_family_t f = addr.family();
219  if ((f != AF_INET) && (f != AF_INET6)) continue;
220 
221  std::stringstream service;
222  // fill in the ip address
223  ibrcommon::MutexLock l(_portmap_lock);
224  service << "ip=" << addr.address() << ";port=" << _portmap[iface] << ";";
225  announcement.addService( DiscoveryService("tcpcl", service.str()));
226 
227  // set the announce mark
228  announced = true;
229  } catch (const ibrcommon::vaddress::address_exception &ex) {
230  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
231  }
232  }
233  } catch (const ibrcommon::Exception &ex) {
234  // address collection process aborted
235  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 65) << "Address collection aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
236  };
237 
238  // if we still not announced anything...
239  if (!announced) {
240  std::stringstream service;
241  // ... set the port only
242  ibrcommon::MutexLock l(_portmap_lock);
243  service << "port=" << _portmap[iface] << ";";
244  announcement.addService( DiscoveryService("tcpcl", service.str()));
245  }
246  return;
247  }
248  }
249 
251  }
252 
253  const std::string TCPConvergenceLayer::getName() const
254  {
255  return TCPConvergenceLayer::TAG;
256  }
257 
258  void TCPConvergenceLayer::raiseEvent(const Event *evt) throw ()
259  {
260  try {
261  const dtn::net::P2PDialupEvent &dialup = dynamic_cast<const dtn::net::P2PDialupEvent&>(*evt);
262 
263  switch (dialup.type)
264  {
266  {
267  // listen to the new interface
268  listen(dialup.iface, 4556);
269  break;
270  }
271 
273  {
274  // check if the interface is bound by us
275  {
276  ibrcommon::MutexLock l(_interface_lock);
277 
278  // only remove the interface if it exists
279  if (_interfaces.find(dialup.iface) == _interfaces.end()) return;
280 
281  // remove the interface from the stored set
282  _interfaces.erase(dialup.iface);
283  }
284 
285  // un-subscribe to NetLink events on our interfaces
287 
288  // remove all sockets on this interface
289  unlisten(dialup.iface);
290  break;
291  }
292  }
293  } catch (std::bad_cast&) {
294 
295  }
296  }
297 
299  {
300  // do not do anything if we are bound on all interfaces
301  if (_any_port > 0) return;
302 
303  {
304  ibrcommon::MutexLock l(_interface_lock);
305  if (_interfaces.find(evt.getInterface()) == _interfaces.end()) return;
306  }
307 
308  switch (evt.getAction())
309  {
311  {
312  ibrcommon::vaddress bindaddr = evt.getAddress();
313  // convert the port into a string
314  ibrcommon::MutexLock l(_portmap_lock);
315  std::stringstream ss; ss << _portmap[evt.getInterface()];
316  bindaddr.setService(ss.str());
318  try {
319  sock->up();
320  _vsocket.add(sock, evt.getInterface());
321  } catch (const ibrcommon::socket_exception&) {
322  delete sock;
323  }
324  break;
325  }
326 
328  {
329  ibrcommon::socketset socks = _vsocket.getAll();
330  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
331  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
332  if (sock->get_address().address() == evt.getAddress().address()) {
333  _vsocket.remove(sock);
334  sock->down();
335  delete sock;
336  break;
337  }
338  }
339  break;
340  }
341 
343  {
344  // remove all sockets on this interface
345  const ibrcommon::vinterface &iface = evt.getInterface();
346 
347  ibrcommon::socketset socks = _vsocket.get(iface);
348  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
349  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
350  _vsocket.remove(sock);
351  try {
352  sock->down();
353  } catch (const ibrcommon::socket_exception &ex) {
354  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
355  }
356  delete sock;
357  }
358  break;
359  }
360 
361  default:
362  break;
363  }
364  }
365 
367  {
368  // search for an existing connection
369  ibrcommon::MutexLock l(_connections_cond);
370 
371  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
372  {
373  TCPConnection &conn = *(*iter);
374 
375  if (conn.match(n))
376  {
377  return;
378  }
379  }
380 
381  try {
382  // create a connection
383  TCPConnection *conn = new TCPConnection(*this, n, NULL, 10);
384 
385 #ifdef WITH_TLS
386  // enable TLS Support
388  {
389  conn->enableTLS();
390  }
391 #endif
392 
393  // raise setup event
395 
396  // add connection as pending
397  _connections.push_back( conn );
398 
399  // start the ClientHandler (service)
400  conn->initialize();
401 
402  // signal that there is a new connection
403  _connections_cond.signal(true);
404  } catch (const ibrcommon::Exception&) { };
405 
406  return;
407  }
408 
410  {
411  // search for an existing connection
412  ibrcommon::MutexLock l(_connections_cond);
413 
414  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
415  {
416  TCPConnection &conn = *(*iter);
417 
418  if (conn.match(n))
419  {
420  conn.queue(job);
421  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
422 
423  return;
424  }
425  }
426 
427  try {
428  // create a connection
429  TCPConnection *conn = new TCPConnection(*this, n, NULL, 10);
430 
431 #ifdef WITH_TLS
432  // enable TLS Support
434  {
435  conn->enableTLS();
436  }
437 #endif
438 
439  // raise setup event
441 
442  // add connection as pending
443  _connections.push_back( conn );
444 
445  // start the ClientHandler (service)
446  conn->initialize();
447 
448  // queue the bundle
449  conn->queue(job);
450 
451  // signal that there is a new connection
452  _connections_cond.signal(true);
453 
454  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
455  } catch (const ibrcommon::Exception&) {
456 
457  }
458  }
459 
460  void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
461  {
462  ibrcommon::MutexLock l(_connections_cond);
463  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
464  {
465  if (conn == (*iter))
466  {
467  // put pending connection to the active connections
468  return;
469  }
470  }
471 
472  _connections.push_back( conn );
473 
474  // signal that there is a new connection
475  _connections_cond.signal(true);
476 
477  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
478  }
479 
480  void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
481  {
482  ibrcommon::MutexLock l(_connections_cond);
483  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
484  {
485  if (conn == (*iter))
486  {
487  _connections.erase(iter);
488  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
489 
490  // signal that there is a connection less
491  _connections_cond.signal(true);
492  return;
493  }
494  }
495  }
496 
498  {
499  try {
500  while (true)
501  {
502  ibrcommon::socketset readfds;
503 
504  // wait for incoming connections
505  _vsocket.select(&readfds, NULL, NULL, NULL);
506 
507  for (ibrcommon::socketset::iterator iter = readfds.begin(); iter != readfds.end(); ++iter) {
508  try {
509  // assume that all sockets are serversockets
510  ibrcommon::serversocket &sock = dynamic_cast<ibrcommon::serversocket&>(**iter);
511 
512  // wait for incoming connections
513  ibrcommon::vaddress peeraddr;
514  ibrcommon::clientsocket *client = sock.accept(peeraddr);
515 
516  // create a EID based on the peer address
517  dtn::data::EID source("tcp://" + peeraddr.address() + ":" + peeraddr.service());
518 
519  // create a new node object
520  dtn::core::Node node(source);
521 
522  // add TCP connection
523  const std::string uri = "ip=" + peeraddr.address() + ";port=" + peeraddr.service() + ";";
524  node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri, 0, 30) );
525 
526  // create a new TCPConnection and return the pointer
527  TCPConnection *obj = new TCPConnection(*this, node, client, 10);
528 
529 #ifdef WITH_TLS
530  // enable TLS Support
532  {
533  obj->enableTLS();
534  }
535 #endif
536 
537  // add the connection to the connection list
538  connectionUp(obj);
539 
540  // initialize the object
541  obj->initialize();
542  } catch (const std::bad_cast&) {
543 
544  }
545  }
546 
547  // breakpoint
549  }
550  } catch (const std::exception&) {
551  // ignore all errors
552  return;
553  }
554  }
555 
557  {
558  _vsocket.down();
559  }
560 
561  void TCPConvergenceLayer::closeAll()
562  {
563  // search for an existing connection
564  ibrcommon::MutexLock l(_connections_cond);
565  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
566  {
567  TCPConnection &conn = *(*iter);
568 
569  // close the connection immediately
570  conn.shutdown();
571  }
572  }
573 
575  {
576  // listen on P2P dial-up events
578 
579  // routine checked for throw() on 15.02.2013
580  try {
581  // listen on the socket
582  _vsocket.up();
583  _vsocket_state = true;
584  } catch (const ibrcommon::socket_exception &ex) {
585  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) << "bind failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
586  }
587  }
588 
590  {
591  // un-listen on P2P dial-up events
593 
594  // routine checked for throw() on 15.02.2013
595  try {
596  // shutdown all sockets
597  _vsocket.down();
598  _vsocket_state = false;
599  } catch (const ibrcommon::socket_exception &ex) {
600  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) << "shutdown failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
601  }
602 
603  // close all active connections
604  closeAll();
605 
606  // wait until all tcp connections are down
607  {
608  ibrcommon::MutexLock l(_connections_cond);
609  while (_connections.size() > 0) _connections_cond.wait();
610  }
611  }
612  }
613 
614  const ConvergenceLayer::stats_map& TCPConvergenceLayer::getStats()
615  {
616  size_t in = 0;
617  size_t out = 0;
618 
619  // collect stats of all connections
620  ibrcommon::MutexLock l(_connections_cond);
621 
622  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
623  {
624  TCPConnection &conn = *(*iter);
625  in += conn.getTrafficStats(0);
626  out += conn.getTrafficStats(1);
627  conn.resetTrafficStats();
628  }
629 
630  addStats("in", in);
631  addStats("out", out);
632 
633  return ConvergenceLayer::getStats();
634  }
635 
636  void TCPConvergenceLayer::resetStats()
637  {
638  // reset the stats of all connections
639  ibrcommon::MutexLock l(_connections_cond);
640 
641  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
642  {
643  TCPConnection &conn = *(*iter);
644  conn.resetTrafficStats();
645  }
646  }
647 }