43 const unsigned int DTNTPWorker::PROTO_VERSION = 1;
44 const std::string DTNTPWorker::TAG =
"DTNTPWorker";
47 : _sync_threshold(0.15f), _announce_rating(false), _base_rating(0.0), _psi(0.99), _sigma(1.0), _sync(false)
49 AbstractWorker::initialize(
"/dtntp", 60,
true);
52 timerclear(&_last_sync_time);
97 : type(TIMESYNC_REQUEST), origin_rating(dtn::utils::Clock::getRating()), peer_rating(0.0)
111 std::stringstream ss;
113 stream << (char)obj.
type;
133 std::stringstream ss;
142 ss.str((
const std::string&)bs);
153 ss.str((
const std::string&)bs);
177 for (blacklist_map::iterator iter = _sync_blacklist.begin(); iter != _sync_blacklist.end();)
183 _sync_blacklist.erase(iter++);
210 if (timerisset(&_last_sync_time))
222 double timediff = now - last_sync;
233 for (std::set<dtn::core::Node>::const_iterator iter = nodes.begin(); iter != nodes.end(); ++iter) {
234 if (shouldSyncWith(*iter)) {
239 }
catch (
const std::bad_cast&) { };
245 if (!node.
has(
"dtntp"))
return false;
248 const std::list<dtn::core::Node::Attribute> attrs = node.
get(
"dtntp");
250 if (attrs.empty())
return false;
253 unsigned int version = 0;
256 decode(attrs.front(), version, timestamp, quality);
259 if (version != 1)
return false;
278 if (_sync_blacklist.find(peer) != _sync_blacklist.end())
355 std::stringstream ss;
364 std::vector<std::string>::const_iterator param_iter = parameters.begin();
366 while (param_iter != parameters.end())
370 if (p[0].compare(
"version") == 0)
372 std::stringstream ss(p[1]);
376 if (p[0].compare(
"timestamp") == 0)
381 if (p[0].compare(
"quality") == 0)
383 std::stringstream ss(p[1]);
391 bool DTNTPWorker::hasReference()
const {
392 return (_sigma == 1.0);
395 void DTNTPWorker::sync(
const TimeSyncMessage &msg,
const struct timeval &tv_offset,
const struct timeval &tv_local,
const struct timeval &tv_remote)
398 if (hasReference())
return;
409 if (timerisset(&_last_sync_time))
414 double t_stable = local_time - lastsync_time;
416 if (t_stable > 0.0) {
417 double sigma_base = (1 / ::pow(_psi, 1/t_stable));
418 double sigma_adjustment = ::fabs(remote_time - local_time) / t_stable * msg.peer_rating;
419 _sigma = sigma_base + sigma_adjustment;
425 if (local_time > remote_time) {
427 _base_rating = msg.peer_rating * (remote_time / local_time);
430 _base_rating = msg.peer_rating * (local_time / remote_time);
522 timeval tv_rtt_measured, tv_local_timestamp, tv_rtt, tv_prop_delay, tv_sync_delay, tv_peer_timestamp, tv_offset;
524 timerclear(&tv_rtt_measured);
525 tv_rtt_measured.tv_sec = origin_age.
getSeconds().
get<time_t>();
538 timersub(&tv_local_timestamp, &msg.origin_timestamp, &tv_rtt);
547 double prop_delay = 0.0;
552 if (rtt <= rtt_measured) {
553 timerclear(&tv_prop_delay);
556 timersub(&tv_rtt, &tv_rtt_measured, &tv_prop_delay);
561 tv_prop_delay.tv_sec /= 2;
562 tv_prop_delay.tv_usec /= 2;
565 timerclear(&tv_sync_delay);
570 timeradd(&msg.peer_timestamp, &tv_sync_delay, &tv_peer_timestamp);
573 timeradd(&msg.peer_timestamp, &tv_prop_delay, &tv_peer_timestamp);
576 timersub(&tv_local_timestamp, &tv_peer_timestamp, &tv_offset);
582 sync(msg, tv_offset, tv_local_timestamp, tv_peer_timestamp);