IBR-DTNSuite  0.8
daemon/src/storage/SimpleBundleStorage.cpp
Go to the documentation of this file.
00001 #include "storage/SimpleBundleStorage.h"
00002 #include "core/TimeEvent.h"
00003 #include "core/GlobalEvent.h"
00004 #include "core/BundleExpiredEvent.h"
00005 #include "core/BundleEvent.h"
00006 
00007 #include <ibrdtn/data/AgeBlock.h>
00008 #include <ibrdtn/utils/Utils.h>
00009 #include <ibrcommon/thread/RWLock.h>
00010 #include <ibrcommon/Logger.h>
00011 
00012 #include <string.h>
00013 #include <stdlib.h>
00014 #include <iostream>
00015 #include <fstream>
00016 #include <cstring>
00017 #include <cerrno>
00018 
00019 namespace dtn
00020 {
00021         namespace storage
00022         {
00023                 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize, size_t buffer_limit)
00024                  : _datastore(*this, workdir, buffer_limit), _maxsize(maxsize), _currentsize(0)
00025                 {
00026                         // load persistent bundles
00027                         _datastore.iterateAll();
00028 
00029                         // some output
00030                         IBRCOMMON_LOGGER(info) << _bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
00031                 }
00032 
00033                 SimpleBundleStorage::~SimpleBundleStorage()
00034                 {
00035                 }
00036 
00037                 void SimpleBundleStorage::eventDataStorageStored(const dtn::storage::DataStorage::Hash &hash)
00038                 {
00039                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00040                         dtn::data::MetaBundle meta(_pending_bundles[hash]);
00041                         _pending_bundles.erase(hash);
00042                         _stored_bundles[meta] = hash;
00043                 }
00044 
00045                 void SimpleBundleStorage::eventDataStorageStoreFailed(const dtn::storage::DataStorage::Hash &hash, const ibrcommon::Exception &ex)
00046                 {
00047                         IBRCOMMON_LOGGER(error) << "store failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00048 
00049                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00050 
00051                         // get the reference to the bundle
00052                         const dtn::data::Bundle &b = _pending_bundles[hash];
00053 
00054                         // decrement the storage size
00055                         _currentsize -= _bundle_size[b];
00056 
00057                         // cleanup bundle sizes
00058                         _bundle_size.erase(b);
00059 
00060                         // delete the pending bundle
00061                         _pending_bundles.erase(hash);
00062                 }
00063 
00064                 void SimpleBundleStorage::eventDataStorageRemoved(const dtn::storage::DataStorage::Hash &hash)
00065                 {
00066                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00067 
00068                         for (std::map<dtn::data::MetaBundle, DataStorage::Hash>::iterator iter = _stored_bundles.begin();
00069                                         iter != _stored_bundles.end(); iter++)
00070                         {
00071                                 if (iter->second == hash)
00072                                 {
00073                                         // decrement the storage size
00074                                         _currentsize -= _bundle_size[iter->first];
00075 
00076                                         _bundle_size.erase(iter->first);
00077 
00078                                         _stored_bundles.erase(iter);
00079                                         return;
00080                                 }
00081                         }
00082                 }
00083 
00084                 void SimpleBundleStorage::eventDataStorageRemoveFailed(const dtn::storage::DataStorage::Hash&, const ibrcommon::Exception &ex)
00085                 {
00086                         IBRCOMMON_LOGGER(error) << "remove failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00087                 }
00088 
00089                 void SimpleBundleStorage::iterateDataStorage(const dtn::storage::DataStorage::Hash &hash, dtn::storage::DataStorage::istream &stream)
00090                 {
00091                         try {
00092                                 dtn::data::Bundle bundle;
00093                                 dtn::data::DefaultDeserializer ds(*stream);
00094 
00095                                 // load a bundle into the storage
00096                                 ds >> bundle;
00097 
00098                                 // extract meta data
00099                                 dtn::data::MetaBundle meta(bundle);
00100 
00101                                 // lock the bundle lists
00102                                 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00103 
00104                                 // add the bundle to the stored bundles
00105                                 _stored_bundles[meta] = hash;
00106 
00107                                 // increment the storage size
00108                                 _bundle_size[meta] = (*stream).tellg();
00109                                 _currentsize += (*stream).tellg();
00110 
00111                                 // add it to the bundle list
00112                                 dtn::data::BundleList::add(meta);
00113                                 _priority_index.insert(meta);
00114 
00115                         } catch (const std::exception&) {
00116                                 // report this error to the console
00117                                 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << hash.value << IBRCOMMON_LOGGER_ENDL;
00118 
00119                                 // error while reading file
00120                                 _datastore.remove(hash);
00121                         }
00122                 }
00123 
00124                 dtn::data::Bundle SimpleBundleStorage::__get(const dtn::data::MetaBundle &meta)
00125                 {
00126                         DataStorage::Hash hash(meta.toString());
00127                         std::map<DataStorage::Hash, dtn::data::Bundle>::iterator it = _pending_bundles.find(hash);
00128 
00129                         if (_pending_bundles.end() != it)
00130                         {
00131                                 return it->second;
00132                         }
00133 
00134                         try {
00135                                 DataStorage::istream stream = _datastore.retrieve(hash);
00136 
00137                                 // load the bundle from the storage
00138                                 dtn::data::Bundle bundle;
00139 
00140                                 // load the bundle from file
00141                                 try {
00142                                         dtn::data::DefaultDeserializer(*stream) >> bundle;
00143                                 } catch (const std::exception &ex) {
00144                                         throw dtn::SerializationFailedException("bundle get failed: " + std::string(ex.what()));
00145                                 }
00146 
00147                                 try {
00148                                         dtn::data::AgeBlock &agebl = bundle.getBlock<dtn::data::AgeBlock>();
00149 
00150                                         // modify the AgeBlock with the age of the file
00151                                         time_t age = stream.lastaccess() - stream.lastmodify();
00152 
00153                                         agebl.addSeconds(age);
00154                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00155 
00156                                 return bundle;
00157                         } catch (const DataStorage::DataNotAvailableException&) {
00158                                 throw BundleStorage::NoBundleFoundException();
00159                         }
00160                 }
00161 
00162                 void SimpleBundleStorage::componentUp()
00163                 {
00164                         bindEvent(dtn::core::TimeEvent::className);
00165                         _datastore.start();
00166                 }
00167 
00168                 void SimpleBundleStorage::componentDown()
00169                 {
00170                         unbindEvent(dtn::core::TimeEvent::className);
00171                         _datastore.stop();
00172                         _datastore.join();
00173                 }
00174 
00175                 void SimpleBundleStorage::raiseEvent(const dtn::core::Event *evt)
00176                 {
00177                         try {
00178                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00179                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00180                                 {
00181                                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00182                                         dtn::data::BundleList::expire(time.getTimestamp());
00183                                 }
00184                         } catch (const std::bad_cast&) { }
00185                 }
00186 
00187                 const std::string SimpleBundleStorage::getName() const
00188                 {
00189                         return "SimpleBundleStorage";
00190                 }
00191 
00192                 bool SimpleBundleStorage::empty()
00193                 {
00194                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY);
00195                         return dtn::data::BundleList::empty();
00196                 }
00197 
00198                 void SimpleBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&)
00199                 {
00200                         // custody is successful transferred to another node.
00201                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
00202                 }
00203 
00204                 unsigned int SimpleBundleStorage::count()
00205                 {
00206                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY);
00207                         return dtn::data::BundleList::size();
00208                 }
00209 
00210                 const std::list<dtn::data::MetaBundle> SimpleBundleStorage::get(BundleFilterCallback &cb)
00211                 {
00212                         // result list
00213                         std::list<dtn::data::MetaBundle> result;
00214 
00215                         // we have to iterate through all bundles
00216                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY);
00217 
00218                         for (std::set<dtn::data::MetaBundle, CMP_BUNDLE_PRIORITY>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && ((cb.limit() == 0) || (result.size() < cb.limit())); iter++)
00219                         {
00220                                 const dtn::data::MetaBundle &meta = (*iter);
00221 
00222                                 if ( cb.shouldAdd(meta) )
00223                                 {
00224                                         result.push_back(meta);
00225                                 }
00226                         }
00227 
00228                         return result;
00229                 }
00230 
00231                 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id)
00232                 {
00233                         try {
00234                                 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY);
00235 
00236                                 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00237                                 {
00238                                         const dtn::data::MetaBundle &meta = (*iter);
00239                                         if (id == meta)
00240                                         {
00241                                                 return __get(meta);
00242                                         }
00243                                 }
00244                         } catch (const dtn::SerializationFailedException &ex) {
00245                                 // bundle loading failed
00246                                 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00247 
00248                                 // the bundle is broken, delete it
00249                                 remove(id);
00250 
00251                                 throw BundleStorage::BundleLoadException();
00252                         }
00253 
00254                         throw BundleStorage::NoBundleFoundException();
00255                 }
00256 
00257                 const std::set<dtn::data::EID> SimpleBundleStorage::getDistinctDestinations()
00258                 {
00259                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY);
00260                         std::set<dtn::data::EID> ret;
00261                         return ret;
00262                 }
00263 
00264                 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle)
00265                 {
00266                         // get the bundle size
00267                         dtn::data::DefaultSerializer s(std::cout);
00268                         size_t bundle_size = s.getLength(bundle);
00269 
00270                         // store the bundle
00271                         BundleContainer *bc = new BundleContainer(bundle);
00272                         DataStorage::Hash hash(*bc);
00273 
00274                         // check if this container is too big for us.
00275                         {
00276                                 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00277                                 if ((_maxsize > 0) && (_currentsize + bundle_size > _maxsize))
00278                                 {
00279                                         throw StorageSizeExeededException();
00280                                 }
00281 
00282                                 // create meta data object
00283                                 dtn::data::MetaBundle meta(bundle);
00284 
00285                                 // accept custody if requested
00286                                 try {
00287                                         dtn::data::EID custodian = BundleStorage::acceptCustody(bundle);
00288 
00289                                         // container for the custody accepted bundle
00290                                         dtn::data::Bundle ca_bundle = bundle;
00291 
00292                                         // set the new custodian
00293                                         ca_bundle._custodian = custodian;
00294 
00295                                         // create meta data object
00296                                         meta = ca_bundle;
00297 
00298                                         // add the bundle to the stored bundles
00299                                         _pending_bundles[hash] = ca_bundle;
00300                                 } catch (const ibrcommon::Exception&) {
00301                                         // no custody requested
00302                                         // add the bundle to the stored bundles
00303                                         _pending_bundles[hash] = bundle;
00304                                 }
00305 
00306                                 // increment the storage size
00307                                 _bundle_size[meta] = bundle_size;
00308                                 _currentsize += bundle_size;
00309 
00310                                 // add it to the bundle list
00311                                 dtn::data::BundleList::add(meta);
00312                                 _priority_index.insert(meta);
00313                         }
00314 
00315                         // put the bundle into the data store
00316                         _datastore.store(hash, bc);
00317                 }
00318 
00319                 void SimpleBundleStorage::remove(const dtn::data::BundleID &id)
00320                 {
00321                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00322 
00323                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00324                         {
00325                                 if ((*iter) == id)
00326                                 {
00327                                         // remove item in the bundlelist
00328                                         dtn::data::MetaBundle meta = (*iter);
00329 
00330                                         // remove it from the bundle list
00331                                         dtn::data::BundleList::remove(meta);
00332                                         _priority_index.erase(meta);
00333 
00334                                         DataStorage::Hash hash(meta.toString());
00335 
00336                                         // create a background task for removing the bundle
00337                                         _datastore.remove(hash);
00338 
00339                                         return;
00340                                 }
00341                         }
00342 
00343                         throw BundleStorage::NoBundleFoundException();
00344                 }
00345 
00346                 dtn::data::MetaBundle SimpleBundleStorage::remove(const ibrcommon::BloomFilter &filter)
00347                 {
00348                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00349 
00350                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00351                         {
00352                                 // remove item in the bundlelist
00353                                 const dtn::data::MetaBundle meta = (*iter);
00354 
00355                                 if ( filter.contains(meta.toString()) )
00356                                 {
00357                                         // remove it from the bundle list
00358                                         dtn::data::BundleList::remove(meta);
00359                                         _priority_index.erase(meta);
00360 
00361                                         DataStorage::Hash hash(meta.toString());
00362 
00363                                         // create a background task for removing the bundle
00364                                         _datastore.remove(hash);
00365 
00366                                         return meta;
00367                                 }
00368                         }
00369 
00370                         throw BundleStorage::NoBundleFoundException();
00371                 }
00372 
00373                 size_t SimpleBundleStorage::size() const
00374                 {
00375                         return _currentsize;
00376                 }
00377 
00378                 void SimpleBundleStorage::clear()
00379                 {
00380                         ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE);
00381 
00382                         // mark all bundles for deletion
00383                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00384                         {
00385                                 // remove item in the bundlelist
00386                                 const dtn::data::MetaBundle meta = (*iter);
00387 
00388                                 DataStorage::Hash hash(meta.toString());
00389 
00390                                 // create a background task for removing the bundle
00391                                 _datastore.remove(hash);
00392                         }
00393 
00394                         _bundles.clear();
00395                         _priority_index.clear();
00396                         dtn::data::BundleList::clear();
00397 
00398                         // set the storage size to zero
00399                         _currentsize = 0;
00400                 }
00401 
00402                 void SimpleBundleStorage::eventBundleExpired(const ExpiringBundle &b)
00403                 {
00404                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00405                         {
00406                                 if ((*iter) == b.bundle)
00407                                 {
00408                                         // remove the bundle
00409                                         const dtn::data::MetaBundle &meta = (*iter);
00410 
00411                                         DataStorage::Hash hash(meta.toString());
00412 
00413                                         // create a background task for removing the bundle
00414                                         _datastore.remove(hash);
00415 
00416                                         // remove the bundle off the index
00417                                         _priority_index.erase(meta);
00418 
00419                                         break;
00420                                 }
00421                         }
00422 
00423                         // raise bundle event
00424                         dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00425 
00426                         // raise an event
00427                         dtn::core::BundleExpiredEvent::raise( b.bundle );
00428                 }
00429 
00430                 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
00431                  : _bundle(b)
00432                 { }
00433 
00434                 SimpleBundleStorage::BundleContainer::~BundleContainer()
00435                 { }
00436 
00437                 std::string SimpleBundleStorage::BundleContainer::getKey() const
00438                 {
00439                         return dtn::data::BundleID(_bundle).toString();
00440                 }
00441 
00442                 std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
00443                 {
00444                         // get an serializer for bundles
00445                         dtn::data::DefaultSerializer s(stream);
00446 
00447                         // length of the bundle
00448                         unsigned int size = s.getLength(_bundle);
00449 
00450                         // serialize the bundle
00451                         s << _bundle; stream.flush();
00452 
00453                         // check the streams health
00454                         if (!stream.good())
00455                         {
00456                                 std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]";
00457                                 throw dtn::SerializationFailedException(ss.str());
00458                         }
00459 
00460                         // get the write position
00461                         if (size > stream.tellp())
00462                         {
00463                                 std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]";
00464                                 throw dtn::SerializationFailedException(ss.str());
00465                         }
00466 
00467                         // return the stream, this allows stacking
00468                         return stream;
00469                 }
00470         }
00471 }