IBR-DTNSuite  0.10
DTNTPWorker.cpp
Go to the documentation of this file.
1 /*
2  * DTNTPWorker.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "DTNTPWorker.h"
23 #include "core/EventDispatcher.h"
24 #include "core/NodeEvent.h"
25 #include "core/BundleCore.h"
26 #include "core/TimeEvent.h"
28 #include <ibrdtn/utils/Clock.h>
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/AgeBlock.h>
32 #include <ibrdtn/data/SDNV.h>
35 #include <ibrcommon/Logger.h>
36 
37 #include <sys/time.h>
38 
39 namespace dtn
40 {
41  namespace daemon
42  {
43  const unsigned int DTNTPWorker::PROTO_VERSION = 1;
44  const std::string DTNTPWorker::TAG = "DTNTPWorker";
45 
47  : _sync_threshold(0.15f), _announce_rating(false), _base_rating(0.0), _psi(0.99), _sigma(1.0), _sync(false)
48  {
49  AbstractWorker::initialize("/dtntp", 60, true);
50 
51  // initialize the last sync time to zero
52  timerclear(&_last_sync_time);
53 
54  // get global configuration for time synchronization
56 
57  if (conf.hasReference())
58  {
59  // set clock rating to 1 since this node has a reference clock
60  _base_rating = 1.0;
61 
62  // evaluate the current local time
63  if (dtn::utils::Clock::getTime() > 0) {
65  } else {
67  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "The local clock seems to be wrong. Expiration disabled." << IBRCOMMON_LOGGER_ENDL;
68  }
69  } else {
71  _sigma = conf.getSigma();
72  _psi = conf.getPsi();
73  }
74 
75  // check if we should announce our own rating via discovery
76  _announce_rating = conf.sendDiscoveryAnnouncements();
77 
78  // store the sync threshold locally
79  _sync_threshold = conf.getSyncLevel();
80 
81  // synchronize with other nodes
82  _sync = conf.doSync();
83 
84  if (_sync) {
85  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, info) << "Time-Synchronization enabled: " << (conf.hasReference() ? "master mode" : "slave mode") << IBRCOMMON_LOGGER_ENDL;
86  }
87 
89  }
90 
92  {
94  }
95 
97  : type(TIMESYNC_REQUEST), origin_rating(dtn::utils::Clock::getRating()), peer_rating(0.0)
98  {
99  timerclear(&origin_timestamp);
100  timerclear(&peer_timestamp);
101 
103  }
104 
106  {
107  }
108 
109  std::ostream &operator<<(std::ostream &stream, const DTNTPWorker::TimeSyncMessage &obj)
110  {
111  std::stringstream ss;
112 
113  stream << (char)obj.type;
114 
115  ss.clear(); ss.str(""); ss << obj.origin_rating;
116  stream << dtn::data::BundleString(ss.str());
117 
118  stream << dtn::data::Number(obj.origin_timestamp.tv_sec);
119  stream << dtn::data::Number(obj.origin_timestamp.tv_usec);
120 
121  ss.clear(); ss.str(""); ss << obj.peer_rating;
122  stream << dtn::data::BundleString(ss.str());
123 
124  stream << dtn::data::Number(obj.peer_timestamp.tv_sec);
125  stream << dtn::data::Number(obj.peer_timestamp.tv_usec);
126 
127  return stream;
128  }
129 
130  std::istream &operator>>(std::istream &stream, DTNTPWorker::TimeSyncMessage &obj)
131  {
132  char type = 0;
133  std::stringstream ss;
135  dtn::data::Number sdnv;
136 
137  stream >> type;
139 
140  stream >> bs;
141  ss.clear();
142  ss.str((const std::string&)bs);
143  ss >> obj.origin_rating;
144 
145  stream >> sdnv;
146  obj.origin_timestamp.tv_sec = sdnv.get<time_t>();
147 
148  stream >> sdnv;
149  obj.origin_timestamp.tv_usec = sdnv.get<suseconds_t>();
150 
151  stream >> bs;
152  ss.clear();
153  ss.str((const std::string&)bs);
154  ss >> obj.peer_rating;
155 
156  stream >> sdnv;
157  obj.peer_timestamp.tv_sec = sdnv.get<time_t>();
158 
159  stream >> sdnv;
160  obj.peer_timestamp.tv_usec = sdnv.get<suseconds_t>();
161 
162  return stream;
163  }
164 
165  void DTNTPWorker::raiseEvent(const dtn::core::Event *evt) throw ()
166  {
167  try {
168  const dtn::core::TimeEvent &t = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
169 
170  if (t.getAction() != dtn::core::TIME_SECOND_TICK) return;
171 
172  ibrcommon::MutexLock l(_sync_lock);
173 
174  // remove outdated blacklist entries
175  {
176  ibrcommon::MutexLock l(_blacklist_lock);
177  for (blacklist_map::iterator iter = _sync_blacklist.begin(); iter != _sync_blacklist.end();)
178  {
179  const dtn::data::Timestamp &bl_age = (*iter).second;
180 
181  // do not query again if the blacklist entry is valid
182  if (bl_age < t.getUnixTimestamp()) {
183  _sync_blacklist.erase(iter++);
184  } else {
185  ++iter;
186  }
187  }
188  }
189 
190  // if we are a reference node, we have to watch on our clock
191  // do some plausibility checks here
192  if (hasReference())
193  {
197  if (dtn::utils::Clock::getRating() == 0)
198  {
199  if (t.getTimestamp() > 0)
200  {
202  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "The local clock seems to be okay again. Expiration enabled." << IBRCOMMON_LOGGER_ENDL;
203  }
204  }
205  }
206  // if we are not a reference then update the local rating if we're not a reference node
207  else
208  {
209  // before we can age our rating we should have been synchronized at least one time
210  if (timerisset(&_last_sync_time))
211  {
212  timeval tv_now;
214 
215  double last_sync = dtn::utils::Clock::toDouble(_last_sync_time);
216  double now = dtn::utils::Clock::toDouble(tv_now);
217 
218  // the last sync must be in the past
219  if (last_sync < now)
220  {
221  // calculate the new clock rating
222  double timediff = now - last_sync;
223  dtn::utils::Clock::setRating(_base_rating * (1.0 / (::pow(_sigma, timediff))));
224  }
225  }
226  }
227 
228  // if synchronization is enabled
229  if (_sync)
230  {
231  // search for other nodes with better credentials
232  const std::set<dtn::core::Node> nodes = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
233  for (std::set<dtn::core::Node>::const_iterator iter = nodes.begin(); iter != nodes.end(); ++iter) {
234  if (shouldSyncWith(*iter)) {
235  syncWith(*iter);
236  }
237  }
238  }
239  } catch (const std::bad_cast&) { };
240  }
241 
242  bool DTNTPWorker::shouldSyncWith(const dtn::core::Node &node) const
243  {
244  // only query for time sync if the other node supports this
245  if (!node.has("dtntp")) return false;
246 
247  // get discovery attribute
248  const std::list<dtn::core::Node::Attribute> attrs = node.get("dtntp");
249 
250  if (attrs.empty()) return false;
251 
252  // decode attribute parameter
253  unsigned int version = 0;
254  dtn::data::Timestamp timestamp = 0;
255  float quality = 0.0;
256  decode(attrs.front(), version, timestamp, quality);
257 
258  // we do only support version = 1
259  if (version != 1) return false;
260 
261  // do not sync if the timestamps are equal in seconds
262  if (timestamp == dtn::utils::Clock::getTime()) return false;
263 
264  // do not sync if the quality is worse than ours
265  if ((quality * (1 - _sync_threshold)) <= dtn::utils::Clock::getRating()) return false;
266 
267  return true;
268  }
269 
270  void DTNTPWorker::syncWith(const dtn::core::Node &node)
271  {
272  // get the EID of the peer
273  const dtn::data::EID &peer = node.getEID();
274 
275  // check sync blacklist
276  {
277  ibrcommon::MutexLock l(_blacklist_lock);
278  if (_sync_blacklist.find(peer) != _sync_blacklist.end())
279  {
280  const dtn::data::Timestamp &bl_age = _sync_blacklist[peer];
281 
282  // do not query again if the blacklist entry is valid
284  {
285  return;
286  }
287  }
288 
289  // create a new blacklist entry
290  _sync_blacklist[peer] = dtn::utils::Clock::getUnixTimestamp() + 60;
291  }
292 
293  // send a time sync bundle
295 
296  // add an age block
298 
299  try {
301 
302  // create the payload of the message
303  {
304  ibrcommon::BLOB::iostream stream = ref.iostream();
305 
306  // create a new timesync request
307  TimeSyncMessage msg;
308 
309  // write the message
310  (*stream) << msg;
311  }
312 
313  // add the payload to the message
314  b.push_back(ref);
315 
316  // set the source
317  if (dtn::core::BundleCore::local.isCompressable()) {
319  } else {
321  }
322 
323  // set the destination
324  if (peer.isCompressable()) {
325  b.destination = peer.add(peer.getDelimiter() + "60");
326  } else {
327  b.destination = peer.add(peer.getDelimiter() + "dtntp");
328  }
329 
330  // set high priority
333 
334  // set the the destination as singleton receiver
336 
337  // set the lifetime of the bundle to 60 seconds
338  b.lifetime = 60;
339 
340  // add a schl block
342  schl.setLimit(1);
343 
344  transmit(b);
345  } catch (const ibrcommon::IOException &ex) {
346  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, error) << "error while synchronizing, Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
347  }
348  }
349 
350 
352  {
353  if (!_announce_rating) throw NoServiceHereException("Discovery of time sync mechanisms disabled.");
354 
355  std::stringstream ss;
356  ss << "version=" << PROTO_VERSION << ";quality=" << dtn::utils::Clock::getRating() << ";timestamp=" << dtn::utils::Clock::getTime().toString() << ";";
357  announcement.addService( DiscoveryService("dtntp", ss.str()));
358  }
359 
360  void DTNTPWorker::decode(const dtn::core::Node::Attribute &attr, unsigned int &version, dtn::data::Timestamp &timestamp, float &quality) const
361  {
362  // parse parameters
363  std::vector<std::string> parameters = dtn::utils::Utils::tokenize(";", attr.value);
364  std::vector<std::string>::const_iterator param_iter = parameters.begin();
365 
366  while (param_iter != parameters.end())
367  {
368  std::vector<std::string> p = dtn::utils::Utils::tokenize("=", (*param_iter));
369 
370  if (p[0].compare("version") == 0)
371  {
372  std::stringstream ss(p[1]);
373  ss >> version;
374  }
375 
376  if (p[0].compare("timestamp") == 0)
377  {
378  timestamp.fromString(p[1]);
379  }
380 
381  if (p[0].compare("quality") == 0)
382  {
383  std::stringstream ss(p[1]);
384  ss >> quality;
385  }
386 
387  ++param_iter;
388  }
389  }
390 
391  bool DTNTPWorker::hasReference() const {
392  return (_sigma == 1.0);
393  }
394 
395  void DTNTPWorker::sync(const TimeSyncMessage &msg, const struct timeval &tv_offset, const struct timeval &tv_local, const struct timeval &tv_remote)
396  {
397  // do not sync if we are a reference
398  if (hasReference()) return;
399 
400  ibrcommon::MutexLock l(_sync_lock);
401 
402  // if the received quality of time is worse than ours, ignore it
403  if (dtn::utils::Clock::getRating() >= msg.peer_rating) return;
404 
405  double local_time = dtn::utils::Clock::toDouble(tv_local);
406  double remote_time = dtn::utils::Clock::toDouble(tv_remote);
407 
408  // adjust sigma if we sync'd at least twice
409  if (timerisset(&_last_sync_time))
410  {
411  double lastsync_time = dtn::utils::Clock::toDouble(_last_sync_time);
412 
413  // adjust sigma
414  double t_stable = local_time - lastsync_time;
415 
416  if (t_stable > 0.0) {
417  double sigma_base = (1 / ::pow(_psi, 1/t_stable));
418  double sigma_adjustment = ::fabs(remote_time - local_time) / t_stable * msg.peer_rating;
419  _sigma = sigma_base + sigma_adjustment;
420 
421  IBRCOMMON_LOGGER_DEBUG_TAG(DTNTPWorker::TAG, 25) << "new sigma: " << _sigma << IBRCOMMON_LOGGER_ENDL;
422  }
423  }
424 
425  if (local_time > remote_time) {
426  // determine the new base rating
427  _base_rating = msg.peer_rating * (remote_time / local_time);
428  } else {
429  // determine the new base rating
430  _base_rating = msg.peer_rating * (local_time / remote_time);
431  }
432 
433  // trigger time adjustment event
434  dtn::core::TimeAdjustmentEvent::raise(tv_offset, _base_rating);
435 
436  // store the timestamp of the last synchronization
437  dtn::utils::Clock::gettimeofday(&_last_sync_time);
438  }
439 
441  {
442  // do not sync with ourselves
443  if (b.source.getNode() == dtn::core::BundleCore::local) return;
444 
445  try {
446  // read payload block
448 
449  // read the type of the message
450  char type = 0; (*p.getBLOB().iostream()).get(type);
451 
452  switch (type)
453  {
455  {
456  dtn::data::Bundle response = b;
457  response.relabel();
458 
459  // set the lifetime of the bundle to 60 seconds
460  response.lifetime = 60;
461 
462  // switch the source and destination
463  response.source = b.destination;
464  response.destination = b.source;
465 
466  // set high priority
469 
470  // set the the destination as singleton receiver
472 
473  // modify the payload - locked
474  {
476  ibrcommon::BLOB::iostream stream = ref.iostream();
477 
478  // read the timesync message
479  TimeSyncMessage msg;
480  (*stream) >> msg;
481 
482  // clear the payload
483  stream.clear();
484 
485  // fill in the own values
487  msg.peer_rating = dtn::utils::Clock::getRating();
488  dtn::utils::Clock::gettimeofday(&msg.peer_timestamp);
489 
490  // write the response
491  (*stream) << msg;
492  }
493 
494  // add a second age block
495  response.push_front<dtn::data::AgeBlock>();
496 
497  // modify the old schl block or add a new one
498  try {
500  schl.setLimit(1);
503  schl.setLimit(1);
504  };
505 
506  // send the response
507  transmit(response);
508  break;
509  }
510 
512  {
513  // read the ageblock of the bundle
515 
516  if (!age_it.next(b.end())) throw ibrcommon::Exception("first ageblock missing");
517  const dtn::data::AgeBlock &peer_age = dynamic_cast<const dtn::data::AgeBlock&>(**age_it);
518 
519  if (!age_it.next(b.end())) throw ibrcommon::Exception("second ageblock missing");
520  const dtn::data::AgeBlock &origin_age = dynamic_cast<const dtn::data::AgeBlock&>(**age_it);
521 
522  timeval tv_rtt_measured, tv_local_timestamp, tv_rtt, tv_prop_delay, tv_sync_delay, tv_peer_timestamp, tv_offset;
523 
524  timerclear(&tv_rtt_measured);
525  tv_rtt_measured.tv_sec = origin_age.getSeconds().get<time_t>();
526  tv_rtt_measured.tv_usec = origin_age.getMicroseconds().get<suseconds_t>() % 1000000;
527 
529  ibrcommon::BLOB::iostream stream = ref.iostream();
530 
531  // parse the received time sync message
532  TimeSyncMessage msg; (*stream) >> msg;
533 
534  // store the current time in tv_local
535  dtn::utils::Clock::gettimeofday(&tv_local_timestamp);
536 
537  // determine the RTT of the message exchange
538  timersub(&tv_local_timestamp, &msg.origin_timestamp, &tv_rtt);
539  double rtt = dtn::utils::Clock::toDouble(tv_rtt);
540 
541  // abort here if the rtt is negative or zero!
542  if (rtt <= 0.0) {
543  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "RTT " << rtt << " is too small" << IBRCOMMON_LOGGER_ENDL;
544  break;
545  }
546 
547  double prop_delay = 0.0;
548 
549  // assume zero prop. delay if rtt is smaller than the
550  // time measured by the age block
551  double rtt_measured = dtn::utils::Clock::toDouble(tv_rtt_measured);
552  if (rtt <= rtt_measured) {
553  timerclear(&tv_prop_delay);
554  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, warning) << "Prop. delay " << prop_delay << " is smaller than the tracked time (" << rtt_measured << ")" << IBRCOMMON_LOGGER_ENDL;
555  } else {
556  timersub(&tv_rtt, &tv_rtt_measured, &tv_prop_delay);
557  prop_delay = dtn::utils::Clock::toDouble(tv_prop_delay);
558  }
559 
560  // half the prop delay
561  tv_prop_delay.tv_sec /= 2;
562  tv_prop_delay.tv_usec /= 2;
563 
564  // copy time interval tracked with the ageblock of the peer
565  timerclear(&tv_sync_delay);
566  tv_sync_delay.tv_sec = peer_age.getSeconds().get<time_t>();
567  tv_sync_delay.tv_usec = peer_age.getMicroseconds().get<suseconds_t>() % 1000000;
568 
569  // add sync delay to the peer timestamp
570  timeradd(&msg.peer_timestamp, &tv_sync_delay, &tv_peer_timestamp);
571 
572  // add propagation delay to the peer timestamp
573  timeradd(&msg.peer_timestamp, &tv_prop_delay, &tv_peer_timestamp);
574 
575  // calculate offset
576  timersub(&tv_local_timestamp, &tv_peer_timestamp, &tv_offset);
577 
578  // print out offset to the local clock
579  IBRCOMMON_LOGGER_TAG(DTNTPWorker::TAG, info) << "DT-NTP bundle received; rtt = " << rtt << "s; prop. delay = " << prop_delay << "s; clock of " << b.source.getNode().getString() << " has a offset of " << dtn::utils::Clock::toDouble(tv_offset) << "s" << IBRCOMMON_LOGGER_ENDL;
580 
581  // sync to this time message
582  sync(msg, tv_offset, tv_local_timestamp, tv_peer_timestamp);
583 
584  // remove the blacklist entry
585  ibrcommon::MutexLock l(_blacklist_lock);
586  _sync_blacklist.erase(b.source.getNode());
587 
588  break;
589  }
590  }
591  } catch (const ibrcommon::Exception&) { };
592  }
593  }
594 }