36 #include <sys/socket.h>
37 #include <arpa/inet.h>
46 _exiting(false), _initialized(false), _announced(false), _foundNodes(-1),
48 _config(daemon::Configuration::getInstance().getDHT()) {
56 return "DHT Naming Service";
62 bool dtn::dht::DHTNameService::setNonBlockingInterruptPipe() {
63 for (
int i = 0; i < 2; ++i) {
65 opts = fcntl(_interrupt_pipe[i], F_GETFL);
70 if (fcntl(_interrupt_pipe[i], F_SETFL, opts) < 0) {
79 if (pipe(_interrupt_pipe) < 0) {
85 if (!this->setNonBlockingInterruptPipe()) {
87 <<
" setting pipe to non-blocking mode"
92 int rc = dtn_dht_initstruct(&_context);
98 if (_config.randomPort()) {
100 _context.port = rand() % 64000 + 1024;
102 _context.port = _config.getPort();
104 if (_config.getIPv4Binding().size() > 0) {
105 _context.bind = _config.getIPv4Binding().c_str();
107 if (_config.getIPv6Binding().size() > 0) {
108 _context.bind6 = _config.getIPv6Binding().c_str();
110 if (_config.isIPv4Enabled() && _config.isIPv6Enabled()) {
111 _context.type = BINDBOTH;
112 }
else if (_config.isIPv4Enabled()) {
113 _context.type = IPV4ONLY;
114 }
else if (_config.isIPv6Enabled()) {
115 _context.type = IPV6ONLY;
117 _context.type = BINDNONE;
119 string myid = _config.getID();
120 if (!_config.randomID()) {
121 dtn_dht_build_id_from_str(_context.id, myid.c_str(), myid.size());
124 rc = dtn_dht_init_sockets(&_context);
126 if(_context.type == BINDBOTH) {
134 dtn_dht_close_sockets(&_context);
136 _context.type = IPV4ONLY;
137 rc = dtn_dht_init_sockets(&_context);
141 dtn_dht_close_sockets(&_context);
143 _context.type = IPV6ONLY;
144 rc = dtn_dht_init_sockets(&_context);
150 dtn_dht_close_sockets(&_context);
157 dtn_dht_close_sockets(&_context);
163 if (!_config.isBlacklistEnabled()) {
166 dtn_dht_blacklist(0);
173 rc = dtn_dht_init(&_context);
182 this->_initialized =
true;
184 if(_config.isNeighbourAnnouncementEnabled() &&
187 <<
"announcement enabled but routing forwarding is disabled"
193 if (!this->_initialized) {
200 const std::list<dtn::daemon::Configuration::NetConfig>
205 for (std::list<dtn::daemon::Configuration::NetConfig>::const_iterator iter =
206 nets.begin(); iter != nets.end(); ++iter) {
208 cltype_ = getConvergenceLayerName((*iter));
209 std::stringstream ss;
212 struct dtn_convergence_layer * clstruct =
213 (
struct dtn_convergence_layer*) malloc(
214 sizeof(
struct dtn_convergence_layer));
215 clstruct->clname = (
char*) malloc(cltype_.size());
216 clstruct-> clnamelen = cltype_.size();
217 memcpy(clstruct->clname, cltype_.c_str(), cltype_.size());
218 struct dtn_convergence_layer_arg * arg =
219 (
struct dtn_convergence_layer_arg*) malloc(
220 sizeof(
struct dtn_convergence_layer_arg));
221 arg->key = (
char*) malloc(5);
223 memcpy(arg->key,
"port", 4);
225 arg->value = (
char*) malloc(port_.size());
226 memcpy(arg->value, port_.c_str(), port_.size());
227 arg->valuelen = port_.size();
229 clstruct->args = arg;
230 clstruct->next = _context.clayer;
231 _context.clayer = clstruct;
232 }
catch (
const std::exception&) {
238 int rc, numberOfRandomRequests = 0;
244 struct sockaddr_storage from;
245 socklen_t fromlen =
sizeof(sockaddr_storage);
246 ::memset(&from, 0, fromlen);
248 while (!this->_exiting) {
249 if (this->_foundNodes == 0) {
253 if (!this->_announced && dtn_dht_ready_for_work(&_context) > 2) {
254 this->_announced =
true;
255 if (!_config.isSelfAnnouncingEnabled())
262 std::set<dtn::data::EID>::iterator iterator;
263 for (iterator = eids_.begin(); iterator != eids_.end(); ++iterator) {
267 std::set<dtn::core::Node>::iterator neighbouriterator;
268 for (neighbouriterator = neighbours_.begin(); neighbouriterator
269 != neighbours_.end(); ++neighbouriterator) {
270 if (isNeighbourAnnouncable(*neighbouriterator))
271 announce(*neighbouriterator, NEIGHBOUR);
273 while (!this->cachedLookups.empty()) {
274 lookup(this->cachedLookups.front());
275 this->cachedLookups.pop_front();
279 if (dtn_dht_ready_for_work(&_context) > 0 && dtn_dht_ready_for_work(
280 &_context) <= 2 && numberOfRandomRequests < 40) {
281 dtn_dht_start_random_lookup(&_context);
282 numberOfRandomRequests++;
287 tv.tv_usec = random() % 1000000;
288 int high_fd = _interrupt_pipe[0];
290 FD_SET(_interrupt_pipe[0], &readfds);
291 if (_context.ipv4socket >= 0) {
292 FD_SET(_context.ipv4socket, &readfds);
293 high_fd = max(high_fd, _context.ipv4socket);
295 if (_context.ipv6socket >= 0) {
296 FD_SET(_context.ipv6socket, &readfds);
297 high_fd = max(high_fd, _context.ipv6socket);
299 rc = select(high_fd + 1, &readfds, NULL, NULL, &tv);
301 if (errno != EINTR) {
303 <<
"select of DHT Sockets failed with error: "
308 if (FD_ISSET(_interrupt_pipe[0], &readfds)) {
313 int read = ::read(_interrupt_pipe[0], buf, 2);
314 if (read <= 2 || _exiting)
318 fromlen =
sizeof(from);
319 if (_context.ipv4socket >= 0 && FD_ISSET(_context.ipv4socket,
321 rc = recvfrom(_context.ipv4socket, _buf,
sizeof(_buf) - 1, 0,
322 (
struct sockaddr*) &from, &fromlen);
323 else if (_context.ipv6socket >= 0 && FD_ISSET(_context.ipv6socket,
325 rc = recvfrom(_context.ipv6socket, _buf,
sizeof(_buf) - 1, 0,
326 (
struct sockaddr*) &from, &fromlen);
332 rc = dtn_dht_periodic(&_context, _buf, rc,
333 (
struct sockaddr*) &from, fromlen, &tosleep);
338 rc = dtn_dht_periodic(&_context, NULL, 0, NULL, 0, &tosleep);
341 int numberOfHosts = 0;
342 int numberOfGoodHosts = 0;
343 int numberOfGood6Hosts = 0;
344 unsigned int numberOfBlocksHosts = 0;
345 unsigned int numberOfBlocksHostsIPv4 = 0;
346 unsigned int numberOfBlocksHostsIPv6 = 0;
349 if (_context.ipv4socket >= 0)
351 = dtn_dht_nodes(AF_INET, &numberOfGoodHosts, NULL);
352 if (_context.ipv6socket >= 0)
353 numberOfHosts += dtn_dht_nodes(AF_INET6, &numberOfGood6Hosts,
355 numberOfBlocksHosts = dtn_dht_blacklisted_nodes(
356 &numberOfBlocksHostsIPv4, &numberOfBlocksHostsIPv6);
358 if (this->_foundNodes != numberOfHosts) {
359 if (_config.isBlacklistEnabled()) {
361 << numberOfHosts <<
"(Good:" << numberOfGoodHosts
362 <<
"+" << numberOfGood6Hosts <<
") Blocked: "
363 << numberOfBlocksHosts <<
"("
364 << numberOfBlocksHostsIPv4 <<
"+"
365 << numberOfBlocksHostsIPv6 <<
")"
370 << numberOfHosts <<
"(Good:" << numberOfGoodHosts
371 <<
"+" << numberOfGood6Hosts <<
")"
375 this->_foundNodes = numberOfHosts;
378 if (errno == EINTR) {
383 if (rc == EINVAL || rc == EFAULT) {
394 this->_initialized =
false;
395 if (_config.getPathToNodeFiles().size() > 0) {
397 int save = dtn_dht_save_conf(_config.getPathToNodeFiles().c_str());
406 if (this->_announced && _config.isSelfAnnouncingEnabled())
411 dtn_dht_close_sockets(&_context);
413 dtn_dht_free_convergence_layer_struct(_context.clayer);
417 ::close(_interrupt_pipe[0]);
418 ::close(_interrupt_pipe[1]);
422 this->_exiting =
true;
425 ssize_t written = ::write(_interrupt_pipe[1],
"i", 1);
432 void dtn::dht::DHTNameService::lookup(
const dtn::data::EID &eid) {
434 if (this->_announced) {
439 dtn_dht_lookup(&_context, eid_.c_str(), eid_.size());
441 this->cachedLookups.push_front(eid);
450 enum dtn_dht_lookup_type type) {
451 if (this->_announced) {
454 int rc = dtn_dht_announce(&_context, eid_.c_str(), eid_.size(), type);
462 bool dtn::dht::DHTNameService::isNeighbourAnnouncable(
464 if (!_config.isNeighbourAnnouncementEnabled())
481 std::set<dtn::core::Node::Type> types = n.
getTypes();
487 std::list<dtn::core::Node::Attribute> services = n.
get(
"dhtns");
488 if (!services.empty()) {
489 for (std::list<dtn::core::Node::Attribute>::const_iterator service =
490 services.begin(); service != services.end(); ++service) {
493 ";", (*service).value);
494 std::vector<string>::const_iterator param_iter = parameters.begin();
496 while (param_iter != parameters.end()) {
499 if (p[0].compare(
"proxy") == 0) {
500 std::stringstream proxy_stream;
501 proxy_stream << p[1];
502 proxy_stream >> proxy;
516 dtn_dht_deannounce(eid_.c_str(), eid_.size());
521 std::string dtn::dht::DHTNameService::getConvergenceLayerName(
523 std::string cltype_ =
"";
546 &&
event.bundle.destination != none) {
547 lookup(event.bundle.destination);
549 }
catch (
const std::bad_cast&) {
558 if (isNeighbourAnnouncable(n))
559 announce(n, NEIGHBOUR);
568 }
catch (
const std::bad_cast&) {
574 std::list<dtn::core::Node::Attribute> services = n.
get(
"dhtns");
575 unsigned int port = 9999;
576 if (!services.empty()) {
577 for (std::list<dtn::core::Node::Attribute>::const_iterator service =
578 services.begin(); service != services.end(); ++service) {
580 ";", (*service).value);
581 std::vector<string>::const_iterator param_iter = parameters.begin();
582 bool portFound =
false;
583 while (param_iter != parameters.end()) {
586 if (p[0].compare(
"port") == 0) {
587 std::stringstream port_stream;
600 const std::list<std::string> ips = getAllIPAddresses(n);
603 std::string lastip =
"";
604 for (std::list<std::string>::const_iterator iter = ips.begin(); iter
605 != ips.end(); ++iter) {
606 const std::string ip = (*iter);
612 struct sockaddr_in sin;
613 memset(&sin, 0,
sizeof(sin));
614 sin.sin_family = AF_INET;
615 sin.sin_port = htons(port);
618 rc = inet_pton(AF_INET, ip.c_str(), &(sin.sin_addr));
621 << n.
toString() <<
" on " << ip <<
":" << port
624 dtn_dht_ping_node((
struct sockaddr*) &sin,
sizeof(sin));
635 std::list<std::string> dtn::dht::DHTNameService::getAllIPAddresses(
637 std::string address =
"0.0.0.0";
638 unsigned int port = 0;
639 std::list < std::string > ret;
640 const std::list<dtn::core::Node::URI> uri_list = n.
get(
642 for (std::list<dtn::core::Node::URI>::const_iterator iter =
643 uri_list.begin(); iter != uri_list.end(); ++iter) {
645 uri.
decode(address, port);
646 ret.push_front(address);
648 const std::list<dtn::core::Node::URI> udp_uri_list = n.
get(
650 for (std::list<dtn::core::Node::URI>::const_iterator iter =
651 udp_uri_list.begin(); iter != udp_uri_list.end(); ++iter) {
653 udp_uri.
decode(address, port);
654 ret.push_front(address);
659 void dtn::dht::DHTNameService::bootstrapping() {
660 if (this->_bootstrapped > time(NULL) + 30) {
663 this->_bootstrapped = time(NULL);
664 if (_config.getPathToNodeFiles().size() > 0) {
667 if (_config.isDNSBootstrappingEnabled()) {
670 if (_config.isIPBootstrappingEnabled()) {
675 void dtn::dht::DHTNameService::bootstrappingFile() {
679 rc = dtn_dht_load_prev_conf(_config.getPathToNodeFiles().c_str());
690 void dtn::dht::DHTNameService::bootstrappingDNS() {
692 std::vector < string > dns = _config.getDNSBootstrappingNames();
694 std::vector<string>::const_iterator dns_iter = dns.begin();
695 while (dns_iter != dns.end()) {
696 const string &dn = (*dns_iter);
699 rc = dtn_dht_dns_bootstrap(&_context, dn.c_str(), NULL);
703 <<
" failed with error: " << rc
707 <<
"DHT Bootstrapping done for domain" << dn
715 rc = dtn_dht_dns_bootstrap(&_context, NULL, NULL);
719 <<
"bootstrapping from default domain failed with error: "
723 <<
"DHT Bootstrapping done for default domain"
730 void dtn::dht::DHTNameService::bootstrappingIPs() {
732 std::vector < string > ips = _config.getIPBootstrappingIPs();
733 std::vector<string>::const_iterator ip_iter = ips.begin();
734 while (ip_iter != ips.end()) {
735 std::vector < string > ip
737 int size, ipversion = AF_INET;
738 struct sockaddr *wellknown_node;
739 struct sockaddr_in sin;
740 struct sockaddr_in6 sin6;
741 memset(&sin, 0,
sizeof(sin));
746 port = atoi(ip[1].c_str());
751 rc = inet_pton(ipversion, ip[0].c_str(), &(sin.sin_addr));
753 ipversion = AF_INET6;
754 rc = inet_pton(ipversion, ip[0].c_str(), &(sin6.sin6_addr));
758 sin6.sin6_family = ipversion;
759 sin6.sin6_port = htons(port);
761 wellknown_node = (
struct sockaddr *) &sin6;
764 sin.sin_family = ipversion;
765 sin.sin_port = htons(port);
767 wellknown_node = (
struct sockaddr *) &sin;
771 dtn_dht_ping_node(wellknown_node, size);
783 if (this->_initialized) {
784 stringstream service;
785 service <<
"port=" << this->_context.port <<
";";
786 if (!this->_config.isNeighbourAllowedToAnnounceMe()) {
787 service <<
"proxy=false;";
791 if(!this->_config.isNeighbourAllowedToAnnounceMe()) {
800 if (result == NULL || result->eid == NULL || result->clayer == NULL) {
806 std::string clname__;
807 struct dtn_eid * eid = result->eid;
809 for (i = 0; i < eid->eidlen; ++i) {
810 eid__ += eid->eid[i];
815 struct dtn_convergence_layer * cl = result->clayer;
819 ss <<
"Adding Node " << eid__ <<
": ";
824 stringstream service;
826 for (i = 0; i < cl->clnamelen; ++i) {
827 clname__ += cl->clname[i];
829 if (clname__ ==
"tcp") {
830 proto__ = Node::CONN_TCPIP;
831 }
else if (clname__ ==
"udp") {
832 proto__ = Node::CONN_UDPIP;
834 proto__ = Node::CONN_UNDEFINED;
838 struct dtn_convergence_layer_arg * arg = cl->args;
839 std::string argstring__;
840 while (arg != NULL) {
841 if (arg->keylen <= 0 || arg->valuelen <= 0) {
846 for (i = 0; i < arg->keylen; ++i) {
847 argstring__ += arg->key[i];
849 service << argstring__ <<
"=";
851 for (i = 0; i < arg->valuelen && arg->value[i] !=
'\0'; ++i) {
852 argstring__ += arg->value[i];
854 service << argstring__ <<
";";
857 ss << clname__ <<
"(" << service.str() <<
") ";
860 Node::URI(Node::NODE_DHT_DISCOVERED, proto__, service.str(),
876 struct dtn_eid * neighbour = result->neighbours;
877 std::string neighbour__;
880 for (i = 0; i < neighbour->eidlen; ++i) {
881 neighbour__ += neighbour->eid[i];
890 neighbour = neighbour->next;
894 struct dtn_eid *
group = result->groups;