IBR-DTNSuite  0.10
SimpleBundleStorage.cpp
Go to the documentation of this file.
1 /*
2  * SimpleBundleStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "core/EventDispatcher.h"
24 #include "core/TimeEvent.h"
25 #include "core/GlobalEvent.h"
27 #include "core/BundleEvent.h"
28 
29 #include <ibrdtn/data/AgeBlock.h>
30 #include <ibrdtn/utils/Utils.h>
31 #include <ibrdtn/utils/Clock.h>
33 #include <ibrcommon/Logger.h>
34 
35 #include <memory>
36 #include <string.h>
37 #include <stdlib.h>
38 #include <iostream>
39 #include <fstream>
40 #include <cstring>
41 #include <cerrno>
42 
43 namespace dtn
44 {
45  namespace storage
46  {
47  const std::string SimpleBundleStorage::TAG = "SimpleBundleStorage";
48 
49  SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, const dtn::data::Length maxsize, const unsigned int buffer_limit)
50  : BundleStorage(maxsize), _datastore(*this, workdir, buffer_limit), _metastore(*this)
51  {
52  }
53 
55  {
56  }
57 
59  {
60  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) << "element successfully stored: " << hash.value << IBRCOMMON_LOGGER_ENDL;
61 
63  _pending_bundles.erase(hash);
64  }
65 
67  {
68  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "store of element " << hash.value << " failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
69 
71 
72  {
74 
75  // get the reference to the bundle
76  const dtn::data::Bundle &b = _pending_bundles[hash];
77  meta = dtn::data::MetaBundle(b);
78  }
79 
80  {
82 
83  // delete the pending bundle
84  _pending_bundles.erase(hash);
85  }
86 
88 
89  // decrement the storage size
90  freeSpace( _metastore.getSize(meta) );
91 
92  // remove it
93  _metastore.remove(meta);
94  }
95 
97  {
98  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) << "element successfully removed: " << hash.value << IBRCOMMON_LOGGER_ENDL;
99 
101 
102  for (MetaStorage::const_iterator it = _metastore.begin(); it != _metastore.end(); ++it)
103  {
104  const dtn::data::MetaBundle &meta = (*it);
105  DataStorage::Hash it_hash(meta.toString());
106 
107  if (it_hash == hash)
108  {
109  // decrement the storage size
110  freeSpace( _metastore.getSize(meta) );
111 
112  // remove it
113  _metastore.remove(meta);
114 
115  return;
116  }
117  }
118  }
119 
121  {
122  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "remove of element " << hash.value << " failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
123 
124  // forward this to eventDataStorageRemoved
126  }
127 
129  {
130  try {
131  dtn::data::Bundle bundle;
132  dtn::data::DefaultDeserializer ds(*stream);
133 
134  // load a bundle into the storage
135  ds >> bundle;
136 
137  // allocate space for the bundle
138  dtn::data::Length bundle_size = static_cast<dtn::data::Length>( (*stream).tellg() );
139  allocSpace(bundle_size);
140 
141  // extract meta data
142  dtn::data::MetaBundle meta(bundle);
143 
144  // lock the bundle lists
146 
147  // add the bundle to the stored bundles
148  _metastore.store(meta, bundle_size);
149 
150  // raise bundle added event
151  eventBundleAdded(bundle);
152 
153  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 10) << "bundle restored " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
154  } catch (const std::exception&) {
155  // report this error to the console
156  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "Unable to restore bundle in file " << hash.value << IBRCOMMON_LOGGER_ENDL;
157 
158  // error while reading file
159  _datastore.remove(hash);
160  }
161  }
162 
164  {
165  // routine checked for throw() on 15.02.2013
166 
167  // load persistent bundles
168  _datastore.iterateAll();
169 
170  // some output
171  {
173  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, info) << _metastore.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
174  }
175 
177 
178  try {
179  _datastore.start();
180  } catch (const ibrcommon::ThreadException &ex) {
181  IBRCOMMON_LOGGER_TAG("SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
182  }
183  }
184 
186  {
187  // routine checked for throw() on 15.02.2013
188 
190  try {
191  _datastore.wait();
192  _datastore.stop();
193  _datastore.join();
194 
195  // reset datastore
196  _datastore.reset();
197 
198  // clear all data structures
200  _metastore.clear();
201  } catch (const ibrcommon::Exception &ex) {
202  IBRCOMMON_LOGGER_TAG("SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
203  }
204  }
205 
207  {
208  try {
209  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
211  {
213  _metastore.expire(time.getTimestamp());
214  }
215  } catch (const std::bad_cast&) { }
216  }
217 
218  const std::string SimpleBundleStorage::getName() const
219  {
220  return "SimpleBundleStorage";
221  }
222 
224  {
226  return _metastore.empty();
227  }
228 
230  {
231  // custody is successful transferred to another node.
232  // it is safe to delete this bundle now. (depending on the routing algorithm.)
233  }
234 
236  {
238  return _metastore.size();
239  }
240 
242  {
243  _datastore.wait();
244  }
245 
247  {
248  _faulty = mode;
249  _datastore.setFaulty(mode);
250  }
251 
253  {
254  size_t items_added = 0;
255 
256  // we have to iterate through all bundles
258 
259  for (MetaStorage::const_iterator iter = _metastore.begin(); (iter != _metastore.end()) && ((cb.limit() == 0) || (items_added < cb.limit())); ++iter)
260  {
261  const dtn::data::MetaBundle &meta = (*iter);
262 
263  // skip expired bundles
264  if ( dtn::utils::Clock::isExpired( meta.timestamp, meta.lifetime ) ) continue;
265 
266  if ( cb.shouldAdd(meta) )
267  {
268  result.put(meta);
269  items_added++;
270  }
271  }
272 
273  if (items_added == 0) throw NoBundleFoundException();
274  }
275 
277  {
278  try {
280 
281  // faulty mechanism for unit-testing
282  if (_faulty) {
283  throw dtn::SerializationFailedException("bundle get failed due to faulty setting");
284  }
285 
286  // search for the bundle in the meta storage
287  const dtn::data::MetaBundle &meta = _metastore.find(dtn::data::MetaBundle::mockUp(id));
288 
289  // create a hash for the data storage
290  DataStorage::Hash hash(meta.toString());
291 
292  // check pending bundles
293  {
295 
296  pending_map::iterator it = _pending_bundles.find(hash);
297 
298  if (_pending_bundles.end() != it)
299  {
300  return it->second;
301  }
302  }
303 
304  try {
305  DataStorage::istream stream = _datastore.retrieve(hash);
306 
307  // load the bundle from the storage
308  dtn::data::Bundle bundle;
309 
310  // load the bundle from file
311  try {
312  dtn::data::DefaultDeserializer(*stream) >> bundle;
313  } catch (const std::exception &ex) {
314  throw dtn::SerializationFailedException(ex.what());
315  }
316 
317  try {
318  dtn::data::AgeBlock &agebl = bundle.find<dtn::data::AgeBlock>();
319 
320  // modify the AgeBlock with the age of the file
321  time_t age = stream.lastaccess() - stream.lastmodify();
322 
323  agebl.addSeconds(age);
325 
326  return bundle;
327  } catch (const DataStorage::DataNotAvailableException &ex) {
329  }
330  } catch (const dtn::SerializationFailedException &ex) {
331  // bundle loading failed
332  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "failed to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
333 
334  // the bundle is broken, delete it
335  remove(id);
336 
338  }
339 
340  throw NoBundleFoundException();
341  }
342 
344  {
346  return _metastore.getDistinctDestinations();
347  }
348 
350  {
351  // get the bundle size
352  dtn::data::DefaultSerializer s(std::cout);
353  dtn::data::Length bundle_size = s.getLength(bundle);
354 
355  // allocate space for the bundle
356  allocSpace(bundle_size);
357 
358  // accept custody if requested
359  try {
360  // create meta data object
361  dtn::data::MetaBundle meta(bundle);
362 
364 
365  // container for the custody accepted bundle
366  dtn::data::Bundle ca_bundle = bundle;
367 
368  // set the new custodian
369  ca_bundle.custodian = custodian;
370 
371  // store the bundle with the custodian
372  __store(ca_bundle, bundle_size);
373  } catch (const ibrcommon::Exception&) {
374  // no custody has been requested - go on with standard store procedure
375  __store(bundle, bundle_size);
376  }
377  }
378 
380  {
382  const dtn::data::MetaBundle &meta = _metastore.find(dtn::data::MetaBundle::mockUp(id));
383 
384  // first check if the bundles is already marked as removed
385  if (!_metastore.isRemoved(meta))
386  {
387  // remove if from the meta storage
388  _metastore.markRemoved(meta);
389 
390  // create the hash for data storage removal
391  DataStorage::Hash hash(meta.toString());
392 
393  // create a background task for removing the bundle
394  _datastore.remove(hash);
395 
396  // raise bundle removed event
397  eventBundleRemoved(meta);
398  }
399  }
400 
401  void SimpleBundleStorage::__store(const dtn::data::Bundle &bundle, const dtn::data::Length &bundle_size)
402  {
403  // create meta bundle object
404  const dtn::data::MetaBundle meta(bundle);
405 
406  // create a new container and hash
407  std::auto_ptr<BundleContainer> bc(new BundleContainer(bundle));
408  DataStorage::Hash hash(*bc);
409 
410  // enter critical section - lock pending bundles
411  {
413 
414  // add bundle to the pending bundles
415  _pending_bundles[hash] = bundle;
416  }
417 
418  // enter critical section - lock all data structures
419  {
421 
422  // add the new bundles to the meta storage
423  _metastore.store(meta, bundle_size);
424  }
425 
426  // put the bundle into the data store
427  _datastore.store(hash, bc.get());
428  bc.release();
429 
430  // raise bundle added event
431  eventBundleAdded(meta);
432  }
433 
435  {
437  const dtn::data::MetaBundle meta = _metastore.find(filter);
438 
439  // first check if the bundles is already marked as removed
440  if (!_metastore.isRemoved(meta))
441  {
442  // remove if from the meta storage
443  _metastore.markRemoved(meta);
444 
445  // create the hash for data storage removal
446  DataStorage::Hash hash(meta.toString());
447 
448  // create a background task for removing the bundle
449  _datastore.remove(hash);
450 
451  // raise bundle removed event
452  eventBundleRemoved(meta);
453  }
454 
455  return meta;
456  }
457 
459  {
461 
462  // mark all bundles for deletion
463  for (MetaStorage::const_iterator iter = _metastore.begin(); iter != _metastore.end(); ++iter)
464  {
465  // remove item in the bundlelist
466  const dtn::data::MetaBundle &meta = (*iter);
467 
468  DataStorage::Hash hash(meta.toString());
469 
470  // create a background task for removing the bundle
471  _datastore.remove(hash);
472  }
473 
474  _metastore.clear();
475  }
476 
478  {
479  DataStorage::Hash hash(b.toString());
480 
481  // create a background task for removing the bundle
482  _datastore.remove(hash);
483 
484  // raise bundle event
486 
487  // raise an event
489 
490  // raise bundle removed event
491  eventBundleRemoved(b);
492  }
493 
494  SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
495  : _bundle(b)
496  { }
497 
498  SimpleBundleStorage::BundleContainer::~BundleContainer()
499  { }
500 
501  std::string SimpleBundleStorage::BundleContainer::getKey() const
502  {
503  return dtn::data::BundleID(_bundle).toString();
504  }
505 
506  std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
507  {
508  // get an serializer for bundles
510 
511  // length of the bundle
512  dtn::data::Length size = s.getLength(_bundle);
513 
514  // serialize the bundle
515  s << _bundle; stream.flush();
516 
517  // check the streams health
518  if (!stream.good())
519  {
520  std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]";
521  throw dtn::SerializationFailedException(ss.str());
522  }
523 
524  // get the write position
525  if (static_cast<std::streamoff>(size) > stream.tellp())
526  {
527  std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]";
528  throw dtn::SerializationFailedException(ss.str());
529  }
530 
531  // return the stream, this allows stacking
532  return stream;
533  }
534  }
535 }