IBR-DTNSuite  0.10
DHTNameService.cpp
Go to the documentation of this file.
1 /*
2  * DHTNameService.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Till Lorentzen <lorenzte@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 
24 #include "DHTNameService.h"
25 #include "ibrcommon/net/vsocket.h"
26 #include "core/EventDispatcher.h"
27 #include "core/BundleCore.h"
28 #include "core/NodeEvent.h"
30 
31 #include <ibrdtn/utils/Utils.h>
32 
33 #include <ibrcommon/Logger.h>
35 
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <set>
39 #include <cstdlib>
40 #include <sys/time.h>
41 #include <fcntl.h>
42 #include <string.h>
43 #include <unistd.h>
44 
46  _exiting(false), _initialized(false), _announced(false), _foundNodes(-1),
47  _bootstrapped(0),
48  _config(daemon::Configuration::getInstance().getDHT()) {
49 }
50 
52  join();
53 }
54 
55 const std::string dtn::dht::DHTNameService::getName() const {
56  return "DHT Naming Service";
57 }
58 
60 }
61 
62 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() {
63  for (int i = 0; i < 2; ++i) {
64  int opts;
65  opts = fcntl(_interrupt_pipe[i], F_GETFL);
66  if (opts < 0) {
67  return false;
68  }
69  opts |= O_NONBLOCK;
70  if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) {
71  return false;
72  }
73  }
74  return true;
75 }
76 
78  // creating interrupt pipe
79  if (pipe(_interrupt_pipe) < 0) {
80  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "Error " << errno << " creating pipe"
82  return;
83  }
84  // set the pipe to non-blocking
85  if (!this->setNonBlockingInterruptPipe()) {
86  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "Error " << errno
87  << " setting pipe to non-blocking mode"
89  return;
90  }
91  // init DHT
92  int rc = dtn_dht_initstruct(&_context);
93  if (rc < 0) {
94  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Context creation failed: " << rc
96  return;
97  }
98  if (_config.randomPort()) {
99  srand( time(NULL));
100  _context.port = rand() % 64000 + 1024;
101  } else {
102  _context.port = _config.getPort();
103  }
104  if (_config.getIPv4Binding().size() > 0) {
105  _context.bind = _config.getIPv4Binding().c_str();
106  }
107  if (_config.getIPv6Binding().size() > 0) {
108  _context.bind6 = _config.getIPv6Binding().c_str();
109  }
110  if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) {
111  _context.type = BINDBOTH;
112  } else if (_config.isIPv4Enabled()) {
113  _context.type = IPV4ONLY;
114  } else if (_config.isIPv6Enabled()) {
115  _context.type = IPV6ONLY;
116  } else {
117  _context.type = BINDNONE;
118  }
119  string myid = _config.getID();
120  if (!_config.randomID()) {
121  dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size());
122  }
123 
124  rc = dtn_dht_init_sockets(&_context);
125  if (rc != 0) {
126  if(_context.type == BINDBOTH) {
127  if(rc == -2){
128  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv6 socket couldn't be initialized"
130  } else {
131  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT socket(s) couldn't be initialized"
133  }
134  dtn_dht_close_sockets(&_context);
135  // Try only IPv4
136  _context.type = IPV4ONLY;
137  rc = dtn_dht_init_sockets(&_context);
138  if(rc != 0){
139  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv4 socket couldn't be initialized"
141  dtn_dht_close_sockets(&_context);
142  // Try only IPv6
143  _context.type = IPV6ONLY;
144  rc = dtn_dht_init_sockets(&_context);
145  if(rc!=0){
146  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv6 socket couldn't be initialized"
148  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Sockets couldn't be initialized"
150  dtn_dht_close_sockets(&_context);
151  return;
152  }
153  }
154  } else {
155  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Sockets couldn't be initialized"
157  dtn_dht_close_sockets(&_context);
158  return;
159  }
160  }
161  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "Sockets for DHT initialized"
163  if (!_config.isBlacklistEnabled()) {
164  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "DHT Blacklist disabled"
166  dtn_dht_blacklist(0);
167  } else {
168  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "DHT Blacklist enabled"
170  }
171  {
172  ibrcommon::MutexLock l(this->_libmutex);
173  rc = dtn_dht_init(&_context);
174  }
175  if (rc < 0) {
176  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT initialization failed"
178  return;
179  }
180  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT initialized on Port: " << _context.port
181  << " with ID: " << myid << IBRCOMMON_LOGGER_ENDL;
182  this->_initialized = true;
183  // Warning about possibly wrong configuration
184  if(_config.isNeighbourAnnouncementEnabled() &&
186  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT will not announce neighbor nodes: "
187  << "announcement enabled but routing forwarding is disabled"
189  }
190 }
191 
193  if (!this->_initialized) {
194  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT is not initialized"
196  return;
197  }
198  std::string cltype_;
199  std::string port_;
200  const std::list<dtn::daemon::Configuration::NetConfig>
201  &nets =
203 
204  // for every available interface, build the correct struct
205  for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter =
206  nets.begin(); iter != nets.end(); ++iter) {
207  try {
208  cltype_ = getConvergenceLayerName((*iter));
209  std::stringstream ss;
210  ss << (*iter).port;
211  port_ = ss.str();
212  struct dtn_convergence_layer * clstruct =
213  (struct dtn_convergence_layer*) malloc(
214  sizeof(struct dtn_convergence_layer));
215  clstruct->clname = (char*) malloc(cltype_.size());
216  clstruct-> clnamelen = cltype_.size();
217  memcpy(clstruct->clname, cltype_.c_str(), cltype_.size());
218  struct dtn_convergence_layer_arg * arg =
219  (struct dtn_convergence_layer_arg*) malloc(
220  sizeof(struct dtn_convergence_layer_arg));
221  arg->key = (char*) malloc(5);
222  arg->key[4] = '\n';
223  memcpy(arg->key, "port", 4);
224  arg->keylen = 4;
225  arg->value = (char*) malloc(port_.size());
226  memcpy(arg->value, port_.c_str(), port_.size());
227  arg->valuelen = port_.size();
228  arg->next = NULL;
229  clstruct->args = arg;
230  clstruct->next = _context.clayer;
231  _context.clayer = clstruct;
232  } catch (const std::exception&) {
233  continue;
234  }
235  }
236 
237  // Bootstrapping the DHT
238  int rc, numberOfRandomRequests = 0;
241 
242  // DHT main loop
243  time_t tosleep = 0;
244  struct sockaddr_storage from;
245  socklen_t fromlen = sizeof(sockaddr_storage);
246  ::memset(&from, 0, fromlen);
247 
248  while (!this->_exiting) {
249  if (this->_foundNodes == 0) {
250  bootstrapping();
251  }
252  //Announce myself
253  if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) {
254  this->_announced = true;
255  if (!_config.isSelfAnnouncingEnabled())
256  return;
257  announce(dtn::core::BundleCore::local, SINGLETON);
258  // Read all unknown EIDs and lookup them
260  dtn::storage::BundleStorage & storage = core.getStorage();
261  std::set<dtn::data::EID> eids_ = storage.getDistinctDestinations();
262  std::set<dtn::data::EID>::iterator iterator;
263  for (iterator = eids_.begin(); iterator != eids_.end(); ++iterator) {
264  lookup(*iterator);
265  }
266  std::set<dtn::core::Node> neighbours_ = core.getConnectionManager().getNeighbors();
267  std::set<dtn::core::Node>::iterator neighbouriterator;
268  for (neighbouriterator = neighbours_.begin(); neighbouriterator
269  != neighbours_.end(); ++neighbouriterator) {
270  if (isNeighbourAnnouncable(*neighbouriterator))
271  announce(*neighbouriterator, NEIGHBOUR);
272  }
273  while (!this->cachedLookups.empty()) {
274  lookup(this->cachedLookups.front());
275  this->cachedLookups.pop_front();
276  }
277  }
278  // Faster Bootstrapping by searching for random Keys
279  if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work(
280  &_context) <= 2 && numberOfRandomRequests < 40) {
281  dtn_dht_start_random_lookup(&_context);
282  numberOfRandomRequests++;
283  }
284  struct timeval tv;
285  fd_set readfds;
286  tv.tv_sec = tosleep;
287  tv.tv_usec = random() % 1000000;
288  int high_fd = _interrupt_pipe[0];
289  FD_ZERO(&readfds);
290  FD_SET(_interrupt_pipe[0], &readfds);
291  if (_context.ipv4socket >= 0) {
292  FD_SET(_context.ipv4socket, &readfds);
293  high_fd = max(high_fd, _context.ipv4socket);
294  }
295  if (_context.ipv6socket >= 0) {
296  FD_SET(_context.ipv6socket, &readfds);
297  high_fd = max(high_fd, _context.ipv6socket);
298  }
299  rc = select(high_fd + 1, &readfds, NULL, NULL, &tv);
300  if (rc < 0) {
301  if (errno != EINTR) {
302  IBRCOMMON_LOGGER_TAG("DHTNameService", error)
303  << "select of DHT Sockets failed with error: "
304  << errno << IBRCOMMON_LOGGER_ENDL;
305  sleep(1);
306  }
307  }
308  if (FD_ISSET(_interrupt_pipe[0], &readfds)) {
309  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "unblocked by self-pipe-trick"
311  // this was an interrupt with the self-pipe-trick
312  char buf[2];
313  int read = ::read(_interrupt_pipe[0], buf, 2);
314  if (read <= 2 || _exiting)
315  break;
316  }
317  if (rc > 0) {
318  fromlen = sizeof(from);
319  if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket,
320  &readfds))
321  rc = recvfrom(_context.ipv4socket, _buf, sizeof(_buf) - 1, 0,
322  (struct sockaddr*) &from, &fromlen);
323  else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket,
324  &readfds))
325  rc = recvfrom(_context.ipv6socket, _buf, sizeof(_buf) - 1, 0,
326  (struct sockaddr*) &from, &fromlen);
327  }
328  if (rc > 0) {
329  _buf[rc] = '\0';
330  {
331  ibrcommon::MutexLock l(this->_libmutex);
332  rc = dtn_dht_periodic(&_context, _buf, rc,
333  (struct sockaddr*) &from, fromlen, &tosleep);
334  }
335  } else {
336  {
337  ibrcommon::MutexLock l(this->_libmutex);
338  rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep);
339  }
340  }
341  int numberOfHosts = 0;
342  int numberOfGoodHosts = 0;
343  int numberOfGood6Hosts = 0;
344  unsigned int numberOfBlocksHosts = 0;
345  unsigned int numberOfBlocksHostsIPv4 = 0;
346  unsigned int numberOfBlocksHostsIPv6 = 0;
347  {
348  ibrcommon::MutexLock l(this->_libmutex);
349  if (_context.ipv4socket >= 0)
350  numberOfHosts
351  = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL);
352  if (_context.ipv6socket >= 0)
353  numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts,
354  NULL);
355  numberOfBlocksHosts = dtn_dht_blacklisted_nodes(
356  &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6);
357  }
358  if (this->_foundNodes != numberOfHosts) {
359  if (_config.isBlacklistEnabled()) {
360  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 42) << "DHT Nodes available: "
361  << numberOfHosts << "(Good:" << numberOfGoodHosts
362  << "+" << numberOfGood6Hosts << ") Blocked: "
363  << numberOfBlocksHosts << "("
364  << numberOfBlocksHostsIPv4 << "+"
365  << numberOfBlocksHostsIPv6 << ")"
367 
368  } else {
369  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 42) << "DHT Nodes available: "
370  << numberOfHosts << "(Good:" << numberOfGoodHosts
371  << "+" << numberOfGood6Hosts << ")"
373 
374  }
375  this->_foundNodes = numberOfHosts;
376  }
377  if (rc < 0) {
378  if (errno == EINTR) {
379  continue;
380  } else {
381  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "dtn_dht_periodic failed"
383  if (rc == EINVAL || rc == EFAULT) {
384  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT failed -> stopping DHT"
386  break;
387  }
388  tosleep = 1;
389  }
390  }
391  }
394  this->_initialized = false;
395  if (_config.getPathToNodeFiles().size() > 0) {
396  ibrcommon::MutexLock l(this->_libmutex);
397  int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str());
398  if (save != 0) {
399  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT could not save nodes"
401  } else {
402  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT saved nodes"
404  }
405  }
406  if (this->_announced && _config.isSelfAnnouncingEnabled())
407  deannounce(dtn::core::BundleCore::local);
408  dtn_dht_uninit();
409  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT shut down" << IBRCOMMON_LOGGER_ENDL;
410  // Closes all sockets of the DHT
411  dtn_dht_close_sockets(&_context);
412 
413  dtn_dht_free_convergence_layer_struct(_context.clayer);
414 
415  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT sockets are closed"
417  ::close(_interrupt_pipe[0]);
418  ::close(_interrupt_pipe[1]);
419 }
420 
422  this->_exiting = true;
423  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT will be shut down"
425  ssize_t written = ::write(_interrupt_pipe[1], "i", 1);
426  if (written < 1) {
427  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT pipeline trick failed"
429  }
430 }
431 
432 void dtn::dht::DHTNameService::lookup(const dtn::data::EID &eid) {
433  if (dtn::core::BundleCore::local != eid.getNode()) {
434  if (this->_announced) {
435  std::string eid_ = eid.getNode().getString();
436  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 30) << "DHT Lookup: " << eid_
438  ibrcommon::MutexLock l(this->_libmutex);
439  dtn_dht_lookup(&_context, eid_.c_str(), eid_.size());
440  } else {
441  this->cachedLookups.push_front(eid);
442  }
443  }
444 }
445 
449 void dtn::dht::DHTNameService::announce(const dtn::core::Node &n,
450  enum dtn_dht_lookup_type type) {
451  if (this->_announced) {
452  std::string eid_ = n.getEID().getNode().getString();
453  ibrcommon::MutexLock l(this->_libmutex);
454  int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type);
455  if (rc > 0) {
456  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT Announcing: " << eid_
458  }
459  }
460 }
461 
462 bool dtn::dht::DHTNameService::isNeighbourAnnouncable(
463  const dtn::core::Node &node) {
464  if (!_config.isNeighbourAnnouncementEnabled())
465  return false;
466  // If forwarding of bundles is disabled, do not announce the neighbors to
467  // prevent receiving bundles for neighbor EIDs, which would not be forwarded
469  return false;
470  }
471 
472 
473  // get the merged node object
475  node.getEID());
476 
477  // Ignore all none discovered and none static nodes
478  // They could only be discovered by the DHT,
479  // so they are not really close neighbours and could be found directly in the DHT.
480  // This prevents the node to be announced by all neighbours, how found this node
481  std::set<dtn::core::Node::Type> types = n.getTypes();
482  if (types.find(dtn::core::Node::NODE_DISCOVERED) == types.end()
483  && (types.find(dtn::core::Node::NODE_STATIC_GLOBAL) == types.end())) {
484  return false;
485  }
486  // Proof, if the neighbour has told us, that he don't want to be published on the DHT
487  std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
488  if (!services.empty()) {
489  for (std::list<dtn::core::Node::Attribute>::const_iterator service =
490  services.begin(); service != services.end(); ++service) {
491  bool proxy = true;
492  std::vector < string > parameters = dtn::utils::Utils::tokenize(
493  ";", (*service).value);
494  std::vector<string>::const_iterator param_iter = parameters.begin();
495 
496  while (param_iter != parameters.end()) {
497  std::vector < string > p = dtn::utils::Utils::tokenize("=",
498  (*param_iter));
499  if (p[0].compare("proxy") == 0) {
500  std::stringstream proxy_stream;
501  proxy_stream << p[1];
502  proxy_stream >> proxy;
503  }
504  ++param_iter;
505  }
506  if (!proxy)
507  return false;
508  }
509  }
510  return true;
511 }
512 
513 void dtn::dht::DHTNameService::deannounce(const dtn::core::Node &n) {
514  std::string eid_ = n.getEID().getNode().getString();
515  ibrcommon::MutexLock l(this->_libmutex);
516  dtn_dht_deannounce(eid_.c_str(), eid_.size());
517  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT Deannounced: " << eid_
519 }
520 
521 std::string dtn::dht::DHTNameService::getConvergenceLayerName(
523  std::string cltype_ = "";
524  switch (net.type) {
526  cltype_ = "tcp";
527  break;
529  cltype_ = "udp";
530  break;
531  // case dtn::daemon::Configuration::NetConfig::NETWORK_HTTP:
532  // cltype_ = "http";
533  // break;
534  default:
535  throw ibrcommon::Exception("type of convergence layer not supported");
536  }
537  return cltype_;
538 }
539 
541  try {
542  const dtn::routing::QueueBundleEvent &event =
543  dynamic_cast<const dtn::routing::QueueBundleEvent&> (*evt);
544  dtn::data::EID none;
545  if (event.bundle.destination != dtn::core::BundleCore::local.getNode()
546  && event.bundle.destination != none) {
547  lookup(event.bundle.destination);
548  }
549  } catch (const std::bad_cast&) {
550  };
551  try {
552  const dtn::core::NodeEvent &nodeevent =
553  dynamic_cast<const dtn::core::NodeEvent&> (*evt);
554  const dtn::core::Node &n = nodeevent.getNode();
556  switch (nodeevent.getAction()) {
557  case NODE_AVAILABLE:
558  if (isNeighbourAnnouncable(n))
559  announce(n, NEIGHBOUR);
560  pingNode(n);
561  break;
562  case NODE_UNAVAILABLE:
563  deannounce(n);
564  default:
565  break;
566  }
567  }
568  } catch (const std::bad_cast&) {
569  };
570 }
571 
572 void dtn::dht::DHTNameService::pingNode(const dtn::core::Node &n) {
573  int rc;
574  std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
575  unsigned int port = 9999;
576  if (!services.empty()) {
577  for (std::list<dtn::core::Node::Attribute>::const_iterator service =
578  services.begin(); service != services.end(); ++service) {
579  std::vector < string > parameters = dtn::utils::Utils::tokenize(
580  ";", (*service).value);
581  std::vector<string>::const_iterator param_iter = parameters.begin();
582  bool portFound = false;
583  while (param_iter != parameters.end()) {
584  std::vector < string > p = dtn::utils::Utils::tokenize("=",
585  (*param_iter));
586  if (p[0].compare("port") == 0) {
587  std::stringstream port_stream;
588  port_stream << p[1];
589  port_stream >> port;
590  portFound = true;
591  }
592  ++param_iter;
593  }
594  // Do not ping the node, if he doesn't tell us, which port he has
595  if(!portFound){
596  continue;
597  }
598  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "trying to ping node "
600  const std::list<std::string> ips = getAllIPAddresses(n);
601  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << ips.size() << " IP Addresses found!"
603  std::string lastip = "";
604  for (std::list<std::string>::const_iterator iter = ips.begin(); iter
605  != ips.end(); ++iter) {
606  const std::string ip = (*iter);
607  // Ignoring double existence of ip's
608  if (ip == lastip) {
609  continue;
610  }
611  // decode address and port
612  struct sockaddr_in sin;
613  memset(&sin, 0, sizeof(sin));
614  sin.sin_family = AF_INET;
615  sin.sin_port = htons(port);
616  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 26) << " --- Using IP address: " << ip
618  rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr));
619  if (rc == 1) {
620  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 26) << "Pinging node "
621  << n.toString() << " on " << ip << ":" << port
623  ibrcommon::MutexLock l(this->_libmutex);
624  dtn_dht_ping_node((struct sockaddr*) &sin, sizeof(sin));
625  } else {
626  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << " --- ERROR pton: " << rc
628  }
629  lastip = ip;
630  }
631  }
632  }
633 }
634 
635 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses(
636  const dtn::core::Node &n) {
637  std::string address = "0.0.0.0";
638  unsigned int port = 0;
639  std::list < std::string > ret;
640  const std::list<dtn::core::Node::URI> uri_list = n.get(
642  for (std::list<dtn::core::Node::URI>::const_iterator iter =
643  uri_list.begin(); iter != uri_list.end(); ++iter) {
644  const dtn::core::Node::URI &uri = (*iter);
645  uri.decode(address, port);
646  ret.push_front(address);
647  }
648  const std::list<dtn::core::Node::URI> udp_uri_list = n.get(
650  for (std::list<dtn::core::Node::URI>::const_iterator iter =
651  udp_uri_list.begin(); iter != udp_uri_list.end(); ++iter) {
652  const dtn::core::Node::URI &udp_uri = (*iter);
653  udp_uri.decode(address, port);
654  ret.push_front(address);
655  }
656  return ret;
657 }
658 
659 void dtn::dht::DHTNameService::bootstrapping() {
660  if (this->_bootstrapped > time(NULL) + 30) {
661  return;
662  }
663  this->_bootstrapped = time(NULL);
664  if (_config.getPathToNodeFiles().size() > 0) {
665  bootstrappingFile();
666  }
667  if (_config.isDNSBootstrappingEnabled()) {
668  bootstrappingDNS();
669  }
670  if (_config.isIPBootstrappingEnabled()) {
671  bootstrappingIPs();
672  }
673 }
674 
675 void dtn::dht::DHTNameService::bootstrappingFile() {
676  int rc;
677  {
678  ibrcommon::MutexLock l(this->_libmutex);
679  rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str());
680  }
681  if (rc != 0) {
682  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT loading of saved nodes failed"
684  } else {
685  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT loading of saved nodes done"
687  }
688 }
689 
690 void dtn::dht::DHTNameService::bootstrappingDNS() {
691  int rc;
692  std::vector < string > dns = _config.getDNSBootstrappingNames();
693  if (!dns.empty()) {
694  std::vector<string>::const_iterator dns_iter = dns.begin();
695  while (dns_iter != dns.end()) {
696  const string &dn = (*dns_iter);
697  {
698  ibrcommon::MutexLock l(this->_libmutex);
699  rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL);
700  }
701  if (rc != 0) {
702  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "bootstrapping from domain " << dn
703  << " failed with error: " << rc
705  } else {
706  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25)
707  << "DHT Bootstrapping done for domain" << dn
709  }
710  ++dns_iter;
711  }
712  } else {
713  {
714  ibrcommon::MutexLock l(this->_libmutex);
715  rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL);
716  }
717  if (rc != 0) {
718  IBRCOMMON_LOGGER_TAG("DHTNameService", warning)
719  << "bootstrapping from default domain failed with error: "
720  << rc << IBRCOMMON_LOGGER_ENDL;
721  } else {
722  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25)
723  << "DHT Bootstrapping done for default domain"
725  }
726  }
727 
728 }
729 
730 void dtn::dht::DHTNameService::bootstrappingIPs() {
731  int rc;
732  std::vector < string > ips = _config.getIPBootstrappingIPs();
733  std::vector<string>::const_iterator ip_iter = ips.begin();
734  while (ip_iter != ips.end()) {
735  std::vector < string > ip
736  = dtn::utils::Utils::tokenize(" ", (*ip_iter));
737  int size, ipversion = AF_INET;
738  struct sockaddr *wellknown_node;
739  struct sockaddr_in sin;
740  struct sockaddr_in6 sin6;
741  memset(&sin, 0, sizeof(sin));
742  int port = 9999;
743 
744  // read port
745  if (ip.size() > 1) {
746  port = atoi(ip[1].c_str());
747  }
748 
749  // read address
750  if (!ip.empty()) {
751  rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr));
752  if (rc <= 0) {
753  ipversion = AF_INET6;
754  rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr));
755  if (rc <= 0) {
756  break;
757  } else {
758  sin6.sin6_family = ipversion;
759  sin6.sin6_port = htons(port);
760  size = sizeof(sin6);
761  wellknown_node = (struct sockaddr *) &sin6;
762  }
763  } else {
764  sin.sin_family = ipversion;
765  sin.sin_port = htons(port);
766  size = sizeof(sin);
767  wellknown_node = (struct sockaddr *) &sin;
768  }
769  if (rc == 1) {
770  ibrcommon::MutexLock l(this->_libmutex);
771  dtn_dht_ping_node(wellknown_node, size);
772  }
773  }
774 
775  ++ip_iter;
776  }
777 }
778 
779 // TODO Nur für Interfaces zulassen, auf denen ich gebunden bin!
780 
783  if (this->_initialized) {
784  stringstream service;
785  service << "port=" << this->_context.port << ";";
786  if (!this->_config.isNeighbourAllowedToAnnounceMe()) {
787  service << "proxy=false;";
788  }
789  announcement.addService( DiscoveryService("dhtns", service.str()));
790  } else {
791  if(!this->_config.isNeighbourAllowedToAnnounceMe()) {
792  announcement.addService( DiscoveryService("dhtns", "proxy=false;"));
793  } else {
795  }
796  }
797 }
798 
799 void dtn_dht_handle_lookup_result(const struct dtn_dht_lookup_result *result) {
800  if (result == NULL || result->eid == NULL || result->clayer == NULL) {
801  return;
802  }
803 
804  // Extracting the EID of the answering node
805  std::string eid__;
806  std::string clname__;
807  struct dtn_eid * eid = result->eid;
808  unsigned int i;
809  for (i = 0; i < eid->eidlen; ++i) {
810  eid__ += eid->eid[i];
811  }
812 
813  // Extracting the convergence layer of the node
814  stringstream ss;
815  struct dtn_convergence_layer * cl = result->clayer;
816  if (cl == NULL)
817  return;
818  dtn::core::Node node(eid__);
819  ss << "Adding Node " << eid__ << ": ";
820  // Iterating over the list of convergence layer
821  bool hasCL = false;
822  while (cl != NULL) {
823  enum Node::Protocol proto__;
824  stringstream service;
825  clname__ = "";
826  for (i = 0; i < cl->clnamelen; ++i) {
827  clname__ += cl->clname[i];
828  }
829  if (clname__ == "tcp") {
830  proto__ = Node::CONN_TCPIP;
831  } else if (clname__ == "udp") {
832  proto__ = Node::CONN_UDPIP;
833  } else {
834  proto__ = Node::CONN_UNDEFINED;
835  //TODO find the right string to be added to service string
836  }
837  // Extracting the arguments of the convergence layer
838  struct dtn_convergence_layer_arg * arg = cl->args;
839  std::string argstring__;
840  while (arg != NULL) {
841  if (arg->keylen <= 0 || arg->valuelen <= 0) {
842  arg = arg->next;
843  continue;
844  }
845  argstring__ = "";
846  for (i = 0; i < arg->keylen; ++i) {
847  argstring__ += arg->key[i];
848  }
849  service << argstring__ << "=";
850  argstring__ = "";
851  for (i = 0; i < arg->valuelen && arg->value[i] != '\0'; ++i) {
852  argstring__ += arg->value[i];
853  }
854  service << argstring__ << ";";
855  arg = arg->next;
856  }
857  ss << clname__ << "(" << service.str() << ") ";
858  // Add the found convergence layer to the node object
859  node.add(
860  Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(),
863  hasCL = true;
864  cl = cl->next;
865  }
866 
867  // Adding the new node to the BundleCore to make it available to the daemon
869  if (hasCL) {
870  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << ss.str() << IBRCOMMON_LOGGER_ENDL;
871  core.getConnectionManager().add(node);
872  }
873 
874  // Extracting all neighbours of the new node if usage is allowed
875  if (!dtn::daemon::Configuration::getInstance().getDHT().ignoreDHTNeighbourInformations()) {
876  struct dtn_eid * neighbour = result->neighbours;
877  std::string neighbour__;
878  while (neighbour) {
879  neighbour__ = "";
880  for (i = 0; i < neighbour->eidlen; ++i) {
881  neighbour__ += neighbour->eid[i];
882  }
883  dtn::data::EID n(neighbour__);
884  dtn::core::Node neighbourNode(n);
886  && !core.getConnectionManager().isNeighbor(n)) {
889  }
890  neighbour = neighbour->next;
891  }
892  }
893  //TODO GROUP HANDLING SHOULD BE IMPLEMENTED HERE!!!
894  struct dtn_eid * group = result->groups;
895  while (group) {
896  group = group->next;
897  }
898  return;
899 }
900 
901 // This function is called, if the DHT has done a search or announcement.
902 // At this moment, it is just ignored
903 void dtn_dht_operation_done(const unsigned char *info_hash){
904  return;
905 }