IBR-DTNSuite  0.12
IPNDAgent.cpp
Go to the documentation of this file.
1 /*
2  * IPNDAgent.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 
24 #include "net/IPNDAgent.h"
25 #include "net/DiscoveryAgent.h"
26 #include "net/P2PDialupEvent.h"
27 #include "core/BundleCore.h"
28 #include "core/EventDispatcher.h"
29 
30 #include <ibrdtn/data/Exceptions.h>
31 
32 #include <ibrcommon/Logger.h>
34 #include <ibrcommon/net/socket.h>
35 #include <ibrcommon/net/vaddress.h>
37 
38 #ifdef __WIN32__
39 #define EADDRNOTAVAIL WSAEADDRNOTAVAIL
40 #define EADDRINUSE WSAEADDRINUSE
41 #endif
42 
43 #include <sstream>
44 #include <string.h>
45 #include <typeinfo>
46 #include <time.h>
47 
48 namespace dtn
49 {
50  namespace net
51  {
52  const std::string IPNDAgent::TAG = "IPNDAgent";
53 
55  :
56 #ifndef __WIN32__
57  _virtual_mcast_iface("__virtual_multicast_interface__"),
58 #endif
59  _state(false), _port(port)
60  {
61  }
62 
64  {
65  _socket.destroy();
66  }
67 
68  void IPNDAgent::add(const ibrcommon::vaddress &address) {
69  IBRCOMMON_LOGGER_TAG("DiscoveryAgent", info) << "listen to " << address.toString() << IBRCOMMON_LOGGER_ENDL;
70  _destinations.insert(address);
71  }
72 
74  {
75  IBRCOMMON_LOGGER_TAG("DiscoveryAgent", info) << "add interface " << net.toString() << IBRCOMMON_LOGGER_ENDL;
76 
77  // add the interface to the stored set
78  ibrcommon::MutexLock l(_interface_lock);
79 
80  // only add the interface once
81  if (_interfaces.find(net) != _interfaces.end()) return;
82 
83  // store the new interface in the list of interfaces
84  _interfaces.insert(net);
85  }
86 
87  void IPNDAgent::join(const ibrcommon::vinterface &iface, const ibrcommon::vaddress &addr) throw ()
88  {
89  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) << "Join on " << iface.toString() << " (" << addr.toString() << ", family: " << addr.family() << ")" << IBRCOMMON_LOGGER_ENDL;
90 
91  // only join IPv6 and IPv4 addresses
92  if ((addr.family() != AF_INET) && (addr.family() != AF_INET6)) return;
93 
94  // do not join on loopback interfaces
95  if (addr.isLocal()) return;
96 
97  // create a multicast socket and bind to given addr
99 
100  // if we are in UP state
101  if (_state) {
102  try {
103  // bring up
104  msock->up();
105 
106  // listen to multicast addresses
107  for (std::set<ibrcommon::vaddress>::const_iterator it_addr = _destinations.begin(); it_addr != _destinations.end(); ++it_addr)
108  {
109  try {
110  msock->join(*it_addr, iface);
111  } catch (const ibrcommon::socket_raw_error &e) {
112  if (e.error() == EADDRINUSE) {
113  // silent error
114  } else if (e.error() == 92) {
115  // silent error - protocol not available
116  } else {
117  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, warning) << "Join to " << (*it_addr).toString() << " failed on " << iface.toString() << "; " << e.what() << IBRCOMMON_LOGGER_ENDL;
118  }
119  } catch (const ibrcommon::socket_exception &e) {
120  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 10) << "Join to " << (*it_addr).toString() << " failed on " << iface.toString() << "; " << e.what() << IBRCOMMON_LOGGER_ENDL;
121  }
122  }
123  } catch (const ibrcommon::socket_exception &ex) {
124  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "Join failed on " << iface.toString() << " (" << addr.toString() << ", family: " << addr.family() << ")" << "; " << ex.what() << IBRCOMMON_LOGGER_ENDL;
125  delete msock;
126  return;
127  }
128  }
129 
130  // add multicast socket to _socket
131  _socket.add(msock, iface);
132  }
133 
134  void IPNDAgent::leave(const ibrcommon::vinterface &iface, const ibrcommon::vaddress &addr) throw ()
135  {
136  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) << "Leave " << iface.toString() << " (" << addr.toString() << ", family: " << addr.family() << ")" << IBRCOMMON_LOGGER_ENDL;
137 
138  // get all sockets bound to the given interface
139  ibrcommon::socketset ifsocks = _socket.get(iface);
140 
141  for (ibrcommon::socketset::iterator it = ifsocks.begin(); it != ifsocks.end(); ++it)
142  {
143  ibrcommon::multicastsocket *msock = dynamic_cast<ibrcommon::multicastsocket*>(*it);
144  if (msock == NULL) continue;
145 
146  if (msock->get_address() == addr) {
147  // remove the socket
148  _socket.remove(msock);
149 
150  // shutdown the socket
151  try {
152  msock->down();
153  } catch (const ibrcommon::socket_exception &ex) {
154  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 10) << "leave failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
155  }
156 
157  // delete the socket
158  delete msock;
159 
160  return;
161  }
162  }
163  }
164 
165  void IPNDAgent::leave(const ibrcommon::vinterface &iface) throw ()
166  {
167  // get all sockets bound to the given interface
168  ibrcommon::socketset ifsocks = _socket.get(iface);
169 
170  for (ibrcommon::socketset::iterator it = ifsocks.begin(); it != ifsocks.end(); ++it)
171  {
172  ibrcommon::multicastsocket *msock = dynamic_cast<ibrcommon::multicastsocket*>(*it);
173  if (msock == NULL) continue;
174 
175  // remove the socket
176  _socket.remove(msock);
177 
178  // shutdown the socket
179  try {
180  msock->down();
181  } catch (const ibrcommon::socket_exception &ex) {
182  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 10) << "leave failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
183  }
184 
185  // delete the socket
186  delete msock;
187  }
188  }
189 
190  void IPNDAgent::join(const ibrcommon::vinterface &iface) throw ()
191  {
192  std::list<ibrcommon::vaddress> addrs = iface.getAddresses();
193 
194  for (std::list<ibrcommon::vaddress>::const_iterator it = addrs.begin(); it != addrs.end(); ++it)
195  {
196  ibrcommon::vaddress addr = (*it);
197  addr.setService(_port);
198  join(iface, addr);
199  }
200  }
201 
202  void IPNDAgent::onAdvertiseBeacon(const ibrcommon::vinterface &iface, const DiscoveryBeacon &beacon) throw ()
203  {
204  // serialize announcement
205  stringstream ss; ss << beacon;
206  const std::string data = ss.str();
207 
208  // get all sockets for the given interface
209  ibrcommon::socketset fds = _socket.get(iface);
210 
211  // send out discovery announcements on all bound sockets
212  // (hopefully only one per interface)
213  for (ibrcommon::socketset::const_iterator iter = fds.begin(); iter != fds.end(); ++iter)
214  {
215  try {
216  ibrcommon::udpsocket &sock = dynamic_cast<ibrcommon::udpsocket&>(**iter);
217 
218  // send beacon to all addresses
219  for (std::set<ibrcommon::vaddress>::const_iterator addr_it = _destinations.begin(); addr_it != _destinations.end(); ++addr_it)
220  {
221  const ibrcommon::vaddress &addr = (*addr_it);
222 
223  try {
224  // prevent broadcasting in the wrong address family
225  if (addr.family() != sock.get_family()) continue;
226 
227  sock.sendto(data.c_str(), data.length(), 0, addr);
228  } catch (const ibrcommon::socket_exception &e) {
229  IBRCOMMON_LOGGER_DEBUG_TAG(IPNDAgent::TAG, 5) << "can not send message to " << addr.toString() << " via " << sock.get_address().toString() << "/" << iface.toString() << "; socket exception: " << e.what() << IBRCOMMON_LOGGER_ENDL;
230  } catch (const ibrcommon::vaddress::address_exception &ex) {
231  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
232  }
233  }
234  } catch (const std::bad_cast&) {
235  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "Socket for sending isn't a udpsocket." << IBRCOMMON_LOGGER_ENDL;
236  }
237  }
238  }
239 
240  void IPNDAgent::raiseEvent(const dtn::core::Event *evt) throw ()
241  {
242  try {
243  const dtn::net::P2PDialupEvent &dialup = dynamic_cast<const dtn::net::P2PDialupEvent&>(*evt);
244 
245  switch (dialup.type)
246  {
248  {
249  // add the interface to the stored set
250  {
251  ibrcommon::MutexLock l(_interface_lock);
252 
253  // only add the interface once
254  if (_interfaces.find(dialup.iface) != _interfaces.end()) break;
255 
256  // store the new interface in the list of interfaces
257  _interfaces.insert(dialup.iface);
258  }
259 
260  // subscribe to NetLink events on our interfaces
262 
263  // register as discovery handler for this interface
265 
266  // join to all multicast addresses on this interface
267  join(dialup.iface);
268  break;
269  }
270 
272  {
273  // check if the interface is bound by us
274  {
275  ibrcommon::MutexLock l(_interface_lock);
276 
277  // only remove the interface if it exists
278  if (_interfaces.find(dialup.iface) == _interfaces.end()) break;
279 
280  // remove the interface from the stored set
281  _interfaces.erase(dialup.iface);
282  }
283 
284  // subscribe to NetLink events on our interfaces
286 
287  // un-register as discovery handler for this interface
289 
290  // leave the multicast groups on the interface
291  leave(dialup.iface);
292  break;
293  }
294  }
295  } catch (std::bad_cast&) {
296 
297  }
298  }
299 
301  {
302  // check first if we are really bound to the interface
303  {
304  ibrcommon::MutexLock l(_interface_lock);
305  if (_interfaces.find(evt.getInterface()) == _interfaces.end()) return;
306  }
307 
308  switch (evt.getAction())
309  {
311  {
312  ibrcommon::vaddress addr = evt.getAddress();
313  addr.setService(_port);
314  join(evt.getInterface(), addr);
315  break;
316  }
317 
319  {
320  ibrcommon::vaddress addr = evt.getAddress();
321  addr.setService(_port);
322  leave(evt.getInterface(), addr);
323  break;
324  }
325 
327  {
328  // leave the multicast groups on the interface
329  leave(evt.getInterface());
330  break;
331  }
332 
333  default:
334  break;
335  }
336  }
337 
338  void IPNDAgent::componentUp() throw ()
339  {
340  // routine checked for throw() on 15.02.2013
341 
342  try {
343  // setup the sockets
344  _socket.up();
345 
346  // set state to UP
347  _state = true;
348  } catch (const ibrcommon::socket_exception &ex) {
349  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
350  }
351 
352 #ifndef __WIN32__
353  std::set<sa_family_t> bound_set;
354 
355  // create a socket for each multicast address and bind explicit
356  // to the multicast addresses
357  for (std::set<ibrcommon::vaddress>::const_iterator it_addr = _destinations.begin(); it_addr != _destinations.end(); ++it_addr)
358  {
359  const ibrcommon::vaddress &addr = (*it_addr);
360 
361  try {
362  sa_family_t fam = addr.family();
363 
364  if (bound_set.find(fam) == bound_set.end()) {
365  const ibrcommon::vaddress any_addr(_port, fam);
366 
367  // create a multicast socket and bind to given addr
369 
370  try {
371  // bring up
372  msock->up();
373 
374  // add mcast socket to vsocket
375  _socket.add(msock, _virtual_mcast_iface);
376  } catch (const ibrcommon::socket_exception &ex) {
377  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "failed to set-up multicast socket on " << any_addr.toString() << ": " << ex.what() << IBRCOMMON_LOGGER_ENDL;
378  delete msock;
379  }
380 
381  bound_set.insert(fam);
382  }
383  } catch (const ibrcommon::vaddress::address_exception &ex) {
384  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "unsupported multicast address " << addr.toString() << ": " << ex.what() << IBRCOMMON_LOGGER_ENDL;
385  }
386  }
387 #endif
388 
389  // listen to P2P dial-up events
391 
392  // join multicast groups and register as discovery handler
393  ibrcommon::MutexLock l(_interface_lock);
394 
395  for (std::set<ibrcommon::vinterface>::const_iterator it_iface = _interfaces.begin(); it_iface != _interfaces.end(); ++it_iface)
396  {
397  const ibrcommon::vinterface &iface = (*it_iface);
398 
399  // subscribe to NetLink events on our interfaces
401 
402  // register as discovery handler for this interface
404 
405  // join to all multicast addresses on this interface
406  join(iface);
407  }
408  }
409 
410  void IPNDAgent::componentDown() throw ()
411  {
412  // un-listen to P2P dial-up events
414 
415  // unsubscribe to NetLink events
417 
418  // un-register as discovery handler for this interface
420 
421  // mark the send socket as down
422  _state = false;
423 
424  // shutdown and destroy all sockets
425  _socket.destroy();
426 
429  }
430 
431  void IPNDAgent::componentRun() throw ()
432  {
433  struct timeval tv;
434 
435  // every second we want to transmit a discovery message, timeout of 1 seconds
436  tv.tv_sec = 1;
437  tv.tv_usec = 0;
438 
439  // get the reference to the discovery agent
441 
442  try {
443  while (true)
444  {
446 
447  try {
448  // select on all bound sockets
449  _socket.select(&fds, NULL, NULL, &tv);
450 
451  // receive from all sockets
452  for (ibrcommon::socketset::const_iterator iter = fds.begin(); iter != fds.end(); ++iter)
453  {
454  ibrcommon::multicastsocket &sock = dynamic_cast<ibrcommon::multicastsocket&>(**iter);
455 
456  char data[1500];
457  ibrcommon::vaddress sender;
458  DiscoveryBeacon beacon = agent.obtainBeacon();
459 
460  ssize_t len = sock.recvfrom(data, 1500, 0, sender);
461 
462  if (len < 0) return;
463 
464  stringstream ss;
465  ss.write(data, len);
466 
467  try {
468  ss >> beacon;
469 
470  if (beacon.isShort())
471  {
472  // generate name with the sender address
473  beacon.setEID( dtn::data::EID("udp://[" + sender.address() + "]:4556") );
474 
475  // add generated tcpcl service if the services list is empty
476  beacon.addService(dtn::net::DiscoveryService(dtn::core::Node::CONN_TCPIP, "ip=" + sender.address() + ";port=4556;"));
477  }
478 
479  DiscoveryBeacon::service_list &services = beacon.getServices();
480 
481  // add source address if not set
482  for (dtn::net::DiscoveryBeacon::service_list::iterator iter = services.begin(); iter != services.end(); ++iter) {
483  DiscoveryService &service = (*iter);
484 
485  if ( (service.getParameters().find("port=") != std::string::npos) &&
486  (service.getParameters().find("ip=") == std::string::npos) ) {
487 
488  // update service entry
489  service.update("ip=" + sender.address() + ";" + service.getParameters());
490  }
491  }
492 
493  // announce the received beacon
494  agent.onBeaconReceived(beacon);
495  } catch (const dtn::InvalidDataException&) {
496  } catch (const ibrcommon::IOException&) {
497  }
498  }
499 
500  // trigger an artificial timeout if the remaining timeout value is zero or below
501  if ( tv.tv_sec <= 0 && tv.tv_usec <= 0 )
502  throw ibrcommon::vsocket_timeout("timeout");
503 
504  } catch (const ibrcommon::vsocket_timeout&) {
505  // reset timeout to 1 second
506  tv.tv_sec = 1;
507  tv.tv_usec = 0;
508  }
509 
510  yield();
511  }
512  } catch (const ibrcommon::vsocket_interrupt&) {
513  // vsocket has been interrupted - exit now
514  } catch (const ibrcommon::socket_exception &ex) {
515  // unexpected error - exit now
516  IBRCOMMON_LOGGER_TAG(IPNDAgent::TAG, error) << "unexpected error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
517  }
518  }
519 
521  {
522  // shutdown and interrupt the receiving thread
523  _socket.down();
524  }
525 
526  const std::string IPNDAgent::getName() const
527  {
528  return "IPNDAgent";
529  }
530  }
531 }