IBR-DTNSuite  0.12
SQLiteDatabase.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteDatabase.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "storage/SQLiteDatabase.h"
26 #include <ibrdtn/utils/Clock.h>
27 #include <ibrcommon/Logger.h>
28 #include <stdint.h>
29 
30 namespace dtn
31 {
32  namespace storage
33  {
34  void sql_tracer(void*, const char * pQuery)
35  {
36  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
37  }
38 
39  const std::string SQLiteDatabase::TAG = "SQLiteDatabase";
40 
41  const std::string SQLiteDatabase::_select_names[] = {
42  "source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, expiretime, fragmentoffset, appdatalength, hopcount, netpriority, payloadlength, bytes",
43  "source, timestamp, sequencenumber, fragmentoffset, payloadlength, bytes",
44  "`source`, `timestamp`, `sequencenumber`, `fragmentoffset`, `fragmentlength`, `expiretime`"
45  };
46 
47  const std::string SQLiteDatabase::_where_filter[] = {
48  "source = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND fragmentlength = ?",
49  "a.source = b.source AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.fragmentlength = b.fragmentlength"
50  };
51 
52  const std::string SQLiteDatabase::_tables[] =
53  { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes", "properties", "bundle_set", "bundle_set_names" };
54 
55  // this is the version of a fresh created db scheme
56  const int SQLiteDatabase::DBSCHEMA_FRESH_VERSION = 8;
57 
58  const int SQLiteDatabase::DBSCHEMA_VERSION = 8;
59 
60  const std::string SQLiteDatabase::QUERY_SCHEMAVERSION = "SELECT `value` FROM " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] + " WHERE `key` = 'version' LIMIT 0,1;";
61  const std::string SQLiteDatabase::SET_SCHEMAVERSION = "INSERT INTO " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] + " (`key`, `value`) VALUES ('version', ?);";
62 
63  const std::string SQLiteDatabase::_sql_queries[SQL_QUERIES_END] =
64  {
65  "SELECT " + _select_names[0] + " FROM " + _tables[SQL_TABLE_BUNDLE],
66  "SELECT " + _select_names[0] + " FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset, fragmentlength LIMIT ?,?;",
67  "SELECT " + _select_names[0] + " FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE " + _where_filter[0] + " LIMIT 1;",
68  "SELECT bytes FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE " + _where_filter[0] + " LIMIT 1;",
69  "SELECT DISTINCT destination FROM " + _tables[SQL_TABLE_BUNDLE],
70 
71  //EXPIRE_*
72  "SELECT " + _select_names[1] + " FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
73  "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE " + _where_filter[1] + " AND a.expiretime <= ?;",
74  "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
75  "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;",
76 
77  "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;",
78  "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
79 
80  "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE " + _where_filter[0] + ";",
81  "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
82  "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority, hopcount, netpriority, payloadlength, bytes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);",
83  "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE " + _where_filter[0] + ";",
84 
85  "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE " + _where_filter[0] + ";",
86 
87  //BLOCK_*
88  "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE " + _where_filter[0] + " ORDER BY ordernumber ASC;",
89  "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE " + _where_filter[0] + " AND ordernumber = ?;",
90  "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";",
91  "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?,?);",
92 
93  //BUNDLE_SET_*
94  "INSERT INTO " + _tables[SQL_TABLE_BUNDLE_SET] + " (set_id, source, timestamp, sequencenumber, fragmentoffset, fragmentlength, expiretime) VALUES (?,?,?,?,?,?,?);",
95  "DELETE FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ?;",
96  "SELECT " + _select_names[2] + " FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ? AND " + _where_filter[0] + " LIMIT 1;",
97  "SELECT " + _select_names[2] + " FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ? AND expiretime <= ?;",
98  "DELETE FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ? AND expiretime <= ?;",
99  "SELECT " + _select_names[2] + " FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ?;",
100  "SELECT COUNT(*) FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ?;",
101  "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE_SET] +" WHERE set_id = ? ORDER BY expiretime ASC LIMIT 1;",
102  "INSERT INTO " + _tables[SQL_TABLE_BUNDLE_SET] + " SELECT " + _select_names[2] + ", ? FROM " + _tables[SQL_TABLE_BUNDLE_SET] + " WHERE set_id = ?;",
103 
104  //BUNDLE_SET_NAME_*
105  "INSERT INTO " + _tables[SQL_TABLE_BUNDLE_SET_NAME] + " (name, persistent) VALUES (?, ?);",
106  "SELECT id FROM " + _tables[SQL_TABLE_BUNDLE_SET_NAME] + " WHERE name = ? AND persistent = ? LIMIT 0, 1;",
107  "DELETE FROM " + _tables[SQL_TABLE_BUNDLE_SET_NAME] + " WHERE id = ? LIMIT 0, 1;",
108 
109  "VACUUM;"
110  };
111 
112  const std::string SQLiteDatabase::_db_structure[SQLiteDatabase::DB_STRUCTURE_END] =
113  {
114  "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `fragmentlength` INTEGER NOT NULL DEFAULT 0, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
115  "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `appdatalength` INTEGER NOT NULL DEFAULT 0, `fragmentlength` INTEGER NOT NULL DEFAULT 0, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL, `hopcount` INTEGER DEFAULT NULL, `netpriority` INTEGER NOT NULL DEFAULT 0, `payloadlength` INTEGER NOT NULL DEFAULT 0, `bytes` INTEGER NOT NULL DEFAULT 0);",
116  "CREATE TABLE IF NOT EXISTS "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, KEY INT, Routing TEXT);",
117  "CREATE TABLE IF NOT EXISTS "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID TEXT, KEY INT, Routing TEXT);",
118  "CREATE TABLE IF NOT EXISTS "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, KEY INT, Routing TEXT);",
119  "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source = OLD.source AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND " + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = OLD.fragmentoffset AND " + _tables[SQL_TABLE_BLOCK] + ".fragmentlength = OLD.fragmentlength; END;",
120  "CREATE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source, timestamp, sequencenumber, fragmentoffset, fragmentlength);",
121  "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);",
122  "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);",
123  "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source, timestamp, sequencenumber, fragmentoffset, fragmentlength);"
124  "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source, timestamp, sequencenumber, fragmentoffset, fragmentlength, expiretime);",
125  "CREATE TABLE IF NOT EXISTS '" + _tables[SQL_TABLE_PROPERTIES] + "' ( `key` TEXT PRIMARY KEY ASC ON CONFLICT REPLACE, `value` TEXT NOT NULL);",
126  "CREATE TABLE IF NOT EXISTS " + _tables[SQL_TABLE_BUNDLE_SET] + " (`source` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL, `fragmentlength` INTEGER NOT NULL, `expiretime` INTEGER, `set_id` INTEGER, PRIMARY KEY(`set_id`, `source`, `timestamp`, `sequencenumber`, `fragmentoffset`, `fragmentlength`));",
127  "CREATE TABLE IF NOT EXISTS " + _tables[SQL_TABLE_BUNDLE_SET_NAME] + " (`id` INTEGER PRIMARY KEY, `name` TEXT NOT NULL, `persistent` INTEGER NOT NULL);",
128  "CREATE UNIQUE INDEX IF NOT EXISTS bundle_set_names_index ON " + _tables[SQL_TABLE_BUNDLE_SET_NAME] + " (`name`, `persistent`);"
129  };
130 
132  { }
133 
135  {
136  }
137 
138  SQLiteDatabase::Statement::Statement(sqlite3 *database, const std::string &query)
139  : _database(database), _st(NULL), _query(query)
140  {
141  prepare();
142  }
143 
145  {
146  if (_st != NULL) {
147  sqlite3_finalize(_st);
148  }
149  }
150 
152  {
153  return _st;
154  }
155 
157  {
158  if (_st != NULL) {
159  sqlite3_reset(_st);
160  sqlite3_clear_bindings(_st);
161  }
162  }
163 
165  {
166  if (_st == NULL)
167  throw SQLiteQueryException("statement not prepared");
168 
169  int ret = sqlite3_step(_st);
170 
171  // check if the return value signals an error
172  switch (ret)
173  {
174  case SQLITE_CORRUPT:
175  throw SQLiteQueryException("Database is corrupt: " + std::string(sqlite3_errmsg(_database)));
176 
177  case SQLITE_INTERRUPT:
178  throw SQLiteQueryException("Database interrupt: " + std::string(sqlite3_errmsg(_database)));
179 
180  case SQLITE_SCHEMA:
181  throw SQLiteQueryException("Database schema error: " + std::string(sqlite3_errmsg(_database)));
182 
183  case SQLITE_ERROR:
184  throw SQLiteQueryException("Database error: " + std::string(sqlite3_errmsg(_database)));
185 
186  default:
187  return ret;
188  }
189  }
190 
192  {
193  if (_st != NULL)
194  throw SQLiteQueryException("already prepared");
195 
196  int err = sqlite3_prepare_v2(_database, _query.c_str(), static_cast<int>(_query.length()), &_st, 0);
197 
198  if ( err != SQLITE_OK )
199  throw SQLiteQueryException("failed to prepare statement: " + _query);
200  }
201 
203 
205  : _file(file), _database(NULL), _next_expiration(0), _listener(listener), _faulty(false)
206  {
207  }
208 
210  {
211  }
212 
213  int SQLiteDatabase::getVersion() throw (SQLiteDatabase::SQLiteQueryException)
214  {
215  // Check version of SQLiteDB
216  int version = 0;
217 
218  // prepare the statement
219  Statement st(_database, QUERY_SCHEMAVERSION);
220 
221  // execute statement for version query
222  int err = st.step();
223 
224  // Query finished no table found
225  if (err == SQLITE_ROW)
226  {
227  std::string dbversion = (const char*) sqlite3_column_text(*st, 0);
228  std::stringstream ss(dbversion);
229  ss >> version;
230  }
231 
232  return version;
233  }
234 
235  void SQLiteDatabase::setVersion(int version) throw (SQLiteDatabase::SQLiteQueryException)
236  {
237  std::stringstream ss; ss << version;
238  Statement st(_database, SET_SCHEMAVERSION);
239 
240  // bind version text to the statement
241  sqlite3_bind_text(*st, 1, ss.str().c_str(), static_cast<int>(ss.str().length()), SQLITE_TRANSIENT);
242 
243  int err = st.step();
244  if(err != SQLITE_DONE)
245  {
246  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "failed to set version " << err << IBRCOMMON_LOGGER_ENDL;
247  }
248  }
249 
250  void SQLiteDatabase::doUpgrade(int oldVersion, int newVersion) throw (ibrcommon::Exception)
251  {
252  if (oldVersion > newVersion)
253  {
254  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "Downgrade from version " << oldVersion << " to version " << newVersion << " is not possible." << IBRCOMMON_LOGGER_ENDL;
255  throw ibrcommon::Exception("Downgrade not possible.");
256  }
257 
258  if ((oldVersion != 0) && (oldVersion < DBSCHEMA_FRESH_VERSION))
259  {
260  throw ibrcommon::Exception("Re-creation required.");
261  }
262 
263  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, info) << "Upgrade from version " << oldVersion << " to version " << newVersion << IBRCOMMON_LOGGER_ENDL;
264 
265  for (int j = oldVersion; j < newVersion; ++j)
266  {
267  switch (j)
268  {
269  // if there is no version field, drop all tables
270  case 0:
271  for (size_t i = 0; i < SQL_TABLE_END; ++i)
272  {
273  Statement st(_database, "DROP TABLE IF EXISTS " + _tables[i] + ";");
274  int err = st.step();
275  if(err != SQLITE_DONE)
276  {
277  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "drop table " << _tables[i] << " failed " << err << IBRCOMMON_LOGGER_ENDL;
278  }
279  }
280 
281  // create all tables
282  for (size_t i = 0; i < (DB_STRUCTURE_END - 1); ++i)
283  {
284  Statement st(_database, _db_structure[i]);
285  int err = st.step();
286  if(err != SQLITE_DONE)
287  {
288  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "failed to create table " << _tables[i] << "; err: " << err << IBRCOMMON_LOGGER_ENDL;
289  }
290  }
291 
292  // set new database version
293  setVersion(DBSCHEMA_FRESH_VERSION);
294  j = DBSCHEMA_FRESH_VERSION;
295  break;
296 
297  default:
298  // NO UPGRADE PATH HERE
299  if (DBSCHEMA_FRESH_VERSION > j)
300  throw ibrcommon::Exception("Re-creation required.");
301  }
302  }
303  }
304 
305  void SQLiteDatabase::open() throw (SQLiteDatabase::SQLiteQueryException)
306  {
307  //Configure SQLite Library
309 
310  // check if SQLite is thread-safe
311  if (sqlite3_threadsafe() == 0)
312  {
313  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
314  throw ibrcommon::Exception("need threading support in sqlite!");
315  }
316 
317  //open the database
318  if (sqlite3_open_v2(_file.getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
319  {
320  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
321  sqlite3_close(_database);
322  throw ibrcommon::Exception("Unable to open sqlite database");
323  }
324  try {
325  // check database version and upgrade if necessary
326  int version = getVersion();
327 
328  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, info) << "Database version " << version << " found." << IBRCOMMON_LOGGER_ENDL;
329 
330  if (version != DBSCHEMA_VERSION)
331  {
332  doUpgrade(version, DBSCHEMA_VERSION);
333  }
334  } catch (const ibrcommon::Exception &ex) {
335  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, warning) << "upgrade failed, start-over with a fresh database" << IBRCOMMON_LOGGER_ENDL;
336  doUpgrade(0, DBSCHEMA_VERSION);
337  }
338 
339  // disable synchronous mode
340  sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL);
341 
342  // enable sqlite tracing if debug level is higher than 50
343  if (IBRCOMMON_LOGGER_LEVEL >= 50)
344  {
345  sqlite3_trace(_database, &sql_tracer, NULL);
346  }
347 
348  // calculate next Bundleexpiredtime
349  update_expire_time();
350  }
351 
353  {
354  //close Databaseconnection
355  if (sqlite3_close(_database) != SQLITE_OK)
356  {
357  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL;
358  }
359 
360  // shutdown sqlite library
362  }
363 
365  {
366  // lock the prepared statement
367  Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
368 
369  // bind bundle id to the statement
370  set_bundleid(st, id);
371 
372  // execute the query and check for error
373  if ((st.step() != SQLITE_ROW) || _faulty)
374  {
375  stringstream error;
376  error << "No Bundle found with BundleID: " << id.toString();
377  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteDatabase::TAG, 15) << error.str() << IBRCOMMON_LOGGER_ENDL;
378  throw SQLiteQueryException(error.str());
379  }
380 
381  // query bundle data
382  get(st, meta);
383  }
384 
385  void SQLiteDatabase::get(Statement &st, dtn::data::MetaBundle &bundle, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
386  {
387  try {
388  bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset) );
389  bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 1) );
390  bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 2) );
391  bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 3) );
392  } catch (const dtn::InvalidDataException&) {
393  IBRCOMMON_LOGGER_TAG(TAG, warning) << "unable to read EIDs from database" << IBRCOMMON_LOGGER_ENDL;
394  throw SQLiteDatabase::SQLiteQueryException("unable to read EIDs from database");
395  }
396 
397  bundle.procflags = sqlite3_column_int(*st, offset + 4);
398  bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
399  bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
400  bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
401  bundle.expiretime = sqlite3_column_int64(*st, offset + 8);
402 
403  if (bundle.procflags & data::Bundle::FRAGMENT)
404  {
405  bundle.setFragment(true);
406  bundle.fragmentoffset = sqlite3_column_int64(*st, offset + 9);
407  bundle.appdatalength = sqlite3_column_int64(*st, offset + 10);
408  }
409  else
410  {
411  bundle.setFragment(false);
412  bundle.fragmentoffset = 0;
413  bundle.appdatalength = 0;
414  }
415 
416  if (sqlite3_column_type(*st, offset + 11) != SQLITE_NULL)
417  {
418  bundle.hopcount = sqlite3_column_int64(*st, 11);
419  }
420  else
421  {
422  bundle.hopcount = dtn::data::Number::max();
423  }
424 
425  // restore net priority
426  bundle.net_priority = sqlite3_column_int(*st, 12);
427 
428  // set payload length
429  bundle.setPayloadLength(sqlite3_column_int64(*st, offset + 13));
430  }
431 
432  void SQLiteDatabase::get(Statement &st, dtn::data::Bundle &bundle, const int offset) const throw (SQLiteDatabase::SQLiteQueryException)
433  {
434  try {
435  bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 0) );
436  bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 1) );
437  bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 2) );
438  bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 3) );
439  } catch (const dtn::InvalidDataException&) {
440  IBRCOMMON_LOGGER_TAG(TAG, warning) << "unable to read EIDs from database" << IBRCOMMON_LOGGER_ENDL;
441  throw SQLiteDatabase::SQLiteQueryException("unable to read EIDs from database");
442  }
443 
444  bundle.procflags = sqlite3_column_int(*st, offset + 4);
445  bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
446  bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
447  bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
448  // offset = 8 -> expiretime
449 
450  if (bundle.procflags & data::Bundle::FRAGMENT)
451  {
452  bundle.fragmentoffset = sqlite3_column_int64(*st, offset + 9);
453  bundle.appdatalength = sqlite3_column_int64(*st, offset + 10);
454  }
455  }
456 
457  void SQLiteDatabase::iterateAll() throw (SQLiteDatabase::SQLiteQueryException)
458  {
459  Statement st(_database, _sql_queries[BUNDLE_GET_ITERATOR]);
460  // abort if enough bundles are found
461  while (st.step() == SQLITE_ROW)
462  {
464 
465  // extract the primary values and set them in the bundle object
466  get(st, m, 0);
467 
468  // call iteration callback
469  _listener.iterateDatabase(m, sqlite3_column_int(*st, 14));
470  }
471 
472  st.reset();
473  }
474 
476  {
477  size_t items_added = 0;
478 
479  const std::string base_query =
480  "SELECT " + _select_names[0] + " FROM " + _tables[SQL_TABLE_BUNDLE];
481 
482  size_t offset = 0;
483  const bool unlimited = (cb.limit() <= 0);
484  const size_t query_limit = 50;
485 
486  try {
487  try {
488  const SQLBundleQuery &query = dynamic_cast<const SQLBundleQuery&>(cb);
489 
490  // custom query string
491  const std::string query_string = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset, fragmentlength LIMIT ?,?;";
492 
493  // create statement for custom query
494  Statement st(_database, query_string);
495 
496  while (unlimited || (items_added < query_limit))
497  {
498  // bind the statement parameter
499  int bind_offset = query.bind(*st, 1);
500 
501  // query the database
502  __get(cb, st, ret, items_added, bind_offset, offset, query_limit);
503 
504  // increment the offset, because we might not have enough
505  offset += query_limit;
506  }
507  } catch (const std::bad_cast&) {
508  Statement st(_database, _sql_queries[BUNDLE_GET_FILTER]);
509 
510  while (unlimited || (items_added < query_limit))
511  {
512  // query the database
513  __get(cb, st, ret, items_added, 1, offset, query_limit);
514 
515  // increment the offset, because we might not have enough
516  offset += query_limit;
517  }
518  }
519  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
520  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
521  } catch (const dtn::storage::NoBundleFoundException&) { }
522 
523  if (items_added == 0) throw dtn::storage::NoBundleFoundException();
524  }
525 
526  void SQLiteDatabase::__get(const BundleSelector &cb, Statement &st, BundleResult &ret, size_t &items_added, const int bind_offset, const size_t offset, const size_t query_limit) const throw (SQLiteDatabase::SQLiteQueryException, NoBundleFoundException, BundleSelectorException)
527  {
528  const bool unlimited = (cb.limit() <= 0);
529 
530  // limit result according to callback result
531  sqlite3_bind_int64(*st, bind_offset, offset);
532  sqlite3_bind_int64(*st, bind_offset + 1, query_limit);
533 
534  // returned no result
535  if ((st.step() == SQLITE_DONE) || _faulty)
537 
538  // abort if enough bundles are found
539  while (unlimited || (items_added < query_limit))
540  {
542 
543  // extract the primary values and set them in the bundle object
544  get(st, m, 0);
545 
546  // check if the bundle is already expired
547  if ( !dtn::utils::Clock::isExpired( m ) )
548  {
549  // ask the filter if this bundle should be added to the return list
550  if (cb.shouldAdd(m))
551  {
552  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL;
553 
554  // add the bundle to the list
555  ret.put(m);
556  items_added++;
557  }
558  }
559 
560  if (st.step() != SQLITE_ROW) break;
561  }
562 
563  st.reset();
564  }
565 
567  {
568  int err = 0;
569 
570  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL;
571 
572  // do this while db is locked
573  Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
574 
575  // set the bundle key values
576  set_bundleid(st, id);
577 
578  // execute the query and check for error
579  if (((err = st.step()) != SQLITE_ROW) || _faulty)
580  {
581  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteDatabase::TAG, 15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
583  }
584 
585  // read bundle data
586  get(st, bundle);
587 
588  try {
589 
590  int err = 0;
591  string file;
592 
593  Statement st(_database, _sql_queries[BLOCK_GET_ID]);
594 
595  // set the bundle key values
596  set_bundleid(st, id);
597 
598  // query the database and step through all blocks
599  while ((err = st.step()) == SQLITE_ROW)
600  {
601  const ibrcommon::File f( (const char*) sqlite3_column_text(*st, 0) );
602  int blocktyp = sqlite3_column_int(*st, 1);
603 
604  blocks.push_back( blocklist_entry(blocktyp, f) );
605  }
606 
607  if (err == SQLITE_DONE)
608  {
609  if (blocks.size() == 0)
610  {
611  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
612  throw SQLiteQueryException("no blocks found");
613  }
614  }
615  else
616  {
617  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
618  throw SQLiteQueryException("can not query for blocks");
619  }
620 
621  } catch (const ibrcommon::Exception &ex) {
622  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
624  }
625  }
626 
628  {
629  int err;
630 
631  Statement st(_database, _sql_queries[BUNDLE_STORE]);
632 
633  set_bundleid(st, bundle);
634 
635  sqlite3_bind_text(*st, 6, bundle.destination.getString().c_str(), static_cast<int>(bundle.destination.getString().length()), SQLITE_TRANSIENT);
636  sqlite3_bind_text(*st, 7, bundle.reportto.getString().c_str(), static_cast<int>(bundle.reportto.getString().length()), SQLITE_TRANSIENT);
637  sqlite3_bind_text(*st, 8, bundle.custodian.getString().c_str(), static_cast<int>(bundle.custodian.getString().length()), SQLITE_TRANSIENT);
638  sqlite3_bind_int(*st, 9, bundle.procflags.get<uint32_t>());
639  sqlite3_bind_int64(*st, 10, bundle.lifetime.get<uint64_t>());
640 
641  if (bundle.get(dtn::data::Bundle::FRAGMENT))
642  {
643  sqlite3_bind_int64(*st, 11, bundle.appdatalength.get<uint64_t>());
644  }
645  else
646  {
647  sqlite3_bind_int64(*st, 11, -1);
648  }
649 
651  sqlite3_bind_int64(*st, 12, expire_time.get<uint64_t>());
652 
653  sqlite3_bind_int64(*st, 13, bundle.getPriority());
654 
655  try {
657  sqlite3_bind_int64(*st, 14, schl.getHopsToLive().get<uint64_t>() );
659  sqlite3_bind_null(*st, 14 );
660  }
661 
662  try {
663  const dtn::data::SchedulingBlock &sched = bundle.find<dtn::data::SchedulingBlock>();
664  sqlite3_bind_int(*st, 15, sched.getPriority().get<int>() );
666  sqlite3_bind_int64(*st, 15, 0 );
667  }
668 
669  // set payload length
670  sqlite3_bind_int64(*st, 16, bundle.getPayloadLength());
671 
672  // set bundle size
673  sqlite3_bind_int64(*st, 17, size);
674 
675  err = st.step();
676 
677  if (err == SQLITE_CONSTRAINT)
678  {
679  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
680 
681  stringstream error;
682  error << "store() failure: " << err << " " << sqlite3_errmsg(_database);
683  throw SQLiteQueryException(error.str());
684  }
685  else if ((err != SQLITE_DONE) || _faulty)
686  {
687  stringstream error;
688  error << "store() failure: " << err << " " << sqlite3_errmsg(_database);
689  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << error.str() << IBRCOMMON_LOGGER_ENDL;
690 
691  throw SQLiteQueryException(error.str());
692  }
693 
694  // set new expire time
695  new_expire_time(expire_time);
696  }
697 
699  {
700  int blocktyp = (int)block.getType();
701 
702  // protect this query from concurrent access and enable the auto-reset feature
703  Statement st(_database, _sql_queries[BLOCK_STORE]);
704 
705  // set bundle key data
706  set_bundleid(st, id);
707 
708  // set the block type
709  sqlite3_bind_int(*st, 6, blocktyp);
710 
711  // the filename of the block data
712  sqlite3_bind_text(*st, 7, file.getPath().c_str(), static_cast<int>(file.getPath().size()), SQLITE_TRANSIENT);
713 
714  // the ordering number
715  sqlite3_bind_int(*st, 8, index);
716 
717  // execute the query and store the block in the database
718  if (st.step() != SQLITE_DONE)
719  {
720  throw SQLiteQueryException("can not store block of bundle");
721  }
722  }
723 
724  void SQLiteDatabase::transaction() throw (SQLiteDatabase::SQLiteQueryException)
725  {
726  char *zErrMsg = 0;
727 
728  // start a transaction
729  int ret = sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, &zErrMsg);
730 
731  // check if the return value signals an error
732  if ( ret != SQLITE_OK )
733  {
734  sqlite3_free( zErrMsg );
735  throw SQLiteQueryException( zErrMsg );
736  }
737  }
738 
739  void SQLiteDatabase::rollback() throw (SQLiteDatabase::SQLiteQueryException)
740  {
741  char *zErrMsg = 0;
742 
743  // rollback the whole transaction
744  int ret = sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, &zErrMsg);
745 
746  // check if the return value signals an error
747  if ( ret != SQLITE_OK )
748  {
749  sqlite3_free( zErrMsg );
750  throw SQLiteQueryException( zErrMsg );
751  }
752  }
753 
754  void SQLiteDatabase::commit() throw (SQLiteDatabase::SQLiteQueryException)
755  {
756  char *zErrMsg = 0;
757 
758  // commit the transaction
759  int ret = sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, &zErrMsg);
760 
761  // check if the return value signals an error
762  if ( ret != SQLITE_OK )
763  {
764  sqlite3_free( zErrMsg );
765  throw SQLiteQueryException( zErrMsg );
766  }
767  }
768 
770  {
771  // return value (size of the bundle in bytes)
772  dtn::data::Length ret = 0;
773 
774  {
775  // lock the database
776  Statement st(_database, _sql_queries[BUNDLE_GET_LENGTH_ID]);
777 
778  // bind bundle id to the statement
779  set_bundleid(st, id);
780 
781  // execute the query and check for error
782  if (st.step() != SQLITE_ROW)
783  {
784  // no bundle found - stop here
785  return ret;
786  }
787  else
788  {
789  ret = sqlite3_column_int(*st, 0);
790  }
791  }
792 
793  {
794  // lock the database
795  Statement st(_database, _sql_queries[BLOCK_GET_ID]);
796 
797  // set the bundle key values
798  set_bundleid(st, id);
799 
800  // step through all blocks
801  while (st.step() == SQLITE_ROW)
802  {
803  // delete each referenced block file
804  ibrcommon::File blockfile( (const char*)sqlite3_column_text(*st, 0) );
805  blockfile.remove();
806  }
807  }
808 
809  {
810  // lock the database
811  Statement st(_database, _sql_queries[BUNDLE_DELETE]);
812 
813  // then remove the bundle data
814  set_bundleid(st, id);
815  st.step();
816 
817  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 10) << "bundle " << id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL;
818  }
819 
820  //update deprecated timer
821  update_expire_time();
822 
823  // return the size of the removed bundle
824  return ret;
825  }
826 
827  void SQLiteDatabase::clear() throw (SQLiteDatabase::SQLiteQueryException)
828  {
829  Statement vacuum(_database, _sql_queries[VACUUM]);
830  Statement bundle_clear(_database, _sql_queries[BUNDLE_CLEAR]);
831  Statement block_clear(_database, _sql_queries[BLOCK_CLEAR]);
832 
833  bundle_clear.step();
834  block_clear.step();
835 
836  if (SQLITE_DONE != vacuum.step())
837  {
838  throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed.");
839  }
840 
841  reset_expire_time();
842  }
843 
845  {
846  // lock the prepared statement
847  Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
848 
849  // bind bundle id to the statement
850  set_bundleid(st, id);
851 
852  // execute the query and check for error
853  return !((st.step() != SQLITE_ROW) || _faulty);
854  }
855 
856  bool SQLiteDatabase::empty() const throw (SQLiteDatabase::SQLiteQueryException)
857  {
858  Statement st(_database, _sql_queries[EMPTY_CHECK]);
859 
860  if (SQLITE_DONE == st.step())
861  {
862  return true;
863  }
864  else
865  {
866  return false;
867  }
868  }
869 
870  size_t SQLiteDatabase::count() const throw (SQLiteDatabase::SQLiteQueryException)
871  {
872  size_t rows = 0;
873  int err = 0;
874 
875  Statement st(_database, _sql_queries[COUNT_ENTRIES]);
876 
877  if ((err = st.step()) == SQLITE_ROW)
878  {
879  rows = sqlite3_column_int(*st, 0);
880  }
881  else
882  {
883  stringstream error;
884  error << "count: failure " << err << " " << sqlite3_errmsg(_database);
885  throw SQLiteQueryException(error.str());
886  }
887 
888  return rows;
889  }
890 
891  const std::set<dtn::data::EID> SQLiteDatabase::getDistinctDestinations() throw (SQLiteDatabase::SQLiteQueryException)
892  {
893  std::set<dtn::data::EID> ret;
894 
895  Statement st(_database, _sql_queries[GET_DISTINCT_DESTINATIONS]);
896 
897  // step through all blocks
898  while (st.step() == SQLITE_ROW)
899  {
900  // delete each referenced block file
901  const std::string destination( (const char*)sqlite3_column_text(*st, 0) );
902  ret.insert(destination);
903  }
904 
905  return ret;
906  }
907 
908  void SQLiteDatabase::update_expire_time() throw (SQLiteDatabase::SQLiteQueryException)
909  {
910  Statement st(_database, _sql_queries[EXPIRE_NEXT_TIMESTAMP]);
911 
912  int err = st.step();
913 
914  if (err == SQLITE_ROW)
915  {
916  _next_expiration = sqlite3_column_int64(*st, 0);
917  }
918  else
919  {
920  _next_expiration = 0;
921  }
922  }
923 
924  void SQLiteDatabase::expire(const dtn::data::Timestamp &timestamp) throw ()
925  {
926  /*
927  * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called.
928  */
929  dtn::data::Timestamp exp_time = get_expire_time();
930  if ((timestamp < exp_time) || (exp_time == 0)) return;
931 
932  /*
933  * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem
934  * das nächste Bündel gelöscht werden soll in eine Variable und führt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist.
935  * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt.
936  */
937 
938  try {
939  Statement st(_database, _sql_queries[EXPIRE_BUNDLE_FILENAMES]);
940 
941  // query for blocks of expired bundles
942  sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
943  while (st.step() == SQLITE_ROW)
944  {
945  ibrcommon::File block((const char*)sqlite3_column_text(*st,0));
946  block.remove();
947  }
948  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
949  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
950  }
951 
952  try {
953  Statement st(_database, _sql_queries[EXPIRE_BUNDLES]);
954 
956 
957  // query expired bundles
958  sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
959  while (st.step() == SQLITE_ROW)
960  {
961  id.source = dtn::data::EID((const char*)sqlite3_column_text(*st, 0));
962  id.timestamp = sqlite3_column_int64(*st, 1);
963  id.sequencenumber = sqlite3_column_int64(*st, 2);
964 
965  id.setFragment(sqlite3_column_int64(*st, 3) >= 0);
966 
967  if (id.isFragment()) {
968  id.fragmentoffset = sqlite3_column_int64(*st, 3);
969  } else {
970  id.fragmentoffset = 0;
971  }
972 
973  id.setPayloadLength(sqlite3_column_int64(*st, 4));
974 
976 
977  // raise bundle removed event
978  _listener.eventBundleExpired(id, sqlite3_column_int(*st, 5));
979  }
980  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
981  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
982  }
983 
984  try {
985  Statement st(_database, _sql_queries[EXPIRE_BUNDLE_DELETE]);
986 
987  // delete all expired db entries (bundles and blocks)
988  sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
989  st.step();
990  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
991  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
992  }
993 
994  try {
995  //update deprecated timer
996  update_expire_time();
997  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
998  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
999  }
1000  }
1001 
1002  void SQLiteDatabase::vacuum() throw (SQLiteDatabase::SQLiteQueryException)
1003  {
1004  Statement st(_database, _sql_queries[VACUUM]);
1005  st.step();
1006  }
1007 
1009  {
1010  if (mode == UPDATE_CUSTODIAN)
1011  {
1012  // select query with or without fragmentation extension
1013  Statement st(_database, _sql_queries[BUNDLE_UPDATE_CUSTODIAN]);
1014 
1015  sqlite3_bind_text(*st, 1, eid.getString().c_str(), static_cast<int>(eid.getString().length()), SQLITE_TRANSIENT);
1016  set_bundleid(st, id, 1);
1017 
1018  // update the custodian in the database
1019  int err = st.step();
1020 
1021  if (err != SQLITE_DONE)
1022  {
1023  stringstream error;
1024  error << "update_custodian() failure: " << err << " " << sqlite3_errmsg(_database);
1025  throw SQLiteDatabase::SQLiteQueryException(error.str());
1026  }
1027  }
1028  }
1029 
1030  void SQLiteDatabase::new_expire_time(const dtn::data::Timestamp &ttl) throw ()
1031  {
1032  if (_next_expiration == 0 || ttl < _next_expiration)
1033  {
1034  _next_expiration = ttl;
1035  }
1036  }
1037 
1038  void SQLiteDatabase::reset_expire_time() throw ()
1039  {
1040  _next_expiration = 0;
1041  }
1042 
1043  const dtn::data::Timestamp& SQLiteDatabase::get_expire_time() const throw ()
1044  {
1045  return _next_expiration;
1046  }
1047 
1048  void SQLiteDatabase::set_bundleid(Statement &st, const dtn::data::BundleID &id, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
1049  {
1050  sqlite3_bind_text(*st, offset + 1, id.source.getString().c_str(), static_cast<int>(id.source.getString().length()), SQLITE_TRANSIENT);
1051  sqlite3_bind_int64(*st, offset + 2, id.timestamp.get<uint64_t>());
1052  sqlite3_bind_int64(*st, offset + 3, id.sequencenumber.get<uint64_t>());
1053 
1054  if (id.isFragment())
1055  {
1056  sqlite3_bind_int64(*st, offset + 4, id.fragmentoffset.get<uint64_t>());
1057  sqlite3_bind_int64(*st, offset + 5, id.getPayloadLength());
1058  }
1059  else
1060  {
1061  sqlite3_bind_int64(*st, offset + 4, -1);
1062  sqlite3_bind_int64(*st, offset + 5, -1);
1063  }
1064  }
1065 
1067  {
1068  _faulty = mode;
1069  }
1070  } /* namespace storage */
1071 } /* namespace dtn */