IBR-DTNSuite  0.12
DHTNameService.cpp
Go to the documentation of this file.
1 /*
2  * DHTNameService.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Till Lorentzen <lorenzte@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 
24 #include "DHTNameService.h"
25 #include "ibrcommon/net/vsocket.h"
26 #include "core/EventDispatcher.h"
27 #include "core/BundleCore.h"
28 #include "core/NodeEvent.h"
30 
31 #include <ibrdtn/utils/Utils.h>
32 
33 #include <ibrcommon/Logger.h>
35 
36 #include <sys/socket.h>
37 #include <arpa/inet.h>
38 #include <set>
39 #include <cstdlib>
40 #include <sys/time.h>
41 #include <fcntl.h>
42 #include <string.h>
43 #include <unistd.h>
44 
46  _exiting(false), _initialized(false), _announced(false), _foundNodes(-1),
47  _bootstrapped(0),
48  _config(daemon::Configuration::getInstance().getDHT()) {
49 }
50 
52  join();
53 }
54 
55 const std::string dtn::dht::DHTNameService::getName() const {
56  return "DHT Naming Service";
57 }
58 
60 }
61 
62 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() {
63  for (int i = 0; i < 2; ++i) {
64  int opts;
65  opts = fcntl(_interrupt_pipe[i], F_GETFL);
66  if (opts < 0) {
67  return false;
68  }
69  opts |= O_NONBLOCK;
70  if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) {
71  return false;
72  }
73  }
74  return true;
75 }
76 
78  // register as discovery beacon handler
80 
81  // creating interrupt pipe
82  if (pipe(_interrupt_pipe) < 0) {
83  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "Error " << errno << " creating pipe"
85  return;
86  }
87  // set the pipe to non-blocking
88  if (!this->setNonBlockingInterruptPipe()) {
89  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "Error " << errno
90  << " setting pipe to non-blocking mode"
92  return;
93  }
94  // init DHT
95  int rc = dtn_dht_initstruct(&_context);
96  if (rc < 0) {
97  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Context creation failed: " << rc
99  return;
100  }
101  if (_config.randomPort()) {
102  srand( time(NULL));
103  _context.port = rand() % 64000 + 1024;
104  } else {
105  _context.port = _config.getPort();
106  }
107  if (_config.getIPv4Binding().size() > 0) {
108  _context.bind = _config.getIPv4Binding().c_str();
109  }
110  if (_config.getIPv6Binding().size() > 0) {
111  _context.bind6 = _config.getIPv6Binding().c_str();
112  }
113  if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) {
114  _context.type = BINDBOTH;
115  } else if (_config.isIPv4Enabled()) {
116  _context.type = IPV4ONLY;
117  } else if (_config.isIPv6Enabled()) {
118  _context.type = IPV6ONLY;
119  } else {
120  _context.type = BINDNONE;
121  }
122  string myid = _config.getID();
123  if (!_config.randomID()) {
124  dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size());
125  }
126 
127  rc = dtn_dht_init_sockets(&_context);
128  if (rc != 0) {
129  if(_context.type == BINDBOTH) {
130  if(rc == -2){
131  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv6 socket couldn't be initialized"
133  } else {
134  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT socket(s) couldn't be initialized"
136  }
137  dtn_dht_close_sockets(&_context);
138  // Try only IPv4
139  _context.type = IPV4ONLY;
140  rc = dtn_dht_init_sockets(&_context);
141  if(rc != 0){
142  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv4 socket couldn't be initialized"
144  dtn_dht_close_sockets(&_context);
145  // Try only IPv6
146  _context.type = IPV6ONLY;
147  rc = dtn_dht_init_sockets(&_context);
148  if(rc!=0){
149  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT IPv6 socket couldn't be initialized"
151  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Sockets couldn't be initialized"
153  dtn_dht_close_sockets(&_context);
154  return;
155  }
156  }
157  } else {
158  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT Sockets couldn't be initialized"
160  dtn_dht_close_sockets(&_context);
161  return;
162  }
163  }
164  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "Sockets for DHT initialized"
166  if (!_config.isBlacklistEnabled()) {
167  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "DHT Blacklist disabled"
169  dtn_dht_blacklist(0);
170  } else {
171  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 50) << "DHT Blacklist enabled"
173  }
174  {
175  ibrcommon::MutexLock l(this->_libmutex);
176  rc = dtn_dht_init(&_context);
177  }
178  if (rc < 0) {
179  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT initialization failed"
181  return;
182  }
183  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT initialized on Port: " << _context.port
184  << " with ID: " << myid << IBRCOMMON_LOGGER_ENDL;
185  this->_initialized = true;
186  // Warning about possibly wrong configuration
187  if(_config.isNeighbourAnnouncementEnabled() &&
189  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT will not announce neighbor nodes: "
190  << "announcement enabled but routing forwarding is disabled"
192  }
193 }
194 
196  if (!this->_initialized) {
197  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT is not initialized"
199  return;
200  }
201  std::string cltype_;
202  std::string port_;
203  const std::list<dtn::daemon::Configuration::NetConfig>
204  &nets =
206 
207  // for every available interface, build the correct struct
208  for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter =
209  nets.begin(); iter != nets.end(); ++iter) {
210  try {
211  cltype_ = getConvergenceLayerName((*iter));
212  std::stringstream ss;
213  ss << (*iter).port;
214  port_ = ss.str();
215  struct dtn_convergence_layer * clstruct =
216  (struct dtn_convergence_layer*) malloc(
217  sizeof(struct dtn_convergence_layer));
218  clstruct->clname = (char*) malloc(cltype_.size());
219  clstruct->clnamelen = cltype_.size();
220  memcpy(clstruct->clname, cltype_.c_str(), cltype_.size());
221  struct dtn_convergence_layer_arg * arg =
222  (struct dtn_convergence_layer_arg*) malloc(
223  sizeof(struct dtn_convergence_layer_arg));
224  arg->key = (char*) malloc(5);
225  arg->key[4] = '\n';
226 
227 #ifdef HAVE_VMIME
228  if(cltype_ == "email") {
229  memcpy(arg->key, "email", 5);
230  arg->keylen = 5;
231  const std::string &email =
233  arg->value = (char*) malloc(email.size());
234  memcpy(arg->value, email.c_str(), email.size());
235  arg->valuelen = email.size();
236  }else{
237 #endif
238 
239  memcpy(arg->key, "port", 4);
240  arg->keylen = 4;
241  arg->value = (char*) malloc(port_.size());
242  memcpy(arg->value, port_.c_str(), port_.size());
243  arg->valuelen = port_.size();
244 
245 #ifdef HAVE_VMIME
246  }
247 #endif
248 
249  arg->next = NULL;
250  clstruct->args = arg;
251  clstruct->next = _context.clayer;
252  _context.clayer = clstruct;
253  } catch (const std::exception&) {
254  continue;
255  }
256  }
257 
258  // Bootstrapping the DHT
259  int rc, numberOfRandomRequests = 0;
262 
263  // DHT main loop
264  time_t tosleep = 0;
265  struct sockaddr_storage from;
266  socklen_t fromlen = sizeof(sockaddr_storage);
267  ::memset(&from, 0, fromlen);
268 
269  while (!this->_exiting) {
270  if (this->_foundNodes == 0) {
271  bootstrapping();
272  }
273  //Announce myself
274  if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) {
275  this->_announced = true;
276  if (!_config.isSelfAnnouncingEnabled())
277  return;
278  announce(dtn::core::BundleCore::local, SINGLETON);
279  // Read all unknown EIDs and lookup them
281  dtn::storage::BundleStorage & storage = core.getStorage();
282  std::set<dtn::data::EID> eids_ = storage.getDistinctDestinations();
283  std::set<dtn::data::EID>::iterator iterator;
284  for (iterator = eids_.begin(); iterator != eids_.end(); ++iterator) {
285  lookup(*iterator);
286  }
287  std::set<dtn::core::Node> neighbours_ = core.getConnectionManager().getNeighbors();
288  std::set<dtn::core::Node>::iterator neighbouriterator;
289  for (neighbouriterator = neighbours_.begin(); neighbouriterator
290  != neighbours_.end(); ++neighbouriterator) {
291  if (isNeighbourAnnouncable(*neighbouriterator))
292  announce(*neighbouriterator, NEIGHBOUR);
293  }
294  while (!this->cachedLookups.empty()) {
295  lookup(this->cachedLookups.front());
296  this->cachedLookups.pop_front();
297  }
298  }
299  // Faster Bootstrapping by searching for random Keys
300  if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work(
301  &_context) <= 2 && numberOfRandomRequests < 40) {
302  dtn_dht_start_random_lookup(&_context);
303  numberOfRandomRequests++;
304  }
305  struct timeval tv;
306  fd_set readfds;
307  tv.tv_sec = tosleep;
308  tv.tv_usec = random() % 1000000;
309  int high_fd = _interrupt_pipe[0];
310  FD_ZERO(&readfds);
311  FD_SET(_interrupt_pipe[0], &readfds);
312  if (_context.ipv4socket >= 0) {
313  FD_SET(_context.ipv4socket, &readfds);
314  high_fd = max(high_fd, _context.ipv4socket);
315  }
316  if (_context.ipv6socket >= 0) {
317  FD_SET(_context.ipv6socket, &readfds);
318  high_fd = max(high_fd, _context.ipv6socket);
319  }
320  rc = select(high_fd + 1, &readfds, NULL, NULL, &tv);
321  if (rc < 0) {
322  if (errno != EINTR) {
323  IBRCOMMON_LOGGER_TAG("DHTNameService", error)
324  << "select of DHT Sockets failed with error: "
325  << errno << IBRCOMMON_LOGGER_ENDL;
327  }
328  }
329  if (FD_ISSET(_interrupt_pipe[0], &readfds)) {
330  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "unblocked by self-pipe-trick"
332  // this was an interrupt with the self-pipe-trick
333  char buf[2];
334  int read = ::read(_interrupt_pipe[0], buf, 2);
335  if (read <= 2 || _exiting)
336  break;
337  }
338  if (rc > 0) {
339  fromlen = sizeof(from);
340  if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket,
341  &readfds))
342  rc = recvfrom(_context.ipv4socket, _buf, sizeof(_buf) - 1, 0,
343  (struct sockaddr*) &from, &fromlen);
344  else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket,
345  &readfds))
346  rc = recvfrom(_context.ipv6socket, _buf, sizeof(_buf) - 1, 0,
347  (struct sockaddr*) &from, &fromlen);
348  }
349  if (rc > 0) {
350  _buf[rc] = '\0';
351  {
352  ibrcommon::MutexLock l(this->_libmutex);
353  rc = dtn_dht_periodic(&_context, _buf, rc,
354  (struct sockaddr*) &from, fromlen, &tosleep);
355  }
356  } else {
357  {
358  ibrcommon::MutexLock l(this->_libmutex);
359  rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep);
360  }
361  }
362  int numberOfHosts = 0;
363  int numberOfGoodHosts = 0;
364  int numberOfGood6Hosts = 0;
365  unsigned int numberOfBlocksHosts = 0;
366  unsigned int numberOfBlocksHostsIPv4 = 0;
367  unsigned int numberOfBlocksHostsIPv6 = 0;
368  {
369  ibrcommon::MutexLock l(this->_libmutex);
370  if (_context.ipv4socket >= 0)
371  numberOfHosts
372  = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL);
373  if (_context.ipv6socket >= 0)
374  numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts,
375  NULL);
376  numberOfBlocksHosts = dtn_dht_blacklisted_nodes(
377  &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6);
378  }
379  if (this->_foundNodes != numberOfHosts) {
380  if (_config.isBlacklistEnabled()) {
381  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 42) << "DHT Nodes available: "
382  << numberOfHosts << "(Good:" << numberOfGoodHosts
383  << "+" << numberOfGood6Hosts << ") Blocked: "
384  << numberOfBlocksHosts << "("
385  << numberOfBlocksHostsIPv4 << "+"
386  << numberOfBlocksHostsIPv6 << ")"
388 
389  } else {
390  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 42) << "DHT Nodes available: "
391  << numberOfHosts << "(Good:" << numberOfGoodHosts
392  << "+" << numberOfGood6Hosts << ")"
394 
395  }
396  this->_foundNodes = numberOfHosts;
397  }
398  if (rc < 0) {
399  if (errno == EINTR) {
400  continue;
401  } else {
402  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "dtn_dht_periodic failed"
404  if (rc == EINVAL || rc == EFAULT) {
405  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << "DHT failed -> stopping DHT"
407  break;
408  }
409  tosleep = 1;
410  }
411  }
412  }
415  this->_initialized = false;
416  if (_config.getPathToNodeFiles().size() > 0) {
417  ibrcommon::MutexLock l(this->_libmutex);
418  int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str());
419  if (save != 0) {
420  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT could not save nodes"
422  } else {
423  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT saved nodes"
425  }
426  }
427  if (this->_announced && _config.isSelfAnnouncingEnabled())
428  deannounce(dtn::core::BundleCore::local);
429  dtn_dht_uninit();
430  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT shut down" << IBRCOMMON_LOGGER_ENDL;
431  // Closes all sockets of the DHT
432  dtn_dht_close_sockets(&_context);
433 
434  dtn_dht_free_convergence_layer_struct(_context.clayer);
435 
436  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT sockets are closed"
438  ::close(_interrupt_pipe[0]);
439  ::close(_interrupt_pipe[1]);
440 }
441 
443  // un-register as discovery beacon handler
445 
446  this->_exiting = true;
447  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT will be shut down"
449  ssize_t written = ::write(_interrupt_pipe[1], "i", 1);
450  if (written < 1) {
451  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT pipeline trick failed"
453  }
454 }
455 
456 void dtn::dht::DHTNameService::lookup(const dtn::data::EID &eid) {
457  if (!dtn::core::BundleCore::local.sameHost(eid)) {
458  if (this->_announced) {
459  std::string eid_ = eid.getNode().getString();
460  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 30) << "DHT Lookup: " << eid_
462  ibrcommon::MutexLock l(this->_libmutex);
463  dtn_dht_lookup(&_context, eid_.c_str(), eid_.size());
464  } else {
465  this->cachedLookups.push_front(eid);
466  }
467  }
468 }
469 
473 void dtn::dht::DHTNameService::announce(const dtn::core::Node &n,
474  enum dtn_dht_lookup_type type) {
475  if (this->_announced) {
476  std::string eid_ = n.getEID().getNode().getString();
477  ibrcommon::MutexLock l(this->_libmutex);
478  int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type);
479  if (rc > 0) {
480  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << "DHT Announcing: " << eid_
482  }
483  }
484 }
485 
486 bool dtn::dht::DHTNameService::isNeighbourAnnouncable(
487  const dtn::core::Node &node) {
488  if (!_config.isNeighbourAnnouncementEnabled())
489  return false;
490  // If forwarding of bundles is disabled, do not announce the neighbors to
491  // prevent receiving bundles for neighbor EIDs, which would not be forwarded
493  return false;
494  }
495 
496 
497  // get the merged node object
499  node.getEID());
500 
501  // Ignore all none discovered and none static nodes
502  // They could only be discovered by the DHT,
503  // so they are not really close neighbours and could be found directly in the DHT.
504  // This prevents the node to be announced by all neighbours, how found this node
505  std::set<dtn::core::Node::Type> types = n.getTypes();
506  if (types.find(dtn::core::Node::NODE_DISCOVERED) == types.end()
507  && (types.find(dtn::core::Node::NODE_STATIC_GLOBAL) == types.end())) {
508  return false;
509  }
510  // Proof, if the neighbour has told us, that he don't want to be published on the DHT
511  std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
512  if (!services.empty()) {
513  for (std::list<dtn::core::Node::Attribute>::const_iterator service =
514  services.begin(); service != services.end(); ++service) {
515  bool proxy = true;
516  std::vector < string > parameters = dtn::utils::Utils::tokenize(
517  ";", (*service).value);
518  std::vector<string>::const_iterator param_iter = parameters.begin();
519 
520  while (param_iter != parameters.end()) {
521  std::vector < string > p = dtn::utils::Utils::tokenize("=",
522  (*param_iter));
523  if (p[0].compare("proxy") == 0) {
524  std::stringstream proxy_stream;
525  proxy_stream << p[1];
526  proxy_stream >> proxy;
527  }
528  ++param_iter;
529  }
530  if (!proxy)
531  return false;
532  }
533  }
534  return true;
535 }
536 
537 void dtn::dht::DHTNameService::deannounce(const dtn::core::Node &n) {
538  std::string eid_ = n.getEID().getNode().getString();
539  ibrcommon::MutexLock l(this->_libmutex);
540  dtn_dht_deannounce(eid_.c_str(), eid_.size());
541  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT Deannounced: " << eid_
543 }
544 
545 std::string dtn::dht::DHTNameService::getConvergenceLayerName(
547  std::string cltype_ = "";
548  switch (net.type) {
550  cltype_ = "tcp";
551  break;
553  cltype_ = "udp";
554  break;
556  cltype_ = "email";
557  break;
558  // case dtn::daemon::Configuration::NetConfig::NETWORK_HTTP:
559  // cltype_ = "http";
560  // break;
561  default:
562  throw ibrcommon::Exception("type of convergence layer not supported");
563  }
564  return cltype_;
565 }
566 
568  try {
569  const dtn::routing::QueueBundleEvent &event =
570  dynamic_cast<const dtn::routing::QueueBundleEvent&> (*evt);
571  if (!event.bundle.destination.sameHost(dtn::core::BundleCore::local)
572  && !event.bundle.destination.isNone()) {
573  lookup(event.bundle.destination);
574  }
575  } catch (const std::bad_cast&) {
576  };
577  try {
578  const dtn::core::NodeEvent &nodeevent =
579  dynamic_cast<const dtn::core::NodeEvent&> (*evt);
580  const dtn::core::Node &n = nodeevent.getNode();
582  switch (nodeevent.getAction()) {
583  case NODE_AVAILABLE:
584  if (isNeighbourAnnouncable(n))
585  announce(n, NEIGHBOUR);
586  pingNode(n);
587  break;
588  case NODE_UNAVAILABLE:
589  deannounce(n);
590  default:
591  break;
592  }
593  }
594  } catch (const std::bad_cast&) {
595  };
596 }
597 
598 void dtn::dht::DHTNameService::pingNode(const dtn::core::Node &n) {
599  int rc;
600  std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
601  unsigned int port = 9999;
602  if (!services.empty()) {
603  for (std::list<dtn::core::Node::Attribute>::const_iterator service =
604  services.begin(); service != services.end(); ++service) {
605  std::vector < string > parameters = dtn::utils::Utils::tokenize(
606  ";", (*service).value);
607  std::vector<string>::const_iterator param_iter = parameters.begin();
608  bool portFound = false;
609  while (param_iter != parameters.end()) {
610  std::vector < string > p = dtn::utils::Utils::tokenize("=",
611  (*param_iter));
612  if (p[0].compare("port") == 0) {
613  std::stringstream port_stream;
614  port_stream << p[1];
615  port_stream >> port;
616  portFound = true;
617  }
618  ++param_iter;
619  }
620  // Do not ping the node, if he doesn't tell us, which port he has
621  if(!portFound){
622  continue;
623  }
624  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "trying to ping node "
626  const std::list<std::string> ips = getAllIPAddresses(n);
627  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << ips.size() << " IP Addresses found!"
629  std::string lastip = "";
630  for (std::list<std::string>::const_iterator iter = ips.begin(); iter
631  != ips.end(); ++iter) {
632  const std::string ip = (*iter);
633  // Ignoring double existence of ip's
634  if (ip == lastip) {
635  continue;
636  }
637  // decode address and port
638  struct sockaddr_in sin;
639  memset(&sin, 0, sizeof(sin));
640  sin.sin_family = AF_INET;
641  sin.sin_port = htons(port);
642  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 26) << " --- Using IP address: " << ip
644  rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr));
645  if (rc == 1) {
646  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 26) << "Pinging node "
647  << n.toString() << " on " << ip << ":" << port
649  ibrcommon::MutexLock l(this->_libmutex);
650  dtn_dht_ping_node((struct sockaddr*) &sin, sizeof(sin));
651  } else {
652  IBRCOMMON_LOGGER_TAG("DHTNameService", error) << " --- ERROR pton: " << rc
654  }
655  lastip = ip;
656  }
657  }
658  }
659 }
660 
661 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses(
662  const dtn::core::Node &n) {
663  std::string address = "0.0.0.0";
664  unsigned int port = 0;
665  std::list < std::string > ret;
666  const std::list<dtn::core::Node::URI> uri_list = n.get(
668  for (std::list<dtn::core::Node::URI>::const_iterator iter =
669  uri_list.begin(); iter != uri_list.end(); ++iter) {
670  const dtn::core::Node::URI &uri = (*iter);
671  uri.decode(address, port);
672  ret.push_front(address);
673  }
674  const std::list<dtn::core::Node::URI> udp_uri_list = n.get(
676  for (std::list<dtn::core::Node::URI>::const_iterator iter =
677  udp_uri_list.begin(); iter != udp_uri_list.end(); ++iter) {
678  const dtn::core::Node::URI &udp_uri = (*iter);
679  udp_uri.decode(address, port);
680  ret.push_front(address);
681  }
682  return ret;
683 }
684 
685 void dtn::dht::DHTNameService::bootstrapping() {
686  if (this->_bootstrapped > time(NULL) + 30) {
687  return;
688  }
689  this->_bootstrapped = time(NULL);
690  if (_config.getPathToNodeFiles().size() > 0) {
691  bootstrappingFile();
692  }
693  if (_config.isDNSBootstrappingEnabled()) {
694  bootstrappingDNS();
695  }
696  if (_config.isIPBootstrappingEnabled()) {
697  bootstrappingIPs();
698  }
699 }
700 
701 void dtn::dht::DHTNameService::bootstrappingFile() {
702  int rc;
703  {
704  ibrcommon::MutexLock l(this->_libmutex);
705  rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str());
706  }
707  if (rc != 0) {
708  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "DHT loading of saved nodes failed"
710  } else {
711  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25) << "DHT loading of saved nodes done"
713  }
714 }
715 
716 void dtn::dht::DHTNameService::bootstrappingDNS() {
717  int rc;
718  std::vector < string > dns = _config.getDNSBootstrappingNames();
719  if (!dns.empty()) {
720  std::vector<string>::const_iterator dns_iter = dns.begin();
721  while (dns_iter != dns.end()) {
722  const string &dn = (*dns_iter);
723  {
724  ibrcommon::MutexLock l(this->_libmutex);
725  rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL);
726  }
727  if (rc != 0) {
728  IBRCOMMON_LOGGER_TAG("DHTNameService", warning) << "bootstrapping from domain " << dn
729  << " failed with error: " << rc
731  } else {
732  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25)
733  << "DHT Bootstrapping done for domain" << dn
735  }
736  ++dns_iter;
737  }
738  } else {
739  {
740  ibrcommon::MutexLock l(this->_libmutex);
741  rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL);
742  }
743  if (rc != 0) {
744  IBRCOMMON_LOGGER_TAG("DHTNameService", warning)
745  << "bootstrapping from default domain failed with error: "
746  << rc << IBRCOMMON_LOGGER_ENDL;
747  } else {
748  IBRCOMMON_LOGGER_DEBUG_TAG("DHTNameService", 25)
749  << "DHT Bootstrapping done for default domain"
751  }
752  }
753 
754 }
755 
756 void dtn::dht::DHTNameService::bootstrappingIPs() {
757  int rc;
758  std::vector < string > ips = _config.getIPBootstrappingIPs();
759  std::vector<string>::const_iterator ip_iter = ips.begin();
760  while (ip_iter != ips.end()) {
761  std::vector < string > ip
762  = dtn::utils::Utils::tokenize(" ", (*ip_iter));
763  int size, ipversion = AF_INET;
764  struct sockaddr *wellknown_node;
765  struct sockaddr_in sin;
766  struct sockaddr_in6 sin6;
767  memset(&sin, 0, sizeof(sin));
768  int port = 9999;
769 
770  // read port
771  if (ip.size() > 1) {
772  port = atoi(ip[1].c_str());
773  }
774 
775  // read address
776  if (!ip.empty()) {
777  rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr));
778  if (rc <= 0) {
779  ipversion = AF_INET6;
780  rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr));
781  if (rc <= 0) {
782  break;
783  } else {
784  sin6.sin6_family = ipversion;
785  sin6.sin6_port = htons(port);
786  size = sizeof(sin6);
787  wellknown_node = (struct sockaddr *) &sin6;
788  }
789  } else {
790  sin.sin_family = ipversion;
791  sin.sin_port = htons(port);
792  size = sizeof(sin);
793  wellknown_node = (struct sockaddr *) &sin;
794  }
795  if (rc == 1) {
796  ibrcommon::MutexLock l(this->_libmutex);
797  dtn_dht_ping_node(wellknown_node, size);
798  }
799  }
800 
801  ++ip_iter;
802  }
803 }
804 
805 // TODO Nur für Interfaces zulassen, auf denen ich gebunden bin!
806 
809  if (this->_initialized) {
810  stringstream service;
811  service << "port=" << this->_context.port << ";";
812  if (!this->_config.isNeighbourAllowedToAnnounceMe()) {
813  service << "proxy=false;";
814  }
815  beacon.addService( DiscoveryService("dhtns", service.str()));
816  } else {
817  if(!this->_config.isNeighbourAllowedToAnnounceMe()) {
818  beacon.addService( DiscoveryService("dhtns", "proxy=false;"));
819  } else {
821  }
822  }
823 }
824 
825 void dtn_dht_handle_lookup_result(const struct dtn_dht_lookup_result *result) {
826  if (result == NULL || result->eid == NULL || result->clayer == NULL) {
827  return;
828  }
829 
830  // Extracting the EID of the answering node
831  std::string eid__;
832  std::string clname__;
833  struct dtn_eid * eid = result->eid;
834  unsigned int i;
835  for (i = 0; i < eid->eidlen; ++i) {
836  eid__ += eid->eid[i];
837  }
838 
839  // Extracting the convergence layer of the node
840  stringstream ss;
841  struct dtn_convergence_layer * cl = result->clayer;
842  if (cl == NULL)
843  return;
844  dtn::core::Node node(eid__);
845  ss << "Adding Node " << eid__ << ": ";
846  // Iterating over the list of convergence layer
847  bool hasCL = false;
848  while (cl != NULL) {
849  enum Node::Protocol proto__;
850  stringstream service;
851  clname__ = "";
852  for (i = 0; i < cl->clnamelen; ++i) {
853  clname__ += cl->clname[i];
854  }
855  if (clname__ == "tcp") {
856  proto__ = Node::CONN_TCPIP;
857  } else if (clname__ == "udp") {
858  proto__ = Node::CONN_UDPIP;
859  } else if (clname__ == "email") {
860  proto__ = Node::CONN_EMAIL;
861  } else {
862  proto__ = Node::CONN_UNDEFINED;
863  //TODO find the right string to be added to service string
864  }
865  // Extracting the arguments of the convergence layer
866  struct dtn_convergence_layer_arg * arg = cl->args;
867  std::string argstring__;
868  while (arg != NULL) {
869  if (arg->keylen <= 0 || arg->valuelen <= 0) {
870  arg = arg->next;
871  continue;
872  }
873  argstring__ = "";
874  for (i = 0; i < arg->keylen; ++i) {
875  argstring__ += arg->key[i];
876  }
877  service << argstring__ << "=";
878  argstring__ = "";
879  for (i = 0; i < arg->valuelen && arg->value[i] != '\0'; ++i) {
880  argstring__ += arg->value[i];
881  }
882  service << argstring__ << ";";
883  arg = arg->next;
884  }
885  ss << clname__ << "(" << service.str() << ") ";
886  // Add the found convergence layer to the node object
887  node.add(
888  Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(),
891  hasCL = true;
892  cl = cl->next;
893  }
894 
895  // Adding the new node to the BundleCore to make it available to the daemon
897  if (hasCL) {
898  IBRCOMMON_LOGGER_TAG("DHTNameService", info) << ss.str() << IBRCOMMON_LOGGER_ENDL;
899  core.getConnectionManager().add(node);
900  }
901 
902  // Extracting all neighbours of the new node if usage is allowed
903  if (!dtn::daemon::Configuration::getInstance().getDHT().ignoreDHTNeighbourInformations()) {
904  struct dtn_eid * neighbour = result->neighbours;
905  std::string neighbour__;
906  while (neighbour) {
907  neighbour__ = "";
908  for (i = 0; i < neighbour->eidlen; ++i) {
909  neighbour__ += neighbour->eid[i];
910  }
911  dtn::data::EID n(neighbour__);
912  dtn::core::Node neighbourNode(n);
913  if (!dtn::core::BundleCore::local.sameHost(n)
914  && !core.getConnectionManager().isNeighbor(n)) {
917  }
918  neighbour = neighbour->next;
919  }
920  }
921  //TODO GROUP HANDLING SHOULD BE IMPLEMENTED HERE!!!
922  struct dtn_eid * group = result->groups;
923  while (group) {
924  group = group->next;
925  }
926  return;
927 }
928 
929 // This function is called, if the DHT has done a search or announcement.
930 // At this moment, it is just ignored
931 void dtn_dht_operation_done(const unsigned char *info_hash){
932  return;
933 }