IBR-DTNSuite
0.8
|
00001 /* 00002 * DTNTPWorker.cpp 00003 * 00004 * Created on: 05.05.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "DTNTPWorker.h" 00009 #include "core/NodeEvent.h" 00010 #include "core/BundleCore.h" 00011 #include "core/TimeEvent.h" 00012 #include <ibrdtn/utils/Clock.h> 00013 #include <ibrdtn/utils/Utils.h> 00014 #include <ibrdtn/data/AgeBlock.h> 00015 #include <ibrdtn/data/PayloadBlock.h> 00016 #include <ibrdtn/data/SDNV.h> 00017 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00018 #include <ibrcommon/TimeMeasurement.h> 00019 #include <ibrcommon/Logger.h> 00020 00021 #include <sys/time.h> 00022 00023 namespace dtn 00024 { 00025 namespace daemon 00026 { 00027 const unsigned int DTNTPWorker::PROTO_VERSION = 1; 00028 00029 DTNTPWorker::DTNTPWorker() 00030 : _conf(dtn::daemon::Configuration::getInstance().getTimeSync()), _qot_current_tic(0), _sigma(_conf.getSigma()), 00031 _epsilon(1 / _sigma), _quality_diff(1 - _conf.getSyncLevel()), _sync_age(0) 00032 { 00033 AbstractWorker::initialize("/dtntp", 60, true); 00034 00035 if (_conf.hasReference()) 00036 { 00037 _last_sync.origin_quality = 0.0; 00038 _last_sync.peer_quality = 1.0; 00039 _last_sync.peer_timestamp = _last_sync.origin_timestamp; 00040 } 00041 else 00042 { 00043 _last_sync.origin_quality = 0.0; 00044 _last_sync.peer_quality = 0.0; 00045 } 00046 00047 // set current quality to last sync quality 00048 dtn::utils::Clock::quality = _last_sync.peer_quality; 00049 00050 if (_conf.syncOnDiscovery()) 00051 { 00052 bindEvent(dtn::core::NodeEvent::className); 00053 } 00054 00055 bindEvent(dtn::core::TimeEvent::className); 00056 00057 // debug quality of time 00058 IBRCOMMON_LOGGER_DEBUG(10) << "quality of time is " << dtn::utils::Clock::quality << IBRCOMMON_LOGGER_ENDL; 00059 } 00060 00061 DTNTPWorker::~DTNTPWorker() 00062 { 00063 unbindEvent(dtn::core::NodeEvent::className); 00064 unbindEvent(dtn::core::TimeEvent::className); 00065 } 00066 00067 DTNTPWorker::TimeSyncMessage::TimeSyncMessage() 00068 : type(TIMESYNC_REQUEST), origin_quality(dtn::utils::Clock::quality), peer_quality(0.0) 00069 { 00070 timerclear(&origin_timestamp); 00071 timerclear(&peer_timestamp); 00072 00073 dtn::utils::Clock::gettimeofday(&origin_timestamp); 00074 } 00075 00076 DTNTPWorker::TimeSyncMessage::~TimeSyncMessage() 00077 { 00078 } 00079 00080 std::ostream &operator<<(std::ostream &stream, const DTNTPWorker::TimeSyncMessage &obj) 00081 { 00082 std::stringstream ss; 00083 00084 stream << (char)obj.type; 00085 00086 ss.clear(); ss.str(""); ss << obj.origin_quality; 00087 stream << dtn::data::BundleString(ss.str()); 00088 00089 stream << dtn::data::SDNV(obj.origin_timestamp.tv_sec); 00090 stream << dtn::data::SDNV(obj.origin_timestamp.tv_usec); 00091 00092 ss.clear(); ss.str(""); ss << obj.peer_quality; 00093 stream << dtn::data::BundleString(ss.str()); 00094 00095 stream << dtn::data::SDNV(obj.peer_timestamp.tv_sec); 00096 stream << dtn::data::SDNV(obj.peer_timestamp.tv_usec); 00097 00098 return stream; 00099 } 00100 00101 std::istream &operator>>(std::istream &stream, DTNTPWorker::TimeSyncMessage &obj) 00102 { 00103 char type = 0; 00104 std::stringstream ss; 00105 dtn::data::BundleString bs; 00106 dtn::data::SDNV sdnv; 00107 00108 stream >> type; 00109 obj.type = DTNTPWorker::TimeSyncMessage::MSG_TYPE(type); 00110 00111 stream >> bs; 00112 ss.clear(); 00113 ss.str((const std::string&)bs); 00114 ss >> obj.origin_quality; 00115 00116 stream >> sdnv; obj.origin_timestamp.tv_sec = sdnv.getValue(); 00117 stream >> sdnv; obj.origin_timestamp.tv_usec = sdnv.getValue(); 00118 00119 stream >> bs; 00120 ss.clear(); 00121 ss.str((const std::string&)bs); 00122 ss >> obj.peer_quality; 00123 00124 stream >> sdnv; obj.peer_timestamp.tv_sec = sdnv.getValue(); 00125 stream >> sdnv; obj.peer_timestamp.tv_usec = sdnv.getValue(); 00126 00127 return stream; 00128 } 00129 00130 void DTNTPWorker::raiseEvent(const dtn::core::Event *evt) 00131 { 00132 try { 00133 const dtn::core::TimeEvent &t = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00134 00135 if (t.getAction() == dtn::core::TIME_SECOND_TICK) 00136 { 00137 ibrcommon::MutexLock l(_sync_lock); 00138 00139 // remove outdated blacklist entries 00140 { 00141 ibrcommon::MutexLock l(_blacklist_lock); 00142 for (std::map<EID, size_t>::iterator iter = _sync_blacklist.begin(); iter != _sync_blacklist.end(); iter++) 00143 { 00144 size_t bl_age = (*iter).second; 00145 00146 // do not query again if the blacklist entry is valid 00147 if (bl_age < t.getUnixTimestamp()) 00148 { 00149 _sync_blacklist.erase((*iter).first); 00150 } 00151 } 00152 } 00153 00154 if ((_conf.getQualityOfTimeTick() > 0) && !_conf.hasReference()) 00155 { 00159 if (_qot_current_tic == _conf.getQualityOfTimeTick()) 00160 { 00161 // get current time values 00162 _sync_age++; 00163 00164 // adjust own quality of time 00165 dtn::utils::Clock::quality = _last_sync.peer_quality * (1 / ::pow(_sigma, _sync_age) ); 00166 00167 // debug quality of time 00168 IBRCOMMON_LOGGER_DEBUG(25) << "new quality = " << _last_sync.peer_quality << " * (1 / (" << _sigma << " ^ " << _sync_age << "))" << IBRCOMMON_LOGGER_ENDL; 00169 IBRCOMMON_LOGGER_DEBUG(25) << "new quality of time is " << dtn::utils::Clock::quality << IBRCOMMON_LOGGER_ENDL; 00170 00171 // reset the tick counter 00172 _qot_current_tic = 0; 00173 } 00174 else 00175 { 00176 // increment the tick counter 00177 _qot_current_tic++; 00178 } 00179 } 00180 else 00181 { 00185 if (dtn::utils::Clock::quality == 0) 00186 { 00187 if (t.getTimestamp() > 0) 00188 { 00189 dtn::utils::Clock::quality = 1; 00190 IBRCOMMON_LOGGER(warning) << "The local clock seems to be okay again. Expiration enabled." << IBRCOMMON_LOGGER_ENDL; 00191 } 00192 } 00193 } 00194 } 00195 } catch (const std::bad_cast&) { }; 00196 00197 try { 00198 const dtn::core::NodeEvent &n = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00199 const dtn::core::Node &node = n.getNode(); 00200 00201 if (n.getAction() == dtn::core::NODE_INFO_UPDATED) 00202 { 00203 // only query for time sync if the other node supports this 00204 if (!node.has("dtntp")) return; 00205 00206 // get discovery attribute 00207 const std::list<dtn::core::Node::Attribute> attrs = node.get("dtntp"); 00208 00209 // decode attribute parameter 00210 unsigned int version = 0; 00211 size_t timestamp = 0; 00212 float quality = 0.0; 00213 decode(attrs.front(), version, timestamp, quality); 00214 00215 // we do only support version = 1 00216 if (version != 1) return; 00217 00218 // do not sync if the quality is worse than ours 00219 if ((quality * _quality_diff) <= dtn::utils::Clock::quality) return; 00220 00221 // get the EID of the peer 00222 const dtn::data::EID &peer = n.getNode().getEID(); 00223 00224 // check sync blacklist 00225 { 00226 ibrcommon::MutexLock l(_blacklist_lock); 00227 if (_sync_blacklist.find(peer) != _sync_blacklist.end()) 00228 { 00229 size_t bl_age = _sync_blacklist[peer]; 00230 00231 // do not query again if the blacklist entry is valid 00232 if (bl_age > dtn::utils::Clock::getUnixTimestamp()) 00233 { 00234 return; 00235 } 00236 } 00237 00238 // create a new blacklist entry 00239 _sync_blacklist[peer] = dtn::utils::Clock::getUnixTimestamp() + 60; 00240 } 00241 00242 // send a time sync bundle 00243 dtn::data::Bundle b; 00244 00245 // add an age block 00246 b.push_back<dtn::data::AgeBlock>(); 00247 00248 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create(); 00249 00250 // create the payload of the message 00251 { 00252 ibrcommon::BLOB::iostream stream = ref.iostream(); 00253 00254 // create a new timesync request 00255 TimeSyncMessage msg; 00256 00257 // write the message 00258 (*stream) << msg; 00259 } 00260 00261 // add the payload to the message 00262 b.push_back(ref); 00263 00264 // set the source and destination 00265 b._source = dtn::core::BundleCore::local + "/dtntp"; 00266 b._destination = peer + "/dtntp"; 00267 00268 // set high priority 00269 b.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00270 b.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00271 00272 // set the the destination as singleton receiver 00273 b.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00274 00275 // set the lifetime of the bundle to 60 seconds 00276 b._lifetime = 60; 00277 00278 // add a schl block 00279 dtn::data::ScopeControlHopLimitBlock &schl = b.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00280 schl.setLimit(1); 00281 00282 transmit(b); 00283 } 00284 } catch (const std::bad_cast&) { }; 00285 } 00286 00287 void DTNTPWorker::update(const ibrcommon::vinterface&, std::string &name, std::string &data) throw(NoServiceHereException) 00288 { 00289 if (!_conf.sendDiscoveryAnnouncements()) throw NoServiceHereException("Discovery of time sync mechanisms disabled."); 00290 00291 std::stringstream ss; 00292 ss << "version=" << PROTO_VERSION << ";quality=" << dtn::utils::Clock::quality << ";timestamp=" << dtn::utils::Clock::getTime() << ";"; 00293 name = "dtntp"; 00294 data = ss.str(); 00295 } 00296 00297 void DTNTPWorker::decode(const dtn::core::Node::Attribute &attr, unsigned int &version, size_t ×tamp, float &quality) 00298 { 00299 // parse parameters 00300 std::vector<std::string> parameters = dtn::utils::Utils::tokenize(";", attr.value); 00301 std::vector<std::string>::const_iterator param_iter = parameters.begin(); 00302 00303 while (param_iter != parameters.end()) 00304 { 00305 std::vector<std::string> p = dtn::utils::Utils::tokenize("=", (*param_iter)); 00306 00307 if (p[0].compare("version") == 0) 00308 { 00309 std::stringstream ss(p[1]); 00310 ss >> version; 00311 } 00312 00313 if (p[0].compare("timestamp") == 0) 00314 { 00315 std::stringstream ss(p[1]); 00316 ss >> timestamp; 00317 } 00318 00319 if (p[0].compare("quality") == 0) 00320 { 00321 std::stringstream ss(p[1]); 00322 ss >> quality; 00323 } 00324 00325 param_iter++; 00326 } 00327 } 00328 00329 // void DTNTPWorker::shared_sync(const TimeBeacon &beacon) 00330 // { 00331 // // do not sync if we are a reference 00332 // if (_conf.hasReference()) return; 00333 // 00334 // // adjust own quality of time 00335 // TimeBeacon current; get_time(current); 00336 // long int tdiff = current.sec - _last_sync.sec; 00337 // 00338 // ibrcommon::MutexLock l(_sync_lock); 00339 // 00340 // // if we have no time, take it 00341 // if (_last_sync.quality == 0) 00342 // { 00343 // dtn::utils::Clock::quality = beacon.quality * _epsilon; 00344 // } 00345 // // if our last sync is older than one second... 00346 // else if (tdiff > 0) 00347 // { 00348 // // sync our clock 00349 // double ext_faktor = beacon.quality / (beacon.quality + dtn::utils::Clock::quality); 00350 // double int_faktor = dtn::utils::Clock::quality / (beacon.quality + dtn::utils::Clock::quality); 00351 // 00352 // // set the new time values 00353 // set_time( (beacon.sec * ext_faktor) + (current.sec * int_faktor), 00354 // (beacon.usec * ext_faktor) + (current.usec * int_faktor) 00355 // ); 00356 // } 00357 // else 00358 // { 00359 // return; 00360 // } 00361 // } 00362 00363 void DTNTPWorker::sync(const TimeSyncMessage &msg, struct timeval &offset) 00364 { 00365 // do not sync if we are a reference 00366 if (_conf.hasReference()) return; 00367 00368 ibrcommon::MutexLock l(_sync_lock); 00369 00370 // if the received quality of time is worse than ours, ignore it 00371 if (dtn::utils::Clock::quality >= msg.peer_quality) return; 00372 00373 // the values are better, adapt them 00374 dtn::utils::Clock::quality = msg.peer_quality * _epsilon; 00375 00376 // set the local clock to the new timestamp 00377 dtn::utils::Clock::setOffset(offset); 00378 00379 IBRCOMMON_LOGGER(info) << "time adjusted to " << msg.peer_timestamp.tv_sec << "." << msg.peer_timestamp.tv_usec << "; quality: " << dtn::utils::Clock::quality << IBRCOMMON_LOGGER_ENDL; 00380 00381 // remember the last sync 00382 _last_sync = msg; 00383 _sync_age = 0; 00384 } 00385 00386 void DTNTPWorker::callbackBundleReceived(const Bundle &b) 00387 { 00388 // do not sync with ourselves 00389 if (b._source.getNode() == dtn::core::BundleCore::local) return; 00390 00391 try { 00392 // read payload block 00393 const dtn::data::PayloadBlock &p = b.getBlock<dtn::data::PayloadBlock>(); 00394 00395 // read the type of the message 00396 char type = 0; (*p.getBLOB().iostream()).get(type); 00397 00398 switch (type) 00399 { 00400 case TimeSyncMessage::TIMESYNC_REQUEST: 00401 { 00402 dtn::data::Bundle response = b; 00403 response.relabel(); 00404 00405 // set the lifetime of the bundle to 60 seconds 00406 response._lifetime = 60; 00407 00408 // switch the source and destination 00409 response._source = b._destination; 00410 response._destination = b._source; 00411 00412 // set high priority 00413 response.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00414 response.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00415 00416 // set the the destination as singleton receiver 00417 response.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00418 00419 // modify the payload - locked 00420 { 00421 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00422 ibrcommon::BLOB::iostream stream = ref.iostream(); 00423 00424 // read the timesync message 00425 TimeSyncMessage msg; 00426 (*stream) >> msg; 00427 00428 // clear the payload 00429 stream.clear(); 00430 00431 // fill in the own values 00432 msg.type = TimeSyncMessage::TIMESYNC_RESPONSE; 00433 msg.peer_quality = dtn::utils::Clock::quality; 00434 dtn::utils::Clock::gettimeofday(&msg.peer_timestamp); 00435 00436 // write the response 00437 (*stream) << msg; 00438 } 00439 00440 // add a second age block 00441 response.push_front<dtn::data::AgeBlock>(); 00442 00443 // modify the old schl block or add a new one 00444 try { 00445 dtn::data::ScopeControlHopLimitBlock &schl = response.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00446 schl.setLimit(1); 00447 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { 00448 dtn::data::ScopeControlHopLimitBlock &schl = response.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00449 schl.setLimit(1); 00450 }; 00451 00452 // send the response 00453 transmit(response); 00454 break; 00455 } 00456 00457 case TimeSyncMessage::TIMESYNC_RESPONSE: 00458 { 00459 // read the ageblock of the bundle 00460 const std::list<const dtn::data::AgeBlock*> ageblocks = b.getBlocks<dtn::data::AgeBlock>(); 00461 const dtn::data::AgeBlock &peer_age = (*ageblocks.front()); 00462 const dtn::data::AgeBlock &origin_age = (*ageblocks.back()); 00463 00464 timeval tv_age; timerclear(&tv_age); 00465 tv_age.tv_usec = origin_age.getMicroseconds(); 00466 00467 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00468 ibrcommon::BLOB::iostream stream = ref.iostream(); 00469 00470 TimeSyncMessage msg; (*stream) >> msg; 00471 00472 timeval tv_local, rtt; 00473 dtn::utils::Clock::gettimeofday(&tv_local); 00474 00475 // get the RTT 00476 timersub(&tv_local, &msg.origin_timestamp, &rtt); 00477 00478 // get the propagation delay 00479 timeval prop_delay; 00480 timersub(&rtt, &tv_age, &prop_delay); 00481 00482 // half the prop delay 00483 prop_delay.tv_sec /= 2; 00484 prop_delay.tv_usec /= 2; 00485 00486 timeval sync_delay; 00487 timerclear(&sync_delay); 00488 sync_delay.tv_usec = peer_age.getMicroseconds() + prop_delay.tv_usec; 00489 00490 timeval peer_timestamp; 00491 timeradd(&msg.peer_timestamp, &sync_delay, &peer_timestamp); 00492 00493 timeval offset; 00494 timersub(&tv_local, &peer_timestamp, &offset); 00495 00496 // print out offset to the local clock 00497 IBRCOMMON_LOGGER(info) << "DT-NTP bundle received; rtt = " << rtt.tv_sec << "s " << rtt.tv_usec << "us; prop. delay = " << prop_delay.tv_sec << "s " << prop_delay.tv_usec << "us; clock of " << b._source.getNode().getString() << " has a offset of " << offset.tv_sec << "s " << offset.tv_usec << "us" << IBRCOMMON_LOGGER_ENDL; 00498 00499 // sync to this time message 00500 sync(msg, offset); 00501 00502 // remove the blacklist entry 00503 ibrcommon::MutexLock l(_blacklist_lock); 00504 _sync_blacklist.erase(b._source.getNode()); 00505 00506 break; 00507 } 00508 } 00509 } catch (const ibrcommon::Exception&) { }; 00510 } 00511 } 00512 }