IBR-DTNSuite  0.8
daemon/src/storage/MemoryBundleStorage.cpp
Go to the documentation of this file.
00001 /*
00002  * MemoryBundleStorage.cpp
00003  *
00004  *  Created on: 22.11.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "storage/MemoryBundleStorage.h"
00009 #include "core/TimeEvent.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleExpiredEvent.h"
00012 #include "core/BundleEvent.h"
00013 
00014 #include <ibrcommon/Logger.h>
00015 #include <ibrcommon/thread/MutexLock.h>
00016 
00017 namespace dtn
00018 {
00019         namespace storage
00020         {
00021                 MemoryBundleStorage::MemoryBundleStorage(size_t maxsize)
00022                  : _maxsize(maxsize), _currentsize(0)
00023                 {
00024                 }
00025 
00026                 MemoryBundleStorage::~MemoryBundleStorage()
00027                 {
00028                 }
00029 
00030                 void MemoryBundleStorage::componentUp()
00031                 {
00032                         bindEvent(dtn::core::TimeEvent::className);
00033                 }
00034 
00035                 void MemoryBundleStorage::componentDown()
00036                 {
00037                         unbindEvent(dtn::core::TimeEvent::className);
00038                 }
00039 
00040                 void MemoryBundleStorage::raiseEvent(const dtn::core::Event *evt)
00041                 {
00042                         try {
00043                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00044                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00045                                 {
00046                                         // do expiration of bundles
00047                                         ibrcommon::MutexLock l(_bundleslock);
00048                                         dtn::data::BundleList::expire(time.getTimestamp());
00049                                 }
00050                         } catch (const std::bad_cast&) { }
00051                 }
00052 
00053                 const std::string MemoryBundleStorage::getName() const
00054                 {
00055                         return "MemoryBundleStorage";
00056                 }
00057 
00058                 bool MemoryBundleStorage::empty()
00059                 {
00060                         ibrcommon::MutexLock l(_bundleslock);
00061                         return _bundles.empty();
00062                 }
00063 
00064                 void MemoryBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&)
00065                 {
00066                         // custody is successful transferred to another node.
00067                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
00068                 }
00069 
00070                 unsigned int MemoryBundleStorage::count()
00071                 {
00072                         ibrcommon::MutexLock l(_bundleslock);
00073                         return _bundles.size();
00074                 }
00075 
00076                 const std::list<dtn::data::MetaBundle> MemoryBundleStorage::get(BundleFilterCallback &cb)
00077                 {
00078                         // result list
00079                         std::list<dtn::data::MetaBundle> result;
00080 
00081                         // we have to iterate through all bundles
00082                         ibrcommon::MutexLock l(_bundleslock);
00083 
00084                         for (std::set<dtn::data::MetaBundle, CMP_BUNDLE_PRIORITY>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && ((cb.limit() == 0) || (result.size() < cb.limit())); iter++)
00085                         {
00086                                 const dtn::data::MetaBundle &bundle = (*iter);
00087 
00088                                 if ( cb.shouldAdd(bundle) )
00089                                 {
00090                                         result.push_back(bundle);
00091                                 }
00092                         }
00093 
00094                         return result;
00095                 }
00096 
00097                 dtn::data::Bundle MemoryBundleStorage::get(const dtn::data::BundleID &id)
00098                 {
00099                         try {
00100                                 ibrcommon::MutexLock l(_bundleslock);
00101 
00102                                 for (std::set<dtn::data::Bundle>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00103                                 {
00104                                         const dtn::data::Bundle &bundle = (*iter);
00105                                         if (id == bundle)
00106                                         {
00107                                                 return bundle;
00108                                         }
00109                                 }
00110                         } catch (const dtn::SerializationFailedException &ex) {
00111                                 // bundle loading failed
00112                                 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00113 
00114                                 // the bundle is broken, delete it
00115                                 remove(id);
00116 
00117                                 throw BundleStorage::BundleLoadException();
00118                         }
00119 
00120                         throw BundleStorage::NoBundleFoundException();
00121                 }
00122 
00123                 const std::set<dtn::data::EID> MemoryBundleStorage::getDistinctDestinations()
00124                 {
00125                         std::set<dtn::data::EID> ret;
00126 
00127                         ibrcommon::MutexLock l(_bundleslock);
00128 
00129                         for (std::set<dtn::data::Bundle>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00130                         {
00131                                 const dtn::data::Bundle &bundle = (*iter);
00132                                 ret.insert(bundle._destination);
00133                         }
00134 
00135                         return ret;
00136                 }
00137 
00138                 void MemoryBundleStorage::store(const dtn::data::Bundle &bundle)
00139                 {
00140                         ibrcommon::MutexLock l(_bundleslock);
00141 
00142                         // get size of the bundle
00143                         dtn::data::DefaultSerializer s(std::cout);
00144                         size_t size = s.getLength(bundle);
00145 
00146                         // check if this container is too big for us.
00147                         if ((_maxsize > 0) && (_currentsize + size > _maxsize))
00148                         {
00149                                 throw StorageSizeExeededException();
00150                         }
00151 
00152                         // increment the storage size
00153                         _currentsize += size;
00154                         _bundle_lengths[bundle] = size;
00155 
00156                         // insert Container
00157                         pair<set<dtn::data::Bundle>::iterator,bool> ret = _bundles.insert( bundle );
00158 
00159                         if (ret.second)
00160                         {
00161                                 dtn::data::BundleList::add(dtn::data::MetaBundle(bundle));
00162                                 _priority_index.insert( bundle );
00163                         }
00164                         else
00165                         {
00166                                 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: got bundle duplicate " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00167                         }
00168                 }
00169 
00170                 void MemoryBundleStorage::remove(const dtn::data::BundleID &id)
00171                 {
00172                         ibrcommon::MutexLock l(_bundleslock);
00173 
00174                         for (std::set<dtn::data::Bundle>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00175                         {
00176                                 if ( id == (*iter) )
00177                                 {
00178                                         // remove item in the bundlelist
00179                                         dtn::data::Bundle bundle = (*iter);
00180 
00181                                         // remove it from the bundle list
00182                                         _priority_index.erase(bundle);
00183                                         dtn::data::BundleList::remove(bundle);
00184 
00185                                         // get size of the bundle
00186                                         dtn::data::DefaultSerializer s(std::cout);
00187                                         size_t size = s.getLength(bundle);
00188 
00189                                         // decrement the storage size
00190                                         _currentsize -= size;
00191 
00192                                         // remove the container
00193                                         _bundles.erase(iter);
00194 
00195                                         return;
00196                                 }
00197                         }
00198 
00199                         throw BundleStorage::NoBundleFoundException();
00200                 }
00201 
00202                 dtn::data::MetaBundle MemoryBundleStorage::remove(const ibrcommon::BloomFilter &filter)
00203                 {
00204                         ibrcommon::MutexLock l(_bundleslock);
00205 
00206                         for (std::set<dtn::data::Bundle>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00207                         {
00208                                 const dtn::data::Bundle &bundle = (*iter);
00209 
00210                                 if ( filter.contains(bundle.toString()) )
00211                                 {
00212                                         // remove it from the bundle list
00213                                         _priority_index.erase(bundle);
00214                                         dtn::data::BundleList::remove(bundle);
00215 
00216                                         // get size of the bundle
00217                                         dtn::data::DefaultSerializer s(std::cout);
00218 
00219                                         // decrement the storage size
00220                                         ssize_t len = _bundle_lengths[bundle];
00221                                         _bundle_lengths.erase(bundle);
00222                                         _currentsize -= len;
00223 
00224                                         // remove the container
00225                                         _bundles.erase(iter);
00226 
00227                                         return (MetaBundle)bundle;
00228                                 }
00229                         }
00230 
00231                         throw BundleStorage::NoBundleFoundException();
00232                 }
00233 
00234                 size_t MemoryBundleStorage::size() const
00235                 {
00236                         return _currentsize;
00237                 }
00238 
00239                 void MemoryBundleStorage::clear()
00240                 {
00241                         ibrcommon::MutexLock l(_bundleslock);
00242 
00243                         _bundles.clear();
00244                         _priority_index.clear();
00245                         dtn::data::BundleList::clear();
00246                         _bundle_lengths.clear();
00247 
00248                         // set the storage size to zero
00249                         _currentsize = 0;
00250                 }
00251 
00252                 void MemoryBundleStorage::eventBundleExpired(const ExpiringBundle &b)
00253                 {
00254                         for (std::set<dtn::data::Bundle>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00255                         {
00256                                 if ( b.bundle == (*iter) )
00257                                 {
00258                                         dtn::data::Bundle bundle = (*iter);
00259 
00260                                         // get size of the bundle
00261                                         dtn::data::DefaultSerializer s(std::cout);
00262 
00263                                         // decrement the storage size
00264                                         ssize_t len = _bundle_lengths[bundle];
00265                                         _bundle_lengths.erase(bundle);
00266                                         _currentsize -= len;
00267 
00268                                         _priority_index.erase(bundle);
00269                                         _bundles.erase(iter);
00270                                         break;
00271                                 }
00272                         }
00273 
00274                         // raise bundle event
00275                         dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00276 
00277                         // raise an event
00278                         dtn::core::BundleExpiredEvent::raise( b.bundle );
00279                 }
00280         }
00281 }