IBR-DTNSuite  0.8
daemon/src/storage/SQLiteBundleStorage.cpp
Go to the documentation of this file.
00001 /*
00002  * SQLiteBundleStorage.cpp
00003  *
00004  *  Created on: 09.01.2010
00005  *      Author: Myrtus
00006  */
00007 
00008 
00009 #include "storage/SQLiteBundleStorage.h"
00010 #include "core/TimeEvent.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "core/BundleEvent.h"
00014 
00015 #include <ibrdtn/data/PayloadBlock.h>
00016 #include <ibrdtn/data/AgeBlock.h>
00017 #include <ibrdtn/data/Serializer.h>
00018 #include <ibrdtn/data/Bundle.h>
00019 #include <ibrdtn/data/BundleID.h>
00020 
00021 #include <ibrcommon/thread/MutexLock.h>
00022 #include <ibrcommon/thread/RWLock.h>
00023 #include <ibrcommon/data/BLOB.h>
00024 #include <ibrcommon/Logger.h>
00025 #include <memory>
00026 #include <unistd.h>
00027 
00028 namespace dtn
00029 {
00030         namespace storage
00031         {
00032                 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
00033                 bool SQLiteBundleStorage::TaskIdle::_idle = false;
00034 
00035                 SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path)
00036                  : _blobPath(path)
00037                 {
00038                         // generate a new temporary file
00039                         _file = ibrcommon::TemporaryFile(_blobPath, "blob");
00040                 }
00041 
00042                 SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB()
00043                 {
00044                         // delete the file if the last reference is destroyed
00045                         _file.remove();
00046                 }
00047 
00048                 void SQLiteBundleStorage::SQLiteBLOB::clear()
00049                 {
00050                         // close the file
00051                         _filestream.close();
00052 
00053                         // remove the old file
00054                         _file.remove();
00055 
00056                         // generate a new temporary file
00057                         _file = ibrcommon::TemporaryFile(_blobPath, "blob");
00058 
00059                         // open temporary file
00060                         _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary );
00061 
00062                         if (!_filestream.is_open())
00063                         {
00064                                 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
00065                                 throw ibrcommon::CanNotOpenFileException(_file);
00066                         }
00067                 }
00068 
00069                 void SQLiteBundleStorage::SQLiteBLOB::open()
00070                 {
00071                         ibrcommon::BLOB::_filelimit.wait();
00072 
00073                         // open temporary file
00074                         _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary );
00075 
00076                         if (!_filestream.is_open())
00077                         {
00078                                 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
00079                                 throw ibrcommon::CanNotOpenFileException(_file);
00080                         }
00081                 }
00082 
00083                 void SQLiteBundleStorage::SQLiteBLOB::close()
00084                 {
00085                         // flush the filestream
00086                         _filestream.flush();
00087 
00088                         // close the file
00089                         _filestream.close();
00090 
00091                         ibrcommon::BLOB::_filelimit.post();
00092                 }
00093 
00094                 size_t SQLiteBundleStorage::SQLiteBLOB::__get_size()
00095                 {
00096                         return _file.size();
00097                 }
00098 
00099                 ibrcommon::BLOB::Reference SQLiteBundleStorage::create()
00100                 {
00101                         return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath));
00102                 }
00103 
00104                 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size)
00105                  : _database(path.get("sqlite.db"), size)
00106                 {
00107                         // set the block path
00108                         _blockPath = path.get("blocks");
00109                         _blobPath = path.get("blob");
00110                 }
00111 
00112                 SQLiteBundleStorage::~SQLiteBundleStorage()
00113                 {
00114                         stop();
00115                         join();
00116                 }
00117 
00118                 void SQLiteBundleStorage::componentRun()
00119                 {
00120                         // loop until aborted
00121                         try {
00122                                 while (true)
00123                                 {
00124                                         Task *t = _tasks.getnpop(true);
00125 
00126                                         try {
00127                                                 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
00128                                                 try {
00129                                                         btask.run(*this);
00130                                                 } catch (const std::exception&) {
00131                                                         btask.abort();
00132                                                         continue;
00133                                                 };
00134                                                 btask.done();
00135                                                 continue;
00136                                         } catch (const std::bad_cast&) { };
00137 
00138                                         try {
00139                                                 std::auto_ptr<Task> killer(t);
00140                                                 t->run(*this);
00141                                         } catch (const std::exception&) { };
00142                                 }
00143                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00144                                 // we are aborted, abort all blocking tasks
00145                         }
00146                 }
00147 
00148                 void SQLiteBundleStorage::componentUp()
00149                 {
00150                         //register Events
00151                         bindEvent(dtn::core::TimeEvent::className);
00152                         bindEvent(dtn::core::GlobalEvent::className);
00153 
00154                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00155 
00156                         // delete all old BLOB container
00157                         _blobPath.remove(true);
00158 
00159                         // create BLOB folder
00160                         ibrcommon::File::createDirectory( _blobPath );
00161 
00162                         // create the bundle folder
00163                         ibrcommon::File::createDirectory( _blockPath );
00164 
00165                         // open the database and create all folders and files if needed
00166                         _database.open();
00167                 };
00168 
00169                 void SQLiteBundleStorage::componentDown()
00170                 {
00171                         //unregister Events
00172                         unbindEvent(dtn::core::TimeEvent::className);
00173                         unbindEvent(dtn::core::GlobalEvent::className);
00174 
00175                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00176 
00177                         // close the database
00178                         _database.close();
00179                 };
00180 
00181                 void SQLiteBundleStorage::__cancellation()
00182                 {
00183                         _tasks.abort();
00184                 }
00185 
00186                 const std::set<dtn::data::EID> SQLiteBundleStorage::getDistinctDestinations()
00187                 {
00188                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY);
00189                         return _database.getDistinctDestinations();
00190                 }
00191 
00192                 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb)
00193                 {
00194                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY);
00195                         return _database.get(cb);
00196                 }
00197 
00198                 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id)
00199                 {
00200                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY);
00201 
00202                         SQLiteDatabase::blocklist blocks;
00203                         dtn::data::Bundle bundle;
00204 
00205                         // query the data base for the bundle
00206                         _database.get(id, bundle, blocks);
00207 
00208                         for (SQLiteDatabase::blocklist::const_iterator iter = blocks.begin(); iter != blocks.end(); iter++)
00209                         {
00210                                 const SQLiteDatabase::blocklist_entry &entry = (*iter);
00211                                 const int blocktyp = entry.first;
00212                                 const ibrcommon::File &file = entry.second;
00213 
00214                                 IBRCOMMON_LOGGER_DEBUG(50) << "add block: " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00215 
00216                                 // load block from file
00217                                 std::ifstream is(file.getPath().c_str(), std::ios::binary | std::ios::in);
00218 
00219                                 if ((blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE) &&
00220                                                 (!bundle.get(dtn::data::PrimaryBlock::APPDATA_IS_ADMRECORD)))
00221                                 {
00222                                         // create a new BLOB object
00223                                         SQLiteBLOB *blob = new SQLiteBLOB(_blobPath);
00224 
00225                                         // remove the corresponding file
00226                                         blob->_file.remove();
00227 
00228                                         // generate a hardlink, pointing to the BLOB file
00229                                         if ( ::link(file.getPath().c_str(), blob->_file.getPath().c_str()) == 0)
00230                                         {
00231                                                 // create a reference of the BLOB
00232                                                 ibrcommon::BLOB::Reference ref(blob);
00233 
00234                                                 // add payload block to the bundle
00235                                                 bundle.push_back(ref);
00236                                         }
00237                                         else
00238                                         {
00239                                                 delete blob;
00240                                                 IBRCOMMON_LOGGER(error) << "unable to load bundle: failed to create a hard-link" << IBRCOMMON_LOGGER_ENDL;
00241                                         }
00242                                 }
00243                                 else
00244                                 {
00245                                         // read the block
00246                                         dtn::data::Block &block = dtn::data::SeparateDeserializer(is, bundle).readBlock();
00247 
00248                                         // close the file
00249                                         is.close();
00250 
00251                                         // modify the age block if present
00252                                         try {
00253                                                 dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block);
00254 
00255                                                 // modify the AgeBlock with the age of the file
00256                                                 time_t age = file.lastaccess() - file.lastmodify();
00257 
00258                                                 agebl.addSeconds(age);
00259                                         } catch (const std::bad_cast&) { };
00260                                 }
00261                         }
00262 
00263                         return bundle;
00264                 }
00265 
00266                 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle)
00267                 {
00268                         IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00269 
00270                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00271 
00272                         // start transaction to store the bundle
00273                         _database.transaction();
00274 
00275                         try {
00276                                 // store the bundle data in the database
00277                                 _database.store(bundle);
00278 
00279                                 // create a bundle id
00280                                 const dtn::data::BundleID id(bundle);
00281 
00282                                 // get all blocks of the bundle
00283                                 const list<const dtn::data::Block*> blocklist = bundle.getBlocks();
00284 
00285                                 // index number for order of the blocks
00286                                 int index = 1;
00287 
00288                                 // number of bytes stored
00289                                 int storedBytes = 0;
00290 
00291                                 for(std::list<const dtn::data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++)
00292                                 {
00293                                         const dtn::data::Block &block = (**it);
00294 
00295                                         // create a temporary file
00296                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
00297 
00298                                         try {
00299                                                 try {
00300                                                         const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block);
00301                                                         ibrcommon::BLOB::Reference ref = payload.getBLOB();
00302                                                         ibrcommon::BLOB::iostream stream = ref.iostream();
00303 
00304                                                         const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref);
00305 
00306                                                         // first remove the tmp file
00307                                                         tmpfile.remove();
00308 
00309                                                         // make a hardlink to the origin blob file
00310                                                         if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 )
00311                                                         {
00312                                                                 tmpfile = ibrcommon::TemporaryFile(_blockPath, "block");
00313                                                                 throw ibrcommon::Exception("hard-link failed");
00314                                                         }
00315                                                 } catch (const std::bad_cast&) {
00316                                                         throw ibrcommon::Exception("not a Payload or SQLiteBLOB");
00317                                                 }
00318 
00319                                                 storedBytes += _blockPath.size();
00320                                         } catch (const ibrcommon::Exception&) {
00321                                                 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary);
00322                                                 dtn::data::SeparateSerializer serializer(filestream);
00323                                                 serializer << block;
00324                                                 storedBytes += serializer.getLength(block);
00325                                                 filestream.close();
00326                                         }
00327 
00328                                         _database.store(id, index, block, tmpfile);
00329 
00330                                         // increment index
00331                                         index++;
00332                                 }
00333 
00334                                 _database.commit();
00335 
00336                                 try {
00337                                         // the bundle is stored sucessfully, we could accept custody if it is requested
00338                                         const dtn::data::EID custodian = acceptCustody(bundle);
00339 
00340                                         // update the custody address of this bundle
00341                                         _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, bundle, custodian);
00342                                 } catch (const ibrcommon::Exception&) {
00343                                         // this bundle has no request for custody transfers
00344                                 }
00345 
00346                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
00347                         } catch (const ibrcommon::Exception&) {
00348                                 _database.rollback();
00349                         }
00350                 }
00351 
00352                 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id)
00353                 {
00354                         _tasks.push(new TaskRemove(id));
00355                 }
00356 
00357                 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage)
00358                 {
00359                         ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00360                         storage._database.remove(_id);
00361                 }
00362 
00363                 void SQLiteBundleStorage::clearAll()
00364                 {
00365                         clear();
00366                 }
00367 
00368                 void SQLiteBundleStorage::clear()
00369                 {
00370                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00371 
00372                         _database.clear();
00373 
00374                         //Delete Folder SQL_TABLE_BLOCK containing Blocks
00375                         _blockPath.remove(true);
00376                         ibrcommon::File::createDirectory(_blockPath);
00377                 }
00378 
00379                 bool SQLiteBundleStorage::empty()
00380                 {
00381                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY);
00382                         return _database.empty();
00383                 }
00384 
00385                 unsigned int SQLiteBundleStorage::count()
00386                 {
00387                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY);
00388                         return _database.count();
00389                 }
00390 
00391                 void SQLiteBundleStorage::raiseEvent(const dtn::core::Event *evt)
00392                 {
00393                         try {
00394                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00395 
00396                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00397                                 {
00398                                         _tasks.push(new TaskExpire(time.getTimestamp()));
00399                                 }
00400                         } catch (const std::bad_cast&) { }
00401                         
00402                         try {
00403                                 const dtn::core::GlobalEvent &global = dynamic_cast<const dtn::core::GlobalEvent&>(*evt);
00404 
00405                                 if(global.getAction() == dtn::core::GlobalEvent::GLOBAL_IDLE)
00406                                 {
00407                                         // switch to idle mode
00408                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
00409                                         TaskIdle::_idle = true;
00410 
00411                                         // generate an idle task
00412                                         _tasks.push(new TaskIdle());
00413                                 }
00414                                 else if(global.getAction() == dtn::core::GlobalEvent::GLOBAL_BUSY)
00415                                 {
00416                                         // switch back to non-idle mode
00417                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
00418                                         TaskIdle::_idle = false;
00419                                 }
00420                         } catch (const std::bad_cast&) { }
00421                 }
00422 
00423                 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
00424                 {
00425                         ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00426                         storage._database.expire(_timestamp);
00427                 }
00428 
00429                 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
00430                 {
00431                         // until IDLE is false
00432                         while (true)
00433                         {
00434                                 /*
00435                                  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
00436                                  * This empty space will be reused the next time new information is added to the database. But in the meantime,
00437                                  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
00438                                  * cause the information in the database to become fragmented - scrattered out all across the database file rather
00439                                  * than clustered together in one place.
00440                                  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
00441                                  * and otherwise cleans up the database file structure.
00442                                  */
00443                                 {
00444                                         ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
00445                                         storage._database.vacuum();
00446                                 }
00447 
00448                                 // here we can do some IDLE stuff...
00449                                 ::sleep(1);
00450 
00451                                 ibrcommon::MutexLock l(TaskIdle::_mutex);
00452                                 if (!TaskIdle::_idle) return;
00453                         }
00454                 }
00455 
00456                 const std::string SQLiteBundleStorage::getName() const
00457                 {
00458                         return "SQLiteBundleStorage";
00459                 }
00460 
00461                 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
00462                 {
00463                         ibrcommon::RWLock l(_global_lock, ibrcommon::RWMutex::LOCK_READONLY);
00464 
00465                         // custody is successful transferred to another node.
00466                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
00467                         // update the custodian of this bundle with the new one
00468                         _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, id, custodian);
00469                 }
00470         }
00471 }