IBR-DTNSuite
0.8
|
00001 /* 00002 * MemoryBundleStorage.cpp 00003 * 00004 * Created on: 22.11.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "storage/MemoryBundleStorage.h" 00009 #include "core/TimeEvent.h" 00010 #include "core/GlobalEvent.h" 00011 #include "core/BundleExpiredEvent.h" 00012 #include "core/BundleEvent.h" 00013 00014 #include <ibrcommon/Logger.h> 00015 #include <ibrcommon/thread/MutexLock.h> 00016 00017 namespace dtn 00018 { 00019 namespace storage 00020 { 00021 MemoryBundleStorage::MemoryBundleStorage(size_t maxsize) 00022 : _maxsize(maxsize), _currentsize(0) 00023 { 00024 } 00025 00026 MemoryBundleStorage::~MemoryBundleStorage() 00027 { 00028 } 00029 00030 void MemoryBundleStorage::componentUp() 00031 { 00032 bindEvent(dtn::core::TimeEvent::className); 00033 } 00034 00035 void MemoryBundleStorage::componentDown() 00036 { 00037 unbindEvent(dtn::core::TimeEvent::className); 00038 } 00039 00040 void MemoryBundleStorage::raiseEvent(const dtn::core::Event *evt) 00041 { 00042 try { 00043 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00044 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00045 { 00046 // do expiration of bundles 00047 ibrcommon::MutexLock l(_bundleslock); 00048 dtn::data::BundleList::expire(time.getTimestamp()); 00049 } 00050 } catch (const std::bad_cast&) { } 00051 } 00052 00053 const std::string MemoryBundleStorage::getName() const 00054 { 00055 return "MemoryBundleStorage"; 00056 } 00057 00058 bool MemoryBundleStorage::empty() 00059 { 00060 ibrcommon::MutexLock l(_bundleslock); 00061 return _bundles.empty(); 00062 } 00063 00064 void MemoryBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&) 00065 { 00066 // custody is successful transferred to another node. 00067 // it is safe to delete this bundle now. (depending on the routing algorithm.) 00068 } 00069 00070 unsigned int MemoryBundleStorage::count() 00071 { 00072 ibrcommon::MutexLock l(_bundleslock); 00073 return _bundles.size(); 00074 } 00075 00076 const std::list<dtn::data::MetaBundle> MemoryBundleStorage::get(BundleFilterCallback &cb) 00077 { 00078 // result list 00079 std::list<dtn::data::MetaBundle> result; 00080 00081 // we have to iterate through all bundles 00082 ibrcommon::MutexLock l(_bundleslock); 00083 00084 for (std::set<dtn::data::MetaBundle, CMP_BUNDLE_PRIORITY>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && ((cb.limit() == 0) || (result.size() < cb.limit())); iter++) 00085 { 00086 const dtn::data::MetaBundle &bundle = (*iter); 00087 00088 if ( cb.shouldAdd(bundle) ) 00089 { 00090 result.push_back(bundle); 00091 } 00092 } 00093 00094 return result; 00095 } 00096 00097 dtn::data::Bundle MemoryBundleStorage::get(const dtn::data::BundleID &id) 00098 { 00099 try { 00100 ibrcommon::MutexLock l(_bundleslock); 00101 00102 for (std::set<dtn::data::Bundle>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++) 00103 { 00104 const dtn::data::Bundle &bundle = (*iter); 00105 if (id == bundle) 00106 { 00107 return bundle; 00108 } 00109 } 00110 } catch (const dtn::SerializationFailedException &ex) { 00111 // bundle loading failed 00112 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00113 00114 // the bundle is broken, delete it 00115 remove(id); 00116 00117 throw BundleStorage::BundleLoadException(); 00118 } 00119 00120 throw BundleStorage::NoBundleFoundException(); 00121 } 00122 00123 const std::set<dtn::data::EID> MemoryBundleStorage::getDistinctDestinations() 00124 { 00125 std::set<dtn::data::EID> ret; 00126 00127 ibrcommon::MutexLock l(_bundleslock); 00128 00129 for (std::set<dtn::data::Bundle>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++) 00130 { 00131 const dtn::data::Bundle &bundle = (*iter); 00132 ret.insert(bundle._destination); 00133 } 00134 00135 return ret; 00136 } 00137 00138 void MemoryBundleStorage::store(const dtn::data::Bundle &bundle) 00139 { 00140 ibrcommon::MutexLock l(_bundleslock); 00141 00142 // get size of the bundle 00143 dtn::data::DefaultSerializer s(std::cout); 00144 size_t size = s.getLength(bundle); 00145 00146 // check if this container is too big for us. 00147 if ((_maxsize > 0) && (_currentsize + size > _maxsize)) 00148 { 00149 throw StorageSizeExeededException(); 00150 } 00151 00152 // increment the storage size 00153 _currentsize += size; 00154 _bundle_lengths[bundle] = size; 00155 00156 // insert Container 00157 pair<set<dtn::data::Bundle>::iterator,bool> ret = _bundles.insert( bundle ); 00158 00159 if (ret.second) 00160 { 00161 dtn::data::BundleList::add(dtn::data::MetaBundle(bundle)); 00162 _priority_index.insert( bundle ); 00163 } 00164 else 00165 { 00166 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: got bundle duplicate " << bundle.toString() << IBRCOMMON_LOGGER_ENDL; 00167 } 00168 } 00169 00170 void MemoryBundleStorage::remove(const dtn::data::BundleID &id) 00171 { 00172 ibrcommon::MutexLock l(_bundleslock); 00173 00174 for (std::set<dtn::data::Bundle>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++) 00175 { 00176 if ( id == (*iter) ) 00177 { 00178 // remove item in the bundlelist 00179 dtn::data::Bundle bundle = (*iter); 00180 00181 // remove it from the bundle list 00182 _priority_index.erase(bundle); 00183 dtn::data::BundleList::remove(bundle); 00184 00185 // get size of the bundle 00186 dtn::data::DefaultSerializer s(std::cout); 00187 size_t size = s.getLength(bundle); 00188 00189 // decrement the storage size 00190 _currentsize -= size; 00191 00192 // remove the container 00193 _bundles.erase(iter); 00194 00195 return; 00196 } 00197 } 00198 00199 throw BundleStorage::NoBundleFoundException(); 00200 } 00201 00202 dtn::data::MetaBundle MemoryBundleStorage::remove(const ibrcommon::BloomFilter &filter) 00203 { 00204 ibrcommon::MutexLock l(_bundleslock); 00205 00206 for (std::set<dtn::data::Bundle>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++) 00207 { 00208 const dtn::data::Bundle &bundle = (*iter); 00209 00210 if ( filter.contains(bundle.toString()) ) 00211 { 00212 // remove it from the bundle list 00213 _priority_index.erase(bundle); 00214 dtn::data::BundleList::remove(bundle); 00215 00216 // get size of the bundle 00217 dtn::data::DefaultSerializer s(std::cout); 00218 00219 // decrement the storage size 00220 ssize_t len = _bundle_lengths[bundle]; 00221 _bundle_lengths.erase(bundle); 00222 _currentsize -= len; 00223 00224 // remove the container 00225 _bundles.erase(iter); 00226 00227 return (MetaBundle)bundle; 00228 } 00229 } 00230 00231 throw BundleStorage::NoBundleFoundException(); 00232 } 00233 00234 size_t MemoryBundleStorage::size() const 00235 { 00236 return _currentsize; 00237 } 00238 00239 void MemoryBundleStorage::clear() 00240 { 00241 ibrcommon::MutexLock l(_bundleslock); 00242 00243 _bundles.clear(); 00244 _priority_index.clear(); 00245 dtn::data::BundleList::clear(); 00246 _bundle_lengths.clear(); 00247 00248 // set the storage size to zero 00249 _currentsize = 0; 00250 } 00251 00252 void MemoryBundleStorage::eventBundleExpired(const ExpiringBundle &b) 00253 { 00254 for (std::set<dtn::data::Bundle>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++) 00255 { 00256 if ( b.bundle == (*iter) ) 00257 { 00258 dtn::data::Bundle bundle = (*iter); 00259 00260 // get size of the bundle 00261 dtn::data::DefaultSerializer s(std::cout); 00262 00263 // decrement the storage size 00264 ssize_t len = _bundle_lengths[bundle]; 00265 _bundle_lengths.erase(bundle); 00266 _currentsize -= len; 00267 00268 _priority_index.erase(bundle); 00269 _bundles.erase(iter); 00270 break; 00271 } 00272 } 00273 00274 // raise bundle event 00275 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED); 00276 00277 // raise an event 00278 dtn::core::BundleExpiredEvent::raise( b.bundle ); 00279 } 00280 } 00281 }