IBR-DTNSuite
0.8
|
00001 /* 00002 * SQLiteBundleStorage.cpp 00003 * 00004 * Created on: 09.01.2010 00005 * Author: Myrtus 00006 */ 00007 00008 00009 #include "storage/SQLiteBundleStorage.h" 00010 #include "core/TimeEvent.h" 00011 #include "core/GlobalEvent.h" 00012 #include "core/BundleCore.h" 00013 #include "core/BundleEvent.h" 00014 00015 #include <ibrdtn/data/PayloadBlock.h> 00016 #include <ibrdtn/data/AgeBlock.h> 00017 #include <ibrdtn/data/Serializer.h> 00018 #include <ibrdtn/data/Bundle.h> 00019 #include <ibrdtn/data/BundleID.h> 00020 00021 #include <ibrcommon/thread/MutexLock.h> 00022 #include <ibrcommon/thread/RWLock.h> 00023 #include <ibrcommon/data/BLOB.h> 00024 #include <ibrcommon/Logger.h> 00025 #include <memory> 00026 #include <unistd.h> 00027 00028 namespace dtn 00029 { 00030 namespace storage 00031 { 00032 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex; 00033 bool SQLiteBundleStorage::TaskIdle::_idle = false; 00034 00035 SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path) 00036 : _blobPath(path) 00037 { 00038 // generate a new temporary file 00039 _file = ibrcommon::TemporaryFile(_blobPath, "blob"); 00040 } 00041 00042 SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB() 00043 { 00044 // delete the file if the last reference is destroyed 00045 _file.remove(); 00046 } 00047 00048 void SQLiteBundleStorage::SQLiteBLOB::clear() 00049 { 00050 // close the file 00051 _filestream.close(); 00052 00053 // remove the old file 00054 _file.remove(); 00055 00056 // generate a new temporary file 00057 _file = ibrcommon::TemporaryFile(_blobPath, "blob"); 00058 00059 // open temporary file 00060 _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary ); 00061 00062 if (!_filestream.is_open()) 00063 { 00064 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL; 00065 throw ibrcommon::CanNotOpenFileException(_file); 00066 } 00067 } 00068 00069 void SQLiteBundleStorage::SQLiteBLOB::open() 00070 { 00071 ibrcommon::BLOB::_filelimit.wait(); 00072 00073 // open temporary file 00074 _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary ); 00075 00076 if (!_filestream.is_open()) 00077 { 00078 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL; 00079 throw ibrcommon::CanNotOpenFileException(_file); 00080 } 00081 } 00082 00083 void SQLiteBundleStorage::SQLiteBLOB::close() 00084 { 00085 // flush the filestream 00086 _filestream.flush(); 00087 00088 // close the file 00089 _filestream.close(); 00090 00091 ibrcommon::BLOB::_filelimit.post(); 00092 } 00093 00094 size_t SQLiteBundleStorage::SQLiteBLOB::__get_size() 00095 { 00096 return _file.size(); 00097 } 00098 00099 ibrcommon::BLOB::Reference SQLiteBundleStorage::create() 00100 { 00101 return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath)); 00102 } 00103 00104 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size) 00105 : _database(path.get("sqlite.db"), size) 00106 { 00107 // set the block path 00108 _blockPath = path.get("blocks"); 00109 _blobPath = path.get("blob"); 00110 } 00111 00112 SQLiteBundleStorage::~SQLiteBundleStorage() 00113 { 00114 stop(); 00115 join(); 00116 } 00117 00118 void SQLiteBundleStorage::componentRun() 00119 { 00120 // loop until aborted 00121 try { 00122 while (true) 00123 { 00124 Task *t = _tasks.getnpop(true); 00125 00126 try { 00127 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t); 00128 try { 00129 btask.run(*this); 00130 } catch (const std::exception&) { 00131 btask.abort(); 00132 continue; 00133 }; 00134 btask.done(); 00135 continue; 00136 } catch (const std::bad_cast&) { }; 00137 00138 try { 00139 std::auto_ptr<Task> killer(t); 00140 t->run(*this); 00141 } catch (const std::exception&) { }; 00142 } 00143 } catch (const ibrcommon::QueueUnblockedException &ex) { 00144 // we are aborted, abort all blocking tasks 00145 } 00146 } 00147 00148 void SQLiteBundleStorage::componentUp() 00149 { 00150 //register Events 00151 bindEvent(dtn::core::TimeEvent::className); 00152 bindEvent(dtn::core::GlobalEvent::className); 00153 00154 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00155 00156 // delete all old BLOB container 00157 _blobPath.remove(true); 00158 00159 // create BLOB folder 00160 ibrcommon::File::createDirectory( _blobPath ); 00161 00162 // create the bundle folder 00163 ibrcommon::File::createDirectory( _blockPath ); 00164 00165 // open the database and create all folders and files if needed 00166 _database.open(); 00167 }; 00168 00169 void SQLiteBundleStorage::componentDown() 00170 { 00171 //unregister Events 00172 unbindEvent(dtn::core::TimeEvent::className); 00173 unbindEvent(dtn::core::GlobalEvent::className); 00174 00175 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00176 00177 // close the database 00178 _database.close(); 00179 }; 00180 00181 void SQLiteBundleStorage::__cancellation() 00182 { 00183 _tasks.abort(); 00184 } 00185 00186 const std::set<dtn::data::EID> SQLiteBundleStorage::getDistinctDestinations() 00187 { 00188 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY); 00189 return _database.getDistinctDestinations(); 00190 } 00191 00192 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb) 00193 { 00194 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY); 00195 return _database.get(cb); 00196 } 00197 00198 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id) 00199 { 00200 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY); 00201 00202 SQLiteDatabase::blocklist blocks; 00203 dtn::data::Bundle bundle; 00204 00205 // query the data base for the bundle 00206 _database.get(id, bundle, blocks); 00207 00208 for (SQLiteDatabase::blocklist::const_iterator iter = blocks.begin(); iter != blocks.end(); iter++) 00209 { 00210 const SQLiteDatabase::blocklist_entry &entry = (*iter); 00211 const int blocktyp = entry.first; 00212 const ibrcommon::File &file = entry.second; 00213 00214 IBRCOMMON_LOGGER_DEBUG(50) << "add block: " << file.getPath() << IBRCOMMON_LOGGER_ENDL; 00215 00216 // load block from file 00217 std::ifstream is(file.getPath().c_str(), std::ios::binary | std::ios::in); 00218 00219 if ((blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE) && 00220 (!bundle.get(dtn::data::PrimaryBlock::APPDATA_IS_ADMRECORD))) 00221 { 00222 // create a new BLOB object 00223 SQLiteBLOB *blob = new SQLiteBLOB(_blobPath); 00224 00225 // remove the corresponding file 00226 blob->_file.remove(); 00227 00228 // generate a hardlink, pointing to the BLOB file 00229 if ( ::link(file.getPath().c_str(), blob->_file.getPath().c_str()) == 0) 00230 { 00231 // create a reference of the BLOB 00232 ibrcommon::BLOB::Reference ref(blob); 00233 00234 // add payload block to the bundle 00235 bundle.push_back(ref); 00236 } 00237 else 00238 { 00239 delete blob; 00240 IBRCOMMON_LOGGER(error) << "unable to load bundle: failed to create a hard-link" << IBRCOMMON_LOGGER_ENDL; 00241 } 00242 } 00243 else 00244 { 00245 // read the block 00246 dtn::data::Block &block = dtn::data::SeparateDeserializer(is, bundle).readBlock(); 00247 00248 // close the file 00249 is.close(); 00250 00251 // modify the age block if present 00252 try { 00253 dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block); 00254 00255 // modify the AgeBlock with the age of the file 00256 time_t age = file.lastaccess() - file.lastmodify(); 00257 00258 agebl.addSeconds(age); 00259 } catch (const std::bad_cast&) { }; 00260 } 00261 } 00262 00263 return bundle; 00264 } 00265 00266 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle) 00267 { 00268 IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL; 00269 00270 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00271 00272 // start transaction to store the bundle 00273 _database.transaction(); 00274 00275 try { 00276 // store the bundle data in the database 00277 _database.store(bundle); 00278 00279 // create a bundle id 00280 const dtn::data::BundleID id(bundle); 00281 00282 // get all blocks of the bundle 00283 const list<const dtn::data::Block*> blocklist = bundle.getBlocks(); 00284 00285 // index number for order of the blocks 00286 int index = 1; 00287 00288 // number of bytes stored 00289 int storedBytes = 0; 00290 00291 for(std::list<const dtn::data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++) 00292 { 00293 const dtn::data::Block &block = (**it); 00294 00295 // create a temporary file 00296 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 00297 00298 try { 00299 try { 00300 const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block); 00301 ibrcommon::BLOB::Reference ref = payload.getBLOB(); 00302 ibrcommon::BLOB::iostream stream = ref.iostream(); 00303 00304 const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref); 00305 00306 // first remove the tmp file 00307 tmpfile.remove(); 00308 00309 // make a hardlink to the origin blob file 00310 if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 ) 00311 { 00312 tmpfile = ibrcommon::TemporaryFile(_blockPath, "block"); 00313 throw ibrcommon::Exception("hard-link failed"); 00314 } 00315 } catch (const std::bad_cast&) { 00316 throw ibrcommon::Exception("not a Payload or SQLiteBLOB"); 00317 } 00318 00319 storedBytes += _blockPath.size(); 00320 } catch (const ibrcommon::Exception&) { 00321 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary); 00322 dtn::data::SeparateSerializer serializer(filestream); 00323 serializer << block; 00324 storedBytes += serializer.getLength(block); 00325 filestream.close(); 00326 } 00327 00328 _database.store(id, index, block, tmpfile); 00329 00330 // increment index 00331 index++; 00332 } 00333 00334 _database.commit(); 00335 00336 try { 00337 // the bundle is stored sucessfully, we could accept custody if it is requested 00338 const dtn::data::EID custodian = acceptCustody(bundle); 00339 00340 // update the custody address of this bundle 00341 _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, bundle, custodian); 00342 } catch (const ibrcommon::Exception&) { 00343 // this bundle has no request for custody transfers 00344 } 00345 00346 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL; 00347 } catch (const ibrcommon::Exception&) { 00348 _database.rollback(); 00349 } 00350 } 00351 00352 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id) 00353 { 00354 _tasks.push(new TaskRemove(id)); 00355 } 00356 00357 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage) 00358 { 00359 ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00360 storage._database.remove(_id); 00361 } 00362 00363 void SQLiteBundleStorage::clearAll() 00364 { 00365 clear(); 00366 } 00367 00368 void SQLiteBundleStorage::clear() 00369 { 00370 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00371 00372 _database.clear(); 00373 00374 //Delete Folder SQL_TABLE_BLOCK containing Blocks 00375 _blockPath.remove(true); 00376 ibrcommon::File::createDirectory(_blockPath); 00377 } 00378 00379 bool SQLiteBundleStorage::empty() 00380 { 00381 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY); 00382 return _database.empty(); 00383 } 00384 00385 unsigned int SQLiteBundleStorage::count() 00386 { 00387 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY); 00388 return _database.count(); 00389 } 00390 00391 void SQLiteBundleStorage::raiseEvent(const dtn::core::Event *evt) 00392 { 00393 try { 00394 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00395 00396 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00397 { 00398 _tasks.push(new TaskExpire(time.getTimestamp())); 00399 } 00400 } catch (const std::bad_cast&) { } 00401 00402 try { 00403 const dtn::core::GlobalEvent &global = dynamic_cast<const dtn::core::GlobalEvent&>(*evt); 00404 00405 if(global.getAction() == dtn::core::GlobalEvent::GLOBAL_IDLE) 00406 { 00407 // switch to idle mode 00408 ibrcommon::MutexLock l(TaskIdle::_mutex); 00409 TaskIdle::_idle = true; 00410 00411 // generate an idle task 00412 _tasks.push(new TaskIdle()); 00413 } 00414 else if(global.getAction() == dtn::core::GlobalEvent::GLOBAL_BUSY) 00415 { 00416 // switch back to non-idle mode 00417 ibrcommon::MutexLock l(TaskIdle::_mutex); 00418 TaskIdle::_idle = false; 00419 } 00420 } catch (const std::bad_cast&) { } 00421 } 00422 00423 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage) 00424 { 00425 ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00426 storage._database.expire(_timestamp); 00427 } 00428 00429 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage) 00430 { 00431 // until IDLE is false 00432 while (true) 00433 { 00434 /* 00435 * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space. 00436 * This empty space will be reused the next time new information is added to the database. But in the meantime, 00437 * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can 00438 * cause the information in the database to become fragmented - scrattered out all across the database file rather 00439 * than clustered together in one place. 00440 * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous, 00441 * and otherwise cleans up the database file structure. 00442 */ 00443 { 00444 ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE); 00445 storage._database.vacuum(); 00446 } 00447 00448 // here we can do some IDLE stuff... 00449 ::sleep(1); 00450 00451 ibrcommon::MutexLock l(TaskIdle::_mutex); 00452 if (!TaskIdle::_idle) return; 00453 } 00454 } 00455 00456 const std::string SQLiteBundleStorage::getName() const 00457 { 00458 return "SQLiteBundleStorage"; 00459 } 00460 00461 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id) 00462 { 00463 ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY); 00464 00465 // custody is successful transferred to another node. 00466 // it is safe to delete this bundle now. (depending on the routing algorithm.) 00467 // update the custodian of this bundle with the new one 00468 _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, id, custodian); 00469 } 00470 } 00471 }