IBR-DTNSuite  0.12
Registration.cpp
Go to the documentation of this file.
1 /*
2  * Registration.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/Registration.h"
25 #include "storage/BundleStorage.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
28 #include "core/BundlePurgeEvent.h"
29 #include "core/FragmentManager.h"
31 
32 #ifdef HAVE_SQLITE
34 #endif
35 
36 #ifdef WITH_COMPRESSION
38 #endif
39 
40 #ifdef WITH_BUNDLE_SECURITY
42 #endif
43 
45 #include <ibrdtn/data/AgeBlock.h>
46 
47 #include <ibrdtn/utils/Clock.h>
48 #include <ibrdtn/utils/Random.h>
49 #include <ibrcommon/Logger.h>
50 
51 #include <limits.h>
52 #include <stdint.h>
53 
54 namespace dtn
55 {
56  namespace api
57  {
58  const std::string Registration::TAG = "Registration";
59  ibrcommon::Mutex Registration::_handle_lock;
60  std::set<std::string> Registration::_handles;
61 
62  const std::string Registration::gen_handle()
63  {
64  static dtn::utils::Random rand;
65  std::string new_handle = rand.gen_chars(16);
66 
67  // if the local host is configured with an IPN address
68  if (dtn::core::BundleCore::local.isCompressable())
69  {
70  // .. then use 32-bit numbers only
71  uint32_t *int_handle = (uint32_t*)new_handle.c_str();
72  std::stringstream ss;
73  ss << *int_handle;
74  new_handle = ss.str();
75  }
76 
77  return new_handle;
78  }
79 
80  const std::string& Registration::alloc_handle(const std::string &handle)
81  {
82  ibrcommon::MutexLock l(_handle_lock);
83  std::pair<std::set<std::string>::iterator, bool> ret = _handles.insert(handle);
84 
85  while (!ret.second) {
86  ret = _handles.insert(gen_handle());
87  }
88 
89  return (*ret.first);
90  }
91 
92  const std::string& Registration::alloc_handle()
93  {
94  return alloc_handle(gen_handle());
95  }
96 
97  void Registration::free_handle(const std::string &handle)
98  {
99  ibrcommon::MutexLock l(_handle_lock);
100  _handles.erase(handle);
101  }
102 
103  Registration::Registration(const std::string &handle)
104  : _handle(alloc_handle(handle)),
105  _default_eid(core::BundleCore::local), _no_more_bundles(false),
106  _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
107  {
108  _default_eid.setApplication(_handle);
109  }
110 
112  : _handle(alloc_handle()),
113  _default_eid(core::BundleCore::local), _no_more_bundles(false),
114  _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
115  {
116  _default_eid.setApplication(_handle);
117  }
118 
120  {
121  free_handle(_handle);
122  }
123 
125  {
126  ibrcommon::MutexLock l(_wait_for_cond);
127  if (call == NOTIFY_BUNDLE_AVAILABLE)
128  {
129  _no_more_bundles = false;
130  _wait_for_cond.signal(true);
131  }
132  else
133  {
134  _notify_queue.push(call);
135  }
136  }
137 
138  void Registration::wait_for_bundle(size_t timeout)
139  {
140  ibrcommon::MutexLock l(_wait_for_cond);
141 
142  while (_no_more_bundles)
143  {
144  if (timeout > 0)
145  {
146  _wait_for_cond.wait(timeout);
147  }
148  else
149  {
150  _wait_for_cond.wait();
151  }
152  }
153  }
154 
156  {
157  return _notify_queue.getnpop(true);
158  }
159 
161  {
162  ibrcommon::MutexLock l(_endpoints_lock);
163  return (_endpoints.find(endpoint) != _endpoints.end());
164  }
165 
166  const std::set<dtn::data::EID> Registration::getSubscriptions()
167  {
168  ibrcommon::MutexLock l(_endpoints_lock);
169  return _endpoints;
170  }
171 
173  {
174  // raise bundle event
176 
178  {
180  }
181  }
182 
183  dtn::data::Bundle Registration::receive() throw (dtn::storage::NoBundleFoundException)
184  {
185  // get the global storage
187 
188  // get the next bundles as MetaBundle
190 
191  // load the bundle
192  return storage.get(b);
193  }
194 
195  dtn::data::MetaBundle Registration::receiveMetaBundle() throw (dtn::storage::NoBundleFoundException)
196  {
197  ibrcommon::MutexLock l(_receive_lock);
198 
199  while(true)
200  {
201  try {
202  // get the first bundle in the queue
203  dtn::data::MetaBundle b = _queue.pop();
204  return b;
205  } catch (const ibrcommon::QueueUnblockedException &e) {
207  {
208  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 25) << "search for more bundles" << IBRCOMMON_LOGGER_ENDL;
209 
210  // query for new bundles
211  underflow();
212  }
213  }
214  catch(const dtn::storage::NoBundleFoundException & ){
215  }
216  }
217 
219  }
220 
222  {
224 
225  // expire outdated bundles in the list
226  _queue.expire(dtn::utils::Clock::getTime());
227 
231 #ifdef HAVE_SQLITE
233 #else
234  class BundleFilter : public dtn::storage::BundleSelector
235 #endif
236  {
237  public:
238  BundleFilter(const std::set<dtn::data::EID> endpoints, const RegistrationQueue &queue, bool loopback, bool fragment_filter)
239  : _endpoints(endpoints), _queue(queue), _loopback(loopback), _fragment_filter(fragment_filter)
240  {};
241 
242  virtual ~BundleFilter() {};
243 
244  virtual dtn::data::Size limit() const throw () { return dtn::core::BundleCore::max_bundles_in_transit; };
245 
246  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
247  {
248  // filter fragments if requested
249  if (meta.isFragment() && _fragment_filter)
250  {
251  return false;
252  }
253 
254  if (_endpoints.find(meta.destination) == _endpoints.end())
255  {
256  return false;
257  }
258 
259  // filter own bundles
260  if (!_loopback)
261  {
262  if (_endpoints.find(meta.source) != _endpoints.end())
263  {
264  return false;
265  }
266  }
267 
268  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 30) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
269 
270  if (_queue.has(meta))
271  {
272  return false;
273  }
274 
275  return true;
276  };
277 
278 #ifdef HAVE_SQLITE
279  const std::string getWhere() const throw ()
280  {
281  if (_endpoints.size() > 1)
282  {
283  std::string where = "(";
284 
285  for (size_t i = _endpoints.size() - 1; i > 0; i--)
286  {
287  where += "destination = ? OR ";
288  }
289 
290  return where + "destination = ?)";
291  }
292  else if (_endpoints.size() == 1)
293  {
294  return "destination = ?";
295  }
296  else
297  {
298  return "destination = null";
299  }
300  };
301 
302  int bind(sqlite3_stmt *st, int offset) const throw ()
303  {
304  int o = offset;
305 
306  for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); ++iter)
307  {
308  const std::string data = (*iter).getString();
309 
310  sqlite3_bind_text(st, o, data.c_str(), static_cast<int>(data.size()), SQLITE_TRANSIENT);
311  o++;
312  }
313 
314  return o;
315  }
316 #endif
317 
318  private:
319  const std::set<dtn::data::EID> _endpoints;
320  const RegistrationQueue &_queue;
321  const bool _loopback;
322  const bool _fragment_filter;
323  } filter(_endpoints, _queue, false, fragment_conf && _filter_fragments);
324 
325  // query the database for more bundles
326  ibrcommon::MutexLock l(_endpoints_lock);
327 
328  try {
329  dtn::core::BundleCore::getInstance().getSeeker().get( filter, _queue );
330  } catch (const dtn::storage::NoBundleFoundException&) {
331  _no_more_bundles = true;
332  throw;
333  }
334  }
335 
336  Registration::RegistrationQueue::RegistrationQueue()
337  {
338  }
339 
340  Registration::RegistrationQueue::~RegistrationQueue()
341  {
342  }
343 
344  void Registration::RegistrationQueue::put(const dtn::data::MetaBundle &bundle) throw ()
345  {
346  try {
347  _queue.push(bundle);
348 
349  ibrcommon::MutexLock l(_lock);
350  _recv_bundles.add(bundle);
351 
352  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 10) << "[RegistrationQueue] add bundle to list of delivered bundles: " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
353  } catch (const ibrcommon::Exception&) { }
354  }
355 
356  dtn::data::MetaBundle Registration::RegistrationQueue::pop() throw (const ibrcommon::QueueUnblockedException)
357  {
358  return _queue.getnpop(false);
359  }
360 
361  bool Registration::RegistrationQueue::has(const dtn::data::BundleID &bundle) const throw ()
362  {
363  ibrcommon::MutexLock l(const_cast<ibrcommon::Mutex&>(_lock));
364  return _recv_bundles.has(bundle);
365  }
366 
367  void Registration::RegistrationQueue::expire(const dtn::data::Timestamp &timestamp) throw ()
368  {
369  ibrcommon::MutexLock l(_lock);
370  _recv_bundles.expire(timestamp);
371  }
372 
373  void Registration::RegistrationQueue::abort() throw ()
374  {
375  _queue.abort();
376  }
377 
378  void Registration::RegistrationQueue::reset() throw ()
379  {
380  _queue.reset();
381  }
382 
384  {
385  {
386  ibrcommon::MutexLock l(_endpoints_lock);
387 
388  // add endpoint to the local set
389  _endpoints.insert(endpoint);
390  }
391 
392  // trigger the search for new bundles
394  }
395 
397  {
398  ibrcommon::MutexLock l(_endpoints_lock);
399  _endpoints.erase(endpoint);
400  }
401 
405  bool Registration::operator==(const std::string &other) const
406  {
407  return (_handle == other);
408  }
409 
413  bool Registration::operator==(const Registration &other) const
414  {
415  return (_handle == other._handle);
416  }
417 
421  bool Registration::operator<(const Registration &other) const
422  {
423  return (_handle < other._handle);
424  }
425 
427  {
428  _queue.abort();
429  _notify_queue.abort();
430 
431  ibrcommon::MutexLock l(_wait_for_cond);
432  _wait_for_cond.abort();
433  }
434 
436  {
437  return _default_eid;
438  }
439 
440  const std::string& Registration::getHandle() const
441  {
442  return _handle;
443  }
444 
446  {
447  _expiry = lifetime + ibrcommon::Timer::get_current_time();
448  _persistent = true;
449  }
450 
452  {
453  _persistent = false;
454  }
455 
457  {
458  if(_expiry <= ibrcommon::Timer::get_current_time())
459  {
460  _persistent = false;
461  }
462 
463  return _persistent;
464  }
465 
467  {
468  if(_expiry <= ibrcommon::Timer::get_current_time())
469  {
470  return false;
471  }
472 
473  return _persistent;
474  }
475 
477  {
478  _filter_fragments = val;
479  }
480 
482  {
483  if(!isPersistent()) throw NotPersistentException("Registration is not persistent.");
484 
485  return _expiry;
486 
487  }
488 
490  {
491  ibrcommon::MutexLock l(_attach_lock);
492  if(!_detached) throw AlreadyAttachedException("Registration is already attached to a client.");
493 
494  _detached = false;
495  }
496 
498  {
499  ibrcommon::MutexLock l1(_wait_for_cond);
500  ibrcommon::MutexLock l2(_attach_lock);
501 
502  _detached = true;
503 
504  _queue.reset();
505  _notify_queue.reset();
506 
507  _wait_for_cond.reset();
508  }
509 
511  {
512  // check address fields for "api:me", this has to be replaced
513  static const dtn::data::EID clienteid("api:me");
514 
515  // create a new sequence number
516  bundle.relabel();
517 
518  // if the relabeling results in a zero timestamp, add an ageblock
519  if (bundle.timestamp == 0)
520  {
521  // check for ageblock
522  try {
523  bundle.find<dtn::data::AgeBlock>();
525  // add a new ageblock
527  }
528  }
529 
530  // set the source address to the sending EID
531  bundle.source = source;
532 
533  if (bundle.destination == clienteid) bundle.destination = source;
534  if (bundle.reportto == clienteid) bundle.reportto = source;
535  if (bundle.custodian == clienteid) bundle.custodian = source;
536 
537  // modify TrackingBlock
538  try {
542 
543 #ifdef WITH_COMPRESSION
544  // if the compression bit is set, then compress the bundle
546  {
547  try {
549  } catch (const ibrcommon::Exception &ex) {
550  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
551  };
552  }
553 #endif
554 
555 #ifdef WITH_BUNDLE_SECURITY
556  // if the encrypt bit is set, then try to encrypt the bundle
558  {
559  try {
561 
564  // sign requested, but no key is available
565  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL;
566  } catch (const dtn::security::EncryptException&) {
567  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL;
568  }
569  }
570 
571  // if the sign bit is set, then try to sign the bundle
573  {
574  try {
576 
579  // sign requested, but no key is available
580  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
581  }
582  }
583 #endif
584 
585  // get the payload size maximum
586  size_t maxPayloadLength = dtn::daemon::Configuration::getInstance().getLimit("payload");
587 
588  // check if fragmentation is enabled
589  // do not try pro-active fragmentation if the payload length is not limited
590  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation() && (maxPayloadLength > 0))
591  {
592  try {
593  std::list<dtn::data::Bundle> fragments;
594 
595  dtn::core::FragmentManager::split(bundle, maxPayloadLength, fragments);
596 
597  //for each fragment raise bundle received event
598  for(std::list<dtn::data::Bundle>::iterator it = fragments.begin(); it != fragments.end(); ++it)
599  {
600  // raise default bundle received event
601  dtn::net::BundleReceivedEvent::raise(source, *it, true);
602  }
603 
604  return;
605  } catch (const FragmentationProhibitedException&) {
606  } catch (const FragmentationNotNecessaryException&) {
607  } catch (const FragmentationAbortedException&) {
608  // drop the bundle
609  return;
610  }
611  }
612 
613  // raise default bundle received event
614  dtn::net::BundleReceivedEvent::raise(source, bundle, true);
615  }
616  }
617 }