IBR-DTNSuite  0.8
daemon/src/net/DHTNameService.cpp
Go to the documentation of this file.
00001 /*
00002  * DHTNameService.cpp
00003  *
00004  *  Created on: 21.11.2011
00005  *      Author: Till Lorentzen
00006  */
00007 
00008 #include "config.h"
00009 
00010 #include "DHTNameService.h"
00011 #include "ibrcommon/net/vsocket.h"
00012 #include "core/BundleCore.h"
00013 #include "core/NodeEvent.h"
00014 #include "routing/QueueBundleEvent.h"
00015 
00016 #include <ibrdtn/utils/Utils.h>
00017 
00018 #include <ibrcommon/Logger.h>
00019 #include <ibrcommon/thread/MutexLock.h>
00020 
00021 #include <sys/socket.h>
00022 #include <arpa/inet.h>
00023 #include <set>
00024 #include <cstdlib>
00025 #include <sys/time.h>
00026 #include <fcntl.h>
00027 #include <string.h>
00028 
00029 dtn::dht::DHTNameService::DHTNameService() :
00030         _exiting(false), _initialized(false), _announced(false), _foundNodes(-1),
00031                         _bootstrapped(0),
00032                         _config(daemon::Configuration::getInstance().getDHT()) {
00033 }
00034 
00035 dtn::dht::DHTNameService::~DHTNameService() {
00036         join();
00037 }
00038 
00039 const std::string dtn::dht::DHTNameService::getName() const {
00040         return "DHT Naming Service";
00041 }
00042 
00043 void dtn::dht::DHTNameService::__cancellation() {
00044 }
00045 
00046 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() {
00047         for (int i = 0; i < 2; i++) {
00048                 int opts;
00049                 opts = fcntl(_interrupt_pipe[i], F_GETFL);
00050                 if (opts < 0) {
00051                         return false;
00052                 }
00053                 opts |= O_NONBLOCK;
00054                 if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) {
00055                         return false;
00056                 }
00057         }
00058         return true;
00059 }
00060 
00061 void dtn::dht::DHTNameService::componentUp() {
00062         std::string eid = dtn::core::BundleCore::local.getNode().getString();
00063         // creating interrupt pipe
00064         if (pipe(_interrupt_pipe) < 0) {
00065                 IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe"
00066                                         << IBRCOMMON_LOGGER_ENDL;
00067                 return;
00068         }
00069         // set the pipe to non-blocking
00070         if (!this->setNonBlockingInterruptPipe()) {
00071                 IBRCOMMON_LOGGER(error) << "Error " << errno
00072                                         << " setting pipe to non-blocking mode"
00073                                         << IBRCOMMON_LOGGER_ENDL;
00074                 return;
00075         }
00076         // init DHT
00077         int rc = dtn_dht_initstruct(&_context);
00078         if (rc < 0) {
00079                 IBRCOMMON_LOGGER(error) << "DHT Context creation failed: " << rc
00080                                         << IBRCOMMON_LOGGER_ENDL;
00081                 return;
00082         }
00083         if (_config.randomPort()) {
00084                 srand( time(NULL));
00085                 _context.port = rand() % 64000 + 1024;
00086         } else {
00087                 _context.port = _config.getPort();
00088         }
00089         if (_config.getIPv4Binding().size() > 0) {
00090                 _context.bind = _config.getIPv4Binding().c_str();
00091         }
00092         if (_config.getIPv6Binding().size() > 0) {
00093                 _context.bind6 = _config.getIPv6Binding().c_str();
00094         }
00095         if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) {
00096                 _context.type = BINDBOTH;
00097         } else if (_config.isIPv4Enabled()) {
00098                 _context.type = IPV4ONLY;
00099         } else if (_config.isIPv6Enabled()) {
00100                 _context.type = IPV6ONLY;
00101         } else {
00102                 _context.type = BINDNONE;
00103         }
00104         string myid = _config.getID();
00105         if (!_config.randomID()) {
00106                 dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size());
00107         }
00108 
00109         rc = dtn_dht_init_sockets(&_context);
00110         if (rc != 0) {
00111                 IBRCOMMON_LOGGER(error) << "DHT Sockets couldn't be initialized"
00112                                         <<IBRCOMMON_LOGGER_ENDL;
00113                 dtn_dht_close_sockets(&_context);
00114                 return;
00115         }
00116         IBRCOMMON_LOGGER_DEBUG(50) << "Sockets for DHT initialized"
00117                                 <<IBRCOMMON_LOGGER_ENDL;
00118         if (!_config.isBlacklistEnabled()) {
00119                 IBRCOMMON_LOGGER_DEBUG(50) << "DHT Blacklist disabled"
00120                                         <<IBRCOMMON_LOGGER_ENDL;
00121                 dtn_dht_blacklist(0);
00122         } else {
00123                 IBRCOMMON_LOGGER_DEBUG(50) << "DHT Blacklist enabled"
00124                                         <<IBRCOMMON_LOGGER_ENDL;
00125         }
00126         {
00127                 ibrcommon::MutexLock l(this->_libmutex);
00128                 rc = dtn_dht_init(&_context);
00129         }
00130         if (rc < 0) {
00131                 IBRCOMMON_LOGGER(error) << "DHT initialization failed"
00132                                         << IBRCOMMON_LOGGER_ENDL;
00133                 return;
00134         }
00135         IBRCOMMON_LOGGER(info) << "DHT initialized on Port: " << _context.port
00136                                 << " with ID: " << myid << IBRCOMMON_LOGGER_ENDL;
00137         this->_initialized = true;
00138 }
00139 
00140 void dtn::dht::DHTNameService::componentRun() {
00141         if (!this->_initialized) {
00142                 IBRCOMMON_LOGGER(error) << "DHT is not initialized"
00143                                         <<IBRCOMMON_LOGGER_ENDL;
00144                 return;
00145         }
00146         std::string cltype_;
00147         std::string port_;
00148         const std::list<dtn::daemon::Configuration::NetConfig>
00149                         &nets =
00150                                         dtn::daemon::Configuration::getInstance().getNetwork().getInterfaces();
00151 
00152         // for every available interface, build the correct struct
00153         for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter =
00154                         nets.begin(); iter != nets.end(); iter++) {
00155                 try {
00156                         cltype_ = getConvergenceLayerName((*iter));
00157                         std::stringstream ss;
00158                         ss << (*iter).port;
00159                         port_ = ss.str();
00160                         struct dtn_convergence_layer * clstruct =
00161                                         (struct dtn_convergence_layer*) malloc(
00162                                                         sizeof(struct dtn_convergence_layer));
00163                         clstruct->clname = (char*) malloc(cltype_.size());
00164                         clstruct-> clnamelen = cltype_.size();
00165                         memcpy(clstruct->clname, cltype_.c_str(), cltype_.size());
00166                         struct dtn_convergence_layer_arg * arg =
00167                                         (struct dtn_convergence_layer_arg*) malloc(
00168                                                         sizeof(struct dtn_convergence_layer_arg));
00169                         arg->key = (char*) malloc(5);
00170                         arg->key[4] = '\n';
00171                         memcpy(arg->key, "port", 4);
00172                         arg->keylen = 4;
00173                         arg->value = (char*) malloc(port_.size());
00174                         memcpy(arg->value, port_.c_str(), port_.size());
00175                         arg->valuelen = port_.size();
00176                         arg->next = NULL;
00177                         clstruct->args = arg;
00178                         clstruct->next = _context.clayer;
00179                         _context.clayer = clstruct;
00180                 } catch (const std::exception&) {
00181                         continue;
00182                 }
00183         }
00184 
00185         // Bootstrapping the DHT
00186         int rc, numberOfRandomRequests = 0;
00187         bindEvent(dtn::routing::QueueBundleEvent::className);
00188         bindEvent(dtn::core::NodeEvent::className);
00189 
00190         // DHT main loop
00191         time_t tosleep = 0;
00192         struct sockaddr_storage from;
00193         socklen_t fromlen;
00194         while (!this->_exiting) {
00195                 if (this->_foundNodes == 0) {
00196                         bootstrapping();
00197                 }
00198                 //Announce myself
00199                 if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) {
00200                         this->_announced = true;
00201                         if (!_config.isSelfAnnouncingEnabled())
00202                                 return;
00203                         announce(dtn::core::BundleCore::local, SINGLETON);
00204                         // Read all unknown EIDs and lookup them
00205                         dtn::core::BundleCore &core = dtn::core::BundleCore::getInstance();
00206                         dtn::storage::BundleStorage & storage = core.getStorage();
00207                         std::set<dtn::data::EID> eids_ = storage.getDistinctDestinations();
00208                         std::set<dtn::data::EID>::iterator iterator;
00209                         for (iterator = eids_.begin(); iterator != eids_.end(); iterator++) {
00210                                 lookup(*iterator);
00211                         }
00212                         std::set<dtn::core::Node> neighbours_ = core.getNeighbors();
00213                         std::set<dtn::core::Node>::iterator neighbouriterator;
00214                         for (neighbouriterator = neighbours_.begin(); neighbouriterator
00215                                         != neighbours_.end(); neighbouriterator++) {
00216                                 if (isNeighbourAnnouncable(*neighbouriterator))
00217                                         announce(*neighbouriterator, NEIGHBOUR);
00218                         }
00219                         while (!this->cachedLookups.empty()) {
00220                                 lookup(this->cachedLookups.front());
00221                                 this->cachedLookups.pop_front();
00222                         }
00223                 }
00224                 // Faster Bootstrapping by searching for random Keys
00225                 if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work(
00226                                 &_context) <= 2 && numberOfRandomRequests < 40) {
00227                         dtn_dht_start_random_lookup(&_context);
00228                         numberOfRandomRequests++;
00229                 }
00230                 struct timeval tv;
00231                 fd_set readfds;
00232                 tv.tv_sec = tosleep;
00233                 tv.tv_usec = random() % 1000000;
00234                 int high_fd = _interrupt_pipe[0];
00235                 FD_ZERO(&readfds);
00236                 FD_SET(_interrupt_pipe[0], &readfds);
00237                 if (_context.ipv4socket >= 0) {
00238                         FD_SET(_context.ipv4socket, &readfds);
00239                         high_fd = max(high_fd, _context.ipv4socket);
00240                 }
00241                 if (_context.ipv6socket >= 0) {
00242                         FD_SET(_context.ipv6socket, &readfds);
00243                         high_fd = max(high_fd, _context.ipv6socket);
00244                 }
00245                 rc = select(high_fd + 1, &readfds, NULL, NULL, &tv);
00246                 if (rc < 0) {
00247                         if (errno != EINTR) {
00248                                 IBRCOMMON_LOGGER(error)
00249                                                         << "select of DHT Sockets failed with error: "
00250                                                         << errno << IBRCOMMON_LOGGER_ENDL;
00251                                 sleep(1);
00252                         }
00253                 }
00254                 if (FD_ISSET(_interrupt_pipe[0], &readfds)) {
00255                         IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick"
00256                                                 << IBRCOMMON_LOGGER_ENDL;
00257                         // this was an interrupt with the self-pipe-trick
00258                         char buf[2];
00259                         int read = ::read(_interrupt_pipe[0], buf, 2);
00260                         if (read <= 2 || _exiting)
00261                                 break;
00262                 }
00263                 if (rc > 0) {
00264                         fromlen = sizeof(from);
00265                         if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket,
00266                                         &readfds))
00267                                 rc = recvfrom(_context.ipv4socket, _buf, sizeof(_buf) - 1, 0,
00268                                                 (struct sockaddr*) &from, &fromlen);
00269                         else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket,
00270                                         &readfds))
00271                                 rc = recvfrom(_context.ipv6socket, _buf, sizeof(_buf) - 1, 0,
00272                                                 (struct sockaddr*) &from, &fromlen);
00273                 }
00274                 if (rc > 0) {
00275                         _buf[rc] = '\0';
00276                         {
00277                                 ibrcommon::MutexLock l(this->_libmutex);
00278                                 rc = dtn_dht_periodic(&_context, _buf, rc,
00279                                                 (struct sockaddr*) &from, fromlen, &tosleep);
00280                         }
00281                 } else {
00282                         {
00283                                 ibrcommon::MutexLock l(this->_libmutex);
00284                                 rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep);
00285                         }
00286                 }
00287                 int numberOfHosts = 0;
00288                 int numberOfGoodHosts = 0;
00289                 int numberOfGood6Hosts = 0;
00290                 unsigned int numberOfBlocksHosts = 0;
00291                 unsigned int numberOfBlocksHostsIPv4 = 0;
00292                 unsigned int numberOfBlocksHostsIPv6 = 0;
00293                 {
00294                         ibrcommon::MutexLock l(this->_libmutex);
00295                         if (_context.ipv4socket >= 0)
00296                                 numberOfHosts
00297                                                 = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL);
00298                         if (_context.ipv6socket >= 0)
00299                                 numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts,
00300                                                 NULL);
00301                         numberOfBlocksHosts = dtn_dht_blacklisted_nodes(
00302                                         &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6);
00303                 }
00304                 if (this->_foundNodes != numberOfHosts) {
00305                         if (_config.isBlacklistEnabled()) {
00306                                 IBRCOMMON_LOGGER(info) << "DHT Nodes available: "
00307                                                         << numberOfHosts << "(Good:" << numberOfGoodHosts
00308                                                         << "+" << numberOfGood6Hosts << ") Blocked: "
00309                                                         << numberOfBlocksHosts << "("
00310                                                         << numberOfBlocksHostsIPv4 << "+"
00311                                                         << numberOfBlocksHostsIPv6 << ")"
00312                                                         << IBRCOMMON_LOGGER_ENDL;
00313 
00314                         } else {
00315                                 IBRCOMMON_LOGGER(info) << "DHT Nodes available: "
00316                                                         << numberOfHosts << "(Good:" << numberOfGoodHosts
00317                                                         << "+" << numberOfGood6Hosts << ")"
00318                                                         << IBRCOMMON_LOGGER_ENDL;
00319 
00320                         }
00321                         this->_foundNodes = numberOfHosts;
00322                 }
00323                 if (rc < 0) {
00324                         if (errno == EINTR) {
00325                                 continue;
00326                         } else {
00327                                 IBRCOMMON_LOGGER(error) << "dtn_dht_periodic failed"
00328                                                         << IBRCOMMON_LOGGER_ENDL;
00329                                 if (rc == EINVAL || rc == EFAULT) {
00330                                         IBRCOMMON_LOGGER(error) << "DHT failed -> stopping DHT"
00331                                                                 << IBRCOMMON_LOGGER_ENDL;
00332                                         break;
00333                                 }
00334                                 tosleep = 1;
00335                         }
00336                 }
00337         }
00338         unbindEvent(dtn::routing::QueueBundleEvent::className);
00339         unbindEvent(dtn::core::NodeEvent::className);
00340         this->_initialized = false;
00341         if (_config.getPathToNodeFiles().size() > 0) {
00342                 ibrcommon::MutexLock l(this->_libmutex);
00343                 int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str());
00344                 if (save != 0) {
00345                         IBRCOMMON_LOGGER(warning) << "DHT could not save nodes"
00346                                                 << IBRCOMMON_LOGGER_ENDL;
00347                 } else {
00348                         IBRCOMMON_LOGGER_DEBUG(25) << "DHT saved nodes"
00349                                                 << IBRCOMMON_LOGGER_ENDL;
00350                 }
00351         }
00352         if (this->_announced && _config.isSelfAnnouncingEnabled())
00353                 deannounce(dtn::core::BundleCore::local);
00354         dtn_dht_uninit();
00355         IBRCOMMON_LOGGER(info) << "DHT shut down" << IBRCOMMON_LOGGER_ENDL;
00356         // Closes all sockets of the DHT
00357         dtn_dht_close_sockets(&_context);
00358 
00359         dtn_dht_free_convergence_layer_struct(_context.clayer);
00360 
00361         IBRCOMMON_LOGGER_DEBUG(25) << "DHT sockets are closed"
00362                                 << IBRCOMMON_LOGGER_ENDL;
00363         ::close(_interrupt_pipe[0]);
00364         ::close(_interrupt_pipe[1]);
00365 }
00366 
00367 void dtn::dht::DHTNameService::componentDown() {
00368         this->_exiting = true;
00369         IBRCOMMON_LOGGER_DEBUG(25) << "DHT will be shut down"
00370                                 << IBRCOMMON_LOGGER_ENDL;
00371         ssize_t written = ::write(_interrupt_pipe[1], "i", 1);
00372         if (written < 1) {
00373                 IBRCOMMON_LOGGER_DEBUG(25) << "DHT pipeline trick failed"
00374                                         << IBRCOMMON_LOGGER_ENDL;
00375         }
00376 }
00377 
00378 void dtn::dht::DHTNameService::lookup(const dtn::data::EID &eid) {
00379         if (dtn::core::BundleCore::local != eid.getNode()) {
00380                 if (this->_announced) {
00381                         std::string eid_ = eid.getNode().getString();
00382                         IBRCOMMON_LOGGER_DEBUG(30) << "DHT Lookup: " << eid_
00383                                                 << IBRCOMMON_LOGGER_ENDL;
00384                         ibrcommon::MutexLock l(this->_libmutex);
00385                         dtn_dht_lookup(&_context, eid_.c_str(), eid_.size());
00386                 } else {
00387                         this->cachedLookups.push_front(eid);
00388                 }
00389         }
00390 }
00391 
00395 void dtn::dht::DHTNameService::announce(const dtn::core::Node &n,
00396                 enum dtn_dht_lookup_type type) {
00397         if (this->_announced) {
00398                 std::string eid_ = n.getEID().getNode().getString();
00399                 ibrcommon::MutexLock l(this->_libmutex);
00400                 int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type);
00401                 if (rc > 0) {
00402                         IBRCOMMON_LOGGER(info) << "DHT Announcing: " << eid_
00403                                                 << IBRCOMMON_LOGGER_ENDL;
00404                 }
00405         }
00406 }
00407 
00408 bool dtn::dht::DHTNameService::isNeighbourAnnouncable(
00409                 const dtn::core::Node &node) {
00410         if (!_config.isNeighbourAnnouncementEnabled())
00411                 return false;
00412 
00413         // get the merged node object
00414         const dtn::core::Node n = dtn::core::BundleCore::getInstance().getNeighbor(
00415                         node.getEID());
00416 
00417         // Ignore all none discovered and none static nodes
00418         // They could only be discovered by the DHT,
00419         // so they are not really close neighbours and could be found directly in the DHT.
00420         // This prevents the node to be announced by all neighbours, how found this node
00421         std::set<dtn::core::Node::Type> types = n.getTypes();
00422         if (types.find(dtn::core::Node::NODE_DISCOVERED) == types.end()
00423                         && (types.find(dtn::core::Node::NODE_STATIC) == types.end())) {
00424                 return false;
00425         }
00426         // Proof, if the neighbour has told us, that he don't want to be published on the DHT
00427         std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
00428         if (!services.empty()) {
00429                 for (std::list<dtn::core::Node::Attribute>::const_iterator service =
00430                                 services.begin(); service != services.end(); service++) {
00431                         bool proxy = true;
00432                         std::vector < string > parameters = dtn::utils::Utils::tokenize(
00433                                         ";", (*service).value);
00434                         std::vector<string>::const_iterator param_iter = parameters.begin();
00435 
00436                         while (param_iter != parameters.end()) {
00437                                 std::vector < string > p = dtn::utils::Utils::tokenize("=",
00438                                                 (*param_iter));
00439                                 if (p[0].compare("proxy") == 0) {
00440                                         std::stringstream proxy_stream;
00441                                         proxy_stream << p[1];
00442                                         proxy_stream >> proxy;
00443                                 }
00444                                 param_iter++;
00445                         }
00446                         if (!proxy)
00447                                 return false;
00448                 }
00449         }
00450         return true;
00451 }
00452 
00453 void dtn::dht::DHTNameService::deannounce(const dtn::core::Node &n) {
00454         std::string eid_ = n.getEID().getNode().getString();
00455         ibrcommon::MutexLock l(this->_libmutex);
00456         dtn_dht_deannounce(eid_.c_str(), eid_.size());
00457         IBRCOMMON_LOGGER_DEBUG(25) << "DHT Deannounced: " << eid_
00458                                 << IBRCOMMON_LOGGER_ENDL;
00459 }
00460 
00461 std::string dtn::dht::DHTNameService::getConvergenceLayerName(
00462                 const dtn::daemon::Configuration::NetConfig &net) {
00463         std::string cltype_ = "";
00464         switch (net.type) {
00465         case dtn::daemon::Configuration::NetConfig::NETWORK_TCP:
00466                 cltype_ = "tcp";
00467                 break;
00468         case dtn::daemon::Configuration::NetConfig::NETWORK_UDP:
00469                 cltype_ = "udp";
00470                 break;
00471                 //      case dtn::daemon::Configuration::NetConfig::NETWORK_HTTP:
00472                 //              cltype_ = "http";
00473                 //              break;
00474         default:
00475                 throw ibrcommon::Exception("type of convergence layer not supported");
00476         }
00477         return cltype_;
00478 }
00479 
00480 void dtn::dht::DHTNameService::raiseEvent(const dtn::core::Event *evt) {
00481         try {
00482                 const dtn::routing::QueueBundleEvent &event =
00483                                 dynamic_cast<const dtn::routing::QueueBundleEvent&> (*evt);
00484                 dtn::data::EID none;
00485                 if (event.bundle.destination != dtn::core::BundleCore::local.getNode()
00486                                 && event.bundle.destination != none) {
00487                         lookup(event.bundle.destination);
00488                 }
00489         } catch (const std::bad_cast&) {
00490         };
00491         try {
00492                 const dtn::core::NodeEvent &nodeevent =
00493                                 dynamic_cast<const dtn::core::NodeEvent&> (*evt);
00494                 const dtn::core::Node &n = nodeevent.getNode();
00495                 if (n.getEID() != dtn::core::BundleCore::local.getNode()) {
00496                         switch (nodeevent.getAction()) {
00497                         case NODE_AVAILABLE:
00498                         case NODE_INFO_UPDATED:
00499                                 if (isNeighbourAnnouncable(n))
00500                                         announce(n, NEIGHBOUR);
00501                                 pingNode(n);
00502                                 break;
00503                         case NODE_UNAVAILABLE:
00504                                 deannounce(n);
00505                         default:
00506                                 break;
00507                         }
00508                 }
00509         } catch (const std::bad_cast&) {
00510         };
00511 }
00512 
00513 void dtn::dht::DHTNameService::pingNode(const dtn::core::Node &n) {
00514         int rc;
00515         std::list<dtn::core::Node::Attribute> services = n.get("dhtns");
00516         std::string address = "0.0.0.0";
00517         unsigned int port = 9999;
00518         if (!services.empty()) {
00519                 for (std::list<dtn::core::Node::Attribute>::const_iterator service =
00520                                 services.begin(); service != services.end(); service++) {
00521                         std::vector < string > parameters = dtn::utils::Utils::tokenize(
00522                                         ";", (*service).value);
00523                         std::vector<string>::const_iterator param_iter = parameters.begin();
00524                         bool portFound = false;
00525                         while (param_iter != parameters.end()) {
00526                                 std::vector < string > p = dtn::utils::Utils::tokenize("=",
00527                                                 (*param_iter));
00528                                 if (p[0].compare("port") == 0) {
00529                                         std::stringstream port_stream;
00530                                         port_stream << p[1];
00531                                         port_stream >> port;
00532                                         portFound = true;
00533                                 }
00534                                 param_iter++;
00535                         }
00536                         // Do not ping the node, if he doesn't tell us, which port he has
00537                         if(!portFound){
00538                                 continue;
00539                         }
00540                         IBRCOMMON_LOGGER_DEBUG(25) << "trying to ping node "
00541                                                 << n.toString() << IBRCOMMON_LOGGER_ENDL;
00542                         const std::list<std::string> ips = getAllIPAddresses(n);
00543                         IBRCOMMON_LOGGER_DEBUG(25) << ips.size() << " IP Addresses found!"
00544                                                 << IBRCOMMON_LOGGER_ENDL;
00545                         std::string lastip = "";
00546                         for (std::list<std::string>::const_iterator iter = ips.begin(); iter
00547                                         != ips.end(); iter++) {
00548                                 const std::string ip = (*iter);
00549                                 // Ignoring double existence of ip's
00550                                 if (ip == lastip) {
00551                                         continue;
00552                                 }
00553                                 // decode address and port
00554                                 struct sockaddr_in sin;
00555                                 memset(&sin, 0, sizeof(sin));
00556                                 sin.sin_family = AF_INET;
00557                                 sin.sin_port = htons(port);
00558                                 IBRCOMMON_LOGGER_DEBUG(26) << " --- Using IP address: " << ip
00559                                                         <<IBRCOMMON_LOGGER_ENDL;
00560                                 rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr));
00561                                 if (rc == 1) {
00562                                         IBRCOMMON_LOGGER_DEBUG(26) << "Pinging node "
00563                                                                 << n.toString() << " on " << ip << ":" << port
00564                                                                 << IBRCOMMON_LOGGER_ENDL;
00565                                         ibrcommon::MutexLock l(this->_libmutex);
00566                                         dtn_dht_ping_node((struct sockaddr*) &sin, sizeof(sin));
00567                                 } else {
00568                                         IBRCOMMON_LOGGER(error) << " --- ERROR pton: " << rc
00569                                                                 <<IBRCOMMON_LOGGER_ENDL;
00570                                 }
00571                                 lastip = ip;
00572                         }
00573                 }
00574         }
00575 }
00576 
00577 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses(
00578                 const dtn::core::Node &n) {
00579         std::string address = "0.0.0.0";
00580         unsigned int port = 0;
00581         std::list < std::string > ret;
00582         const std::list<dtn::core::Node::URI> uri_list = n.get(
00583                         dtn::core::Node::CONN_TCPIP);
00584         for (std::list<dtn::core::Node::URI>::const_iterator iter =
00585                         uri_list.begin(); iter != uri_list.end(); iter++) {
00586                 const dtn::core::Node::URI &uri = (*iter);
00587                 uri.decode(address, port);
00588                 ret.push_front(address);
00589         }
00590         const std::list<dtn::core::Node::URI> udp_uri_list = n.get(
00591                         dtn::core::Node::CONN_UDPIP);
00592         for (std::list<dtn::core::Node::URI>::const_iterator iter =
00593                         udp_uri_list.begin(); iter != udp_uri_list.end(); iter++) {
00594                 const dtn::core::Node::URI &udp_uri = (*iter);
00595                 udp_uri.decode(address, port);
00596                 ret.push_front(address);
00597         }
00598         return ret;
00599 }
00600 
00601 void dtn::dht::DHTNameService::bootstrapping() {
00602         if (this->_bootstrapped > time(NULL) + 30) {
00603                 return;
00604         }
00605         this->_bootstrapped = time(NULL);
00606         if (_config.getPathToNodeFiles().size() > 0) {
00607                 bootstrappingFile();
00608         }
00609         if (_config.isDNSBootstrappingEnabled()) {
00610                 bootstrappingDNS();
00611         }
00612         if (_config.isIPBootstrappingEnabled()) {
00613                 bootstrappingIPs();
00614         }
00615 }
00616 
00617 void dtn::dht::DHTNameService::bootstrappingFile() {
00618         int rc;
00619         {
00620                 ibrcommon::MutexLock l(this->_libmutex);
00621                 rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str());
00622         }
00623         if (rc != 0) {
00624                 IBRCOMMON_LOGGER(warning) << "DHT loading of saved nodes failed"
00625                                         << IBRCOMMON_LOGGER_ENDL;
00626         } else {
00627                 IBRCOMMON_LOGGER_DEBUG(25) << "DHT loading of saved nodes done"
00628                                         << IBRCOMMON_LOGGER_ENDL;
00629         }
00630 }
00631 
00632 void dtn::dht::DHTNameService::bootstrappingDNS() {
00633         int rc;
00634         std::vector < string > dns = _config.getDNSBootstrappingNames();
00635         if (dns.size() > 0) {
00636                 std::vector<string>::const_iterator dns_iter = dns.begin();
00637                 while (dns_iter != dns.end()) {
00638                         const string &dn = (*dns_iter);
00639                         {
00640                                 ibrcommon::MutexLock l(this->_libmutex);
00641                                 rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL);
00642                         }
00643                         if (rc != 0) {
00644                                 IBRCOMMON_LOGGER(warning) << "bootstrapping from domain " << dn
00645                                                         << " failed with error: " << rc
00646                                                         << IBRCOMMON_LOGGER_ENDL;
00647                         } else {
00648                                 IBRCOMMON_LOGGER_DEBUG(25)
00649                                                         << "DHT Bootstrapping done for domain" << dn
00650                                                         << IBRCOMMON_LOGGER_ENDL;
00651                         }
00652                         dns_iter++;
00653                 }
00654         } else {
00655                 {
00656                         ibrcommon::MutexLock l(this->_libmutex);
00657                         rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL);
00658                 }
00659                 if (rc != 0) {
00660                         IBRCOMMON_LOGGER(warning)
00661                                                 << "bootstrapping from default domain failed with error: "
00662                                                 << rc << IBRCOMMON_LOGGER_ENDL;
00663                 } else {
00664                         IBRCOMMON_LOGGER_DEBUG(25)
00665                                                 << "DHT Bootstrapping done for default domain"
00666                                                 << IBRCOMMON_LOGGER_ENDL;
00667                 }
00668         }
00669 
00670 }
00671 
00672 void dtn::dht::DHTNameService::bootstrappingIPs() {
00673         int rc;
00674         std::vector < string > ips = _config.getIPBootstrappingIPs();
00675         std::vector<string>::const_iterator ip_iter = ips.begin();
00676         while (ip_iter != ips.end()) {
00677                 std::vector < string > ip
00678                                 = dtn::utils::Utils::tokenize(" ", (*ip_iter));
00679                 int size, ipversion = AF_INET;
00680                 struct sockaddr *wellknown_node;
00681                 struct sockaddr_in sin;
00682                 struct sockaddr_in6 sin6;
00683                 memset(&sin, 0, sizeof(sin));
00684                 int port = 9999;
00685                 switch (ip.size()) {
00686                 case 2:
00687                         port = atoi(ip[1].c_str());
00688                 case 1:
00689                         rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr));
00690                         if (rc <= 0) {
00691                                 ipversion = AF_INET6;
00692                                 rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr));
00693                                 if (rc <= 0) {
00694                                         break;
00695                                 } else {
00696                                         sin6.sin6_family = ipversion;
00697                                         sin6.sin6_port = htons(port);
00698                                         size = sizeof(sin6);
00699                                         wellknown_node = (struct sockaddr *) &sin6;
00700                                 }
00701                         } else {
00702                                 sin.sin_family = ipversion;
00703                                 sin.sin_port = htons(port);
00704                                 size = sizeof(sin);
00705                                 wellknown_node = (struct sockaddr *) &sin;
00706                         }
00707                         if (rc == 1) {
00708                                 ibrcommon::MutexLock l(this->_libmutex);
00709                                 dtn_dht_ping_node(wellknown_node, size);
00710                         }
00711                         break;
00712                 default:
00713                         break;
00714                 }
00715                 ip_iter++;
00716         }
00717 }
00718 
00719 // TODO Nur für Interfaces zulassen, auf denen ich gebunden bin!
00720 
00721 void dtn::dht::DHTNameService::update(const ibrcommon::vinterface &iface,
00722                 std::string &name, std::string &params)
00723                 throw (dtn::net::DiscoveryServiceProvider::NoServiceHereException) {
00724         if (this->_initialized) {
00725                 name = "dhtns";
00726                 stringstream service;
00727                 service << "port=" << this->_context.port << ";";
00728                 if (!this->_config.isNeighbourAllowedToAnnounceMe()) {
00729                         service << "proxy=false;";
00730                 }
00731                 params = service.str();
00732         } else {
00733                 if(!this->_config.isNeighbourAllowedToAnnounceMe()) {
00734                         name = "dhtns";
00735                         params = "proxy=false;";
00736                 } else {
00737                         throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00738                 }
00739         }
00740 }
00741 
00742 void dtn_dht_handle_lookup_result(const struct dtn_dht_lookup_result *result) {
00743         if (result == NULL || result->eid == NULL || result->clayer == NULL) {
00744                 return;
00745         }
00746 
00747         // Extracting the EID of the answering node
00748         std::string eid__;
00749         std::string clname__;
00750         struct dtn_eid * eid = result->eid;
00751         unsigned int i;
00752         for (i = 0; i < eid->eidlen; i++) {
00753                 eid__ += eid->eid[i];
00754         }
00755 
00756         // Extracting the convergence layer of the node
00757         stringstream ss;
00758         std::string cl__;
00759         struct dtn_convergence_layer * cl = result->clayer;
00760         if (cl == NULL)
00761                 return;
00762         dtn::core::Node node(eid__);
00763         ss << "Adding Node " << eid__ << ": ";
00764         // Iterating over the list of convergence layer
00765         bool hasCL = false;
00766         while (cl != NULL) {
00767                 enum Node::Protocol proto__;
00768                 stringstream service;
00769                 clname__ = "";
00770                 for (i = 0; i < cl->clnamelen; i++) {
00771                         clname__ += cl->clname[i];
00772                 }
00773                 if (clname__ == "tcp") {
00774                         proto__ = Node::CONN_TCPIP;
00775                 } else if (clname__ == "udp") {
00776                         proto__ = Node::CONN_UDPIP;
00777                 } else {
00778                         proto__ = Node::CONN_UNDEFINED;
00779                         //TODO find the right string to be added to service string
00780                 }
00781                 // Extracting the arguments of the convergence layer
00782                 struct dtn_convergence_layer_arg * arg = cl->args;
00783                 std::string argstring__;
00784                 while (arg != NULL) {
00785                         if (arg->keylen <= 0 || arg->valuelen <= 0) {
00786                                 arg = arg->next;
00787                                 continue;
00788                         }
00789                         argstring__ = "";
00790                         for (i = 0; i < arg->keylen; i++) {
00791                                 argstring__ += arg->key[i];
00792                         }
00793                         service << argstring__ << "=";
00794                         argstring__ = "";
00795                         for (i = 0; i < arg->valuelen && arg->value[i] != '\0'; i++) {
00796                                 argstring__ += arg->value[i];
00797                         }
00798                         service << argstring__ << ";";
00799                         arg = arg->next;
00800                 }
00801                 ss << clname__ << "(" << service.str() << ") ";
00802                 // Add the found convergence layer to the node object
00803                 node.add(
00804                                 Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(),
00805                                                 DHT_RESULTS_EXPIRE_TIMEOUT,
00806                                                 DHT_DISCOVERED_NODE_PRIORITY));
00807                 hasCL = true;
00808                 cl = cl->next;
00809         }
00810 
00811         // Adding the new node to the BundleCore to make it available to the daemon
00812         dtn::core::BundleCore &core = dtn::core::BundleCore::getInstance();
00813         if (hasCL) {
00814                 IBRCOMMON_LOGGER(info) << ss.str() << IBRCOMMON_LOGGER_ENDL;
00815                 core.addConnection(node);
00816         }
00817 
00818         // Extracting all neighbours of the new node if usage is allowed
00819         if (!dtn::daemon::Configuration::getInstance().getDHT().ignoreDHTNeighbourInformations()) {
00820                 struct dtn_eid * neighbour = result->neighbours;
00821                 std::string neighbour__;
00822                 while (neighbour) {
00823                         neighbour__ = "";
00824                         for (i = 0; i < neighbour->eidlen; i++) {
00825                                 neighbour__ += neighbour->eid[i];
00826                         }
00827                         dtn::data::EID n(neighbour__);
00828                         dtn::core::Node neighbourNode(n);
00829                         if (dtn::core::BundleCore::local != n.getNode()
00830                                         && !core.isNeighbor(n)) {
00831                                 dtn::core::BundleCore::getInstance().addRoute(n, node.getEID(),
00832                                                 DHT_PATH_EXPIRE_TIMEOUT);
00833                         }
00834                         neighbour = neighbour->next;
00835                 }
00836         }
00837         //TODO GROUP HANDLING SHOULD BE IMPLEMENTED HERE!!!
00838         struct dtn_eid * group = result->groups;
00839         while (group) {
00840                 group = group->next;
00841         }
00842         return;
00843 }
00844 
00845 // This function is called, if the DHT has done a search or announcement.
00846 // At this moment, it is just ignored
00847 void dtn_dht_operation_done(const unsigned char *info_hash){
00848         return;
00849 }