IBR-DTNSuite  0.12
SimpleBundleStorage.cpp
Go to the documentation of this file.
1 /*
2  * SimpleBundleStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "core/EventDispatcher.h"
24 #include "core/TimeEvent.h"
25 #include "core/GlobalEvent.h"
27 #include "core/BundleEvent.h"
28 
29 #include <ibrdtn/data/AgeBlock.h>
30 #include <ibrdtn/utils/Utils.h>
31 #include <ibrdtn/utils/Clock.h>
33 #include <ibrcommon/Logger.h>
34 
35 #include <memory>
36 #include <string.h>
37 #include <stdlib.h>
38 #include <iostream>
39 #include <iomanip>
40 #include <fstream>
41 #include <cstring>
42 #include <cerrno>
43 
44 namespace dtn
45 {
46  namespace storage
47  {
48  const std::string SimpleBundleStorage::TAG = "SimpleBundleStorage";
49 
50  SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, const dtn::data::Length maxsize, const unsigned int buffer_limit)
51  : BundleStorage(maxsize), _datastore(*this, workdir, buffer_limit), _metastore(this)
52  {
53  }
54 
56  {
57  }
58 
60  {
61  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) << "element successfully stored: " << hash.value << IBRCOMMON_LOGGER_ENDL;
62 
64  _pending_bundles.erase(hash);
65  }
66 
68  {
69  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "store of element " << hash.value << " failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
70 
72 
73  {
75 
76  // get the reference to the bundle
77  const dtn::data::Bundle &b = _pending_bundles[hash];
79  }
80 
81  {
83 
84  // delete the pending bundle
85  _pending_bundles.erase(hash);
86  }
87 
89 
90  // remove bundle and decrement the storage size
91  freeSpace( _metastore.remove(meta) );
92  }
93 
95  {
96  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) << "element successfully removed: " << hash.value << IBRCOMMON_LOGGER_ENDL;
97 
99 
100  for (MetaStorage::const_iterator it = _metastore.begin(); it != _metastore.end(); ++it)
101  {
102  const dtn::data::MetaBundle &meta = (*it);
103  DataStorage::Hash it_hash(BundleContainer::createId(meta));
104 
105  if (it_hash == hash)
106  {
107  // remove bundle and decrement the storage size
108  freeSpace( _metastore.remove(meta) );
109 
110  return;
111  }
112  }
113  }
114 
116  {
117  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "remove of element " << hash.value << " failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
118 
119  // forward this to eventDataStorageRemoved
121  }
122 
124  {
125  try {
126  dtn::data::Bundle bundle;
127  dtn::data::DefaultDeserializer ds(*stream);
128 
129  // load a bundle into the storage
130  ds >> bundle;
131 
132  // extract meta data
134 
135  // check if the hash is different
136  DataStorage::Hash hash2(BundleContainer::createId(meta));
137  if (hash != hash2)
138  {
139  // if hash does not match, remove old file
140  _datastore.remove(hash);
141 
142  // and store bundle again
143  store(bundle);
144 
145  return;
146  }
147 
148  // allocate space for the bundle
149  const dtn::data::Length bundle_size = static_cast<dtn::data::Length>( (*stream).tellg() );
150  allocSpace(bundle_size);
151 
152  // lock the bundle lists
154 
155  // add the bundle to the stored bundles
156  _metastore.store(meta, bundle_size);
157 
158  // raise bundle added event
159  eventBundleAdded(meta);
160 
161  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 10) << "bundle restored " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
162  } catch (const std::exception&) {
163  // report this error to the console
164  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "Unable to restore bundle from file " << hash.value << IBRCOMMON_LOGGER_ENDL;
165 
166  // error while reading file
167  _datastore.remove(hash);
168  }
169  }
170 
172  {
173  // routine checked for throw() on 15.02.2013
174 
175  // load persistent bundles
176  _datastore.iterateAll();
177 
178  // some output
179  {
181  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, info) << _metastore.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
182  }
183 
185 
186  try {
187  _datastore.start();
188  } catch (const ibrcommon::ThreadException &ex) {
189  IBRCOMMON_LOGGER_TAG("SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
190  }
191  }
192 
194  {
195  // routine checked for throw() on 15.02.2013
196 
198  try {
199  _datastore.wait();
200  _datastore.stop();
201  _datastore.join();
202 
203  // reset datastore
204  _datastore.reset();
205 
206  // clear all data structures
208  _metastore.clear();
209  clearSpace();
210  } catch (const ibrcommon::Exception &ex) {
211  IBRCOMMON_LOGGER_TAG("SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
212  }
213  }
214 
216  {
217  try {
218  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
220  {
222  _metastore.expire(time.getTimestamp());
223  }
224  } catch (const std::bad_cast&) { }
225  }
226 
227  const std::string SimpleBundleStorage::getName() const
228  {
229  return "SimpleBundleStorage";
230  }
231 
233  {
235  return _metastore.empty();
236  }
237 
239  {
240  // custody is successful transferred to another node.
241  // it is safe to delete this bundle now. (depending on the routing algorithm.)
242  }
243 
245  {
247  return _metastore.size();
248  }
249 
251  {
252  _datastore.wait();
253  }
254 
256  {
257  _faulty = mode;
258  _datastore.setFaulty(mode);
259  }
260 
262  {
263  size_t items_added = 0;
264 
265  // we have to iterate through all bundles
267 
268  for (MetaStorage::const_iterator iter = _metastore.begin(); (iter != _metastore.end()) && ((cb.limit() == 0) || (items_added < cb.limit())); ++iter)
269  {
270  const dtn::data::MetaBundle &meta = (*iter);
271 
272  // skip expired bundles
273  if ( dtn::utils::Clock::isExpired( meta ) ) continue;
274 
275  if ( cb.shouldAdd(meta) )
276  {
277  result.put(meta);
278  items_added++;
279  }
280  }
281 
282  if (items_added == 0) throw NoBundleFoundException();
283  }
284 
286  {
287  try {
289 
290  // faulty mechanism for unit-testing
291  if (_faulty) {
292  throw dtn::SerializationFailedException("bundle get failed due to faulty setting");
293  }
294 
295  // search for the bundle in the meta storage
296  const dtn::data::MetaBundle &meta = _metastore.find(dtn::data::MetaBundle::create(id));
297 
298  // create a hash for the data storage
299  DataStorage::Hash hash(BundleContainer::createId(meta));
300 
301  // check pending bundles
302  {
304 
305  pending_map::iterator it = _pending_bundles.find(hash);
306 
307  if (_pending_bundles.end() != it)
308  {
309  return it->second;
310  }
311  }
312 
313  try {
314  DataStorage::istream stream = _datastore.retrieve(hash);
315 
316  // load the bundle from the storage
317  dtn::data::Bundle bundle;
318 
319  // load the bundle from file
320  try {
321  dtn::data::DefaultDeserializer(*stream) >> bundle;
322  } catch (const std::exception &ex) {
323  throw dtn::SerializationFailedException(ex.what());
324  }
325 
326  try {
327  dtn::data::AgeBlock &agebl = bundle.find<dtn::data::AgeBlock>();
328 
329  // modify the AgeBlock with the age of the file
330  time_t age = stream.lastaccess() - stream.lastmodify();
331 
332  agebl.addSeconds(age);
334 
335  return bundle;
336  } catch (const DataStorage::DataNotAvailableException &ex) {
338  }
339  } catch (const dtn::SerializationFailedException &ex) {
340  // bundle loading failed
341  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "failed to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
342 
343  // the bundle is broken, delete it
344  remove(id);
345 
347  }
348 
349  throw NoBundleFoundException();
350  }
351 
353  {
355  return _metastore.getDistinctDestinations();
356  }
357 
359  {
360  // get the bundle size
361  dtn::data::DefaultSerializer s(std::cout);
362  const dtn::data::Length bundle_size = s.getLength(bundle);
363 
364  // allocate space for the bundle
365  allocSpace(bundle_size);
366 
367  // accept custody if requested
368  try {
369  // create meta data object
371 
372  // accept custody
373  const dtn::data::EID custodian = BundleStorage::acceptCustody(meta);
374 
375  // container for the custody accepted bundle
376  dtn::data::Bundle ca_bundle = bundle;
377 
378  // set the new custodian
379  ca_bundle.custodian = custodian;
380 
381  // store the bundle with the custodian
382  __store(ca_bundle, bundle_size);
383  } catch (const ibrcommon::Exception&) {
384  // no custody has been requested - go on with standard store procedure
385  __store(bundle, bundle_size);
386  }
387  }
388 
390  {
392 
393  // search for the bundle in the meta storage
394  return _metastore.contains(id);
395  }
396 
398  {
400 
401  // search for the bundle in the meta storage
402  return _metastore.find(dtn::data::MetaBundle::create(id));
403  }
404 
406  {
408  const dtn::data::MetaBundle &meta = _metastore.find(dtn::data::MetaBundle::create(id));
409 
410  // first check if the bundles is already marked as removed
411  if (!_metastore.isRemoved(meta))
412  {
413  // remove if from the meta storage
414  _metastore.markRemoved(meta);
415 
416  // create the hash for data storage removal
417  DataStorage::Hash hash(BundleContainer::createId(meta));
418 
419  // create a background task for removing the bundle
420  _datastore.remove(hash);
421 
422  // raise bundle removed event
423  eventBundleRemoved(meta);
424  }
425  }
426 
427  void SimpleBundleStorage::__store(const dtn::data::Bundle &bundle, const dtn::data::Length &bundle_size)
428  {
429  // create meta bundle object
431 
432  // create a new container and hash
433  std::auto_ptr<BundleContainer> bc(new BundleContainer(bundle));
434  DataStorage::Hash hash(*bc);
435 
436  // enter critical section - lock pending bundles
437  {
439 
440  // add bundle to the pending bundles
441  _pending_bundles[hash] = bundle;
442  }
443 
444  // enter critical section - lock all data structures
445  {
447 
448  // add the new bundles to the meta storage
449  _metastore.store(meta, bundle_size);
450  }
451 
452  // put the bundle into the data store
453  _datastore.store(hash, bc.get());
454  bc.release();
455 
456  // raise bundle added event
457  eventBundleAdded(meta);
458  }
459 
461  {
463 
464  // mark all bundles for deletion
465  for (MetaStorage::const_iterator iter = _metastore.begin(); iter != _metastore.end(); ++iter)
466  {
467  // remove item in the bundlelist
468  const dtn::data::MetaBundle &meta = (*iter);
469 
470  DataStorage::Hash hash(BundleContainer::createId(meta));
471 
472  // create a background task for removing the bundle
473  _datastore.remove(hash);
474  }
475  }
476 
478  {
479  DataStorage::Hash hash(BundleContainer::createId(b));
480 
481  // create a background task for removing the bundle
482  _datastore.remove(hash);
483 
484  // raise bundle event
486 
487  // raise an event
489 
490  // raise bundle removed event
491  eventBundleRemoved(b);
492  }
493 
494  SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
495  : _bundle(b)
496  { }
497 
498  SimpleBundleStorage::BundleContainer::~BundleContainer()
499  { }
500 
501  std::string SimpleBundleStorage::BundleContainer::getId() const
502  {
503  return createId(_bundle);
504  }
505 
506  std::string SimpleBundleStorage::BundleContainer::createId(const dtn::data::BundleID &id)
507  {
508  std::stringstream ss_hash, ss_raw;
509  ss_raw << id;
510 
511  int c = 0xff & ss_raw.get();
512  while (ss_raw.good())
513  {
514  ss_hash << std::hex << std::setw( 2 ) << std::setfill( '0' ) << c;
515  c = 0xff & ss_raw.get();
516  }
517 
518  return ss_hash.str();
519  }
520 
521  std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
522  {
523  // get an serializer for bundles
525 
526  // length of the bundle
527  dtn::data::Length size = s.getLength(_bundle);
528 
529  // serialize the bundle
530  s << _bundle; stream.flush();
531 
532  // check the streams health
533  if (!stream.good())
534  {
535  std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]";
536  throw dtn::SerializationFailedException(ss.str());
537  }
538 
539  // get the write position
540  if (static_cast<std::streamoff>(size) > stream.tellp())
541  {
542  std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]";
543  throw dtn::SerializationFailedException(ss.str());
544  }
545 
546  // return the stream, this allows stacking
547  return stream;
548  }
549  }
550 }