IBR-DTNSuite  0.8
daemon/src/DTNTPWorker.cpp
Go to the documentation of this file.
00001 /*
00002  * DTNTPWorker.cpp
00003  *
00004  *  Created on: 05.05.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "DTNTPWorker.h"
00009 #include "core/NodeEvent.h"
00010 #include "core/BundleCore.h"
00011 #include "core/TimeEvent.h"
00012 #include <ibrdtn/utils/Clock.h>
00013 #include <ibrdtn/utils/Utils.h>
00014 #include <ibrdtn/data/AgeBlock.h>
00015 #include <ibrdtn/data/PayloadBlock.h>
00016 #include <ibrdtn/data/SDNV.h>
00017 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00018 #include <ibrcommon/TimeMeasurement.h>
00019 #include <ibrcommon/Logger.h>
00020 
00021 #include <sys/time.h>
00022 
00023 namespace dtn
00024 {
00025         namespace daemon
00026         {
00027                 const unsigned int DTNTPWorker::PROTO_VERSION = 1;
00028 
00029                 DTNTPWorker::DTNTPWorker()
00030                  : _conf(dtn::daemon::Configuration::getInstance().getTimeSync()), _qot_current_tic(0), _sigma(_conf.getSigma()),
00031                    _epsilon(1 / _sigma), _quality_diff(1 - _conf.getSyncLevel()), _sync_age(0)
00032                 {
00033                         AbstractWorker::initialize("/dtntp", 60, true);
00034 
00035                         if (_conf.hasReference())
00036                         {
00037                                 _last_sync.origin_quality = 0.0;
00038                                 _last_sync.peer_quality = 1.0;
00039                                 _last_sync.peer_timestamp = _last_sync.origin_timestamp;
00040                         }
00041                         else
00042                         {
00043                                 _last_sync.origin_quality = 0.0;
00044                                 _last_sync.peer_quality = 0.0;
00045                         }
00046 
00047                         // set current quality to last sync quality
00048                         dtn::utils::Clock::quality = _last_sync.peer_quality;
00049 
00050                         if (_conf.syncOnDiscovery())
00051                         {
00052                                 bindEvent(dtn::core::NodeEvent::className);
00053                         }
00054 
00055                         bindEvent(dtn::core::TimeEvent::className);
00056 
00057                         // debug quality of time
00058                         IBRCOMMON_LOGGER_DEBUG(10) << "quality of time is " << dtn::utils::Clock::quality << IBRCOMMON_LOGGER_ENDL;
00059                 }
00060 
00061                 DTNTPWorker::~DTNTPWorker()
00062                 {
00063                         unbindEvent(dtn::core::NodeEvent::className);
00064                         unbindEvent(dtn::core::TimeEvent::className);
00065                 }
00066 
00067                 DTNTPWorker::TimeSyncMessage::TimeSyncMessage()
00068                  : type(TIMESYNC_REQUEST), origin_quality(dtn::utils::Clock::quality), peer_quality(0.0)
00069                 {
00070                         timerclear(&origin_timestamp);
00071                         timerclear(&peer_timestamp);
00072 
00073                         dtn::utils::Clock::gettimeofday(&origin_timestamp);
00074                 }
00075 
00076                 DTNTPWorker::TimeSyncMessage::~TimeSyncMessage()
00077                 {
00078                 }
00079 
00080                 std::ostream &operator<<(std::ostream &stream, const DTNTPWorker::TimeSyncMessage &obj)
00081                 {
00082                         std::stringstream ss;
00083 
00084                         stream << (char)obj.type;
00085 
00086                         ss.clear(); ss.str(""); ss << obj.origin_quality;
00087                         stream << dtn::data::BundleString(ss.str());
00088 
00089                         stream << dtn::data::SDNV(obj.origin_timestamp.tv_sec);
00090                         stream << dtn::data::SDNV(obj.origin_timestamp.tv_usec);
00091 
00092                         ss.clear(); ss.str(""); ss << obj.peer_quality;
00093                         stream << dtn::data::BundleString(ss.str());
00094 
00095                         stream << dtn::data::SDNV(obj.peer_timestamp.tv_sec);
00096                         stream << dtn::data::SDNV(obj.peer_timestamp.tv_usec);
00097 
00098                         return stream;
00099                 }
00100 
00101                 std::istream &operator>>(std::istream &stream, DTNTPWorker::TimeSyncMessage &obj)
00102                 {
00103                         char type = 0;
00104                         std::stringstream ss;
00105                         dtn::data::BundleString bs;
00106                         dtn::data::SDNV sdnv;
00107 
00108                         stream >> type;
00109                         obj.type = DTNTPWorker::TimeSyncMessage::MSG_TYPE(type);
00110 
00111                         stream >> bs;
00112                         ss.clear();
00113                         ss.str((const std::string&)bs);
00114                         ss >> obj.origin_quality;
00115 
00116                         stream >> sdnv; obj.origin_timestamp.tv_sec = sdnv.getValue();
00117                         stream >> sdnv; obj.origin_timestamp.tv_usec = sdnv.getValue();
00118 
00119                         stream >> bs;
00120                         ss.clear();
00121                         ss.str((const std::string&)bs);
00122                         ss >> obj.peer_quality;
00123 
00124                         stream >> sdnv; obj.peer_timestamp.tv_sec = sdnv.getValue();
00125                         stream >> sdnv; obj.peer_timestamp.tv_usec = sdnv.getValue();
00126 
00127                         return stream;
00128                 }
00129 
00130                 void DTNTPWorker::raiseEvent(const dtn::core::Event *evt)
00131                 {
00132                         try {
00133                                 const dtn::core::TimeEvent &t = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00134 
00135                                 if (t.getAction() == dtn::core::TIME_SECOND_TICK)
00136                                 {
00137                                         ibrcommon::MutexLock l(_sync_lock);
00138 
00139                                         // remove outdated blacklist entries
00140                                         {
00141                                                 ibrcommon::MutexLock l(_blacklist_lock);
00142                                                 for (std::map<EID, size_t>::iterator iter = _sync_blacklist.begin(); iter != _sync_blacklist.end(); iter++)
00143                                                 {
00144                                                         size_t bl_age = (*iter).second;
00145 
00146                                                         // do not query again if the blacklist entry is valid
00147                                                         if (bl_age < t.getUnixTimestamp())
00148                                                         {
00149                                                                 _sync_blacklist.erase((*iter).first);
00150                                                         }
00151                                                 }
00152                                         }
00153 
00154                                         if ((_conf.getQualityOfTimeTick() > 0) && !_conf.hasReference())
00155                                         {
00159                                                 if (_qot_current_tic == _conf.getQualityOfTimeTick())
00160                                                 {
00161                                                         // get current time values
00162                                                         _sync_age++;
00163 
00164                                                         // adjust own quality of time
00165                                                         dtn::utils::Clock::quality = _last_sync.peer_quality * (1 / ::pow(_sigma, _sync_age) );
00166 
00167                                                         // debug quality of time
00168                                                         IBRCOMMON_LOGGER_DEBUG(25) << "new quality = " << _last_sync.peer_quality << " * (1 / (" << _sigma << " ^ " << _sync_age << "))" << IBRCOMMON_LOGGER_ENDL;
00169                                                         IBRCOMMON_LOGGER_DEBUG(25) << "new quality of time is " << dtn::utils::Clock::quality << IBRCOMMON_LOGGER_ENDL;
00170 
00171                                                         // reset the tick counter
00172                                                         _qot_current_tic = 0;
00173                                                 }
00174                                                 else
00175                                                 {
00176                                                         // increment the tick counter
00177                                                         _qot_current_tic++;
00178                                                 }
00179                                         }
00180                                         else
00181                                         {
00185                                                 if (dtn::utils::Clock::quality == 0)
00186                                                 {
00187                                                         if (t.getTimestamp() > 0)
00188                                                         {
00189                                                                 dtn::utils::Clock::quality = 1;
00190                                                                 IBRCOMMON_LOGGER(warning) << "The local clock seems to be okay again. Expiration enabled." << IBRCOMMON_LOGGER_ENDL;
00191                                                         }
00192                                                 }
00193                                         }
00194                                 }
00195                         } catch (const std::bad_cast&) { };
00196 
00197                         try {
00198                                 const dtn::core::NodeEvent &n = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00199                                 const dtn::core::Node &node = n.getNode();
00200 
00201                                 if (n.getAction() == dtn::core::NODE_INFO_UPDATED)
00202                                 {
00203                                         // only query for time sync if the other node supports this
00204                                         if (!node.has("dtntp")) return;
00205 
00206                                         // get discovery attribute
00207                                         const std::list<dtn::core::Node::Attribute> attrs = node.get("dtntp");
00208 
00209                                         // decode attribute parameter
00210                                         unsigned int version = 0;
00211                                         size_t timestamp = 0;
00212                                         float quality = 0.0;
00213                                         decode(attrs.front(), version, timestamp, quality);
00214 
00215                                         // we do only support version = 1
00216                                         if (version != 1) return;
00217 
00218                                         // do not sync if the quality is worse than ours
00219                                         if ((quality * _quality_diff) <= dtn::utils::Clock::quality) return;
00220 
00221                                         // get the EID of the peer
00222                                         const dtn::data::EID &peer = n.getNode().getEID();
00223 
00224                                         // check sync blacklist
00225                                         {
00226                                                 ibrcommon::MutexLock l(_blacklist_lock);
00227                                                 if (_sync_blacklist.find(peer) != _sync_blacklist.end())
00228                                                 {
00229                                                         size_t bl_age = _sync_blacklist[peer];
00230 
00231                                                         // do not query again if the blacklist entry is valid
00232                                                         if (bl_age > dtn::utils::Clock::getUnixTimestamp())
00233                                                         {
00234                                                                 return;
00235                                                         }
00236                                                 }
00237 
00238                                                 // create a new blacklist entry
00239                                                 _sync_blacklist[peer] = dtn::utils::Clock::getUnixTimestamp() + 60;
00240                                         }
00241 
00242                                         // send a time sync bundle
00243                                         dtn::data::Bundle b;
00244 
00245                                         // add an age block
00246                                         b.push_back<dtn::data::AgeBlock>();
00247 
00248                                         ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
00249 
00250                                         // create the payload of the message
00251                                         {
00252                                                 ibrcommon::BLOB::iostream stream = ref.iostream();
00253 
00254                                                 // create a new timesync request
00255                                                 TimeSyncMessage msg;
00256 
00257                                                 // write the message
00258                                                 (*stream) << msg;
00259                                         }
00260 
00261                                         // add the payload to the message
00262                                         b.push_back(ref);
00263 
00264                                         // set the source and destination
00265                                         b._source = dtn::core::BundleCore::local + "/dtntp";
00266                                         b._destination = peer + "/dtntp";
00267 
00268                                         // set high priority
00269                                         b.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00270                                         b.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00271 
00272                                         // set the the destination as singleton receiver
00273                                         b.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00274 
00275                                         // set the lifetime of the bundle to 60 seconds
00276                                         b._lifetime = 60;
00277 
00278                                         // add a schl block
00279                                         dtn::data::ScopeControlHopLimitBlock &schl = b.push_front<dtn::data::ScopeControlHopLimitBlock>();
00280                                         schl.setLimit(1);
00281 
00282                                         transmit(b);
00283                                 }
00284                         } catch (const std::bad_cast&) { };
00285                 }
00286 
00287                 void DTNTPWorker::update(const ibrcommon::vinterface&, std::string &name, std::string &data) throw(NoServiceHereException)
00288                 {
00289                         if (!_conf.sendDiscoveryAnnouncements()) throw NoServiceHereException("Discovery of time sync mechanisms disabled.");
00290 
00291                         std::stringstream ss;
00292                         ss << "version=" << PROTO_VERSION << ";quality=" << dtn::utils::Clock::quality << ";timestamp=" << dtn::utils::Clock::getTime() << ";";
00293                         name = "dtntp";
00294                         data = ss.str();
00295                 }
00296 
00297                 void DTNTPWorker::decode(const dtn::core::Node::Attribute &attr, unsigned int &version, size_t &timestamp, float &quality)
00298                 {
00299                         // parse parameters
00300                         std::vector<std::string> parameters = dtn::utils::Utils::tokenize(";", attr.value);
00301                         std::vector<std::string>::const_iterator param_iter = parameters.begin();
00302 
00303                         while (param_iter != parameters.end())
00304                         {
00305                                 std::vector<std::string> p = dtn::utils::Utils::tokenize("=", (*param_iter));
00306 
00307                                 if (p[0].compare("version") == 0)
00308                                 {
00309                                         std::stringstream ss(p[1]);
00310                                         ss >> version;
00311                                 }
00312 
00313                                 if (p[0].compare("timestamp") == 0)
00314                                 {
00315                                         std::stringstream ss(p[1]);
00316                                         ss >> timestamp;
00317                                 }
00318 
00319                                 if (p[0].compare("quality") == 0)
00320                                 {
00321                                         std::stringstream ss(p[1]);
00322                                         ss >> quality;
00323                                 }
00324 
00325                                 param_iter++;
00326                         }
00327                 }
00328 
00329 //              void DTNTPWorker::shared_sync(const TimeBeacon &beacon)
00330 //              {
00331 //                      // do not sync if we are a reference
00332 //                      if (_conf.hasReference()) return;
00333 //
00334 //                      // adjust own quality of time
00335 //                      TimeBeacon current; get_time(current);
00336 //                      long int tdiff = current.sec - _last_sync.sec;
00337 //
00338 //                      ibrcommon::MutexLock l(_sync_lock);
00339 //
00340 //                      // if we have no time, take it
00341 //                      if (_last_sync.quality == 0)
00342 //                      {
00343 //                              dtn::utils::Clock::quality = beacon.quality * _epsilon;
00344 //                      }
00345 //                      // if our last sync is older than one second...
00346 //                      else if (tdiff > 0)
00347 //                      {
00348 //                              // sync our clock
00349 //                              double ext_faktor = beacon.quality / (beacon.quality + dtn::utils::Clock::quality);
00350 //                              double int_faktor = dtn::utils::Clock::quality / (beacon.quality + dtn::utils::Clock::quality);
00351 //
00352 //                              // set the new time values
00353 //                              set_time(       (beacon.sec * ext_faktor) + (current.sec * int_faktor),
00354 //                                                      (beacon.usec * ext_faktor) + (current.usec * int_faktor)
00355 //                                              );
00356 //                      }
00357 //                      else
00358 //                      {
00359 //                              return;
00360 //                      }
00361 //              }
00362 
00363                 void DTNTPWorker::sync(const TimeSyncMessage &msg, struct timeval &offset)
00364                 {
00365                         // do not sync if we are a reference
00366                         if (_conf.hasReference()) return;
00367 
00368                         ibrcommon::MutexLock l(_sync_lock);
00369 
00370                         // if the received quality of time is worse than ours, ignore it
00371                         if (dtn::utils::Clock::quality >= msg.peer_quality) return;
00372 
00373                         // the values are better, adapt them
00374                         dtn::utils::Clock::quality = msg.peer_quality * _epsilon;
00375 
00376                         // set the local clock to the new timestamp
00377                         dtn::utils::Clock::setOffset(offset);
00378 
00379                         IBRCOMMON_LOGGER(info) << "time adjusted to " << msg.peer_timestamp.tv_sec << "." << msg.peer_timestamp.tv_usec << "; quality: " << dtn::utils::Clock::quality << IBRCOMMON_LOGGER_ENDL;
00380 
00381                         // remember the last sync
00382                         _last_sync = msg;
00383                         _sync_age = 0;
00384                 }
00385 
00386                 void DTNTPWorker::callbackBundleReceived(const Bundle &b)
00387                 {
00388                         // do not sync with ourselves
00389                         if (b._source.getNode() == dtn::core::BundleCore::local) return;
00390 
00391                         try {
00392                                 // read payload block
00393                                 const dtn::data::PayloadBlock &p = b.getBlock<dtn::data::PayloadBlock>();
00394 
00395                                 // read the type of the message
00396                                 char type = 0; (*p.getBLOB().iostream()).get(type);
00397 
00398                                 switch (type)
00399                                 {
00400                                         case TimeSyncMessage::TIMESYNC_REQUEST:
00401                                         {
00402                                                 dtn::data::Bundle response = b;
00403                                                 response.relabel();
00404 
00405                                                 // set the lifetime of the bundle to 60 seconds
00406                                                 response._lifetime = 60;
00407 
00408                                                 // switch the source and destination
00409                                                 response._source = b._destination;
00410                                                 response._destination = b._source;
00411                                                 
00412                                                 // set high priority
00413                                                 response.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00414                                                 response.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00415 
00416                                                 // set the the destination as singleton receiver
00417                                                 response.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00418 
00419                                                 // modify the payload - locked
00420                                                 {
00421                                                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00422                                                         ibrcommon::BLOB::iostream stream = ref.iostream();
00423 
00424                                                         // read the timesync message
00425                                                         TimeSyncMessage msg;
00426                                                         (*stream) >> msg;
00427 
00428                                                         // clear the payload
00429                                                         stream.clear();
00430 
00431                                                         // fill in the own values
00432                                                         msg.type = TimeSyncMessage::TIMESYNC_RESPONSE;
00433                                                         msg.peer_quality = dtn::utils::Clock::quality;
00434                                                         dtn::utils::Clock::gettimeofday(&msg.peer_timestamp);
00435 
00436                                                         // write the response
00437                                                         (*stream) << msg;
00438                                                 }
00439 
00440                                                 // add a second age block
00441                                                 response.push_front<dtn::data::AgeBlock>();
00442 
00443                                                 // modify the old schl block or add a new one
00444                                                 try {
00445                                                         dtn::data::ScopeControlHopLimitBlock &schl = response.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00446                                                         schl.setLimit(1);
00447                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00448                                                         dtn::data::ScopeControlHopLimitBlock &schl = response.push_front<dtn::data::ScopeControlHopLimitBlock>();
00449                                                         schl.setLimit(1);
00450                                                 };
00451 
00452                                                 // send the response
00453                                                 transmit(response);
00454                                                 break;
00455                                         }
00456 
00457                                         case TimeSyncMessage::TIMESYNC_RESPONSE:
00458                                         {
00459                                                 // read the ageblock of the bundle
00460                                                 const std::list<const dtn::data::AgeBlock*> ageblocks = b.getBlocks<dtn::data::AgeBlock>();
00461                                                 const dtn::data::AgeBlock &peer_age = (*ageblocks.front());
00462                                                 const dtn::data::AgeBlock &origin_age = (*ageblocks.back());
00463 
00464                                                 timeval tv_age; timerclear(&tv_age);
00465                                                 tv_age.tv_usec = origin_age.getMicroseconds();
00466 
00467                                                 ibrcommon::BLOB::Reference ref = p.getBLOB();
00468                                                 ibrcommon::BLOB::iostream stream = ref.iostream();
00469 
00470                                                 TimeSyncMessage msg; (*stream) >> msg;
00471 
00472                                                 timeval tv_local, rtt;
00473                                                 dtn::utils::Clock::gettimeofday(&tv_local);
00474 
00475                                                 // get the RTT
00476                                                 timersub(&tv_local, &msg.origin_timestamp, &rtt);
00477 
00478                                                 // get the propagation delay
00479                                                 timeval prop_delay;
00480                                                 timersub(&rtt, &tv_age, &prop_delay);
00481 
00482                                                 // half the prop delay
00483                                                 prop_delay.tv_sec /= 2;
00484                                                 prop_delay.tv_usec /= 2;
00485 
00486                                                 timeval sync_delay;
00487                                                 timerclear(&sync_delay);
00488                                                 sync_delay.tv_usec = peer_age.getMicroseconds() + prop_delay.tv_usec;
00489 
00490                                                 timeval peer_timestamp;
00491                                                 timeradd(&msg.peer_timestamp, &sync_delay, &peer_timestamp);
00492 
00493                                                 timeval offset;
00494                                                 timersub(&tv_local, &peer_timestamp, &offset);
00495 
00496                                                 // print out offset to the local clock
00497                                                 IBRCOMMON_LOGGER(info) << "DT-NTP bundle received; rtt = " << rtt.tv_sec << "s " << rtt.tv_usec << "us; prop. delay = " << prop_delay.tv_sec << "s " << prop_delay.tv_usec << "us; clock of " << b._source.getNode().getString() << " has a offset of " << offset.tv_sec << "s " << offset.tv_usec << "us" << IBRCOMMON_LOGGER_ENDL;
00498 
00499                                                 // sync to this time message
00500                                                 sync(msg, offset);
00501 
00502                                                 // remove the blacklist entry
00503                                                 ibrcommon::MutexLock l(_blacklist_lock);
00504                                                 _sync_blacklist.erase(b._source.getNode());
00505 
00506                                                 break;
00507                                         }
00508                                 }
00509                         } catch (const ibrcommon::Exception&) { };
00510                 }
00511         }
00512 }