IBR-DTNSuite  0.8
daemon/src/storage/DataStorage.cpp
Go to the documentation of this file.
00001 /*
00002  * DataStorage.cpp
00003  *
00004  *  Created on: 22.11.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "storage/DataStorage.h"
00009 #include <typeinfo>
00010 #include <sstream>
00011 #include <iomanip>
00012 #include <list>
00013 
00014 #include <string.h>
00015 #include <stdlib.h>
00016 #include <iostream>
00017 #include <fstream>
00018 #include <cstring>
00019 #include <cerrno>
00020 
00021 namespace dtn
00022 {
00023         namespace storage
00024         {
00025                 DataStorage::Hash::Hash()
00026                  : value("this-hash-value-is-empty")
00027                 {}
00028 
00029                 DataStorage::Hash::Hash(const std::string &key)
00030                  : value(DataStorage::Hash::hash(key))
00031                 { }
00032 
00033                 DataStorage::Hash::Hash(const DataStorage::Container &container)
00034                  : value(DataStorage::Hash::hash(container.getKey()))
00035                 { };
00036 
00037                 DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {};
00038                 DataStorage::Hash::~Hash() {};
00039 
00040                 bool DataStorage::Hash::operator==(const DataStorage::Hash &other) const
00041                 {
00042                         return (value == other.value);
00043                 }
00044 
00045                 bool DataStorage::Hash::operator<(const DataStorage::Hash &other) const
00046                 {
00047                         return (value < other.value);
00048                 }
00049 
00050                 std::string DataStorage::Hash::hash(const std::string &value)
00051                 {
00052                         std::stringstream ss;
00053                         for (std::string::const_iterator iter = value.begin(); iter != value.end(); iter++)
00054                         {
00055                                 ss << std::hex << std::setw( 2 ) << std::setfill( '0' ) << (int)(*iter);
00056                         }
00057                         return ss.str();
00058                 }
00059 
00060                 DataStorage::istream::istream(ibrcommon::Mutex &mutex, const ibrcommon::File &file)
00061                  : ibrcommon::File(file), _stream(NULL), _lock(mutex)
00062                 {
00063                         _lock.enter();
00064                         _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary);
00065                 };
00066 
00067                 DataStorage::istream::~istream()
00068                 {
00069                         if (_stream != NULL)
00070                         {
00071                                 delete _stream;
00072                                 _lock.leave();
00073                         }
00074                 };
00075 
00076                 std::istream& DataStorage::istream::operator*()
00077                 { return *_stream; }
00078 
00079                 DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, size_t write_buffer, bool initialize)
00080                  : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0)
00081                 // limit the number of bundles in the write buffer
00082                 {
00083                         // initialize the storage
00084                         if (initialize)
00085                         {
00086                                 if (_path.exists())
00087                                 {
00088                                         // remove all files in the path
00089                                         std::list<ibrcommon::File> files;
00090                                         _path.getFiles(files);
00091 
00092                                         for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++)
00093                                         {
00094                                                 (*iter).remove(true);
00095                                         }
00096                                 }
00097                                 else
00098                                 {
00099                                         // create the path
00100                                         ibrcommon::File::createDirectory(_path);
00101                                 }
00102                         }
00103                 }
00104 
00105                 DataStorage::~DataStorage()
00106                 {
00107                         _tasks.abort();
00108                         join();
00109 
00110                         // delete all task objects
00111                         try {
00112                                 while (true)
00113                                 {
00114                                         Task *t = _tasks.getnpop(false);
00115                                         delete t;
00116                                 }
00117                         } catch (const ibrcommon::QueueUnblockedException&) {
00118                                 // exit
00119                         }
00120                 }
00121 
00122                 void DataStorage::iterateAll()
00123                 {
00124                         std::list<ibrcommon::File> files;
00125                         _path.getFiles(files);
00126 
00127                         for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
00128                         {
00129                                 if (!(*iter).isSystem())
00130                                 {
00131                                         DataStorage::Hash hash(*iter);
00132                                         DataStorage::istream stream(_global_mutex, *iter);
00133 
00134                                         _callback.iterateDataStorage(hash, stream);
00135                                 }
00136                         }
00137                 }
00138 
00139                 void DataStorage::store(const DataStorage::Hash &hash, DataStorage::Container *data)
00140                 {
00141                         // wait for resources
00142                         if (_store_limited) _store_sem.wait();
00143 
00144                         // put the task into the queue
00145                         _tasks.push( new StoreDataTask(hash, data) );
00146                 }
00147 
00148                 const DataStorage::Hash DataStorage::store(DataStorage::Container *data)
00149                 {
00150                         // create a corresponding hash
00151                         DataStorage::Hash hash(*data);
00152                         store(hash, data);
00153                         return hash;
00154                 }
00155 
00156                 DataStorage::istream DataStorage::retrieve(const DataStorage::Hash &hash) throw (DataNotAvailableException)
00157                 {
00158                         ibrcommon::File file = _path.get(hash.value);
00159 
00160                         if (!file.exists())
00161                         {
00162                                 throw DataNotAvailableException();
00163                         }
00164 
00165                         return DataStorage::istream(_global_mutex, file);
00166                 }
00167 
00168                 void DataStorage::remove(const DataStorage::Hash &hash)
00169                 {
00170                         _tasks.push( new RemoveDataTask(hash) );
00171                 }
00172 
00173                 void DataStorage::__cancellation()
00174                 {
00175                         _tasks.abort();
00176                 }
00177 
00178                 void DataStorage::run()
00179                 {
00180                         try {
00181                                 while (true)
00182                                 {
00183                                         Task *t = _tasks.getnpop(true);
00184 
00185                                         try {
00186                                                 StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t);
00187 
00188                                                 try {
00189                                                         ibrcommon::File destination = _path.get(store.hash.value);
00190 
00191                                                         {
00192                                                                 ibrcommon::MutexLock l(_global_mutex);
00193                                                                 std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc);
00194 
00195                                                                 // check the streams health
00196                                                                 if (!stream.good())
00197                                                                 {
00198                                                                         std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]";
00199                                                                         throw ibrcommon::IOException(ss.str());
00200                                                                 }
00201 
00202                                                                 store.container->serialize(stream);
00203                                                                 stream.close();
00204                                                         }
00205 
00206                                                         // release resources
00207                                                         if (_store_limited) _store_sem.post();
00208 
00209                                                         // notify the stored item
00210                                                         _callback.eventDataStorageStored(store.hash);
00211                                                 } catch (const ibrcommon::Exception &ex) {
00212                                                         // release resources
00213                                                         if (_store_limited) _store_sem.post();
00214 
00215                                                         // notify the fail of store action
00216                                                         _callback.eventDataStorageStoreFailed(store.hash, ex);
00217                                                 }
00218                                         } catch (const std::bad_cast&) {
00219                                         }
00220 
00221                                         try {
00222                                                 RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t);
00223 
00224                                                 try {
00225                                                         ibrcommon::File destination = _path.get(remove.hash.value);
00226                                                         {
00227                                                                 ibrcommon::MutexLock l(_global_mutex);
00228                                                                 if (!destination.exists())
00229                                                                 {
00230                                                                         throw DataNotAvailableException();
00231                                                                 }
00232                                                                 destination.remove();
00233                                                         }
00234                                                         _callback.eventDataStorageRemoved(remove.hash);
00235                                                 } catch (const ibrcommon::Exception &ex) {
00236                                                         _callback.eventDataStorageRemoveFailed(remove.hash, ex);
00237                                                 }
00238                                         } catch (const std::bad_cast&) {
00239 
00240                                         }
00241 
00242                                         delete t;
00243                                 }
00244                         } catch (const ibrcommon::QueueUnblockedException&) {
00245                                 // exit
00246                         }
00247                 }
00248 
00249                 DataStorage::Container::~Container() {};
00250                 DataStorage::Task::~Task() {};
00251 
00252                 DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c)
00253                  : hash(h), container(c)
00254                 {}
00255 
00256                 DataStorage::StoreDataTask::~StoreDataTask()
00257                 {
00258                         delete container;
00259                 }
00260 
00261                 DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h)
00262                 {}
00263 
00264                 DataStorage::RemoveDataTask::~RemoveDataTask()
00265                 {
00266                 }
00267         }
00268 }