IBR-DTNSuite
0.8
|
00001 #include "storage/SimpleBundleStorage.h" 00002 #include "core/TimeEvent.h" 00003 #include "core/GlobalEvent.h" 00004 #include "core/BundleExpiredEvent.h" 00005 #include "core/BundleEvent.h" 00006 00007 #include <ibrdtn/data/AgeBlock.h> 00008 #include <ibrdtn/utils/Utils.h> 00009 #include <ibrcommon/thread/RWLock.h> 00010 #include <ibrcommon/Logger.h> 00011 00012 #include <string.h> 00013 #include <stdlib.h> 00014 #include <iostream> 00015 #include <fstream> 00016 #include <cstring> 00017 #include <cerrno> 00018 00019 namespace dtn 00020 { 00021 namespace storage 00022 { 00023 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize, size_t buffer_limit) 00024 : _datastore(*this, workdir, buffer_limit), _maxsize(maxsize), _currentsize(0) 00025 { 00026 // load persistent bundles 00027 _datastore.iterateAll(); 00028 00029 // some output 00030 IBRCOMMON_LOGGER(info) << _bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL; 00031 } 00032 00033 SimpleBundleStorage::~SimpleBundleStorage() 00034 { 00035 } 00036 00037 void SimpleBundleStorage::eventDataStorageStored(const dtn::storage::DataStorage::Hash &hash) 00038 { 00039 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00040 dtn::data::MetaBundle meta(_pending_bundles[hash]); 00041 _pending_bundles.erase(hash); 00042 _stored_bundles[meta] = hash; 00043 } 00044 00045 void SimpleBundleStorage::eventDataStorageStoreFailed(const dtn::storage::DataStorage::Hash &hash, const ibrcommon::Exception &ex) 00046 { 00047 IBRCOMMON_LOGGER(error) << "store failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00048 00049 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00050 00051 // get the reference to the bundle 00052 const dtn::data::Bundle &b = _pending_bundles[hash]; 00053 00054 // decrement the storage size 00055 _currentsize -= _bundle_size[b]; 00056 00057 // cleanup bundle sizes 00058 _bundle_size.erase(b); 00059 00060 // delete the pending bundle 00061 _pending_bundles.erase(hash); 00062 } 00063 00064 void SimpleBundleStorage::eventDataStorageRemoved(const dtn::storage::DataStorage::Hash &hash) 00065 { 00066 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00067 00068 for (std::map<dtn::data::MetaBundle, DataStorage::Hash>::iterator iter = _stored_bundles.begin(); 00069 iter != _stored_bundles.end(); iter++) 00070 { 00071 if (iter->second == hash) 00072 { 00073 // decrement the storage size 00074 _currentsize -= _bundle_size[iter->first]; 00075 00076 _bundle_size.erase(iter->first); 00077 00078 _stored_bundles.erase(iter); 00079 return; 00080 } 00081 } 00082 } 00083 00084 void SimpleBundleStorage::eventDataStorageRemoveFailed(const dtn::storage::DataStorage::Hash&, const ibrcommon::Exception &ex) 00085 { 00086 IBRCOMMON_LOGGER(error) << "remove failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00087 } 00088 00089 void SimpleBundleStorage::iterateDataStorage(const dtn::storage::DataStorage::Hash &hash, dtn::storage::DataStorage::istream &stream) 00090 { 00091 try { 00092 dtn::data::Bundle bundle; 00093 dtn::data::DefaultDeserializer ds(*stream); 00094 00095 // load a bundle into the storage 00096 ds >> bundle; 00097 00098 // extract meta data 00099 dtn::data::MetaBundle meta(bundle); 00100 00101 // lock the bundle lists 00102 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00103 00104 // add the bundle to the stored bundles 00105 _stored_bundles[meta] = hash; 00106 00107 // increment the storage size 00108 _bundle_size[meta] = (*stream).tellg(); 00109 _currentsize += (*stream).tellg(); 00110 00111 // add it to the bundle list 00112 dtn::data::BundleList::add(meta); 00113 _priority_index.insert(meta); 00114 00115 } catch (const std::exception&) { 00116 // report this error to the console 00117 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << hash.value << IBRCOMMON_LOGGER_ENDL; 00118 00119 // error while reading file 00120 _datastore.remove(hash); 00121 } 00122 } 00123 00124 dtn::data::Bundle SimpleBundleStorage::__get(const dtn::data::MetaBundle &meta) 00125 { 00126 DataStorage::Hash hash(meta.toString()); 00127 std::map<DataStorage::Hash, dtn::data::Bundle>::iterator it = _pending_bundles.find(hash); 00128 00129 if (_pending_bundles.end() != it) 00130 { 00131 return it->second; 00132 } 00133 00134 try { 00135 DataStorage::istream stream = _datastore.retrieve(hash); 00136 00137 // load the bundle from the storage 00138 dtn::data::Bundle bundle; 00139 00140 // load the bundle from file 00141 try { 00142 dtn::data::DefaultDeserializer(*stream) >> bundle; 00143 } catch (const std::exception &ex) { 00144 throw dtn::SerializationFailedException("bundle get failed: " + std::string(ex.what())); 00145 } 00146 00147 try { 00148 dtn::data::AgeBlock &agebl = bundle.getBlock<dtn::data::AgeBlock>(); 00149 00150 // modify the AgeBlock with the age of the file 00151 time_t age = stream.lastaccess() - stream.lastmodify(); 00152 00153 agebl.addSeconds(age); 00154 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00155 00156 return bundle; 00157 } catch (const DataStorage::DataNotAvailableException&) { 00158 throw BundleStorage::NoBundleFoundException(); 00159 } 00160 } 00161 00162 void SimpleBundleStorage::componentUp() 00163 { 00164 bindEvent(dtn::core::TimeEvent::className); 00165 _datastore.start(); 00166 } 00167 00168 void SimpleBundleStorage::componentDown() 00169 { 00170 unbindEvent(dtn::core::TimeEvent::className); 00171 _datastore.stop(); 00172 _datastore.join(); 00173 } 00174 00175 void SimpleBundleStorage::raiseEvent(const dtn::core::Event *evt) 00176 { 00177 try { 00178 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00179 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00180 { 00181 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00182 dtn::data::BundleList::expire(time.getTimestamp()); 00183 } 00184 } catch (const std::bad_cast&) { } 00185 } 00186 00187 const std::string SimpleBundleStorage::getName() const 00188 { 00189 return "SimpleBundleStorage"; 00190 } 00191 00192 bool SimpleBundleStorage::empty() 00193 { 00194 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY); 00195 return dtn::data::BundleList::empty(); 00196 } 00197 00198 void SimpleBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&) 00199 { 00200 // custody is successful transferred to another node. 00201 // it is safe to delete this bundle now. (depending on the routing algorithm.) 00202 } 00203 00204 unsigned int SimpleBundleStorage::count() 00205 { 00206 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY); 00207 return dtn::data::BundleList::size(); 00208 } 00209 00210 const std::list<dtn::data::MetaBundle> SimpleBundleStorage::get(BundleFilterCallback &cb) 00211 { 00212 // result list 00213 std::list<dtn::data::MetaBundle> result; 00214 00215 // we have to iterate through all bundles 00216 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY); 00217 00218 for (std::set<dtn::data::MetaBundle, CMP_BUNDLE_PRIORITY>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && ((cb.limit() == 0) || (result.size() < cb.limit())); iter++) 00219 { 00220 const dtn::data::MetaBundle &meta = (*iter); 00221 00222 if ( cb.shouldAdd(meta) ) 00223 { 00224 result.push_back(meta); 00225 } 00226 } 00227 00228 return result; 00229 } 00230 00231 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id) 00232 { 00233 try { 00234 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY); 00235 00236 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00237 { 00238 const dtn::data::MetaBundle &meta = (*iter); 00239 if (id == meta) 00240 { 00241 return __get(meta); 00242 } 00243 } 00244 } catch (const dtn::SerializationFailedException &ex) { 00245 // bundle loading failed 00246 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00247 00248 // the bundle is broken, delete it 00249 remove(id); 00250 00251 throw BundleStorage::BundleLoadException(); 00252 } 00253 00254 throw BundleStorage::NoBundleFoundException(); 00255 } 00256 00257 const std::set<dtn::data::EID> SimpleBundleStorage::getDistinctDestinations() 00258 { 00259 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READONLY); 00260 std::set<dtn::data::EID> ret; 00261 return ret; 00262 } 00263 00264 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle) 00265 { 00266 // get the bundle size 00267 dtn::data::DefaultSerializer s(std::cout); 00268 size_t bundle_size = s.getLength(bundle); 00269 00270 // store the bundle 00271 BundleContainer *bc = new BundleContainer(bundle); 00272 DataStorage::Hash hash(*bc); 00273 00274 // check if this container is too big for us. 00275 { 00276 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00277 if ((_maxsize > 0) && (_currentsize + bundle_size > _maxsize)) 00278 { 00279 throw StorageSizeExeededException(); 00280 } 00281 00282 // create meta data object 00283 dtn::data::MetaBundle meta(bundle); 00284 00285 // accept custody if requested 00286 try { 00287 dtn::data::EID custodian = BundleStorage::acceptCustody(bundle); 00288 00289 // container for the custody accepted bundle 00290 dtn::data::Bundle ca_bundle = bundle; 00291 00292 // set the new custodian 00293 ca_bundle._custodian = custodian; 00294 00295 // create meta data object 00296 meta = ca_bundle; 00297 00298 // add the bundle to the stored bundles 00299 _pending_bundles[hash] = ca_bundle; 00300 } catch (const ibrcommon::Exception&) { 00301 // no custody requested 00302 // add the bundle to the stored bundles 00303 _pending_bundles[hash] = bundle; 00304 } 00305 00306 // increment the storage size 00307 _bundle_size[meta] = bundle_size; 00308 _currentsize += bundle_size; 00309 00310 // add it to the bundle list 00311 dtn::data::BundleList::add(meta); 00312 _priority_index.insert(meta); 00313 } 00314 00315 // put the bundle into the data store 00316 _datastore.store(hash, bc); 00317 } 00318 00319 void SimpleBundleStorage::remove(const dtn::data::BundleID &id) 00320 { 00321 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00322 00323 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00324 { 00325 if ((*iter) == id) 00326 { 00327 // remove item in the bundlelist 00328 dtn::data::MetaBundle meta = (*iter); 00329 00330 // remove it from the bundle list 00331 dtn::data::BundleList::remove(meta); 00332 _priority_index.erase(meta); 00333 00334 DataStorage::Hash hash(meta.toString()); 00335 00336 // create a background task for removing the bundle 00337 _datastore.remove(hash); 00338 00339 return; 00340 } 00341 } 00342 00343 throw BundleStorage::NoBundleFoundException(); 00344 } 00345 00346 dtn::data::MetaBundle SimpleBundleStorage::remove(const ibrcommon::BloomFilter &filter) 00347 { 00348 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00349 00350 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00351 { 00352 // remove item in the bundlelist 00353 const dtn::data::MetaBundle meta = (*iter); 00354 00355 if ( filter.contains(meta.toString()) ) 00356 { 00357 // remove it from the bundle list 00358 dtn::data::BundleList::remove(meta); 00359 _priority_index.erase(meta); 00360 00361 DataStorage::Hash hash(meta.toString()); 00362 00363 // create a background task for removing the bundle 00364 _datastore.remove(hash); 00365 00366 return meta; 00367 } 00368 } 00369 00370 throw BundleStorage::NoBundleFoundException(); 00371 } 00372 00373 size_t SimpleBundleStorage::size() const 00374 { 00375 return _currentsize; 00376 } 00377 00378 void SimpleBundleStorage::clear() 00379 { 00380 ibrcommon::RWLock l(_bundleslock, ibrcommon::RWMutex::LOCK_READWRITE); 00381 00382 // mark all bundles for deletion 00383 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00384 { 00385 // remove item in the bundlelist 00386 const dtn::data::MetaBundle meta = (*iter); 00387 00388 DataStorage::Hash hash(meta.toString()); 00389 00390 // create a background task for removing the bundle 00391 _datastore.remove(hash); 00392 } 00393 00394 _bundles.clear(); 00395 _priority_index.clear(); 00396 dtn::data::BundleList::clear(); 00397 00398 // set the storage size to zero 00399 _currentsize = 0; 00400 } 00401 00402 void SimpleBundleStorage::eventBundleExpired(const ExpiringBundle &b) 00403 { 00404 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00405 { 00406 if ((*iter) == b.bundle) 00407 { 00408 // remove the bundle 00409 const dtn::data::MetaBundle &meta = (*iter); 00410 00411 DataStorage::Hash hash(meta.toString()); 00412 00413 // create a background task for removing the bundle 00414 _datastore.remove(hash); 00415 00416 // remove the bundle off the index 00417 _priority_index.erase(meta); 00418 00419 break; 00420 } 00421 } 00422 00423 // raise bundle event 00424 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED); 00425 00426 // raise an event 00427 dtn::core::BundleExpiredEvent::raise( b.bundle ); 00428 } 00429 00430 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b) 00431 : _bundle(b) 00432 { } 00433 00434 SimpleBundleStorage::BundleContainer::~BundleContainer() 00435 { } 00436 00437 std::string SimpleBundleStorage::BundleContainer::getKey() const 00438 { 00439 return dtn::data::BundleID(_bundle).toString(); 00440 } 00441 00442 std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream) 00443 { 00444 // get an serializer for bundles 00445 dtn::data::DefaultSerializer s(stream); 00446 00447 // length of the bundle 00448 unsigned int size = s.getLength(_bundle); 00449 00450 // serialize the bundle 00451 s << _bundle; stream.flush(); 00452 00453 // check the streams health 00454 if (!stream.good()) 00455 { 00456 std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]"; 00457 throw dtn::SerializationFailedException(ss.str()); 00458 } 00459 00460 // get the write position 00461 if (size > stream.tellp()) 00462 { 00463 std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]"; 00464 throw dtn::SerializationFailedException(ss.str()); 00465 } 00466 00467 // return the stream, this allows stacking 00468 return stream; 00469 } 00470 } 00471 }