IBR-DTNSuite  0.10
DataStorage.cpp
Go to the documentation of this file.
1 /*
2  * DataStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "storage/DataStorage.h"
23 #include <typeinfo>
24 #include <sstream>
25 #include <iomanip>
26 #include <list>
27 
28 #include <string.h>
29 #include <stdlib.h>
30 #include <iostream>
31 #include <fstream>
32 #include <cstring>
33 #include <cerrno>
34 
35 namespace dtn
36 {
37  namespace storage
38  {
40  : value("this-hash-value-is-empty")
41  {}
42 
43  DataStorage::Hash::Hash(const std::string &key)
44  : value(DataStorage::Hash::hash(key))
45  { }
46 
48  : value(DataStorage::Hash::hash(container.getKey()))
49  { }
50 
51  DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {}
53 
55  {
56  return (value == other.value);
57  }
58 
60  {
61  return (value < other.value);
62  }
63 
64  std::string DataStorage::Hash::hash(const std::string &value)
65  {
66  std::stringstream ss;
67  for (std::string::const_iterator iter = value.begin(); iter != value.end(); ++iter)
68  {
69  ss << std::hex << std::setw( 2 ) << std::setfill( '0' ) << (int)(*iter);
70  }
71  return ss.str();
72  }
73 
75  : ibrcommon::File(file), _stream(NULL), _lock(mutex)
76  {
77  _lock.enter();
78  _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary);
79  }
80 
82  {
83  if (_stream != NULL)
84  {
85  delete _stream;
86  _lock.leave();
87  }
88  }
89 
91  { return *_stream; }
92 
93  DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, unsigned int write_buffer, bool initialize)
94  : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0), _faulty(false)
95  // limit the number of bundles in the write buffer
96  {
97  // initialize the storage
98  if (initialize)
99  {
100  if (_path.exists())
101  {
102  // remove all files in the path
103  std::list<ibrcommon::File> files;
104  _path.getFiles(files);
105 
106  for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); ++iter)
107  {
108  (*iter).remove(true);
109  }
110  }
111  else
112  {
113  // create the path
115  }
116  }
117  }
118 
120  {
121  _tasks.abort();
122  join();
123 
124  // delete all task objects
125  try {
126  while (true)
127  {
128  Task *t = _tasks.getnpop(false);
129  delete t;
130  }
131  } catch (const ibrcommon::QueueUnblockedException&) {
132  // exit
133  }
134  }
135 
137  {
138  JoinableThread::reset();
139  }
140 
141  void DataStorage::setFaulty(bool mode)
142  {
143  _faulty = mode;
144  }
145 
147  {
148  std::list<ibrcommon::File> files;
149  _path.getFiles(files);
150 
151  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
152  {
153  if (!(*iter).isSystem())
154  {
155  DataStorage::Hash hash(*iter);
156  DataStorage::istream stream(_global_mutex, *iter);
157 
158  _callback.iterateDataStorage(hash, stream);
159  }
160  }
161  }
162 
164  {
165  // wait for resources
166  if (_store_limited) _store_sem.wait();
167 
168  // put the task into the queue
169  _tasks.push( new StoreDataTask(hash, data) );
170  }
171 
173  {
174  // create a corresponding hash
175  DataStorage::Hash hash(*data);
176  store(hash, data);
177  return hash;
178  }
179 
181  {
182  ibrcommon::File file = _path.get(hash.value);
183 
184  if (!file.exists())
185  {
186  throw DataNotAvailableException("file " + file.getPath() + " not found");
187  }
188 
189  return DataStorage::istream(_global_mutex, file);
190  }
191 
193  {
194  _tasks.push( new RemoveDataTask(hash) );
195  }
196 
198  {
200  }
201 
203  {
204  _tasks.abort();
205  }
206 
207  void DataStorage::run() throw ()
208  {
209  try {
210  while (true)
211  {
212  Task *t = _tasks.get(true);
213 
214  try {
215  StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t);
216 
217  try {
218  ibrcommon::File destination = _path.get(store.hash.value);
219 
220  {
221  ibrcommon::MutexLock l(_global_mutex);
222  std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc);
223 
224  // check the streams health
225  if (!stream.good() || _faulty)
226  {
227  std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]";
228  throw ibrcommon::IOException(ss.str());
229  }
230 
231  store._container->serialize(stream);
232  stream.close();
233  }
234 
235  // release resources
236  if (_store_limited) _store_sem.post();
237 
238  // notify the stored item
239  _callback.eventDataStorageStored(store.hash);
240  } catch (const ibrcommon::Exception &ex) {
241  // release resources
242  if (_store_limited) _store_sem.post();
243 
244  // notify the fail of store action
245  _callback.eventDataStorageStoreFailed(store.hash, ex);
246  }
247  } catch (const std::bad_cast&) {
248  }
249 
250  try {
251  RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t);
252 
253  try {
254  ibrcommon::File destination = _path.get(remove.hash.value);
255  {
256  ibrcommon::MutexLock l(_global_mutex);
257  if (!destination.exists())
258  {
260  }
261  destination.remove();
262  }
263  _callback.eventDataStorageRemoved(remove.hash);
264  } catch (const ibrcommon::Exception &ex) {
265  _callback.eventDataStorageRemoveFailed(remove.hash, ex);
266  }
267  } catch (const std::bad_cast&) {
268 
269  }
270 
271  delete t;
272  _tasks.pop();
273  }
274  } catch (const ibrcommon::QueueUnblockedException&) {
275  // exit
276  }
277  }
278 
280  DataStorage::Task::~Task() {}
281 
282  DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c)
283  : hash(h), _container(c)
284  {}
285 
286  DataStorage::StoreDataTask::~StoreDataTask()
287  {
288  }
289 
290  DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h)
291  {}
292 
293  DataStorage::RemoveDataTask::~RemoveDataTask()
294  {
295  }
296  }
297 }