IBR-DTNSuite  0.12
SQLiteBundleStorage.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteBundleStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * Written-by: Matthias Myrtus
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 
25 #include "core/EventDispatcher.h"
26 #include "core/TimeEvent.h"
27 #include "core/GlobalEvent.h"
28 #include "core/BundleCore.h"
29 #include "core/BundleEvent.h"
30 
33 #include <ibrdtn/data/AgeBlock.h>
34 #include <ibrdtn/data/Serializer.h>
35 #include <ibrdtn/data/Bundle.h>
36 #include <ibrdtn/data/BundleID.h>
37 
40 #include <ibrcommon/data/BLOB.h>
41 #include <ibrcommon/Logger.h>
42 #include <memory>
43 #include <unistd.h>
44 
45 namespace dtn
46 {
47  namespace storage
48  {
49  const std::string SQLiteBundleStorage::TAG = "SQLiteBundleStorage";
50 
51  ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
52  bool SQLiteBundleStorage::TaskIdle::_idle = false;
53 
54  SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path)
55  : _file(path, "blob")
56  {
57  }
58 
59  SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB()
60  {
61  // delete the file if the last reference is destroyed
62  _file.remove();
63  }
64 
65  void SQLiteBundleStorage::SQLiteBLOB::clear()
66  {
67  // close the file
68  _filestream.close();
69 
70  // open temporary file
71  _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary );
72 
73  if (!_filestream.is_open())
74  {
75  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
77  }
78  }
79 
80  void SQLiteBundleStorage::SQLiteBLOB::open()
81  {
83 
84  // open temporary file
85  _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary );
86 
87  if (!_filestream.is_open())
88  {
89  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
91  }
92  }
93 
94  void SQLiteBundleStorage::SQLiteBLOB::close()
95  {
96  // flush the filestream
97  _filestream.flush();
98 
99  // close the file
100  _filestream.close();
101 
103  }
104 
105  std::streamsize SQLiteBundleStorage::SQLiteBLOB::__get_size()
106  {
107  return _file.size();
108  }
109 
111  {
112  return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath));
113  }
114 
115  SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const dtn::data::Length &maxsize, bool usePersistentBundleSets)
116  : BundleStorage(maxsize), _database(path.get("sqlite.db"), *this)
117  {
118  //let the factory create SQLiteBundleSets
119  if (usePersistentBundleSets)
121 
122  // use sqlite storage as BLOB provider, auto delete off
123  ibrcommon::BLOB::changeProvider(this, false);
124 
125  // set the block path
126  _blockPath = path.get("blocks");
127  _blobPath = path.get("blob");
128 
129  try {
131 
132  // delete all old BLOB container
133  _blobPath.remove(true);
134 
135  // create BLOB folder
137 
138  // create the bundle folder
139  ibrcommon::File::createDirectory( _blockPath );
140 
141  // open the database and create all folders and files if needed
142  _database.open();
143  } catch (const ibrcommon::Exception &ex) {
144  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
145  }
146  }
147 
149  {
150  // stop factory from creating SQLiteBundleSets
152 
153  try {
155 
156  // close the database
157  _database.close();
158  } catch (const ibrcommon::Exception &ex) {
159  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
160  }
161  }
162 
164  {
165  // loop until aborted
166  try {
167  while (true)
168  {
169  Task *t = _tasks.getnpop(true);
170 
171  try {
172  BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
173  try {
174  btask.run(*this);
175  } catch (const std::exception&) {
176  btask.abort();
177  continue;
178  };
179  btask.done();
180  continue;
181  } catch (const std::bad_cast&) { };
182 
183  try {
184  std::auto_ptr<Task> killer(t);
185  t->run(*this);
186  } catch (const std::exception&) { };
187  }
188  } catch (const ibrcommon::QueueUnblockedException &ex) {
189  // we are aborted, abort all blocking tasks
190  }
191  }
192 
194  {
195  // routine checked for throw() on 15.02.2013
196 
197  //register Events
200 
201  try {
202  // iterate through all bundles to generate indexes
203  _database.iterateAll();
204  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
205  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
206  }
207  }
208 
210  {
211  // routine checked for throw() on 15.02.2013
212 
213  //unregister Events
216 
217  stop();
218  join();
219  }
220 
222  {
223  _tasks.abort();
224  }
225 
227  {
228  try {
230  return _database.getDistinctDestinations();
231  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
232  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
233  }
235  }
236 
238  {
240  _database.get(cb, result);
241  }
242 
244  {
246  dtn::data::Bundle bundle;
247 
248  try {
250 
251  // query the data base for the bundle
252  _database.get(id, bundle, blocks);
253 
254  for (SQLiteDatabase::blocklist::const_iterator iter = blocks.begin(); iter != blocks.end(); ++iter)
255  {
256  const SQLiteDatabase::blocklist_entry &entry = (*iter);
257  const int blocktyp = entry.first;
258  const ibrcommon::File &file = entry.second;
259 
260  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 50) << "add block: " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
261 
262  // load block from file
263  std::ifstream is(file.getPath().c_str(), std::ios::binary | std::ios::in);
264 
265  if (blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE)
266  {
267  // create a new BLOB object
268  SQLiteBLOB *blob = new SQLiteBLOB(_blobPath);
269 
270  // create a reference of the BLOB
271  ibrcommon::BLOB::Reference ref(blob);
272 
273  try {
274  // remove the temporary file
275  blob->_file.remove();
276 
277  // generate a hard-link, pointing to the BLOB file
278  if ( ::link(file.getPath().c_str(), blob->_file.getPath().c_str()) != 0 )
279  {
280  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "hard-link failed (" << errno << ") " << blob->_file.getPath() << " -> " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
281 
282  // copy the BLOB into a new file if hard-links are not supported
283  std::ofstream fout(blob->_file.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
284 
285  // open the filestream
286  ibrcommon::BLOB::iostream stream = ref.iostream();
287 
288  const std::streamsize length = stream.size();
289  ibrcommon::BLOB::copy(fout, (*stream), length);
290  }
291  else
292  {
293  // update BLOB size
294  blob->update();
295  }
296 
297  // add payload block to the bundle
298  bundle.push_back(ref);
299  } catch (const ibrcommon::Exception &ex) {
300  // remove the temporary file
301  blob->_file.remove();
302 
303  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "unable to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
305  }
306  }
307  else
308  {
309  try {
310  // read the block
312 
313  // close the file
314  is.close();
315 
316  // modify the age block if present
317  try {
318  dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block);
319 
320  // modify the AgeBlock with the age of the file
321  time_t age = file.lastaccess() - file.lastmodify();
322 
323  agebl.addSeconds(age);
324  } catch (const std::bad_cast&) { };
326  // skip extensions block
327  }
328  }
329  }
330  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
331  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
332 
333  // try to purge all referenced data
334  remove(id);
335 
337  }
338 
339  return bundle;
340  }
341 
343  {
344  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
345 
347 
348  // get size of the bundle
349  dtn::data::DefaultSerializer s(std::cout);
350  dtn::data::Length size = s.getLength(bundle);
351 
352  // increment the storage size
353  allocSpace(size);
354 
355  // start transaction to store the bundle
356  _database.transaction();
357 
358  try {
359  // store the bundle data in the database
360  _database.store(bundle, size);
361 
362  // create a bundle id
363  const dtn::data::BundleID &id = bundle;
365 
366  // index number for order of the blocks
367  int index = 1;
368 
369  // number of bytes stored
370  dtn::data::Length storedBytes = 0;
371 
372  for(dtn::data::Bundle::const_iterator it = bundle.begin() ;it != bundle.end(); ++it)
373  {
374  const dtn::data::Block &block = (**it);
375 
377  {
378  // create a temporary file
379  ibrcommon::TemporaryFile tmpfile(_blockPath, "payload");
380 
381  try {
382  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block);
383  ibrcommon::BLOB::Reference ref = payload.getBLOB();
384  ibrcommon::BLOB::iostream stream = ref.iostream();
385 
386  try {
387  const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref);
388 
389  // first remove the tmp file
390  tmpfile.remove();
391 
392  // make a hard-link to the origin blob file
393  if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 )
394  {
395  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "hard-link failed (" << errno << ") " << tmpfile.getPath() << " -> " << blob._file.getPath() << IBRCOMMON_LOGGER_ENDL;
396 
397  // copy the BLOB into a new file if hard-links are not supported
398  std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
399 
400  const std::streamsize length = stream.size();
401  ibrcommon::BLOB::copy(fout, (*stream), length);
402  }
403  } catch (const std::bad_cast&) {
404  // copy the BLOB into a new file this isn't a sqlite block object
405  std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
406 
407  const std::streamsize length = stream.size();
408  ibrcommon::BLOB::copy(fout, (*stream), length);
409  }
410  } catch (const std::bad_cast&) {
411  // remove the tmp file
412  tmpfile.remove();
413  throw ibrcommon::Exception("not a payload block");
414  }
415 
416  // add determine the amount of stored bytes
417  storedBytes += tmpfile.size();
418 
419  // store the block into the database
420  _database.store(id, index, block, tmpfile);
421  }
422  else
423  {
424  ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
425 
426  std::ofstream filestream(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
427  dtn::data::SeparateSerializer serializer(filestream);
428  serializer << block;
429  filestream.close();
430 
431  // add determine the amount of stored bytes
432  storedBytes += tmpfile.size();
433 
434  // store the block into the database
435  _database.store(id, index, block, tmpfile);
436  }
437 
438  // increment index
439  index++;
440  }
441 
442  _database.commit();
443 
444  try {
445  // the bundle is stored sucessfully, we could accept custody if it is requested
446  const dtn::data::EID custodian = acceptCustody(meta);
447 
448  // update the custody address of this bundle
449  _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, bundle, custodian);
450  } catch (const ibrcommon::Exception&) {
451  // this bundle has no request for custody transfers
452  }
453 
454  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
455 
456  // raise bundle added event
457  eventBundleAdded(meta);
458  } catch (const ibrcommon::Exception &ex) {
459  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
460  _database.rollback();
461 
462  // free the previously allocated space
463  freeSpace(size);
464  }
465  }
466 
468  {
469  try {
471  return _database.contains(id);
472  } catch (const SQLiteDatabase::SQLiteQueryException&) {
473  return false;
474  }
475  }
476 
478  {
479  try {
482  _database.get(id, ret);
483  return ret;
484  } catch (const SQLiteDatabase::SQLiteQueryException&) {
486  }
487  }
488 
490  {
491  // remove the bundle in locked state
492  try {
494  freeSpace( _database.remove(id) );
495 
496  // raise bundle removed event
497  eventBundleRemoved(id);
498  } catch (const ibrcommon::Exception &ex) {
499  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
500  }
501  }
502 
504  {
506 
507  try {
508  _database.clear();
509  } catch (const ibrcommon::Exception &ex) {
510  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
511  }
512 
513  //Delete Folder SQL_TABLE_BLOCK containing Blocks
514  _blockPath.remove(true);
516 
517  // set the storage size to zero
518  clearSpace();
519  }
520 
522  {
523  try {
525  return _database.empty();
526  } catch (const ibrcommon::Exception &ex) {
527  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
528  }
529  return true;
530  }
531 
533  {
534  try {
536  return _database.count();
537  } catch (const ibrcommon::Exception &ex) {
538  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
539  }
540  return 0;
541  }
542 
544  {
545  try {
546  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
547 
549  {
550  _tasks.push(new TaskExpire(time.getTimestamp()));
551  }
552  } catch (const std::bad_cast&) { }
553 
554  try {
555  const dtn::core::GlobalEvent &global = dynamic_cast<const dtn::core::GlobalEvent&>(*evt);
556 
558  {
559  // switch to idle mode
560  ibrcommon::MutexLock l(TaskIdle::_mutex);
561  TaskIdle::_idle = true;
562 
563  // generate an idle task
564  _tasks.push(new TaskIdle());
565  }
567  {
568  // switch back to non-idle mode
569  ibrcommon::MutexLock l(TaskIdle::_mutex);
570  TaskIdle::_idle = false;
571  }
572  } catch (const std::bad_cast&) { }
573  }
574 
575  void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
576  {
577  try {
578  ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
579  storage._database.expire(_timestamp);
580  } catch (const ibrcommon::Exception &ex) {
581  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
582  }
583  }
584 
585  void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
586  {
587  // until IDLE is false
588  while (true)
589  {
590  /*
591  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
592  * This empty space will be reused the next time new information is added to the database. But in the meantime,
593  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
594  * cause the information in the database to become fragmented - scrattered out all across the database file rather
595  * than clustered together in one place.
596  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
597  * and otherwise cleans up the database file structure.
598  */
599  try {
600  ibrcommon::RWLock l(storage._global_lock, ibrcommon::RWMutex::LOCK_READWRITE);
601  storage._database.vacuum();
602  } catch (const ibrcommon::Exception &ex) {
603  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
604  }
605 
606  // here we can do some IDLE stuff...
608 
609  ibrcommon::MutexLock l(TaskIdle::_mutex);
610  if (!TaskIdle::_idle) return;
611  }
612  }
613 
614  const std::string SQLiteBundleStorage::getName() const
615  {
616  return "SQLiteBundleStorage";
617  }
618 
620  {
621  try {
623 
624  // custody is successful transferred to another node.
625  // it is safe to delete this bundle now. (depending on the routing algorithm.)
626  // update the custodian of this bundle with the new one
627  _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, id, custodian);
628  } catch (const ibrcommon::Exception &ex) {
629  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
630  }
631  }
632 
634  {
635  // raise bundle added event
636  eventBundleAdded(bundle);
637 
638  // allocate consumed space of the bundle
639  allocSpace(size);
640  }
641 
643  {
644  // raise bundle removed event
645  eventBundleRemoved(id);
646 
647  // release consumed space of this bundle
648  freeSpace(size);
649  }
650 
652  {
654  }
655 
657  {
658  _faulty = mode;
659  _database.setFaulty(mode);
660  }
661  }
662 }