IBR-DTNSuite  0.12
DataStorage.cpp
Go to the documentation of this file.
1 /*
2  * DataStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "storage/DataStorage.h"
23 #include <typeinfo>
24 #include <sstream>
25 #include <iomanip>
26 #include <list>
27 
28 #include <string.h>
29 #include <stdlib.h>
30 #include <iostream>
31 #include <fstream>
32 #include <cstring>
33 #include <cerrno>
34 
35 namespace dtn
36 {
37  namespace storage
38  {
40  : value("this-hash-value-is-empty")
41  {}
42 
43  DataStorage::Hash::Hash(const std::string &v)
44  : value(v)
45  {
46  }
47 
49  : value(container.getId())
50  { }
51 
52  DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {}
54 
56  {
57  return (value != other.value);
58  }
59 
61  {
62  return (value == other.value);
63  }
64 
66  {
67  return (value < other.value);
68  }
69 
71  : ibrcommon::File(file), _stream(NULL), _lock(mutex)
72  {
73  _lock.enter();
74  _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary);
75  }
76 
78  {
79  if (_stream != NULL)
80  {
81  delete _stream;
82  _lock.leave();
83  }
84  }
85 
87  { return *_stream; }
88 
89  DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, unsigned int write_buffer, bool initialize)
90  : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0), _faulty(false)
91  // limit the number of bundles in the write buffer
92  {
93  // initialize the storage
94  if (initialize)
95  {
96  if (_path.exists())
97  {
98  // remove all files in the path
99  std::list<ibrcommon::File> files;
100  _path.getFiles(files);
101 
102  for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); ++iter)
103  {
104  (*iter).remove(true);
105  }
106  }
107  else
108  {
109  // create the path
111  }
112  }
113  }
114 
116  {
117  _tasks.abort();
118  join();
119 
120  // delete all task objects
121  try {
122  while (true)
123  {
124  Task *t = _tasks.getnpop(false);
125  delete t;
126  }
127  } catch (const ibrcommon::QueueUnblockedException&) {
128  // exit
129  }
130  }
131 
133  {
134  JoinableThread::reset();
135  }
136 
137  void DataStorage::setFaulty(bool mode)
138  {
139  _faulty = mode;
140  }
141 
143  {
144  std::list<ibrcommon::File> files;
145  _path.getFiles(files);
146 
147  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
148  {
149  if (!(*iter).isSystem() && !(*iter).isDirectory())
150  {
151  DataStorage::Hash hash(*iter);
152  DataStorage::istream stream(_global_mutex, *iter);
153 
154  _callback.iterateDataStorage(hash, stream);
155  }
156  }
157  }
158 
160  {
161  // wait for resources
162  if (_store_limited) _store_sem.wait();
163 
164  // put the task into the queue
165  _tasks.push( new StoreDataTask(hash, data) );
166  }
167 
169  {
170  // create a corresponding hash
171  DataStorage::Hash hash(*data);
172  store(hash, data);
173  return hash;
174  }
175 
177  {
178  ibrcommon::File file = _path.get(hash.value);
179 
180  if (!file.exists())
181  {
182  throw DataNotAvailableException("file " + file.getPath() + " not found");
183  }
184 
185  return DataStorage::istream(_global_mutex, file);
186  }
187 
189  {
190  _tasks.push( new RemoveDataTask(hash) );
191  }
192 
194  {
196  }
197 
199  {
200  _tasks.abort();
201  }
202 
203  void DataStorage::run() throw ()
204  {
205  try {
206  while (true)
207  {
208  Task *t = _tasks.get(true);
209 
210  try {
211  StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t);
212 
213  try {
214  ibrcommon::File destination = _path.get(store.hash.value);
215 
216  {
217  ibrcommon::MutexLock l(_global_mutex);
218  std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc);
219 
220  // check the streams health
221  if (!stream.good() || _faulty)
222  {
223  std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]";
224  throw ibrcommon::IOException(ss.str());
225  }
226 
227  store._container->serialize(stream);
228  stream.close();
229  }
230 
231  // release resources
232  if (_store_limited) _store_sem.post();
233 
234  // notify the stored item
235  _callback.eventDataStorageStored(store.hash);
236  } catch (const ibrcommon::Exception &ex) {
237  // release resources
238  if (_store_limited) _store_sem.post();
239 
240  // notify the fail of store action
241  _callback.eventDataStorageStoreFailed(store.hash, ex);
242  }
243  } catch (const std::bad_cast&) {
244  }
245 
246  try {
247  RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t);
248 
249  try {
250  ibrcommon::File destination = _path.get(remove.hash.value);
251  {
252  ibrcommon::MutexLock l(_global_mutex);
253  if (!destination.exists())
254  {
256  }
257  destination.remove();
258  }
259  _callback.eventDataStorageRemoved(remove.hash);
260  } catch (const ibrcommon::Exception &ex) {
261  _callback.eventDataStorageRemoveFailed(remove.hash, ex);
262  }
263  } catch (const std::bad_cast&) {
264 
265  }
266 
267  delete t;
268  _tasks.pop();
269  }
270  } catch (const ibrcommon::QueueUnblockedException&) {
271  // exit
272  }
273  }
274 
276  DataStorage::Task::~Task() {}
277 
278  DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c)
279  : hash(h), _container(c)
280  {}
281 
282  DataStorage::StoreDataTask::~StoreDataTask()
283  {
284  }
285 
286  DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h)
287  {}
288 
289  DataStorage::RemoveDataTask::~RemoveDataTask()
290  {
291  }
292  }
293 }