IBR-DTNSuite  0.12
DTNTPWorker.cpp
Go to the documentation of this file.
1 /*
2  * DTNTPWorker.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "DTNTPWorker.h"
24 #include "core/EventDispatcher.h"
25 #include "core/NodeEvent.h"
26 #include "core/BundleCore.h"
27 #include "core/TimeEvent.h"
29 #include <ibrdtn/utils/Clock.h>
30 #include <ibrdtn/utils/Utils.h>
31 #include <ibrdtn/data/AgeBlock.h>
33 #include <ibrdtn/data/SDNV.h>
36 #include <ibrcommon/Logger.h>
37 
38 #include <sys/time.h>
39 
40 #ifdef __WIN32__
41 #define suseconds_t long
42 #endif
43 
44 namespace dtn
45 {
46  namespace daemon
47  {
48  const unsigned int DTNTPWorker::PROTO_VERSION = 1;
49  const std::string DTNTPWorker::TAG = "DTNTPWorker";
50  DTNTPWorker::TimeSyncState DTNTPWorker::_sync_state;
51 
53  : sync_threshold(0.15f), base_rating(0.0), psi(0.99), sigma(1.0), last_sync_set(false)
54  {
55  // initialize the last sync time to zero
56  last_sync_time.tv_sec = 0;
57  last_sync_time.tv_nsec = 0;
58  }
59 
61  {
62  }
63 
64  double DTNTPWorker::TimeSyncState::toDouble(const timespec &val) {
65  return static_cast<double>(val.tv_sec) + (static_cast<double>(val.tv_nsec) / 1000000000.0);
66  }
67 
69  : _announce_rating(false), _sync(false)
70  {
71  AbstractWorker::initialize("dtntp");
72 
73  // get global configuration for time synchronization
75 
76  if (conf.hasReference())
77  {
78  // set clock rating to 1 since this node has a reference clock
79  _sync_state.base_rating = 1.0;
80 
81  // evaluate the current local time
82  if (dtn::utils::Clock::getTime() > 0) {
84  } else {
86  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "Expiration limited due to wrong local clock." << IBRCOMMON_LOGGER_ENDL;
87  }
88  } else {
90  _sync_state.sigma = conf.getSigma();
91  _sync_state.psi = conf.getPsi();
92  }
93 
94  // check if we should announce our own rating via discovery
95  _announce_rating = conf.sendDiscoveryBeacons();
96 
97  // store the sync threshold locally
98  _sync_state.sync_threshold = conf.getSyncLevel();
99 
100  // synchronize with other nodes
101  _sync = conf.doSync();
102 
103  if (_sync || _announce_rating) {
104  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, info) << "Time-Synchronization enabled: " << (conf.hasReference() ? "master mode" : "slave mode") << IBRCOMMON_LOGGER_ENDL;
105  }
106 
108 
109  // register as discovery handler for all interfaces
111  }
112 
114  {
115  // unregister as discovery handler for all interfaces
117 
119  }
120 
122  : type(TIMESYNC_REQUEST), origin_rating(dtn::utils::Clock::getRating()), peer_rating(0.0)
123  {
124  timerclear(&origin_timestamp);
125  timerclear(&peer_timestamp);
126 
128  }
129 
131  {
132  }
133 
134  std::ostream &operator<<(std::ostream &stream, const DTNTPWorker::TimeSyncMessage &obj)
135  {
136  std::stringstream ss;
137 
138  stream << (char)obj.type;
139 
140  ss.clear(); ss.str(""); ss << obj.origin_rating;
141  stream << dtn::data::BundleString(ss.str());
142 
143  stream << dtn::data::Number(obj.origin_timestamp.tv_sec);
144  stream << dtn::data::Number(obj.origin_timestamp.tv_usec);
145 
146  ss.clear(); ss.str(""); ss << obj.peer_rating;
147  stream << dtn::data::BundleString(ss.str());
148 
149  stream << dtn::data::Number(obj.peer_timestamp.tv_sec);
150  stream << dtn::data::Number(obj.peer_timestamp.tv_usec);
151 
152  return stream;
153  }
154 
155  std::istream &operator>>(std::istream &stream, DTNTPWorker::TimeSyncMessage &obj)
156  {
157  char type = 0;
158  std::stringstream ss;
160  dtn::data::Number sdnv;
161 
162  stream >> type;
164 
165  stream >> bs;
166  ss.clear();
167  ss.str((const std::string&)bs);
168  ss >> obj.origin_rating;
169 
170  stream >> sdnv;
171  obj.origin_timestamp.tv_sec = sdnv.get<time_t>();
172 
173  stream >> sdnv;
174  obj.origin_timestamp.tv_usec = sdnv.get<suseconds_t>();
175 
176  stream >> bs;
177  ss.clear();
178  ss.str((const std::string&)bs);
179  ss >> obj.peer_rating;
180 
181  stream >> sdnv;
182  obj.peer_timestamp.tv_sec = sdnv.get<time_t>();
183 
184  stream >> sdnv;
185  obj.peer_timestamp.tv_usec = sdnv.get<suseconds_t>();
186 
187  return stream;
188  }
189 
190  void DTNTPWorker::raiseEvent(const dtn::core::Event *evt) throw ()
191  {
192  try {
193  const dtn::core::TimeEvent &t = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
194 
195  if (t.getAction() != dtn::core::TIME_SECOND_TICK) return;
196 
197  ibrcommon::MutexLock l(_sync_lock);
198 
199  // remove outdated blacklist entries
200  {
201  // get current monotonic timestamp
203 
204  ibrcommon::MutexLock l(_peer_lock);
205  for (peer_map::iterator iter = _peers.begin(); iter != _peers.end();)
206  {
207  const SyncPeer &peer = (*iter).second;
208 
209  // do not query again if the blacklist entry is valid
210  if (peer.isExpired()) {
211  _peers.erase(iter++);
212  } else {
213  ++iter;
214  }
215  }
216  }
217 
218  // if we are a reference node, we have to watch on our clock
219  // do some plausibility checks here
220  if (hasReference())
221  {
225  if (dtn::utils::Clock::getRating() == 0)
226  {
227  if (t.getTimestamp() > 0)
228  {
230  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "The local clock seems to be okay again." << IBRCOMMON_LOGGER_ENDL;
231  }
232  }
233  }
234  // if we are not a reference then update the local rating if we're not a reference node
235  else
236  {
237  // before we can age our rating we should have been synchronized at least one time
238  if (_sync_state.last_sync_set)
239  {
240  struct timespec ts_now, ts_diff;
242  ibrcommon::MonotonicClock::diff(_sync_state.last_sync_time, ts_now, ts_diff);
243 
244  double last_sync = TimeSyncState::toDouble(ts_diff);
245 
246  // the last sync must be in the past
247  if (last_sync)
248  {
249  // calculate the new clock rating
250  dtn::utils::Clock::setRating(_sync_state.base_rating * (1.0 / (::pow(_sync_state.sigma, last_sync))));
251  }
252  }
253  }
254 
255  // if synchronization is enabled
256  if (_sync)
257  {
258  // search for other nodes with better credentials
259  const std::set<dtn::core::Node> nodes = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
260  for (std::set<dtn::core::Node>::const_iterator iter = nodes.begin(); iter != nodes.end(); ++iter) {
261  if (shouldSyncWith(*iter)) {
262  syncWith(*iter);
263  }
264  }
265  }
266  } catch (const std::bad_cast&) { };
267  }
268 
269  bool DTNTPWorker::shouldSyncWith(const dtn::core::Node &node) const
270  {
271  // only query for time sync if the other node supports this
272  if (!node.has("dtntp")) return false;
273 
274  // get discovery attribute
275  const std::list<dtn::core::Node::Attribute> attrs = node.get("dtntp");
276 
277  if (attrs.empty()) return false;
278 
279  // decode attribute parameter
280  unsigned int version = 0;
281  dtn::data::Timestamp timestamp = 0;
282  float quality = 0.0;
283  decode(attrs.front(), version, timestamp, quality);
284 
285  // we do only support version = 1
286  if (version != 1) return false;
287 
288  // do not sync if the timestamps are equal in seconds
289  if (timestamp == dtn::utils::Clock::getTime()) return false;
290 
291  // do not sync if the quality is worse than ours
292  if ((quality * (1 - _sync_state.sync_threshold)) <= dtn::utils::Clock::getRating()) return false;
293 
294  return true;
295  }
296 
297  void DTNTPWorker::syncWith(const dtn::core::Node &node)
298  {
299  // get the EID of the peer
300  const dtn::data::EID &peer = node.getEID();
301 
302  // check sync peers
303  {
304  ibrcommon::MutexLock l(_peer_lock);
305  const peer_map::const_iterator it = _peers.find(peer);
306  if (it != _peers.end())
307  {
308  const SyncPeer &peer = (*it).second;
309 
310  // do not query again if the peer was already queried
311  if (peer.state != SyncPeer::STATE_IDLE)
312  {
313  return;
314  }
315  }
316 
317  // create a new peer entry
318  SyncPeer &p = _peers[peer];
319  p.touch();
320  p.state = SyncPeer::STATE_PREPARE;
321  }
322 
323  // generate a time sync bundle with a zero timestamp (+age block)
324  dtn::data::Bundle b(true);
325 
326  try {
328 
329  // create the payload of the message
330  {
331  ibrcommon::BLOB::iostream stream = ref.iostream();
332 
333  // create a new timesync request
334  TimeSyncMessage msg;
335 
336  ibrcommon::MutexLock l(_peer_lock);
337 
338  // add sync parameters to the peer entry
339  SyncPeer &p = _peers[peer];
340  p.request_timestamp = msg.origin_timestamp;
341  ibrcommon::MonotonicClock::gettime(p.request_monotonic_time);
342  p.state = SyncPeer::STATE_REQUEST;
343 
344  // write the message
345  (*stream) << msg;
346  }
347 
348  // add the payload to the message
349  b.push_back(ref);
350 
351  // set the source
352  b.source = dtn::core::BundleCore::local;
353 
354  // set source application
355  b.source.setApplication("dtntp");
356 
357  // set the destination
358  b.destination = peer;
359 
360  // set destination application
361  b.destination.setApplication("dtntp");
362 
363  // set high priority
366 
367  // set the the destination as singleton receiver
369 
370  // set the lifetime of the bundle to 60 seconds
371  b.lifetime = 60;
372 
373  // add a schl block
375  schl.setLimit(1);
376 
377  transmit(b);
378  } catch (const ibrcommon::IOException &ex) {
379  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, error) << "error while synchronizing, Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
380  }
381  }
382 
384  {
385  if (!_announce_rating) throw NoServiceHereException("Discovery of time sync mechanisms disabled.");
386 
387  std::stringstream ss;
388  ss << "version=" << PROTO_VERSION << ";quality=" << dtn::utils::Clock::getRating() << ";timestamp=" << dtn::utils::Clock::getTime().toString() << ";";
389  announcement.addService( DiscoveryService("dtntp", ss.str()));
390  }
391 
392  void DTNTPWorker::decode(const dtn::core::Node::Attribute &attr, unsigned int &version, dtn::data::Timestamp &timestamp, float &quality) const
393  {
394  // parse parameters
395  std::vector<std::string> parameters = dtn::utils::Utils::tokenize(";", attr.value);
396  std::vector<std::string>::const_iterator param_iter = parameters.begin();
397 
398  while (param_iter != parameters.end())
399  {
400  std::vector<std::string> p = dtn::utils::Utils::tokenize("=", (*param_iter));
401 
402  if (p[0].compare("version") == 0)
403  {
404  std::stringstream ss(p[1]);
405  ss >> version;
406  }
407 
408  if (p[0].compare("timestamp") == 0)
409  {
410  timestamp.fromString(p[1]);
411  }
412 
413  if (p[0].compare("quality") == 0)
414  {
415  std::stringstream ss(p[1]);
416  ss >> quality;
417  }
418 
419  ++param_iter;
420  }
421  }
422 
423  bool DTNTPWorker::hasReference() const {
424  return (_sync_state.sigma == 1.0);
425  }
426 
427  void DTNTPWorker::sync(const TimeSyncMessage &msg, const struct timeval &tv_offset, const struct timeval &tv_local, const struct timeval &tv_remote)
428  {
429  // do not sync if we are a reference
430  if (hasReference()) return;
431 
432  ibrcommon::MutexLock l(_sync_lock);
433 
434  // if the received quality of time is worse than ours, ignore it
435  if ((msg.peer_rating * (1 - _sync_state.sync_threshold)) <= dtn::utils::Clock::getRating()) return;
436 
437  double local_time = dtn::utils::Clock::toDouble(tv_local);
438  double remote_time = dtn::utils::Clock::toDouble(tv_remote);
439 
440  // adjust sigma if we sync'd at least twice
441  if (_sync_state.last_sync_set)
442  {
443  struct timespec ts_now, ts_diff;
445  ibrcommon::MonotonicClock::diff(_sync_state.last_sync_time, ts_now, ts_diff);
446 
447  // adjust sigma
448  double t_stable = TimeSyncState::toDouble(ts_diff);
449 
450  if (t_stable > 0.0) {
451  double sigma_base = (1 / ::pow(_sync_state.psi, 1/t_stable));
452  double sigma_adjustment = ::fabs(remote_time - local_time) / t_stable * msg.peer_rating;
453  _sync_state.sigma = sigma_base + sigma_adjustment;
454 
455  IBRCOMMON_LOGGER_DEBUG_TAG(DTNTPWorker::TAG, 25) << "new sigma: " << _sync_state.sigma << IBRCOMMON_LOGGER_ENDL;
456  }
457  }
458 
459  if (local_time > remote_time) {
460  // determine the new base rating
461  _sync_state.base_rating = msg.peer_rating * (remote_time / local_time);
462  } else {
463  // determine the new base rating
464  _sync_state.base_rating = msg.peer_rating * (local_time / remote_time);
465  }
466 
467  // set the local clock to the new timestamp
468  dtn::utils::Clock::setOffset(tv_offset);
469 
470  // update the current local rating
471  dtn::utils::Clock::setRating(_sync_state.base_rating * (1.0 / (::pow(_sync_state.sigma, 0.001))));
472 
473  // store the timestamp of the last synchronization
475  _sync_state.last_sync_set = true;
476 
477  // trigger time adjustment event
478  dtn::core::TimeAdjustmentEvent::raise(tv_offset, _sync_state.base_rating);
479  }
480 
482  {
483  // do not sync with ourselves
485 
486  try {
487  // read payload block
489 
490  // read the type of the message
491  char type = 0; (*p.getBLOB().iostream()).get(type);
492 
493  switch (type)
494  {
496  {
497  dtn::data::Bundle response = b;
498 
499  // relabel with a zero timestamp
500  response.relabel(true);
501 
502  // set the lifetime of the bundle to 60 seconds
503  response.lifetime = 60;
504 
505  // switch the source and destination
506  response.source = b.destination;
507  response.destination = b.source;
508 
509  // set high priority
512 
513  // set the the destination as singleton receiver
515 
516  // modify the payload - locked
517  {
519  ibrcommon::BLOB::iostream stream = ref.iostream();
520 
521  // read the timesync message
522  TimeSyncMessage msg;
523  (*stream) >> msg;
524 
525  // clear the payload
526  stream.clear();
527 
528  // fill in the own values
530  msg.peer_rating = dtn::utils::Clock::getRating();
531  dtn::utils::Clock::gettimeofday(&msg.peer_timestamp);
532 
533  // write the response
534  (*stream) << msg;
535  }
536 
537  // add a second age block
538  response.push_front<dtn::data::AgeBlock>();
539 
540  // modify the old schl block or add a new one
541  try {
543  schl.setLimit(1);
546  schl.setLimit(1);
547  };
548 
549  // send the response
550  transmit(response);
551  break;
552  }
553 
555  {
556  // read the ageblock of the bundle
558 
559  if (!age_it.next(b.end())) throw ibrcommon::Exception("first ageblock missing");
560  const dtn::data::AgeBlock &peer_age = dynamic_cast<const dtn::data::AgeBlock&>(**age_it);
561 
562  if (!age_it.next(b.end())) throw ibrcommon::Exception("second ageblock missing");
563  const dtn::data::AgeBlock &origin_age = dynamic_cast<const dtn::data::AgeBlock&>(**age_it);
564 
565  timeval tv_rtt_measured, tv_local_timestamp, tv_rtt, tv_prop_delay, tv_sync_delay, tv_peer_timestamp, tv_offset;
566 
567  timerclear(&tv_rtt_measured);
568  tv_rtt_measured.tv_sec = origin_age.getSeconds().get<time_t>();
569  tv_rtt_measured.tv_usec = origin_age.getMicroseconds().get<suseconds_t>() % 1000000;
570 
572  ibrcommon::BLOB::iostream stream = ref.iostream();
573 
574  // parse the received time sync message
575  TimeSyncMessage msg; (*stream) >> msg;
576 
577  // do peer checks
578  {
579  ibrcommon::MutexLock l(_peer_lock);
580 
581  // check if the peer entry exists
582  const peer_map::const_iterator it = _peers.find(b.source.getNode());
583  if (it == _peers.end()) break;
584 
585  const SyncPeer &p = (*it).second;
586 
587  // check if the peer entry is in the right state
588  if (p.state != SyncPeer::STATE_REQUEST)
589  break;
590 
591  // check if response matches the request
592  if ((p.request_timestamp.tv_sec != msg.origin_timestamp.tv_sec) ||
593  (p.request_timestamp.tv_usec != msg.origin_timestamp.tv_usec))
594  break;
595 
596  // determine the RTT of the message exchange
597  struct timespec diff, now;
599  ibrcommon::MonotonicClock::diff(p.request_monotonic_time, now, diff);
600 
601  tv_rtt.tv_sec = diff.tv_sec;
602  tv_rtt.tv_usec = diff.tv_nsec / 1000;
603  }
604 
605  // store the current time in tv_local
606  dtn::utils::Clock::gettimeofday(&tv_local_timestamp);
607 
608  // convert timeval RTT into double value
609  double rtt = dtn::utils::Clock::toDouble(tv_rtt);
610 
611  // abort here if the rtt is negative or zero!
612  if (rtt <= 0.0) {
613  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "RTT " << rtt << " is too small" << IBRCOMMON_LOGGER_ENDL;
614  break;
615  }
616 
617  double prop_delay = 0.0;
618 
619  // assume zero prop. delay if rtt is smaller than the
620  // time measured by the age block
621  double rtt_measured = dtn::utils::Clock::toDouble(tv_rtt_measured);
622  if (rtt <= rtt_measured) {
623  timerclear(&tv_prop_delay);
624  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "Prop. delay " << prop_delay << " is smaller than the tracked time (" << rtt_measured << ")" << IBRCOMMON_LOGGER_ENDL;
625  } else {
626  timersub(&tv_rtt, &tv_rtt_measured, &tv_prop_delay);
627  prop_delay = dtn::utils::Clock::toDouble(tv_prop_delay);
628  }
629 
630  // half the prop delay
631  tv_prop_delay.tv_sec /= 2;
632  tv_prop_delay.tv_usec /= 2;
633 
634  // copy time interval tracked with the ageblock of the peer
635  timerclear(&tv_sync_delay);
636  tv_sync_delay.tv_sec = peer_age.getSeconds().get<time_t>();
637  tv_sync_delay.tv_usec = peer_age.getMicroseconds().get<suseconds_t>() % 1000000;
638 
639  // add sync delay to the peer timestamp
640  timeradd(&msg.peer_timestamp, &tv_sync_delay, &tv_peer_timestamp);
641 
642  // add propagation delay to the peer timestamp
643  timeradd(&msg.peer_timestamp, &tv_prop_delay, &tv_peer_timestamp);
644 
645  // calculate offset
646  timersub(&tv_local_timestamp, &tv_peer_timestamp, &tv_offset);
647 
648  // print out offset to the local clock
649  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, info) << "DT-NTP bundle received; rtt = " << rtt << "s; prop. delay = " << prop_delay << "s; clock of " << b.source.getNode().getString() << " has a offset of " << dtn::utils::Clock::toDouble(tv_offset) << "s" << IBRCOMMON_LOGGER_ENDL;
650 
651  // sync to this time message
652  sync(msg, tv_offset, tv_local_timestamp, tv_peer_timestamp);
653 
654  // update the blacklist entry
655  ibrcommon::MutexLock l(_peer_lock);
656  SyncPeer &p = _peers[b.source.getNode()];
657  p.state = SyncPeer::STATE_SYNC;
658  p.touch();
659 
660  break;
661  }
662  }
663  } catch (const ibrcommon::Exception&) { };
664  }
665 
667  {
668  return DTNTPWorker::_sync_state;
669  }
670 
671  DTNTPWorker::SyncPeer::SyncPeer()
672  : state(STATE_IDLE)
673  {
674  request_monotonic_time.tv_sec = 0;
675  request_monotonic_time.tv_nsec = 0;
676 
677  request_timestamp.tv_sec = 0;
678  request_timestamp.tv_usec = 0;
679 
680  touch();
681  }
682 
683  DTNTPWorker::SyncPeer::~SyncPeer()
684  {
685  }
686 
687  void DTNTPWorker::SyncPeer::touch()
688  {
690  }
691 
692  bool DTNTPWorker::SyncPeer::isExpired() const
693  {
694  return ((_touched + 60) < dtn::utils::Clock::getMonotonicTimestamp());
695  }
696  }
697 }