IBR-DTNSuite
0.8
|
00001 /* 00002 * DataStorage.cpp 00003 * 00004 * Created on: 22.11.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "storage/DataStorage.h" 00009 #include <typeinfo> 00010 #include <sstream> 00011 #include <iomanip> 00012 #include <list> 00013 00014 #include <string.h> 00015 #include <stdlib.h> 00016 #include <iostream> 00017 #include <fstream> 00018 #include <cstring> 00019 #include <cerrno> 00020 00021 namespace dtn 00022 { 00023 namespace storage 00024 { 00025 DataStorage::Hash::Hash() 00026 : value("this-hash-value-is-empty") 00027 {} 00028 00029 DataStorage::Hash::Hash(const std::string &key) 00030 : value(DataStorage::Hash::hash(key)) 00031 { } 00032 00033 DataStorage::Hash::Hash(const DataStorage::Container &container) 00034 : value(DataStorage::Hash::hash(container.getKey())) 00035 { }; 00036 00037 DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {}; 00038 DataStorage::Hash::~Hash() {}; 00039 00040 bool DataStorage::Hash::operator==(const DataStorage::Hash &other) const 00041 { 00042 return (value == other.value); 00043 } 00044 00045 bool DataStorage::Hash::operator<(const DataStorage::Hash &other) const 00046 { 00047 return (value < other.value); 00048 } 00049 00050 std::string DataStorage::Hash::hash(const std::string &value) 00051 { 00052 std::stringstream ss; 00053 for (std::string::const_iterator iter = value.begin(); iter != value.end(); iter++) 00054 { 00055 ss << std::hex << std::setw( 2 ) << std::setfill( '0' ) << (int)(*iter); 00056 } 00057 return ss.str(); 00058 } 00059 00060 DataStorage::istream::istream(ibrcommon::Mutex &mutex, const ibrcommon::File &file) 00061 : ibrcommon::File(file), _stream(NULL), _lock(mutex) 00062 { 00063 _lock.enter(); 00064 _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary); 00065 }; 00066 00067 DataStorage::istream::~istream() 00068 { 00069 if (_stream != NULL) 00070 { 00071 delete _stream; 00072 _lock.leave(); 00073 } 00074 }; 00075 00076 std::istream& DataStorage::istream::operator*() 00077 { return *_stream; } 00078 00079 DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, size_t write_buffer, bool initialize) 00080 : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0) 00081 // limit the number of bundles in the write buffer 00082 { 00083 // initialize the storage 00084 if (initialize) 00085 { 00086 if (_path.exists()) 00087 { 00088 // remove all files in the path 00089 std::list<ibrcommon::File> files; 00090 _path.getFiles(files); 00091 00092 for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++) 00093 { 00094 (*iter).remove(true); 00095 } 00096 } 00097 else 00098 { 00099 // create the path 00100 ibrcommon::File::createDirectory(_path); 00101 } 00102 } 00103 } 00104 00105 DataStorage::~DataStorage() 00106 { 00107 _tasks.abort(); 00108 join(); 00109 00110 // delete all task objects 00111 try { 00112 while (true) 00113 { 00114 Task *t = _tasks.getnpop(false); 00115 delete t; 00116 } 00117 } catch (const ibrcommon::QueueUnblockedException&) { 00118 // exit 00119 } 00120 } 00121 00122 void DataStorage::iterateAll() 00123 { 00124 std::list<ibrcommon::File> files; 00125 _path.getFiles(files); 00126 00127 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++) 00128 { 00129 if (!(*iter).isSystem()) 00130 { 00131 DataStorage::Hash hash(*iter); 00132 DataStorage::istream stream(_global_mutex, *iter); 00133 00134 _callback.iterateDataStorage(hash, stream); 00135 } 00136 } 00137 } 00138 00139 void DataStorage::store(const DataStorage::Hash &hash, DataStorage::Container *data) 00140 { 00141 // wait for resources 00142 if (_store_limited) _store_sem.wait(); 00143 00144 // put the task into the queue 00145 _tasks.push( new StoreDataTask(hash, data) ); 00146 } 00147 00148 const DataStorage::Hash DataStorage::store(DataStorage::Container *data) 00149 { 00150 // create a corresponding hash 00151 DataStorage::Hash hash(*data); 00152 store(hash, data); 00153 return hash; 00154 } 00155 00156 DataStorage::istream DataStorage::retrieve(const DataStorage::Hash &hash) throw (DataNotAvailableException) 00157 { 00158 ibrcommon::File file = _path.get(hash.value); 00159 00160 if (!file.exists()) 00161 { 00162 throw DataNotAvailableException(); 00163 } 00164 00165 return DataStorage::istream(_global_mutex, file); 00166 } 00167 00168 void DataStorage::remove(const DataStorage::Hash &hash) 00169 { 00170 _tasks.push( new RemoveDataTask(hash) ); 00171 } 00172 00173 void DataStorage::__cancellation() 00174 { 00175 _tasks.abort(); 00176 } 00177 00178 void DataStorage::run() 00179 { 00180 try { 00181 while (true) 00182 { 00183 Task *t = _tasks.getnpop(true); 00184 00185 try { 00186 StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t); 00187 00188 try { 00189 ibrcommon::File destination = _path.get(store.hash.value); 00190 00191 { 00192 ibrcommon::MutexLock l(_global_mutex); 00193 std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc); 00194 00195 // check the streams health 00196 if (!stream.good()) 00197 { 00198 std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]"; 00199 throw ibrcommon::IOException(ss.str()); 00200 } 00201 00202 store.container->serialize(stream); 00203 stream.close(); 00204 } 00205 00206 // release resources 00207 if (_store_limited) _store_sem.post(); 00208 00209 // notify the stored item 00210 _callback.eventDataStorageStored(store.hash); 00211 } catch (const ibrcommon::Exception &ex) { 00212 // release resources 00213 if (_store_limited) _store_sem.post(); 00214 00215 // notify the fail of store action 00216 _callback.eventDataStorageStoreFailed(store.hash, ex); 00217 } 00218 } catch (const std::bad_cast&) { 00219 } 00220 00221 try { 00222 RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t); 00223 00224 try { 00225 ibrcommon::File destination = _path.get(remove.hash.value); 00226 { 00227 ibrcommon::MutexLock l(_global_mutex); 00228 if (!destination.exists()) 00229 { 00230 throw DataNotAvailableException(); 00231 } 00232 destination.remove(); 00233 } 00234 _callback.eventDataStorageRemoved(remove.hash); 00235 } catch (const ibrcommon::Exception &ex) { 00236 _callback.eventDataStorageRemoveFailed(remove.hash, ex); 00237 } 00238 } catch (const std::bad_cast&) { 00239 00240 } 00241 00242 delete t; 00243 } 00244 } catch (const ibrcommon::QueueUnblockedException&) { 00245 // exit 00246 } 00247 } 00248 00249 DataStorage::Container::~Container() {}; 00250 DataStorage::Task::~Task() {}; 00251 00252 DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c) 00253 : hash(h), container(c) 00254 {} 00255 00256 DataStorage::StoreDataTask::~StoreDataTask() 00257 { 00258 delete container; 00259 } 00260 00261 DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h) 00262 {} 00263 00264 DataStorage::RemoveDataTask::~RemoveDataTask() 00265 { 00266 } 00267 } 00268 }