IBR-DTNSuite  0.10
SQLiteBundleStorage.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteBundleStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * Written-by: Matthias Myrtus
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 
25 #include "core/EventDispatcher.h"
26 #include "core/TimeEvent.h"
27 #include "core/GlobalEvent.h"
28 #include "core/BundleCore.h"
29 #include "core/BundleEvent.h"
30 
33 #include <ibrdtn/data/AgeBlock.h>
34 #include <ibrdtn/data/Serializer.h>
35 #include <ibrdtn/data/Bundle.h>
36 #include <ibrdtn/data/BundleID.h>
37 
40 #include <ibrcommon/data/BLOB.h>
41 #include <ibrcommon/Logger.h>
42 #include <memory>
43 #include <unistd.h>
44 
45 namespace dtn
46 {
47  namespace storage
48  {
49  const std::string SQLiteBundleStorage::TAG = "SQLiteBundleStorage";
50 
51  ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
52  bool SQLiteBundleStorage::TaskIdle::_idle = false;
53 
54  SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path)
55  : _file(path, "blob")
56  {
57  }
58 
59  SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB()
60  {
61  // delete the file if the last reference is destroyed
62  _file.remove();
63  }
64 
65  void SQLiteBundleStorage::SQLiteBLOB::clear()
66  {
67  // close the file
68  _filestream.close();
69 
70  // open temporary file
71  _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary );
72 
73  if (!_filestream.is_open())
74  {
75  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
77  }
78  }
79 
80  void SQLiteBundleStorage::SQLiteBLOB::open()
81  {
83 
84  // open temporary file
85  _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary );
86 
87  if (!_filestream.is_open())
88  {
89  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
91  }
92  }
93 
94  void SQLiteBundleStorage::SQLiteBLOB::close()
95  {
96  // flush the filestream
97  _filestream.flush();
98 
99  // close the file
100  _filestream.close();
101 
103  }
104 
105  std::streamsize SQLiteBundleStorage::SQLiteBLOB::__get_size()
106  {
107  return _file.size();
108  }
109 
111  {
112  return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath));
113  }
114 
116  : BundleStorage(maxsize), _database(path.get("sqlite.db"), *this)
117  {
118  // use sqlite storage as BLOB provider, auto delete off
119  ibrcommon::BLOB::changeProvider(this, false);
120 
121  // set the block path
122  _blockPath = path.get("blocks");
123  _blobPath = path.get("blob");
124  }
125 
127  {
128  }
129 
131  {
132  // loop until aborted
133  try {
134  while (true)
135  {
136  Task *t = _tasks.getnpop(true);
137 
138  try {
139  BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
140  try {
141  btask.run(*this);
142  } catch (const std::exception&) {
143  btask.abort();
144  continue;
145  };
146  btask.done();
147  continue;
148  } catch (const std::bad_cast&) { };
149 
150  try {
151  std::auto_ptr<Task> killer(t);
152  t->run(*this);
153  } catch (const std::exception&) { };
154  }
155  } catch (const ibrcommon::QueueUnblockedException &ex) {
156  // we are aborted, abort all blocking tasks
157  }
158  }
159 
161  {
162  // routine checked for throw() on 15.02.2013
163 
164  //register Events
167 
168  try {
170 
171  // delete all old BLOB container
172  _blobPath.remove(true);
173 
174  // create BLOB folder
176 
177  // create the bundle folder
178  ibrcommon::File::createDirectory( _blockPath );
179 
180  // open the database and create all folders and files if needed
181  _database.open();
182  } catch (const ibrcommon::Exception &ex) {
183  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
184  }
185 
186  try {
187  // iterate through all bundles to generate indexes
188  _database.iterateAll();
189  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
190  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
191  }
192  }
193 
195  {
196  // routine checked for throw() on 15.02.2013
197 
198  //unregister Events
201 
202  try {
204 
205  // close the database
206  _database.close();
207  } catch (const ibrcommon::Exception &ex) {
208  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
209  }
210 
211  stop();
212  join();
213  }
214 
216  {
217  _tasks.abort();
218  }
219 
221  {
222  try {
224  return _database.getDistinctDestinations();
225  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
226  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
227  }
229  }
230 
232  {
234  _database.get(cb, result);
235  }
236 
238  {
240  dtn::data::Bundle bundle;
241 
242  try {
244 
245  // query the data base for the bundle
246  _database.get(id, bundle, blocks);
247 
248  for (SQLiteDatabase::blocklist::const_iterator iter = blocks.begin(); iter != blocks.end(); ++iter)
249  {
250  const SQLiteDatabase::blocklist_entry &entry = (*iter);
251  const int blocktyp = entry.first;
252  const ibrcommon::File &file = entry.second;
253 
254  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 50) << "add block: " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
255 
256  // load block from file
257  std::ifstream is(file.getPath().c_str(), std::ios::binary | std::ios::in);
258 
259  if (blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE)
260  {
261  // create a new BLOB object
262  SQLiteBLOB *blob = new SQLiteBLOB(_blobPath);
263 
264  // create a reference of the BLOB
265  ibrcommon::BLOB::Reference ref(blob);
266 
267  try {
268  // remove the temporary file
269  blob->_file.remove();
270 
271  // generate a hard-link, pointing to the BLOB file
272  if ( ::link(file.getPath().c_str(), blob->_file.getPath().c_str()) != 0 )
273  {
274  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "hard-link failed (" << errno << ") " << blob->_file.getPath() << " -> " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
275 
276  // copy the BLOB into a new file if hard-links are not supported
277  std::ofstream fout(blob->_file.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
278 
279  // open the filestream
280  ibrcommon::BLOB::iostream stream = ref.iostream();
281 
282  const std::streamsize length = stream.size();
283  ibrcommon::BLOB::copy(fout, (*stream), length);
284  }
285 
286  // add payload block to the bundle
287  bundle.push_back(ref);
288  } catch (const ibrcommon::Exception &ex) {
289  // remove the temporary file
290  blob->_file.remove();
291 
292  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "unable to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
294  }
295  }
296  else
297  {
298  try {
299  // read the block
301 
302  // close the file
303  is.close();
304 
305  // modify the age block if present
306  try {
307  dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block);
308 
309  // modify the AgeBlock with the age of the file
310  time_t age = file.lastaccess() - file.lastmodify();
311 
312  agebl.addSeconds(age);
313  } catch (const std::bad_cast&) { };
315  // skip extensions block
316  }
317  }
318  }
319  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
320  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
321 
322  // try to purge all referenced data
323  purge(id);
324 
326  }
327 
328  return bundle;
329  }
330 
332  {
333  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
334 
336 
337  // start transaction to store the bundle
338  _database.transaction();
339 
340  try {
341  // store the bundle data in the database
342  _database.store(bundle);
343 
344  // create a bundle id
345  const dtn::data::BundleID id(bundle);
346 
347  // index number for order of the blocks
348  int index = 1;
349 
350  // number of bytes stored
351  dtn::data::Length storedBytes = 0;
352 
353  for(dtn::data::Bundle::const_iterator it = bundle.begin() ;it != bundle.end(); ++it)
354  {
355  const dtn::data::Block &block = (**it);
356 
358  {
359  // create a temporary file
360  ibrcommon::TemporaryFile tmpfile(_blockPath, "payload");
361 
362  try {
363  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block);
364  ibrcommon::BLOB::Reference ref = payload.getBLOB();
365  ibrcommon::BLOB::iostream stream = ref.iostream();
366 
367  try {
368  const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref);
369 
370  // first remove the tmp file
371  tmpfile.remove();
372 
373  // make a hard-link to the origin blob file
374  if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 )
375  {
376  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "hard-link failed (" << errno << ") " << tmpfile.getPath() << " -> " << blob._file.getPath() << IBRCOMMON_LOGGER_ENDL;
377 
378  // copy the BLOB into a new file if hard-links are not supported
379  std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
380 
381  const std::streamsize length = stream.size();
382  ibrcommon::BLOB::copy(fout, (*stream), length);
383  }
384  } catch (const std::bad_cast&) {
385  // copy the BLOB into a new file this isn't a sqlite block object
386  std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
387 
388  const std::streamsize length = stream.size();
389  ibrcommon::BLOB::copy(fout, (*stream), length);
390  }
391  } catch (const std::bad_cast&) {
392  // remove the tmp file
393  tmpfile.remove();
394  throw ibrcommon::Exception("not a payload block");
395  }
396 
397  // add determine the amount of stored bytes
398  storedBytes += tmpfile.size();
399 
400  // store the block into the database
401  _database.store(id, index, block, tmpfile);
402  }
403  else
404  {
405  ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
406 
407  std::ofstream filestream(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
408  dtn::data::SeparateSerializer serializer(filestream);
409  serializer << block;
410  filestream.close();
411 
412  // add determine the amount of stored bytes
413  storedBytes += tmpfile.size();
414 
415  // store the block into the database
416  _database.store(id, index, block, tmpfile);
417  }
418 
419  // increment index
420  index++;
421  }
422 
423  _database.commit();
424 
425  try {
426  // the bundle is stored sucessfully, we could accept custody if it is requested
427  const dtn::data::EID custodian = acceptCustody(bundle);
428 
429  // update the custody address of this bundle
430  _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, bundle, custodian);
431  } catch (const ibrcommon::Exception&) {
432  // this bundle has no request for custody transfers
433  }
434 
435  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
436 
437  // raise bundle added event
438  eventBundleAdded(bundle);
439  } catch (const ibrcommon::Exception &ex) {
440  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
441  _database.rollback();
442  }
443  }
444 
446  {
447  // remove the bundle in locked state
448  try {
450  _database.remove(id);
451 
452  // raise bundle removed event
453  eventBundleRemoved(id);
454  } catch (const ibrcommon::Exception &ex) {
455  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
456  }
457  }
458 
459  void SQLiteBundleStorage::purge(const dtn::data::BundleID &id) throw ()
460  {
461  try {
462  remove(id);
463  } catch (const ibrcommon::Exception &ex) {
464  // do nothing here.
465  }
466  }
467 
469  {
470  clear();
471  }
472 
474  {
476 
477  try {
478  _database.clear();
479  } catch (const ibrcommon::Exception &ex) {
480  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
481  }
482 
483  //Delete Folder SQL_TABLE_BLOCK containing Blocks
484  _blockPath.remove(true);
486  }
487 
489  {
490  try {
492  return _database.empty();
493  } catch (const ibrcommon::Exception &ex) {
494  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
495  }
496  return true;
497  }
498 
500  {
501  try {
503  return _database.count();
504  } catch (const ibrcommon::Exception &ex) {
505  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
506  }
507  return 0;
508  }
509 
511  {
512  try {
513  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
514 
516  {
517  _tasks.push(new TaskExpire(time.getTimestamp()));
518  }
519  } catch (const std::bad_cast&) { }
520 
521  try {
522  const dtn::core::GlobalEvent &global = dynamic_cast<const dtn::core::GlobalEvent&>(*evt);
523 
525  {
526  // switch to idle mode
527  ibrcommon::MutexLock l(TaskIdle::_mutex);
528  TaskIdle::_idle = true;
529 
530  // generate an idle task
531  _tasks.push(new TaskIdle());
532  }
534  {
535  // switch back to non-idle mode
536  ibrcommon::MutexLock l(TaskIdle::_mutex);
537  TaskIdle::_idle = false;
538  }
539  } catch (const std::bad_cast&) { }
540  }
541 
542  void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
543  {
544  try {
545  ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
546  storage._database.expire(_timestamp);
547  } catch (const ibrcommon::Exception &ex) {
548  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
549  }
550  }
551 
552  void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
553  {
554  // until IDLE is false
555  while (true)
556  {
557  /*
558  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
559  * This empty space will be reused the next time new information is added to the database. But in the meantime,
560  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
561  * cause the information in the database to become fragmented - scrattered out all across the database file rather
562  * than clustered together in one place.
563  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
564  * and otherwise cleans up the database file structure.
565  */
566  try {
567  ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
568  storage._database.vacuum();
569  } catch (const ibrcommon::Exception &ex) {
570  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
571  }
572 
573  // here we can do some IDLE stuff...
574  ::sleep(1);
575 
576  ibrcommon::MutexLock l(TaskIdle::_mutex);
577  if (!TaskIdle::_idle) return;
578  }
579  }
580 
581  const std::string SQLiteBundleStorage::getName() const
582  {
583  return "SQLiteBundleStorage";
584  }
585 
587  {
588  try {
590 
591  // custody is successful transferred to another node.
592  // it is safe to delete this bundle now. (depending on the routing algorithm.)
593  // update the custodian of this bundle with the new one
594  _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, id, custodian);
595  } catch (const ibrcommon::Exception &ex) {
596  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
597  }
598  }
599 
601  {
602  // raise bundle added event
603  eventBundleAdded(bundle);
604  }
605 
607  {
608  // raise bundle removed event
609  eventBundleRemoved(id);
610  }
611 
613  {
615  }
616 
618  {
619  _faulty = mode;
620  _database.setFaulty(mode);
621  }
622  }
623 }