41 #define suseconds_t long
48 const unsigned int DTNTPWorker::PROTO_VERSION = 1;
49 const std::string DTNTPWorker::TAG =
"DTNTPWorker";
50 DTNTPWorker::TimeSyncState DTNTPWorker::_sync_state;
53 : sync_threshold(0.15f), base_rating(0.0), psi(0.99), sigma(1.0), last_sync_set(false)
65 return static_cast<double>(val.tv_sec) + (static_cast<double>(val.tv_nsec) / 1000000000.0);
69 : _announce_rating(false), _sync(false)
71 AbstractWorker::initialize(
"dtntp");
103 if (_sync || _announce_rating) {
122 : type(TIMESYNC_REQUEST), origin_rating(dtn::utils::Clock::getRating()), peer_rating(0.0)
136 std::stringstream ss;
138 stream << (char)obj.
type;
158 std::stringstream ss;
167 ss.str((
const std::string&)bs);
178 ss.str((
const std::string&)bs);
205 for (peer_map::iterator iter = _peers.begin(); iter != _peers.end();)
207 const SyncPeer &peer = (*iter).second;
210 if (peer.isExpired()) {
211 _peers.erase(iter++);
240 struct timespec ts_now, ts_diff;
260 for (std::set<dtn::core::Node>::const_iterator iter = nodes.begin(); iter != nodes.end(); ++iter) {
261 if (shouldSyncWith(*iter)) {
266 }
catch (
const std::bad_cast&) { };
272 if (!node.
has(
"dtntp"))
return false;
275 const std::list<dtn::core::Node::Attribute> attrs = node.
get(
"dtntp");
277 if (attrs.empty())
return false;
280 unsigned int version = 0;
283 decode(attrs.front(), version, timestamp, quality);
286 if (version != 1)
return false;
305 const peer_map::const_iterator it = _peers.find(peer);
306 if (it != _peers.end())
308 const SyncPeer &peer = (*it).second;
311 if (peer.state != SyncPeer::STATE_IDLE)
318 SyncPeer &p = _peers[peer];
320 p.state = SyncPeer::STATE_PREPARE;
339 SyncPeer &p = _peers[peer];
340 p.request_timestamp = msg.origin_timestamp;
342 p.state = SyncPeer::STATE_REQUEST;
358 b.destination = peer;
361 b.destination.setApplication(
"dtntp");
387 std::stringstream ss;
396 std::vector<std::string>::const_iterator param_iter = parameters.begin();
398 while (param_iter != parameters.end())
402 if (p[0].compare(
"version") == 0)
404 std::stringstream ss(p[1]);
408 if (p[0].compare(
"timestamp") == 0)
413 if (p[0].compare(
"quality") == 0)
415 std::stringstream ss(p[1]);
423 bool DTNTPWorker::hasReference()
const {
424 return (_sync_state.
sigma == 1.0);
427 void DTNTPWorker::sync(
const TimeSyncMessage &msg,
const struct timeval &tv_offset,
const struct timeval &tv_local,
const struct timeval &tv_remote)
430 if (hasReference())
return;
443 struct timespec ts_now, ts_diff;
450 if (t_stable > 0.0) {
451 double sigma_base = (1 / ::pow(_sync_state.
psi, 1/t_stable));
452 double sigma_adjustment = ::fabs(remote_time - local_time) / t_stable * msg.peer_rating;
453 _sync_state.
sigma = sigma_base + sigma_adjustment;
459 if (local_time > remote_time) {
461 _sync_state.
base_rating = msg.peer_rating * (remote_time / local_time);
464 _sync_state.
base_rating = msg.peer_rating * (local_time / remote_time);
565 timeval tv_rtt_measured, tv_local_timestamp, tv_rtt, tv_prop_delay, tv_sync_delay, tv_peer_timestamp, tv_offset;
567 timerclear(&tv_rtt_measured);
568 tv_rtt_measured.tv_sec = origin_age.
getSeconds().
get<time_t>();
582 const peer_map::const_iterator it = _peers.find(b.
source.
getNode());
583 if (it == _peers.end())
break;
585 const SyncPeer &p = (*it).second;
588 if (p.state != SyncPeer::STATE_REQUEST)
592 if ((p.request_timestamp.tv_sec != msg.origin_timestamp.tv_sec) ||
593 (p.request_timestamp.tv_usec != msg.origin_timestamp.tv_usec))
597 struct timespec diff, now;
601 tv_rtt.tv_sec = diff.tv_sec;
602 tv_rtt.tv_usec = diff.tv_nsec / 1000;
617 double prop_delay = 0.0;
622 if (rtt <= rtt_measured) {
623 timerclear(&tv_prop_delay);
626 timersub(&tv_rtt, &tv_rtt_measured, &tv_prop_delay);
631 tv_prop_delay.tv_sec /= 2;
632 tv_prop_delay.tv_usec /= 2;
635 timerclear(&tv_sync_delay);
640 timeradd(&msg.peer_timestamp, &tv_sync_delay, &tv_peer_timestamp);
643 timeradd(&msg.peer_timestamp, &tv_prop_delay, &tv_peer_timestamp);
646 timersub(&tv_local_timestamp, &tv_peer_timestamp, &tv_offset);
652 sync(msg, tv_offset, tv_local_timestamp, tv_peer_timestamp);
657 p.state = SyncPeer::STATE_SYNC;
668 return DTNTPWorker::_sync_state;
671 DTNTPWorker::SyncPeer::SyncPeer()
674 request_monotonic_time.tv_sec = 0;
675 request_monotonic_time.tv_nsec = 0;
677 request_timestamp.tv_sec = 0;
678 request_timestamp.tv_usec = 0;
683 DTNTPWorker::SyncPeer::~SyncPeer()
687 void DTNTPWorker::SyncPeer::touch()
692 bool DTNTPWorker::SyncPeer::isExpired()
const