IBR-DTNSuite
0.8
|
00001 /* 00002 * DHTNameService.cpp 00003 * 00004 * Created on: 21.11.2011 00005 * Author: Till Lorentzen 00006 */ 00007 00008 #include "config.h" 00009 00010 #include "DHTNameService.h" 00011 #include "ibrcommon/net/vsocket.h" 00012 #include "core/BundleCore.h" 00013 #include "core/NodeEvent.h" 00014 #include "routing/QueueBundleEvent.h" 00015 00016 #include <ibrdtn/utils/Utils.h> 00017 00018 #include <ibrcommon/Logger.h> 00019 #include <ibrcommon/thread/MutexLock.h> 00020 00021 #include <sys/socket.h> 00022 #include <arpa/inet.h> 00023 #include <set> 00024 #include <cstdlib> 00025 #include <sys/time.h> 00026 #include <fcntl.h> 00027 #include <string.h> 00028 00029 dtn::dht::DHTNameService::DHTNameService() : 00030 _exiting(false), _initialized(false), _announced(false), _foundNodes(-1), 00031 _bootstrapped(0), 00032 _config(daemon::Configuration::getInstance().getDHT()) { 00033 } 00034 00035 dtn::dht::DHTNameService::~DHTNameService() { 00036 join(); 00037 } 00038 00039 const std::string dtn::dht::DHTNameService::getName() const { 00040 return "DHT Naming Service"; 00041 } 00042 00043 void dtn::dht::DHTNameService::__cancellation() { 00044 } 00045 00046 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() { 00047 for (int i = 0; i < 2; i++) { 00048 int opts; 00049 opts = fcntl(_interrupt_pipe[i], F_GETFL); 00050 if (opts < 0) { 00051 return false; 00052 } 00053 opts |= O_NONBLOCK; 00054 if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) { 00055 return false; 00056 } 00057 } 00058 return true; 00059 } 00060 00061 void dtn::dht::DHTNameService::componentUp() { 00062 std::string eid = dtn::core::BundleCore::local.getNode().getString(); 00063 // creating interrupt pipe 00064 if (pipe(_interrupt_pipe) < 0) { 00065 IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" 00066 << IBRCOMMON_LOGGER_ENDL; 00067 return; 00068 } 00069 // set the pipe to non-blocking 00070 if (!this->setNonBlockingInterruptPipe()) { 00071 IBRCOMMON_LOGGER(error) << "Error " << errno 00072 << " setting pipe to non-blocking mode" 00073 << IBRCOMMON_LOGGER_ENDL; 00074 return; 00075 } 00076 // init DHT 00077 int rc = dtn_dht_initstruct(&_context); 00078 if (rc < 0) { 00079 IBRCOMMON_LOGGER(error) << "DHT Context creation failed: " << rc 00080 << IBRCOMMON_LOGGER_ENDL; 00081 return; 00082 } 00083 if (_config.randomPort()) { 00084 srand( time(NULL)); 00085 _context.port = rand() % 64000 + 1024; 00086 } else { 00087 _context.port = _config.getPort(); 00088 } 00089 if (_config.getIPv4Binding().size() > 0) { 00090 _context.bind = _config.getIPv4Binding().c_str(); 00091 } 00092 if (_config.getIPv6Binding().size() > 0) { 00093 _context.bind6 = _config.getIPv6Binding().c_str(); 00094 } 00095 if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) { 00096 _context.type = BINDBOTH; 00097 } else if (_config.isIPv4Enabled()) { 00098 _context.type = IPV4ONLY; 00099 } else if (_config.isIPv6Enabled()) { 00100 _context.type = IPV6ONLY; 00101 } else { 00102 _context.type = BINDNONE; 00103 } 00104 string myid = _config.getID(); 00105 if (!_config.randomID()) { 00106 dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size()); 00107 } 00108 00109 rc = dtn_dht_init_sockets(&_context); 00110 if (rc != 0) { 00111 IBRCOMMON_LOGGER(error) << "DHT Sockets couldn't be initialized" 00112 <<IBRCOMMON_LOGGER_ENDL; 00113 dtn_dht_close_sockets(&_context); 00114 return; 00115 } 00116 IBRCOMMON_LOGGER_DEBUG(50) << "Sockets for DHT initialized" 00117 <<IBRCOMMON_LOGGER_ENDL; 00118 if (!_config.isBlacklistEnabled()) { 00119 IBRCOMMON_LOGGER_DEBUG(50) << "DHT Blacklist disabled" 00120 <<IBRCOMMON_LOGGER_ENDL; 00121 dtn_dht_blacklist(0); 00122 } else { 00123 IBRCOMMON_LOGGER_DEBUG(50) << "DHT Blacklist enabled" 00124 <<IBRCOMMON_LOGGER_ENDL; 00125 } 00126 { 00127 ibrcommon::MutexLock l(this->_libmutex); 00128 rc = dtn_dht_init(&_context); 00129 } 00130 if (rc < 0) { 00131 IBRCOMMON_LOGGER(error) << "DHT initialization failed" 00132 << IBRCOMMON_LOGGER_ENDL; 00133 return; 00134 } 00135 IBRCOMMON_LOGGER(info) << "DHT initialized on Port: " << _context.port 00136 << " with ID: " << myid << IBRCOMMON_LOGGER_ENDL; 00137 this->_initialized = true; 00138 } 00139 00140 void dtn::dht::DHTNameService::componentRun() { 00141 if (!this->_initialized) { 00142 IBRCOMMON_LOGGER(error) << "DHT is not initialized" 00143 <<IBRCOMMON_LOGGER_ENDL; 00144 return; 00145 } 00146 std::string cltype_; 00147 std::string port_; 00148 const std::list<dtn::daemon::Configuration::NetConfig> 00149 &nets = 00150 dtn::daemon::Configuration::getInstance().getNetwork().getInterfaces(); 00151 00152 // for every available interface, build the correct struct 00153 for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter = 00154 nets.begin(); iter != nets.end(); iter++) { 00155 try { 00156 cltype_ = getConvergenceLayerName((*iter)); 00157 std::stringstream ss; 00158 ss << (*iter).port; 00159 port_ = ss.str(); 00160 struct dtn_convergence_layer * clstruct = 00161 (struct dtn_convergence_layer*) malloc( 00162 sizeof(struct dtn_convergence_layer)); 00163 clstruct->clname = (char*) malloc(cltype_.size()); 00164 clstruct-> clnamelen = cltype_.size(); 00165 memcpy(clstruct->clname, cltype_.c_str(), cltype_.size()); 00166 struct dtn_convergence_layer_arg * arg = 00167 (struct dtn_convergence_layer_arg*) malloc( 00168 sizeof(struct dtn_convergence_layer_arg)); 00169 arg->key = (char*) malloc(5); 00170 arg->key[4] = '\n'; 00171 memcpy(arg->key, "port", 4); 00172 arg->keylen = 4; 00173 arg->value = (char*) malloc(port_.size()); 00174 memcpy(arg->value, port_.c_str(), port_.size()); 00175 arg->valuelen = port_.size(); 00176 arg->next = NULL; 00177 clstruct->args = arg; 00178 clstruct->next = _context.clayer; 00179 _context.clayer = clstruct; 00180 } catch (const std::exception&) { 00181 continue; 00182 } 00183 } 00184 00185 // Bootstrapping the DHT 00186 int rc, numberOfRandomRequests = 0; 00187 bindEvent(dtn::routing::QueueBundleEvent::className); 00188 bindEvent(dtn::core::NodeEvent::className); 00189 00190 // DHT main loop 00191 time_t tosleep = 0; 00192 struct sockaddr_storage from; 00193 socklen_t fromlen; 00194 while (!this->_exiting) { 00195 if (this->_foundNodes == 0) { 00196 bootstrapping(); 00197 } 00198 //Announce myself 00199 if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) { 00200 this->_announced = true; 00201 if (!_config.isSelfAnnouncingEnabled()) 00202 return; 00203 announce(dtn::core::BundleCore::local, SINGLETON); 00204 // Read all unknown EIDs and lookup them 00205 dtn::core::BundleCore &core = dtn::core::BundleCore::getInstance(); 00206 dtn::storage::BundleStorage & storage = core.getStorage(); 00207 std::set<dtn::data::EID> eids_ = storage.getDistinctDestinations(); 00208 std::set<dtn::data::EID>::iterator iterator; 00209 for (iterator = eids_.begin(); iterator != eids_.end(); iterator++) { 00210 lookup(*iterator); 00211 } 00212 std::set<dtn::core::Node> neighbours_ = core.getNeighbors(); 00213 std::set<dtn::core::Node>::iterator neighbouriterator; 00214 for (neighbouriterator = neighbours_.begin(); neighbouriterator 00215 != neighbours_.end(); neighbouriterator++) { 00216 if (isNeighbourAnnouncable(*neighbouriterator)) 00217 announce(*neighbouriterator, NEIGHBOUR); 00218 } 00219 while (!this->cachedLookups.empty()) { 00220 lookup(this->cachedLookups.front()); 00221 this->cachedLookups.pop_front(); 00222 } 00223 } 00224 // Faster Bootstrapping by searching for random Keys 00225 if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work( 00226 &_context) <= 2 && numberOfRandomRequests < 40) { 00227 dtn_dht_start_random_lookup(&_context); 00228 numberOfRandomRequests++; 00229 } 00230 struct timeval tv; 00231 fd_set readfds; 00232 tv.tv_sec = tosleep; 00233 tv.tv_usec = random() % 1000000; 00234 int high_fd = _interrupt_pipe[0]; 00235 FD_ZERO(&readfds); 00236 FD_SET(_interrupt_pipe[0], &readfds); 00237 if (_context.ipv4socket >= 0) { 00238 FD_SET(_context.ipv4socket, &readfds); 00239 high_fd = max(high_fd, _context.ipv4socket); 00240 } 00241 if (_context.ipv6socket >= 0) { 00242 FD_SET(_context.ipv6socket, &readfds); 00243 high_fd = max(high_fd, _context.ipv6socket); 00244 } 00245 rc = select(high_fd + 1, &readfds, NULL, NULL, &tv); 00246 if (rc < 0) { 00247 if (errno != EINTR) { 00248 IBRCOMMON_LOGGER(error) 00249 << "select of DHT Sockets failed with error: " 00250 << errno << IBRCOMMON_LOGGER_ENDL; 00251 sleep(1); 00252 } 00253 } 00254 if (FD_ISSET(_interrupt_pipe[0], &readfds)) { 00255 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" 00256 << IBRCOMMON_LOGGER_ENDL; 00257 // this was an interrupt with the self-pipe-trick 00258 char buf[2]; 00259 int read = ::read(_interrupt_pipe[0], buf, 2); 00260 if (read <= 2 || _exiting) 00261 break; 00262 } 00263 if (rc > 0) { 00264 fromlen = sizeof(from); 00265 if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket, 00266 &readfds)) 00267 rc = recvfrom(_context.ipv4socket, _buf, sizeof(_buf) - 1, 0, 00268 (struct sockaddr*) &from, &fromlen); 00269 else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket, 00270 &readfds)) 00271 rc = recvfrom(_context.ipv6socket, _buf, sizeof(_buf) - 1, 0, 00272 (struct sockaddr*) &from, &fromlen); 00273 } 00274 if (rc > 0) { 00275 _buf[rc] = '\0'; 00276 { 00277 ibrcommon::MutexLock l(this->_libmutex); 00278 rc = dtn_dht_periodic(&_context, _buf, rc, 00279 (struct sockaddr*) &from, fromlen, &tosleep); 00280 } 00281 } else { 00282 { 00283 ibrcommon::MutexLock l(this->_libmutex); 00284 rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep); 00285 } 00286 } 00287 int numberOfHosts = 0; 00288 int numberOfGoodHosts = 0; 00289 int numberOfGood6Hosts = 0; 00290 unsigned int numberOfBlocksHosts = 0; 00291 unsigned int numberOfBlocksHostsIPv4 = 0; 00292 unsigned int numberOfBlocksHostsIPv6 = 0; 00293 { 00294 ibrcommon::MutexLock l(this->_libmutex); 00295 if (_context.ipv4socket >= 0) 00296 numberOfHosts 00297 = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL); 00298 if (_context.ipv6socket >= 0) 00299 numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts, 00300 NULL); 00301 numberOfBlocksHosts = dtn_dht_blacklisted_nodes( 00302 &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6); 00303 } 00304 if (this->_foundNodes != numberOfHosts) { 00305 if (_config.isBlacklistEnabled()) { 00306 IBRCOMMON_LOGGER(info) << "DHT Nodes available: " 00307 << numberOfHosts << "(Good:" << numberOfGoodHosts 00308 << "+" << numberOfGood6Hosts << ") Blocked: " 00309 << numberOfBlocksHosts << "(" 00310 << numberOfBlocksHostsIPv4 << "+" 00311 << numberOfBlocksHostsIPv6 << ")" 00312 << IBRCOMMON_LOGGER_ENDL; 00313 00314 } else { 00315 IBRCOMMON_LOGGER(info) << "DHT Nodes available: " 00316 << numberOfHosts << "(Good:" << numberOfGoodHosts 00317 << "+" << numberOfGood6Hosts << ")" 00318 << IBRCOMMON_LOGGER_ENDL; 00319 00320 } 00321 this->_foundNodes = numberOfHosts; 00322 } 00323 if (rc < 0) { 00324 if (errno == EINTR) { 00325 continue; 00326 } else { 00327 IBRCOMMON_LOGGER(error) << "dtn_dht_periodic failed" 00328 << IBRCOMMON_LOGGER_ENDL; 00329 if (rc == EINVAL || rc == EFAULT) { 00330 IBRCOMMON_LOGGER(error) << "DHT failed -> stopping DHT" 00331 << IBRCOMMON_LOGGER_ENDL; 00332 break; 00333 } 00334 tosleep = 1; 00335 } 00336 } 00337 } 00338 unbindEvent(dtn::routing::QueueBundleEvent::className); 00339 unbindEvent(dtn::core::NodeEvent::className); 00340 this->_initialized = false; 00341 if (_config.getPathToNodeFiles().size() > 0) { 00342 ibrcommon::MutexLock l(this->_libmutex); 00343 int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str()); 00344 if (save != 0) { 00345 IBRCOMMON_LOGGER(warning) << "DHT could not save nodes" 00346 << IBRCOMMON_LOGGER_ENDL; 00347 } else { 00348 IBRCOMMON_LOGGER_DEBUG(25) << "DHT saved nodes" 00349 << IBRCOMMON_LOGGER_ENDL; 00350 } 00351 } 00352 if (this->_announced && _config.isSelfAnnouncingEnabled()) 00353 deannounce(dtn::core::BundleCore::local); 00354 dtn_dht_uninit(); 00355 IBRCOMMON_LOGGER(info) << "DHT shut down" << IBRCOMMON_LOGGER_ENDL; 00356 // Closes all sockets of the DHT 00357 dtn_dht_close_sockets(&_context); 00358 00359 dtn_dht_free_convergence_layer_struct(_context.clayer); 00360 00361 IBRCOMMON_LOGGER_DEBUG(25) << "DHT sockets are closed" 00362 << IBRCOMMON_LOGGER_ENDL; 00363 ::close(_interrupt_pipe[0]); 00364 ::close(_interrupt_pipe[1]); 00365 } 00366 00367 void dtn::dht::DHTNameService::componentDown() { 00368 this->_exiting = true; 00369 IBRCOMMON_LOGGER_DEBUG(25) << "DHT will be shut down" 00370 << IBRCOMMON_LOGGER_ENDL; 00371 ssize_t written = ::write(_interrupt_pipe[1], "i", 1); 00372 if (written < 1) { 00373 IBRCOMMON_LOGGER_DEBUG(25) << "DHT pipeline trick failed" 00374 << IBRCOMMON_LOGGER_ENDL; 00375 } 00376 } 00377 00378 void dtn::dht::DHTNameService::lookup(const dtn::data::EID &eid) { 00379 if (dtn::core::BundleCore::local != eid.getNode()) { 00380 if (this->_announced) { 00381 std::string eid_ = eid.getNode().getString(); 00382 IBRCOMMON_LOGGER_DEBUG(30) << "DHT Lookup: " << eid_ 00383 << IBRCOMMON_LOGGER_ENDL; 00384 ibrcommon::MutexLock l(this->_libmutex); 00385 dtn_dht_lookup(&_context, eid_.c_str(), eid_.size()); 00386 } else { 00387 this->cachedLookups.push_front(eid); 00388 } 00389 } 00390 } 00391 00395 void dtn::dht::DHTNameService::announce(const dtn::core::Node &n, 00396 enum dtn_dht_lookup_type type) { 00397 if (this->_announced) { 00398 std::string eid_ = n.getEID().getNode().getString(); 00399 ibrcommon::MutexLock l(this->_libmutex); 00400 int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type); 00401 if (rc > 0) { 00402 IBRCOMMON_LOGGER(info) << "DHT Announcing: " << eid_ 00403 << IBRCOMMON_LOGGER_ENDL; 00404 } 00405 } 00406 } 00407 00408 bool dtn::dht::DHTNameService::isNeighbourAnnouncable( 00409 const dtn::core::Node &node) { 00410 if (!_config.isNeighbourAnnouncementEnabled()) 00411 return false; 00412 00413 // get the merged node object 00414 const dtn::core::Node n = dtn::core::BundleCore::getInstance().getNeighbor( 00415 node.getEID()); 00416 00417 // Ignore all none discovered and none static nodes 00418 // They could only be discovered by the DHT, 00419 // so they are not really close neighbours and could be found directly in the DHT. 00420 // This prevents the node to be announced by all neighbours, how found this node 00421 std::set<dtn::core::Node::Type> types = n.getTypes(); 00422 if (types.find(dtn::core::Node::NODE_DISCOVERED) == types.end() 00423 && (types.find(dtn::core::Node::NODE_STATIC) == types.end())) { 00424 return false; 00425 } 00426 // Proof, if the neighbour has told us, that he don't want to be published on the DHT 00427 std::list<dtn::core::Node::Attribute> services = n.get("dhtns"); 00428 if (!services.empty()) { 00429 for (std::list<dtn::core::Node::Attribute>::const_iterator service = 00430 services.begin(); service != services.end(); service++) { 00431 bool proxy = true; 00432 std::vector < string > parameters = dtn::utils::Utils::tokenize( 00433 ";", (*service).value); 00434 std::vector<string>::const_iterator param_iter = parameters.begin(); 00435 00436 while (param_iter != parameters.end()) { 00437 std::vector < string > p = dtn::utils::Utils::tokenize("=", 00438 (*param_iter)); 00439 if (p[0].compare("proxy") == 0) { 00440 std::stringstream proxy_stream; 00441 proxy_stream << p[1]; 00442 proxy_stream >> proxy; 00443 } 00444 param_iter++; 00445 } 00446 if (!proxy) 00447 return false; 00448 } 00449 } 00450 return true; 00451 } 00452 00453 void dtn::dht::DHTNameService::deannounce(const dtn::core::Node &n) { 00454 std::string eid_ = n.getEID().getNode().getString(); 00455 ibrcommon::MutexLock l(this->_libmutex); 00456 dtn_dht_deannounce(eid_.c_str(), eid_.size()); 00457 IBRCOMMON_LOGGER_DEBUG(25) << "DHT Deannounced: " << eid_ 00458 << IBRCOMMON_LOGGER_ENDL; 00459 } 00460 00461 std::string dtn::dht::DHTNameService::getConvergenceLayerName( 00462 const dtn::daemon::Configuration::NetConfig &net) { 00463 std::string cltype_ = ""; 00464 switch (net.type) { 00465 case dtn::daemon::Configuration::NetConfig::NETWORK_TCP: 00466 cltype_ = "tcp"; 00467 break; 00468 case dtn::daemon::Configuration::NetConfig::NETWORK_UDP: 00469 cltype_ = "udp"; 00470 break; 00471 // case dtn::daemon::Configuration::NetConfig::NETWORK_HTTP: 00472 // cltype_ = "http"; 00473 // break; 00474 default: 00475 throw ibrcommon::Exception("type of convergence layer not supported"); 00476 } 00477 return cltype_; 00478 } 00479 00480 void dtn::dht::DHTNameService::raiseEvent(const dtn::core::Event *evt) { 00481 try { 00482 const dtn::routing::QueueBundleEvent &event = 00483 dynamic_cast<const dtn::routing::QueueBundleEvent&> (*evt); 00484 dtn::data::EID none; 00485 if (event.bundle.destination != dtn::core::BundleCore::local.getNode() 00486 && event.bundle.destination != none) { 00487 lookup(event.bundle.destination); 00488 } 00489 } catch (const std::bad_cast&) { 00490 }; 00491 try { 00492 const dtn::core::NodeEvent &nodeevent = 00493 dynamic_cast<const dtn::core::NodeEvent&> (*evt); 00494 const dtn::core::Node &n = nodeevent.getNode(); 00495 if (n.getEID() != dtn::core::BundleCore::local.getNode()) { 00496 switch (nodeevent.getAction()) { 00497 case NODE_AVAILABLE: 00498 case NODE_INFO_UPDATED: 00499 if (isNeighbourAnnouncable(n)) 00500 announce(n, NEIGHBOUR); 00501 pingNode(n); 00502 break; 00503 case NODE_UNAVAILABLE: 00504 deannounce(n); 00505 default: 00506 break; 00507 } 00508 } 00509 } catch (const std::bad_cast&) { 00510 }; 00511 } 00512 00513 void dtn::dht::DHTNameService::pingNode(const dtn::core::Node &n) { 00514 int rc; 00515 std::list<dtn::core::Node::Attribute> services = n.get("dhtns"); 00516 std::string address = "0.0.0.0"; 00517 unsigned int port = 9999; 00518 if (!services.empty()) { 00519 for (std::list<dtn::core::Node::Attribute>::const_iterator service = 00520 services.begin(); service != services.end(); service++) { 00521 std::vector < string > parameters = dtn::utils::Utils::tokenize( 00522 ";", (*service).value); 00523 std::vector<string>::const_iterator param_iter = parameters.begin(); 00524 bool portFound = false; 00525 while (param_iter != parameters.end()) { 00526 std::vector < string > p = dtn::utils::Utils::tokenize("=", 00527 (*param_iter)); 00528 if (p[0].compare("port") == 0) { 00529 std::stringstream port_stream; 00530 port_stream << p[1]; 00531 port_stream >> port; 00532 portFound = true; 00533 } 00534 param_iter++; 00535 } 00536 // Do not ping the node, if he doesn't tell us, which port he has 00537 if(!portFound){ 00538 continue; 00539 } 00540 IBRCOMMON_LOGGER_DEBUG(25) << "trying to ping node " 00541 << n.toString() << IBRCOMMON_LOGGER_ENDL; 00542 const std::list<std::string> ips = getAllIPAddresses(n); 00543 IBRCOMMON_LOGGER_DEBUG(25) << ips.size() << " IP Addresses found!" 00544 << IBRCOMMON_LOGGER_ENDL; 00545 std::string lastip = ""; 00546 for (std::list<std::string>::const_iterator iter = ips.begin(); iter 00547 != ips.end(); iter++) { 00548 const std::string ip = (*iter); 00549 // Ignoring double existence of ip's 00550 if (ip == lastip) { 00551 continue; 00552 } 00553 // decode address and port 00554 struct sockaddr_in sin; 00555 memset(&sin, 0, sizeof(sin)); 00556 sin.sin_family = AF_INET; 00557 sin.sin_port = htons(port); 00558 IBRCOMMON_LOGGER_DEBUG(26) << " --- Using IP address: " << ip 00559 <<IBRCOMMON_LOGGER_ENDL; 00560 rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr)); 00561 if (rc == 1) { 00562 IBRCOMMON_LOGGER_DEBUG(26) << "Pinging node " 00563 << n.toString() << " on " << ip << ":" << port 00564 << IBRCOMMON_LOGGER_ENDL; 00565 ibrcommon::MutexLock l(this->_libmutex); 00566 dtn_dht_ping_node((struct sockaddr*) &sin, sizeof(sin)); 00567 } else { 00568 IBRCOMMON_LOGGER(error) << " --- ERROR pton: " << rc 00569 <<IBRCOMMON_LOGGER_ENDL; 00570 } 00571 lastip = ip; 00572 } 00573 } 00574 } 00575 } 00576 00577 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses( 00578 const dtn::core::Node &n) { 00579 std::string address = "0.0.0.0"; 00580 unsigned int port = 0; 00581 std::list < std::string > ret; 00582 const std::list<dtn::core::Node::URI> uri_list = n.get( 00583 dtn::core::Node::CONN_TCPIP); 00584 for (std::list<dtn::core::Node::URI>::const_iterator iter = 00585 uri_list.begin(); iter != uri_list.end(); iter++) { 00586 const dtn::core::Node::URI &uri = (*iter); 00587 uri.decode(address, port); 00588 ret.push_front(address); 00589 } 00590 const std::list<dtn::core::Node::URI> udp_uri_list = n.get( 00591 dtn::core::Node::CONN_UDPIP); 00592 for (std::list<dtn::core::Node::URI>::const_iterator iter = 00593 udp_uri_list.begin(); iter != udp_uri_list.end(); iter++) { 00594 const dtn::core::Node::URI &udp_uri = (*iter); 00595 udp_uri.decode(address, port); 00596 ret.push_front(address); 00597 } 00598 return ret; 00599 } 00600 00601 void dtn::dht::DHTNameService::bootstrapping() { 00602 if (this->_bootstrapped > time(NULL) + 30) { 00603 return; 00604 } 00605 this->_bootstrapped = time(NULL); 00606 if (_config.getPathToNodeFiles().size() > 0) { 00607 bootstrappingFile(); 00608 } 00609 if (_config.isDNSBootstrappingEnabled()) { 00610 bootstrappingDNS(); 00611 } 00612 if (_config.isIPBootstrappingEnabled()) { 00613 bootstrappingIPs(); 00614 } 00615 } 00616 00617 void dtn::dht::DHTNameService::bootstrappingFile() { 00618 int rc; 00619 { 00620 ibrcommon::MutexLock l(this->_libmutex); 00621 rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str()); 00622 } 00623 if (rc != 0) { 00624 IBRCOMMON_LOGGER(warning) << "DHT loading of saved nodes failed" 00625 << IBRCOMMON_LOGGER_ENDL; 00626 } else { 00627 IBRCOMMON_LOGGER_DEBUG(25) << "DHT loading of saved nodes done" 00628 << IBRCOMMON_LOGGER_ENDL; 00629 } 00630 } 00631 00632 void dtn::dht::DHTNameService::bootstrappingDNS() { 00633 int rc; 00634 std::vector < string > dns = _config.getDNSBootstrappingNames(); 00635 if (dns.size() > 0) { 00636 std::vector<string>::const_iterator dns_iter = dns.begin(); 00637 while (dns_iter != dns.end()) { 00638 const string &dn = (*dns_iter); 00639 { 00640 ibrcommon::MutexLock l(this->_libmutex); 00641 rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL); 00642 } 00643 if (rc != 0) { 00644 IBRCOMMON_LOGGER(warning) << "bootstrapping from domain " << dn 00645 << " failed with error: " << rc 00646 << IBRCOMMON_LOGGER_ENDL; 00647 } else { 00648 IBRCOMMON_LOGGER_DEBUG(25) 00649 << "DHT Bootstrapping done for domain" << dn 00650 << IBRCOMMON_LOGGER_ENDL; 00651 } 00652 dns_iter++; 00653 } 00654 } else { 00655 { 00656 ibrcommon::MutexLock l(this->_libmutex); 00657 rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL); 00658 } 00659 if (rc != 0) { 00660 IBRCOMMON_LOGGER(warning) 00661 << "bootstrapping from default domain failed with error: " 00662 << rc << IBRCOMMON_LOGGER_ENDL; 00663 } else { 00664 IBRCOMMON_LOGGER_DEBUG(25) 00665 << "DHT Bootstrapping done for default domain" 00666 << IBRCOMMON_LOGGER_ENDL; 00667 } 00668 } 00669 00670 } 00671 00672 void dtn::dht::DHTNameService::bootstrappingIPs() { 00673 int rc; 00674 std::vector < string > ips = _config.getIPBootstrappingIPs(); 00675 std::vector<string>::const_iterator ip_iter = ips.begin(); 00676 while (ip_iter != ips.end()) { 00677 std::vector < string > ip 00678 = dtn::utils::Utils::tokenize(" ", (*ip_iter)); 00679 int size, ipversion = AF_INET; 00680 struct sockaddr *wellknown_node; 00681 struct sockaddr_in sin; 00682 struct sockaddr_in6 sin6; 00683 memset(&sin, 0, sizeof(sin)); 00684 int port = 9999; 00685 switch (ip.size()) { 00686 case 2: 00687 port = atoi(ip[1].c_str()); 00688 case 1: 00689 rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr)); 00690 if (rc <= 0) { 00691 ipversion = AF_INET6; 00692 rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr)); 00693 if (rc <= 0) { 00694 break; 00695 } else { 00696 sin6.sin6_family = ipversion; 00697 sin6.sin6_port = htons(port); 00698 size = sizeof(sin6); 00699 wellknown_node = (struct sockaddr *) &sin6; 00700 } 00701 } else { 00702 sin.sin_family = ipversion; 00703 sin.sin_port = htons(port); 00704 size = sizeof(sin); 00705 wellknown_node = (struct sockaddr *) &sin; 00706 } 00707 if (rc == 1) { 00708 ibrcommon::MutexLock l(this->_libmutex); 00709 dtn_dht_ping_node(wellknown_node, size); 00710 } 00711 break; 00712 default: 00713 break; 00714 } 00715 ip_iter++; 00716 } 00717 } 00718 00719 // TODO Nur für Interfaces zulassen, auf denen ich gebunden bin! 00720 00721 void dtn::dht::DHTNameService::update(const ibrcommon::vinterface &iface, 00722 std::string &name, std::string ¶ms) 00723 throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException) { 00724 if (this->_initialized) { 00725 name = "dhtns"; 00726 stringstream service; 00727 service << "port=" << this->_context.port << ";"; 00728 if (!this->_config.isNeighbourAllowedToAnnounceMe()) { 00729 service << "proxy=false;"; 00730 } 00731 params = service.str(); 00732 } else { 00733 if(!this->_config.isNeighbourAllowedToAnnounceMe()) { 00734 name = "dhtns"; 00735 params = "proxy=false;"; 00736 } else { 00737 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException(); 00738 } 00739 } 00740 } 00741 00742 void dtn_dht_handle_lookup_result(const struct dtn_dht_lookup_result *result) { 00743 if (result == NULL || result->eid == NULL || result->clayer == NULL) { 00744 return; 00745 } 00746 00747 // Extracting the EID of the answering node 00748 std::string eid__; 00749 std::string clname__; 00750 struct dtn_eid * eid = result->eid; 00751 unsigned int i; 00752 for (i = 0; i < eid->eidlen; i++) { 00753 eid__ += eid->eid[i]; 00754 } 00755 00756 // Extracting the convergence layer of the node 00757 stringstream ss; 00758 std::string cl__; 00759 struct dtn_convergence_layer * cl = result->clayer; 00760 if (cl == NULL) 00761 return; 00762 dtn::core::Node node(eid__); 00763 ss << "Adding Node " << eid__ << ": "; 00764 // Iterating over the list of convergence layer 00765 bool hasCL = false; 00766 while (cl != NULL) { 00767 enum Node::Protocol proto__; 00768 stringstream service; 00769 clname__ = ""; 00770 for (i = 0; i < cl->clnamelen; i++) { 00771 clname__ += cl->clname[i]; 00772 } 00773 if (clname__ == "tcp") { 00774 proto__ = Node::CONN_TCPIP; 00775 } else if (clname__ == "udp") { 00776 proto__ = Node::CONN_UDPIP; 00777 } else { 00778 proto__ = Node::CONN_UNDEFINED; 00779 //TODO find the right string to be added to service string 00780 } 00781 // Extracting the arguments of the convergence layer 00782 struct dtn_convergence_layer_arg * arg = cl->args; 00783 std::string argstring__; 00784 while (arg != NULL) { 00785 if (arg->keylen <= 0 || arg->valuelen <= 0) { 00786 arg = arg->next; 00787 continue; 00788 } 00789 argstring__ = ""; 00790 for (i = 0; i < arg->keylen; i++) { 00791 argstring__ += arg->key[i]; 00792 } 00793 service << argstring__ << "="; 00794 argstring__ = ""; 00795 for (i = 0; i < arg->valuelen && arg->value[i] != '\0'; i++) { 00796 argstring__ += arg->value[i]; 00797 } 00798 service << argstring__ << ";"; 00799 arg = arg->next; 00800 } 00801 ss << clname__ << "(" << service.str() << ") "; 00802 // Add the found convergence layer to the node object 00803 node.add( 00804 Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(), 00805 DHT_RESULTS_EXPIRE_TIMEOUT, 00806 DHT_DISCOVERED_NODE_PRIORITY)); 00807 hasCL = true; 00808 cl = cl->next; 00809 } 00810 00811 // Adding the new node to the BundleCore to make it available to the daemon 00812 dtn::core::BundleCore &core = dtn::core::BundleCore::getInstance(); 00813 if (hasCL) { 00814 IBRCOMMON_LOGGER(info) << ss.str() << IBRCOMMON_LOGGER_ENDL; 00815 core.addConnection(node); 00816 } 00817 00818 // Extracting all neighbours of the new node if usage is allowed 00819 if (!dtn::daemon::Configuration::getInstance().getDHT().ignoreDHTNeighbourInformations()) { 00820 struct dtn_eid * neighbour = result->neighbours; 00821 std::string neighbour__; 00822 while (neighbour) { 00823 neighbour__ = ""; 00824 for (i = 0; i < neighbour->eidlen; i++) { 00825 neighbour__ += neighbour->eid[i]; 00826 } 00827 dtn::data::EID n(neighbour__); 00828 dtn::core::Node neighbourNode(n); 00829 if (dtn::core::BundleCore::local != n.getNode() 00830 && !core.isNeighbor(n)) { 00831 dtn::core::BundleCore::getInstance().addRoute(n, node.getEID(), 00832 DHT_PATH_EXPIRE_TIMEOUT); 00833 } 00834 neighbour = neighbour->next; 00835 } 00836 } 00837 //TODO GROUP HANDLING SHOULD BE IMPLEMENTED HERE!!! 00838 struct dtn_eid * group = result->groups; 00839 while (group) { 00840 group = group->next; 00841 } 00842 return; 00843 } 00844 00845 // This function is called, if the DHT has done a search or announcement. 00846 // At this moment, it is just ignored 00847 void dtn_dht_operation_done(const unsigned char *info_hash){ 00848 return; 00849 }