36 #include <sys/socket.h>
37 #include <arpa/inet.h>
46 _exiting(false), _initialized(false), _announced(false), _foundNodes(-1),
48 _config(daemon::Configuration::getInstance().getDHT()) {
56 return "DHT Naming Service";
62 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() {
63 for (
int i = 0; i < 2; ++i) {
65 opts = fcntl(_interrupt_pipe[i], F_GETFL);
70 if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) {
82 if (pipe(_interrupt_pipe) < 0) {
88 if (!this->setNonBlockingInterruptPipe()) {
90 <<
" setting pipe to non-blocking mode"
95 int rc = dtn_dht_initstruct(&_context);
101 if (_config.randomPort()) {
103 _context.port = rand() % 64000 + 1024;
105 _context.port = _config.getPort();
107 if (_config.getIPv4Binding().size() > 0) {
108 _context.bind = _config.getIPv4Binding().c_str();
110 if (_config.getIPv6Binding().size() > 0) {
111 _context.bind6 = _config.getIPv6Binding().c_str();
113 if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) {
114 _context.type = BINDBOTH;
115 }
else if (_config.isIPv4Enabled()) {
116 _context.type = IPV4ONLY;
117 }
else if (_config.isIPv6Enabled()) {
118 _context.type = IPV6ONLY;
120 _context.type = BINDNONE;
122 string myid = _config.getID();
123 if (!_config.randomID()) {
124 dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size());
127 rc = dtn_dht_init_sockets(&_context);
129 if(_context.type == BINDBOTH) {
137 dtn_dht_close_sockets(&_context);
139 _context.type = IPV4ONLY;
140 rc = dtn_dht_init_sockets(&_context);
144 dtn_dht_close_sockets(&_context);
146 _context.type = IPV6ONLY;
147 rc = dtn_dht_init_sockets(&_context);
153 dtn_dht_close_sockets(&_context);
160 dtn_dht_close_sockets(&_context);
166 if (!_config.isBlacklistEnabled()) {
169 dtn_dht_blacklist(0);
176 rc = dtn_dht_init(&_context);
185 this->_initialized =
true;
187 if(_config.isNeighbourAnnouncementEnabled() &&
190 <<
"announcement enabled but routing forwarding is disabled"
196 if (!this->_initialized) {
203 const std::list<dtn::daemon::Configuration::NetConfig>
208 for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter =
209 nets.begin(); iter != nets.end(); ++iter) {
211 cltype_ = getConvergenceLayerName((*iter));
212 std::stringstream ss;
215 struct dtn_convergence_layer * clstruct =
216 (
struct dtn_convergence_layer*) malloc(
217 sizeof(
struct dtn_convergence_layer));
218 clstruct->clname = (
char*) malloc(cltype_.size());
219 clstruct->clnamelen = cltype_.size();
220 memcpy(clstruct->clname, cltype_.c_str(), cltype_.size());
221 struct dtn_convergence_layer_arg * arg =
222 (
struct dtn_convergence_layer_arg*) malloc(
223 sizeof(
struct dtn_convergence_layer_arg));
224 arg->key = (
char*) malloc(5);
228 if(cltype_ ==
"email") {
229 memcpy(arg->key,
"email", 5);
231 const std::string &email =
233 arg->value = (
char*) malloc(email.size());
234 memcpy(arg->value, email.c_str(), email.size());
235 arg->valuelen = email.size();
239 memcpy(arg->key,
"port", 4);
241 arg->value = (
char*) malloc(port_.size());
242 memcpy(arg->value, port_.c_str(), port_.size());
243 arg->valuelen = port_.size();
250 clstruct->args = arg;
251 clstruct->next = _context.clayer;
252 _context.clayer = clstruct;
253 }
catch (
const std::exception&) {
259 int rc, numberOfRandomRequests = 0;
265 struct sockaddr_storage from;
266 socklen_t fromlen =
sizeof(sockaddr_storage);
267 ::memset(&from, 0, fromlen);
269 while (!this->_exiting) {
270 if (this->_foundNodes == 0) {
274 if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) {
275 this->_announced =
true;
276 if (!_config.isSelfAnnouncingEnabled())
283 std::set<dtn::data::EID>::iterator iterator;
284 for (iterator = eids_.begin(); iterator != eids_.end(); ++iterator) {
288 std::set<dtn::core::Node>::iterator neighbouriterator;
289 for (neighbouriterator = neighbours_.begin(); neighbouriterator
290 != neighbours_.end(); ++neighbouriterator) {
291 if (isNeighbourAnnouncable(*neighbouriterator))
292 announce(*neighbouriterator, NEIGHBOUR);
294 while (!this->cachedLookups.empty()) {
295 lookup(this->cachedLookups.front());
296 this->cachedLookups.pop_front();
300 if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work(
301 &_context) <= 2 && numberOfRandomRequests < 40) {
302 dtn_dht_start_random_lookup(&_context);
303 numberOfRandomRequests++;
308 tv.tv_usec = random() % 1000000;
309 int high_fd = _interrupt_pipe[0];
311 FD_SET(_interrupt_pipe[0], &readfds);
312 if (_context.ipv4socket >= 0) {
313 FD_SET(_context.ipv4socket, &readfds);
314 high_fd = max(high_fd, _context.ipv4socket);
316 if (_context.ipv6socket >= 0) {
317 FD_SET(_context.ipv6socket, &readfds);
318 high_fd = max(high_fd, _context.ipv6socket);
320 rc = select(high_fd + 1, &readfds, NULL, NULL, &tv);
322 if (errno != EINTR) {
324 <<
"select of DHT Sockets failed with error: "
329 if (FD_ISSET(_interrupt_pipe[0], &readfds)) {
334 int read = ::read(_interrupt_pipe[0], buf, 2);
335 if (read <= 2 || _exiting)
339 fromlen =
sizeof(from);
340 if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket,
342 rc = recvfrom(_context.ipv4socket, _buf,
sizeof(_buf) - 1, 0,
343 (
struct sockaddr*) &from, &fromlen);
344 else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket,
346 rc = recvfrom(_context.ipv6socket, _buf,
sizeof(_buf) - 1, 0,
347 (
struct sockaddr*) &from, &fromlen);
353 rc = dtn_dht_periodic(&_context, _buf, rc,
354 (
struct sockaddr*) &from, fromlen, &tosleep);
359 rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep);
362 int numberOfHosts = 0;
363 int numberOfGoodHosts = 0;
364 int numberOfGood6Hosts = 0;
365 unsigned int numberOfBlocksHosts = 0;
366 unsigned int numberOfBlocksHostsIPv4 = 0;
367 unsigned int numberOfBlocksHostsIPv6 = 0;
370 if (_context.ipv4socket >= 0)
372 = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL);
373 if (_context.ipv6socket >= 0)
374 numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts,
376 numberOfBlocksHosts = dtn_dht_blacklisted_nodes(
377 &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6);
379 if (this->_foundNodes != numberOfHosts) {
380 if (_config.isBlacklistEnabled()) {
382 << numberOfHosts <<
"(Good:" << numberOfGoodHosts
383 <<
"+" << numberOfGood6Hosts <<
") Blocked: "
384 << numberOfBlocksHosts <<
"("
385 << numberOfBlocksHostsIPv4 <<
"+"
386 << numberOfBlocksHostsIPv6 <<
")"
391 << numberOfHosts <<
"(Good:" << numberOfGoodHosts
392 <<
"+" << numberOfGood6Hosts <<
")"
396 this->_foundNodes = numberOfHosts;
399 if (errno == EINTR) {
404 if (rc == EINVAL || rc == EFAULT) {
415 this->_initialized =
false;
416 if (_config.getPathToNodeFiles().size() > 0) {
418 int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str());
427 if (this->_announced && _config.isSelfAnnouncingEnabled())
432 dtn_dht_close_sockets(&_context);
434 dtn_dht_free_convergence_layer_struct(_context.clayer);
438 ::close(_interrupt_pipe[0]);
439 ::close(_interrupt_pipe[1]);
446 this->_exiting =
true;
449 ssize_t written = ::write(_interrupt_pipe[1],
"i", 1);
456 void dtn::dht::DHTNameService::lookup(
const dtn::data::EID &eid) {
458 if (this->_announced) {
463 dtn_dht_lookup(&_context, eid_.c_str(), eid_.size());
465 this->cachedLookups.push_front(eid);
474 enum dtn_dht_lookup_type type) {
475 if (this->_announced) {
478 int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type);
486 bool dtn::dht::DHTNameService::isNeighbourAnnouncable(
488 if (!_config.isNeighbourAnnouncementEnabled())
505 std::set<dtn::core::Node::Type> types = n.
getTypes();
511 std::list<dtn::core::Node::Attribute> services = n.
get(
"dhtns");
512 if (!services.empty()) {
513 for (std::list<dtn::core::Node::Attribute>::const_iterator service =
514 services.begin(); service != services.end(); ++service) {
517 ";", (*service).value);
518 std::vector<string>::const_iterator param_iter = parameters.begin();
520 while (param_iter != parameters.end()) {
523 if (p[0].compare(
"proxy") == 0) {
524 std::stringstream proxy_stream;
525 proxy_stream << p[1];
526 proxy_stream >> proxy;
540 dtn_dht_deannounce(eid_.c_str(), eid_.size());
545 std::string dtn::dht::DHTNameService::getConvergenceLayerName(
547 std::string cltype_ =
"";
572 && !
event.bundle.destination.isNone()) {
573 lookup(event.bundle.destination);
575 }
catch (
const std::bad_cast&) {
584 if (isNeighbourAnnouncable(n))
585 announce(n, NEIGHBOUR);
594 }
catch (
const std::bad_cast&) {
600 std::list<dtn::core::Node::Attribute> services = n.
get(
"dhtns");
601 unsigned int port = 9999;
602 if (!services.empty()) {
603 for (std::list<dtn::core::Node::Attribute>::const_iterator service =
604 services.begin(); service != services.end(); ++service) {
606 ";", (*service).value);
607 std::vector<string>::const_iterator param_iter = parameters.begin();
608 bool portFound =
false;
609 while (param_iter != parameters.end()) {
612 if (p[0].compare(
"port") == 0) {
613 std::stringstream port_stream;
626 const std::list<std::string> ips = getAllIPAddresses(n);
629 std::string lastip =
"";
630 for (std::list<std::string>::const_iterator iter = ips.begin(); iter
631 != ips.end(); ++iter) {
632 const std::string ip = (*iter);
638 struct sockaddr_in sin;
639 memset(&sin, 0,
sizeof(sin));
640 sin.sin_family = AF_INET;
641 sin.sin_port = htons(port);
644 rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr));
647 << n.
toString() <<
" on " << ip <<
":" << port
650 dtn_dht_ping_node((
struct sockaddr*) &sin,
sizeof(sin));
661 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses(
663 std::string address =
"0.0.0.0";
664 unsigned int port = 0;
665 std::list < std::string > ret;
666 const std::list<dtn::core::Node::URI> uri_list = n.
get(
668 for (std::list<dtn::core::Node::URI>::const_iterator iter =
669 uri_list.begin(); iter != uri_list.end(); ++iter) {
671 uri.
decode(address, port);
672 ret.push_front(address);
674 const std::list<dtn::core::Node::URI> udp_uri_list = n.
get(
676 for (std::list<dtn::core::Node::URI>::const_iterator iter =
677 udp_uri_list.begin(); iter != udp_uri_list.end(); ++iter) {
679 udp_uri.
decode(address, port);
680 ret.push_front(address);
685 void dtn::dht::DHTNameService::bootstrapping() {
686 if (this->_bootstrapped > time(NULL) + 30) {
689 this->_bootstrapped = time(NULL);
690 if (_config.getPathToNodeFiles().size() > 0) {
693 if (_config.isDNSBootstrappingEnabled()) {
696 if (_config.isIPBootstrappingEnabled()) {
701 void dtn::dht::DHTNameService::bootstrappingFile() {
705 rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str());
716 void dtn::dht::DHTNameService::bootstrappingDNS() {
718 std::vector < string > dns = _config.getDNSBootstrappingNames();
720 std::vector<string>::const_iterator dns_iter = dns.begin();
721 while (dns_iter != dns.end()) {
722 const string &dn = (*dns_iter);
725 rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL);
729 <<
" failed with error: " << rc
733 <<
"DHT Bootstrapping done for domain" << dn
741 rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL);
745 <<
"bootstrapping from default domain failed with error: "
749 <<
"DHT Bootstrapping done for default domain"
756 void dtn::dht::DHTNameService::bootstrappingIPs() {
758 std::vector < string > ips = _config.getIPBootstrappingIPs();
759 std::vector<string>::const_iterator ip_iter = ips.begin();
760 while (ip_iter != ips.end()) {
761 std::vector < string > ip
763 int size, ipversion = AF_INET;
764 struct sockaddr *wellknown_node;
765 struct sockaddr_in sin;
766 struct sockaddr_in6 sin6;
767 memset(&sin, 0,
sizeof(sin));
772 port = atoi(ip[1].c_str());
777 rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr));
779 ipversion = AF_INET6;
780 rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr));
784 sin6.sin6_family = ipversion;
785 sin6.sin6_port = htons(port);
787 wellknown_node = (
struct sockaddr *) &sin6;
790 sin.sin_family = ipversion;
791 sin.sin_port = htons(port);
793 wellknown_node = (
struct sockaddr *) &sin;
797 dtn_dht_ping_node(wellknown_node, size);
809 if (this->_initialized) {
810 stringstream service;
811 service <<
"port=" << this->_context.port <<
";";
812 if (!this->_config.isNeighbourAllowedToAnnounceMe()) {
813 service <<
"proxy=false;";
817 if(!this->_config.isNeighbourAllowedToAnnounceMe()) {
826 if (result == NULL || result->eid == NULL || result->clayer == NULL) {
832 std::string clname__;
833 struct dtn_eid * eid = result->eid;
835 for (i = 0; i < eid->eidlen; ++i) {
836 eid__ += eid->eid[i];
841 struct dtn_convergence_layer * cl = result->clayer;
845 ss <<
"Adding Node " << eid__ <<
": ";
850 stringstream service;
852 for (i = 0; i < cl->clnamelen; ++i) {
853 clname__ += cl->clname[i];
855 if (clname__ ==
"tcp") {
856 proto__ = Node::CONN_TCPIP;
857 }
else if (clname__ ==
"udp") {
858 proto__ = Node::CONN_UDPIP;
859 }
else if (clname__ ==
"email") {
860 proto__ = Node::CONN_EMAIL;
862 proto__ = Node::CONN_UNDEFINED;
866 struct dtn_convergence_layer_arg * arg = cl->args;
867 std::string argstring__;
868 while (arg != NULL) {
869 if (arg->keylen <= 0 || arg->valuelen <= 0) {
874 for (i = 0; i < arg->keylen; ++i) {
875 argstring__ += arg->key[i];
877 service << argstring__ <<
"=";
879 for (i = 0; i < arg->valuelen && arg->value[i] !=
'\0'; ++i) {
880 argstring__ += arg->value[i];
882 service << argstring__ <<
";";
885 ss << clname__ <<
"(" << service.str() <<
") ";
888 Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(),
904 struct dtn_eid * neighbour = result->neighbours;
905 std::string neighbour__;
908 for (i = 0; i < neighbour->eidlen; ++i) {
909 neighbour__ += neighbour->eid[i];
918 neighbour = neighbour->next;
922 struct dtn_eid *
group = result->groups;