IBR-DTNSuite  0.10
Registration.cpp
Go to the documentation of this file.
1 /*
2  * Registration.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/Registration.h"
25 #include "storage/BundleStorage.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
28 #include "core/BundlePurgeEvent.h"
29 #include "core/FragmentManager.h"
31 
32 #ifdef HAVE_SQLITE
34 #endif
35 
36 #ifdef WITH_COMPRESSION
38 #endif
39 
40 #ifdef WITH_BUNDLE_SECURITY
42 #endif
43 
45 #include <ibrdtn/data/AgeBlock.h>
46 
47 #include <ibrdtn/utils/Clock.h>
48 #include <ibrdtn/utils/Random.h>
49 #include <ibrcommon/Logger.h>
50 
51 #include <limits.h>
52 #include <stdint.h>
53 
54 namespace dtn
55 {
56  namespace api
57  {
58  const std::string Registration::TAG = "Registration";
59  std::set<std::string> Registration::_handles;
60 
61  const std::string Registration::alloc_handle()
62  {
63  static dtn::utils::Random rand;
64 
65  std::string new_handle = rand.gen_chars(16);
66 
67  // if the local host is configured with an IPN address
68  if (dtn::core::BundleCore::local.isCompressable())
69  {
70  // .. then use 32-bit numbers only
71  uint32_t *int_handle = (uint32_t*)new_handle.c_str();
72  std::stringstream ss;
73  ss << *int_handle;
74  new_handle = ss.str();
75  }
76 
77  while (_handles.find(new_handle) != _handles.end())
78  {
79  new_handle = rand.gen_chars(16);
80 
81  // if the local host is configured with an IPN address
82  if (dtn::core::BundleCore::local.isCompressable())
83  {
84  // .. then use 32-bit numbers only
85  uint32_t *int_handle = (uint32_t*)new_handle.c_str();
86  std::stringstream ss;
87  ss << *int_handle;
88  new_handle = ss.str();
89  }
90  }
91 
92  Registration::_handles.insert(new_handle);
93 
94  return new_handle;
95  }
96 
97  void Registration::free_handle(const std::string &handle)
98  {
99  Registration::_handles.erase(handle);
100  }
101 
103  : _handle(alloc_handle()),
104  _default_eid(core::BundleCore::local.add( dtn::core::BundleCore::local.getDelimiter() + _handle) ),
105  _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
106  {
107  }
108 
110  {
111  free_handle(_handle);
112  }
113 
115  {
116  ibrcommon::MutexLock l(_wait_for_cond);
117  if (call == NOTIFY_BUNDLE_AVAILABLE)
118  {
119  _no_more_bundles = false;
120  _wait_for_cond.signal(true);
121  }
122  else
123  {
124  _notify_queue.push(call);
125  }
126  }
127 
128  void Registration::wait_for_bundle(size_t timeout)
129  {
130  ibrcommon::MutexLock l(_wait_for_cond);
131 
132  while (_no_more_bundles)
133  {
134  if (timeout > 0)
135  {
136  _wait_for_cond.wait(timeout);
137  }
138  else
139  {
140  _wait_for_cond.wait();
141  }
142  }
143  }
144 
146  {
147  return _notify_queue.getnpop(true);
148  }
149 
151  {
152  ibrcommon::MutexLock l(_endpoints_lock);
153  return (_endpoints.find(endpoint) != _endpoints.end());
154  }
155 
156  const std::set<dtn::data::EID> Registration::getSubscriptions()
157  {
158  ibrcommon::MutexLock l(_endpoints_lock);
159  return _endpoints;
160  }
161 
163  {
164  // raise bundle event
166 
168  {
170  }
171  }
172 
173  dtn::data::Bundle Registration::receive() throw (dtn::storage::NoBundleFoundException)
174  {
175  // get the global storage
177 
178  // get the next bundles as MetaBundle
180 
181  // load the bundle
182  return storage.get(b);
183  }
184 
185  dtn::data::MetaBundle Registration::receiveMetaBundle() throw (dtn::storage::NoBundleFoundException)
186  {
187  ibrcommon::MutexLock l(_receive_lock);
188 
189  while(true)
190  {
191  try {
192  // get the first bundle in the queue
193  dtn::data::MetaBundle b = _queue.pop();
194  return b;
195  } catch (const ibrcommon::QueueUnblockedException &e) {
197  {
198  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 25) << "search for more bundles" << IBRCOMMON_LOGGER_ENDL;
199 
200  // query for new bundles
201  underflow();
202  }
203  }
204  catch(const dtn::storage::NoBundleFoundException & ){
205  }
206  }
207 
209  }
210 
212  {
214 
215  // expire outdated bundles in the list
216  _queue.expire(dtn::utils::Clock::getTime());
217 
221 #ifdef HAVE_SQLITE
223 #else
224  class BundleFilter : public dtn::storage::BundleSelector
225 #endif
226  {
227  public:
228  BundleFilter(const std::set<dtn::data::EID> endpoints, const RegistrationQueue &queue, bool loopback, bool fragment_filter)
229  : _endpoints(endpoints), _queue(queue), _loopback(loopback), _fragment_filter(fragment_filter)
230  {};
231 
232  virtual ~BundleFilter() {};
233 
234  virtual dtn::data::Size limit() const throw () { return dtn::core::BundleCore::max_bundles_in_transit; };
235 
236  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
237  {
238  // filter fragments if requested
239  if (meta.fragment && _fragment_filter)
240  {
241  return false;
242  }
243 
244  if (_endpoints.find(meta.destination) == _endpoints.end())
245  {
246  return false;
247  }
248 
249  // filter own bundles
250  if (!_loopback)
251  {
252  if (_endpoints.find(meta.source) != _endpoints.end())
253  {
254  return false;
255  }
256  }
257 
258  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 30) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
259 
260  if (_queue.has(meta))
261  {
262  return false;
263  }
264 
265  return true;
266  };
267 
268 #ifdef HAVE_SQLITE
269  const std::string getWhere() const throw ()
270  {
271  if (_endpoints.size() > 1)
272  {
273  std::string where = "(";
274 
275  for (size_t i = _endpoints.size() - 1; i > 0; i--)
276  {
277  where += "destination = ? OR ";
278  }
279 
280  return where + "destination = ?)";
281  }
282  else if (_endpoints.size() == 1)
283  {
284  return "destination = ?";
285  }
286  else
287  {
288  return "destination = null";
289  }
290  };
291 
292  int bind(sqlite3_stmt *st, int offset) const throw ()
293  {
294  int o = offset;
295 
296  for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); ++iter)
297  {
298  const std::string data = (*iter).getString();
299 
300  sqlite3_bind_text(st, o, data.c_str(), static_cast<int>(data.size()), SQLITE_TRANSIENT);
301  o++;
302  }
303 
304  return o;
305  }
306 #endif
307 
308  private:
309  const std::set<dtn::data::EID> _endpoints;
310  const RegistrationQueue &_queue;
311  const bool _loopback;
312  const bool _fragment_filter;
313  } filter(_endpoints, _queue, false, fragment_conf && _filter_fragments);
314 
315  // query the database for more bundles
316  ibrcommon::MutexLock l(_endpoints_lock);
317 
318  try {
319  dtn::core::BundleCore::getInstance().getSeeker().get( filter, _queue );
320  } catch (const dtn::storage::NoBundleFoundException&) {
321  _no_more_bundles = true;
322  throw;
323  }
324  }
325 
326  Registration::RegistrationQueue::RegistrationQueue()
327  {
328  }
329 
330  Registration::RegistrationQueue::~RegistrationQueue()
331  {
332  }
333 
334  void Registration::RegistrationQueue::put(const dtn::data::MetaBundle &bundle) throw ()
335  {
336  try {
337  _queue.push(bundle);
338 
339  ibrcommon::MutexLock l(_lock);
340  _recv_bundles.add(bundle);
341 
342  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 10) << "[RegistrationQueue] add bundle to list of delivered bundles: " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
343  } catch (const ibrcommon::Exception&) { }
344  }
345 
346  dtn::data::MetaBundle Registration::RegistrationQueue::pop() throw (const ibrcommon::QueueUnblockedException)
347  {
348  return _queue.getnpop(false);
349  }
350 
351  bool Registration::RegistrationQueue::has(const dtn::data::BundleID &bundle) const throw ()
352  {
353  ibrcommon::MutexLock l(const_cast<ibrcommon::Mutex&>(_lock));
354  return _recv_bundles.has(bundle);
355  }
356 
357  void Registration::RegistrationQueue::expire(const dtn::data::Timestamp &timestamp) throw ()
358  {
359  ibrcommon::MutexLock l(_lock);
360  _recv_bundles.expire(timestamp);
361  }
362 
363  void Registration::RegistrationQueue::abort() throw ()
364  {
365  _queue.abort();
366  }
367 
368  void Registration::RegistrationQueue::reset() throw ()
369  {
370  _queue.reset();
371  }
372 
374  {
375  {
376  ibrcommon::MutexLock l(_endpoints_lock);
377 
378  // add endpoint to the local set
379  _endpoints.insert(endpoint);
380  }
381 
382  // trigger the search for new bundles
384  }
385 
387  {
388  ibrcommon::MutexLock l(_endpoints_lock);
389  _endpoints.erase(endpoint);
390  }
391 
395  bool Registration::operator==(const std::string &other) const
396  {
397  return (_handle == other);
398  }
399 
403  bool Registration::operator==(const Registration &other) const
404  {
405  return (_handle == other._handle);
406  }
407 
411  bool Registration::operator<(const Registration &other) const
412  {
413  return (_handle < other._handle);
414  }
415 
417  {
418  _queue.abort();
419  _notify_queue.abort();
420 
421  ibrcommon::MutexLock l(_wait_for_cond);
422  _wait_for_cond.abort();
423  }
424 
426  {
427  return _default_eid;
428  }
429 
430  const std::string& Registration::getHandle() const
431  {
432  return _handle;
433  }
434 
436  {
437  _expiry = lifetime + ibrcommon::Timer::get_current_time();
438  _persistent = true;
439  }
440 
442  {
443  _persistent = false;
444  }
445 
447  {
448  if(_expiry <= ibrcommon::Timer::get_current_time())
449  {
450  _persistent = false;
451  }
452 
453  return _persistent;
454  }
455 
457  {
458  if(_expiry <= ibrcommon::Timer::get_current_time())
459  {
460  return false;
461  }
462 
463  return _persistent;
464  }
465 
467  {
468  _filter_fragments = val;
469  }
470 
472  {
473  if(!isPersistent()) throw NotPersistentException("Registration is not persistent.");
474 
475  return _expiry;
476 
477  }
478 
480  {
481  ibrcommon::MutexLock l(_attach_lock);
482  if(!_detached) throw AlreadyAttachedException("Registration is already attached to a client.");
483 
484  _detached = false;
485  }
486 
488  {
489  ibrcommon::MutexLock l1(_wait_for_cond);
490  ibrcommon::MutexLock l2(_attach_lock);
491 
492  _detached = true;
493 
494  _queue.reset();
495  _notify_queue.reset();
496 
497  _wait_for_cond.reset();
498  }
499 
501  {
502  // check address fields for "api:me", this has to be replaced
503  static const dtn::data::EID clienteid("api:me");
504 
505  // set the source address to the sending EID
506  bundle.source = source;
507 
508  if (bundle.destination == clienteid) bundle.destination = source;
509  if (bundle.reportto == clienteid) bundle.reportto = source;
510  if (bundle.custodian == clienteid) bundle.custodian = source;
511 
512  // if the timestamp is not set, add a ageblock
513  if (bundle.timestamp == 0)
514  {
515  // check for ageblock
516  try {
517  bundle.find<dtn::data::AgeBlock>();
519  // add a new ageblock
521  }
522  }
523 
524  // modify TrackingBlock
525  try {
529 
530 #ifdef WITH_COMPRESSION
531  // if the compression bit is set, then compress the bundle
533  {
534  try {
536  } catch (const ibrcommon::Exception &ex) {
537  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
538  };
539  }
540 #endif
541 
542 #ifdef WITH_BUNDLE_SECURITY
543  // if the encrypt bit is set, then try to encrypt the bundle
545  {
546  try {
548 
551  // sign requested, but no key is available
552  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL;
554  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL;
555  }
556  }
557 
558  // if the sign bit is set, then try to sign the bundle
560  {
561  try {
563 
566  // sign requested, but no key is available
567  IBRCOMMON_LOGGER_TAG(Registration::TAG, warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
568  }
569  }
570 #endif
571 
572  // get the payload size maximum
573  size_t maxPayloadLength = dtn::daemon::Configuration::getInstance().getLimit("payload");
574 
575  // check if fragmentation is enabled
576  // do not try pro-active fragmentation if the payload length is not limited
577  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation() && (maxPayloadLength > 0))
578  {
579  try {
580  std::list<dtn::data::Bundle> fragments;
581 
582  dtn::core::FragmentManager::split(bundle, maxPayloadLength, fragments);
583 
584  //for each fragment raise bundle received event
585  for(std::list<dtn::data::Bundle>::iterator it = fragments.begin(); it != fragments.end(); ++it)
586  {
587  // raise default bundle received event
588  dtn::net::BundleReceivedEvent::raise(source, *it, true);
589  }
590 
591  return;
592  } catch (const FragmentationProhibitedException&) {
593  } catch (const FragmentationNotNecessaryException&) {
594  } catch (const FragmentationAbortedException&) {
595  // drop the bundle
596  return;
597  }
598  }
599 
600  // raise default bundle received event
601  dtn::net::BundleReceivedEvent::raise(source, bundle, true);
602  }
603  }
604 }