IBR-DTNSuite  0.10
IPNDAgent.cpp
Go to the documentation of this file.
1 /*
2  * IPNDAgent.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 
24 #include "net/IPNDAgent.h"
25 #include "net/P2PDialupEvent.h"
26 #include "core/BundleCore.h"
27 #include "core/EventDispatcher.h"
28 
29 #include <ibrdtn/data/Exceptions.h>
30 
31 #include <ibrcommon/Logger.h>
33 #include <ibrcommon/net/socket.h>
34 #include <ibrcommon/net/vaddress.h>
36 
37 #include <sstream>
38 #include <string.h>
39 #include <typeinfo>
40 #include <time.h>
41 
42 namespace dtn
43 {
44  namespace net
45  {
46  const std::string IPNDAgent::TAG = "IPNDAgent";
47 
49  : DiscoveryAgent(dtn::daemon::Configuration::getInstance().getDiscovery()),
50  _version(DiscoveryAnnouncement::DISCO_VERSION_01), _send_socket_state(false)
51  {
52  // bind to receive socket
53  _recv_socket.add(new ibrcommon::multicastsocket(port));
54 
55  switch (_config.version())
56  {
57  case 2:
59  break;
60 
61  case 1:
63  break;
64 
65  case 0:
66  IBRCOMMON_LOGGER_TAG("DiscoveryAgent", info) << "DTN2 compatibility mode" << IBRCOMMON_LOGGER_ENDL;
68  break;
69  };
70  }
71 
73  {
74  _send_socket.destroy();
75  _recv_socket.destroy();
76  }
77 
78  void IPNDAgent::add(const ibrcommon::vaddress &address) {
79  IBRCOMMON_LOGGER_TAG("DiscoveryAgent", info) << "listen to " << address.toString() << IBRCOMMON_LOGGER_ENDL;
80  _destinations.insert(address);
81  }
82 
84  {
85  IBRCOMMON_LOGGER_TAG("DiscoveryAgent", info) << "add interface " << net.toString() << IBRCOMMON_LOGGER_ENDL;
86 
87  // add the interface to the stored set
88  ibrcommon::MutexLock l(_interface_lock);
89 
90  // only add the interface once
91  if (_interfaces.find(net) != _interfaces.end()) return;
92 
93  // store the new interface in the list of interfaces
94  _interfaces.insert(net);
95  }
96 
97  void IPNDAgent::leave_interface(const ibrcommon::vinterface &iface) throw ()
98  {
99  ibrcommon::multicastsocket &msock = dynamic_cast<ibrcommon::multicastsocket&>(**_recv_socket.getAll().begin());
100 
101  for (std::set<ibrcommon::vaddress>::const_iterator it_addr = _destinations.begin(); it_addr != _destinations.end(); ++it_addr)
102  {
103  try {
104  msock.leave(*it_addr, iface);
105  } catch (const ibrcommon::socket_raw_error &e) {
106  if (e.error() != EADDRNOTAVAIL) {
107  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "leave failed on " << iface.toString() << "; " << e.what() << IBRCOMMON_LOGGER_ENDL;
108  }
109  } catch (const ibrcommon::socket_exception &e) {
110  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 10) << "can not leave " << (*it_addr).toString() << " on " << iface.toString() << IBRCOMMON_LOGGER_ENDL;
111  } catch (const ibrcommon::Exception&) {
112  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "can not leave " << (*it_addr).toString() << " on " << iface.toString() << IBRCOMMON_LOGGER_ENDL;
113  }
114  }
115  }
116 
117  void IPNDAgent::join_interface(const ibrcommon::vinterface &iface) throw ()
118  {
119  ibrcommon::multicastsocket &msock = dynamic_cast<ibrcommon::multicastsocket&>(**_recv_socket.getAll().begin());
120 
121  for (std::set<ibrcommon::vaddress>::const_iterator it_addr = _destinations.begin(); it_addr != _destinations.end(); ++it_addr)
122  {
123  try {
124  msock.join(*it_addr, iface);
125  } catch (const ibrcommon::socket_raw_error &e) {
126  if (e.error() != EADDRINUSE) {
127  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "join failed on " << iface.toString() << "; " << e.what() << IBRCOMMON_LOGGER_ENDL;
128  }
129  } catch (const ibrcommon::socket_exception &e) {
130  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 10) << "can not join " << (*it_addr).toString() << " on " << iface.toString() << "; " << e.what() << IBRCOMMON_LOGGER_ENDL;
131  }
132  }
133  }
134 
135  void IPNDAgent::send(const DiscoveryAnnouncement &a, const ibrcommon::vinterface &iface, const ibrcommon::vaddress &addr)
136  {
137  // serialize announcement
138  stringstream ss; ss << a;
139  const std::string data = ss.str();
140 
141  ibrcommon::socketset fds = _send_socket.get(iface);
142 
143  // send out discovery announcements on all bound sockets
144  // (hopefully only one per interface)
145  for (ibrcommon::socketset::const_iterator iter = fds.begin(); iter != fds.end(); ++iter)
146  {
147  try {
148  ibrcommon::udpsocket &sock = dynamic_cast<ibrcommon::udpsocket&>(**iter);
149 
150  try {
151  // prevent broadcasting in the wrong address family
152  if (addr.family() != sock.get_family()) continue;
153 
154  sock.sendto(data.c_str(), data.length(), 0, addr);
155  } catch (const ibrcommon::socket_exception &e) {
156  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 5) << "can not send message to " << addr.toString() << " via " << sock.get_address().toString() << "/" << iface.toString() << "; socket exception: " << e.what() << IBRCOMMON_LOGGER_ENDL;
157  } catch (const ibrcommon::vaddress::address_exception &ex) {
159  }
160  } catch (const std::bad_cast&) {
161  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "Socket for sending isn't a udpsocket." << IBRCOMMON_LOGGER_ENDL;
162  }
163  }
164  }
165 
166  void IPNDAgent::sendAnnoucement(const uint16_t &sn, std::list<dtn::net::DiscoveryServiceProvider*> &providers)
167  {
168  DiscoveryAnnouncement announcement(_version, dtn::core::BundleCore::local);
169 
170  // set sequencenumber
171  announcement.setSequencenumber(sn);
172 
173  ibrcommon::MutexLock l(_interface_lock);
174 
175  for (std::set<ibrcommon::vinterface>::const_iterator it_iface = _interfaces.begin(); it_iface != _interfaces.end(); ++it_iface)
176  {
177  const ibrcommon::vinterface &iface = (*it_iface);
178 
179  // clear all services
180  announcement.clearServices();
181 
182  if (!_config.shortbeacon())
183  {
184  // add services
185  for (std::list<DiscoveryServiceProvider*>::iterator iter = providers.begin(); iter != providers.end(); ++iter)
186  {
187  DiscoveryServiceProvider &provider = (**iter);
188 
189  try {
190  // update service information
191  provider.update(iface, announcement);
193 
194  }
195  }
196  }
197 
198  for (std::set<ibrcommon::vaddress>::iterator iter = _destinations.begin(); iter != _destinations.end(); ++iter) {
199  send(announcement, iface, (*iter));
200  }
201  }
202  }
203 
204  void IPNDAgent::listen(const ibrcommon::vinterface &iface) throw ()
205  {
206  // create sockets for all addresses on the interface
207  std::list<ibrcommon::vaddress> addrs = iface.getAddresses();
208 
209  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
210  ibrcommon::vaddress &addr = (*iter);
211 
212  try {
213  // handle the addresses according to their family
214  switch (addr.family()) {
215  case AF_INET:
216  case AF_INET6:
217  {
218  ibrcommon::udpsocket *sock = new ibrcommon::udpsocket(addr);
219  if (_send_socket_state) sock->up();
220  _send_socket.add(sock, iface);
221  break;
222  }
223  default:
224  break;
225  }
226  } catch (const ibrcommon::vaddress::address_exception &ex) {
228  } catch (const ibrcommon::socket_exception &ex) {
230  }
231  }
232  }
233 
234  void IPNDAgent::unlisten(const ibrcommon::vinterface &iface) throw ()
235  {
236  ibrcommon::socketset socks = _send_socket.get(iface);
237  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
238  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
239  _send_socket.remove(sock);
240  try {
241  sock->down();
242  } catch (const ibrcommon::socket_exception &ex) {
244  }
245  delete sock;
246  }
247  }
248 
249  void IPNDAgent::raiseEvent(const Event *evt) throw ()
250  {
251  try {
252  const dtn::net::P2PDialupEvent &dialup = dynamic_cast<const dtn::net::P2PDialupEvent&>(*evt);
253 
254  switch (dialup.type)
255  {
257  {
258  // add the interface to the stored set
259  {
260  ibrcommon::MutexLock l(_interface_lock);
261 
262  // only add the interface once
263  if (_interfaces.find(dialup.iface) != _interfaces.end()) break;
264 
265  // store the new interface in the list of interfaces
266  _interfaces.insert(dialup.iface);
267  }
268 
269  // subscribe to NetLink events on our interfaces
271 
272  // listen to the new interface
273  listen(dialup.iface);
274 
275  // join to all multicast addresses on this interface
276  join_interface(dialup.iface);
277  break;
278  }
279 
281  {
282  // check if the interface is bound by us
283  {
284  ibrcommon::MutexLock l(_interface_lock);
285 
286  // only remove the interface if it exists
287  if (_interfaces.find(dialup.iface) == _interfaces.end()) break;
288 
289  // remove the interface from the stored set
290  _interfaces.erase(dialup.iface);
291  }
292 
293  // subscribe to NetLink events on our interfaces
295 
296  // leave the multicast groups on the interface
297  leave_interface(dialup.iface);
298 
299  // remove all sockets on this interface
300  unlisten(dialup.iface);
301  break;
302  }
303  }
304  } catch (std::bad_cast&) {
305 
306  }
307  }
308 
310  {
311  // check first if we are really bound to the interface
312  {
313  ibrcommon::MutexLock l(_interface_lock);
314  if (_interfaces.find(evt.getInterface()) == _interfaces.end()) return;
315  }
316 
317  switch (evt.getAction())
318  {
320  {
321  const ibrcommon::vaddress &bindaddr = evt.getAddress();
322  ibrcommon::udpsocket *sock = new ibrcommon::udpsocket(bindaddr);
323  try {
324  sock->up();
325  _send_socket.add(sock, evt.getInterface());
326  join_interface(evt.getInterface());
327  } catch (const ibrcommon::socket_exception&) {
328  delete sock;
329  }
330  break;
331  }
332 
334  {
335  ibrcommon::socketset socks = _send_socket.get(evt.getInterface());
336  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
337  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
338  if (sock->get_address().address() == evt.getAddress().address()) {
339  // leave interfaces if a interface is totally gone
340  if (socks.size() == 1) {
341  leave_interface(evt.getInterface());
342  }
343 
344  _send_socket.remove(sock);
345  sock->down();
346  delete sock;
347  break;
348  }
349  }
350  break;
351  }
352 
354  {
355  // leave the multicast groups on the interface
356  leave_interface(evt.getInterface());
357 
358  // remove all sockets on this interface
359  unlisten(evt.getInterface());
360  break;
361  }
362 
363  default:
364  break;
365  }
366  }
367 
368  void IPNDAgent::componentUp() throw ()
369  {
370  // routine checked for throw() on 15.02.2013
371 
372  try {
373  // setup the receive socket
374  _recv_socket.up();
375  } catch (const ibrcommon::socket_exception &ex) {
377  }
378 
379  // join multicast groups
380  try {
381  ibrcommon::MutexLock l(_interface_lock);
382 
383  for (std::set<ibrcommon::vinterface>::const_iterator it_iface = _interfaces.begin(); it_iface != _interfaces.end(); ++it_iface)
384  {
385  const ibrcommon::vinterface &iface = (*it_iface);
386 
387  // subscribe to NetLink events on our interfaces
389 
390  // listen on the interface
391  listen(iface);
392 
393  // join to all multicast addresses on this interface
394  join_interface(iface);
395  }
396  } catch (std::bad_cast&) {
397  throw ibrcommon::socket_exception("no multicast socket found");
398  }
399 
400  try {
401  // setup all send sockets
402  _send_socket.up();
403 
404  // mark the send socket as up
405  _send_socket_state = true;
406  } catch (const ibrcommon::socket_exception &ex) {
408  }
409 
410  // listen on P2P dial-up events
412  }
413 
414  void IPNDAgent::componentDown() throw ()
415  {
416  // un-listen on P2P dial-up events
418 
419  // unsubscribe to NetLink events
421 
422  // shutdown the send socket
423  _send_socket.destroy();
424 
425  // mark the send socket as down
426  _send_socket_state = false;
427 
428  // shutdown the receive socket
429  _recv_socket.down();
430 
431  stop();
432  join();
433  }
434 
435  void IPNDAgent::componentRun() throw ()
436  {
438  tm.start();
439 
440  while (true)
441  {
443 
444  struct timeval tv;
445 
446  // every second we want to transmit a discovery message, timeout of 1 seconds
447  tv.tv_sec = 0;
448  tv.tv_usec = 100000;
449 
450  try {
451  // select on all bound sockets
452  _recv_socket.select(&fds, NULL, NULL, &tv);
453 
454  // receive from all sockets
455  for (ibrcommon::socketset::const_iterator iter = fds.begin(); iter != fds.end(); ++iter)
456  {
457  ibrcommon::multicastsocket &sock = dynamic_cast<ibrcommon::multicastsocket&>(**iter);
458 
459  char data[1500];
460  ibrcommon::vaddress sender;
461  DiscoveryAnnouncement announce(_version);
462 
463  std::list<DiscoveryService> ret_services;
464  dtn::data::EID ret_source;
465 
466  ssize_t len = sock.recvfrom(data, 1500, 0, sender);
467 
468  if (len < 0) return;
469 
470  stringstream ss;
471  ss.write(data, len);
472 
473  try {
474  ss >> announce;
475 
476  if (announce.isShort())
477  {
478  // generate name with the sender address
479  ret_source = dtn::data::EID("udp://[" + sender.address() + "]:4556");
480  }
481  else
482  {
483  ret_source = announce.getEID();
484  }
485 
486  // get the list of services
487  const std::list<DiscoveryService> &services = announce.getServices();
488 
489  if (services.empty() && announce.isShort())
490  {
491  ret_services.push_back(dtn::net::DiscoveryService("tcpcl", "ip=" + sender.address() + ";port=4556;"));
492  }
493 
494  // add all services to the return set
495  for (std::list<DiscoveryService>::const_iterator iter = services.begin(); iter != services.end(); ++iter) {
496  const DiscoveryService &service = (*iter);
497 
498  // add source address if not set
499  if ( (service.getParameters().find("port=") != std::string::npos) &&
500  (service.getParameters().find("ip=") == std::string::npos) ) {
501  // create a new service object
502  dtn::net::DiscoveryService ret_service(service.getName(), "ip=" + sender.address() + ";" + service.getParameters());
503 
504  // add service to the return set
505  ret_services.push_back(ret_service);
506  }
507  else
508  {
509  // add service to the return set
510  ret_services.push_back(service);
511  }
512  }
513 
514  // announce the received services
515  received(ret_source, ret_services);
516  } catch (const dtn::InvalidDataException&) {
517  } catch (const ibrcommon::IOException&) {
518  }
519 
520  yield();
521  }
522  } catch (const ibrcommon::vsocket_interrupt&) {
523  return;
524  } catch (const ibrcommon::vsocket_timeout&) { };
525 
526  // trigger timeout, if one second is elapsed
527  tm.stop(); if (tm.getMilliseconds() > 1000.0)
528  {
529  tm.start();
530  timeout();
531  }
532  }
533  }
534 
536  {
537  // shutdown and interrupt the receiving thread
538  _recv_socket.down();
539  }
540 
541  const std::string IPNDAgent::getName() const
542  {
543  return "IPNDAgent";
544  }
545  }
546 }