IBR-DTNSuite  0.10
SQLiteDatabase.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteDatabase.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "storage/SQLiteDatabase.h"
26 #include <ibrdtn/utils/Clock.h>
27 #include <ibrcommon/Logger.h>
28 #include <stdint.h>
29 
30 namespace dtn
31 {
32  namespace storage
33  {
34  void sql_tracer(void*, const char * pQuery)
35  {
36  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
37  }
38 
39  const std::string SQLiteDatabase::TAG = "SQLiteDatabase";
40 
41  const std::string SQLiteDatabase::_select_names[2] = {
42  "source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength, hopcount, netpriority",
43  "source, timestamp, sequencenumber, fragmentoffset, procflags"
44  };
45 
46  const std::string SQLiteDatabase::_tables[SQL_TABLE_END] =
47  { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes", "properties" };
48 
49  // this is the version of a fresh created db scheme
50  const int SQLiteDatabase::DBSCHEMA_FRESH_VERSION = 3;
51 
52  const int SQLiteDatabase::DBSCHEMA_VERSION = 3;
53 
54  const std::string SQLiteDatabase::QUERY_SCHEMAVERSION = "SELECT `value` FROM " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] + " WHERE `key` = 'version' LIMIT 0,1;";
55  const std::string SQLiteDatabase::SET_SCHEMAVERSION = "INSERT INTO " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] + " (`key`, `value`) VALUES ('version', ?);";
56 
57  const std::string SQLiteDatabase::_sql_queries[SQL_QUERIES_END] =
58  {
59  "SELECT " + _select_names[0] + " FROM " + _tables[SQL_TABLE_BUNDLE],
60  "SELECT " + _select_names[0] + " FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset LIMIT ?,?;",
61  "SELECT " + _select_names[0] + " FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;",
62  "SELECT DISTINCT destination FROM " + _tables[SQL_TABLE_BUNDLE],
63 
64  "SELECT " + _select_names[1] + " FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
65  "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.expiretime <= ?;",
66  "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
67  "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;",
68 
69  "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;",
70  "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
71 
72  "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
73  "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
74  "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority, hopcount, netpriority) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);",
75  "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
76 
77  "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
78 
79  "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;",
80  "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;",
81  "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";",
82  "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);",
83 
84  "VACUUM;",
85  };
86 
87  const std::string SQLiteDatabase::_db_structure[11] =
88  {
89  "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
90  "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `appdatalength` INTEGER NOT NULL DEFAULT 0, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL, `hopcount` INTEGER DEFAULT NULL, `netpriority` INTEGER NOT NULL DEFAULT 0);",
91  "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
92  "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
93  "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
94  "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND " + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset; END;",
95  "CREATE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);",
96  "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);",
97  "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);",
98  "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);"
99  "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);",
100  "CREATE TABLE IF NOT EXISTS '" + _tables[SQL_TABLE_PROPERTIES] + "' ( `key` TEXT PRIMARY KEY ASC ON CONFLICT REPLACE, `value` TEXT NOT NULL);"
101  };
102 
104  { }
105 
107  {
108  }
109 
110  SQLiteDatabase::Statement::Statement(sqlite3 *database, const std::string &query)
111  : _database(database), _st(NULL), _query(query)
112  {
113  prepare();
114  }
115 
117  {
118  if (_st != NULL) {
119  sqlite3_finalize(_st);
120  }
121  }
122 
124  {
125  return _st;
126  }
127 
129  {
130  if (_st != NULL) {
131  sqlite3_reset(_st);
132  sqlite3_clear_bindings(_st);
133  }
134  }
135 
137  {
138  if (_st == NULL)
139  throw SQLiteQueryException("statement not prepared");
140 
141  int ret = sqlite3_step(_st);
142 
143  // check if the return value signals an error
144  switch (ret)
145  {
146  case SQLITE_CORRUPT:
147  throw SQLiteQueryException("Database is corrupt: " + std::string(sqlite3_errmsg(_database)));
148 
149  case SQLITE_INTERRUPT:
150  throw SQLiteQueryException("Database interrupt: " + std::string(sqlite3_errmsg(_database)));
151 
152  case SQLITE_SCHEMA:
153  throw SQLiteQueryException("Database schema error: " + std::string(sqlite3_errmsg(_database)));
154 
155  case SQLITE_ERROR:
156  throw SQLiteQueryException("Database error: " + std::string(sqlite3_errmsg(_database)));
157  default:
158  return ret;
159  }
160  }
161 
163  {
164  if (_st != NULL)
165  throw SQLiteQueryException("already prepared");
166 
167  int err = sqlite3_prepare_v2(_database, _query.c_str(), static_cast<int>(_query.length()), &_st, 0);
168 
169  if ( err != SQLITE_OK )
170  throw SQLiteQueryException("failed to prepare statement: " + _query);
171  }
172 
174 
176  : _file(file), _database(NULL), _next_expiration(0), _listener(listener), _faulty(false)
177  {
178  }
179 
181  {
182  }
183 
184  int SQLiteDatabase::getVersion() throw (SQLiteDatabase::SQLiteQueryException)
185  {
186  // Check version of SQLiteDB
187  int version = 0;
188 
189  // prepare the statement
190  Statement st(_database, QUERY_SCHEMAVERSION);
191 
192  // execute statement for version query
193  int err = st.step();
194 
195  // Query finished no table found
196  if (err == SQLITE_ROW)
197  {
198  std::string dbversion = (const char*) sqlite3_column_text(*st, 0);
199  std::stringstream ss(dbversion);
200  ss >> version;
201  }
202 
203  return version;
204  }
205 
206  void SQLiteDatabase::setVersion(int version) throw (SQLiteDatabase::SQLiteQueryException)
207  {
208  std::stringstream ss; ss << version;
209  Statement st(_database, SET_SCHEMAVERSION);
210 
211  // bind version text to the statement
212  sqlite3_bind_text(*st, 1, ss.str().c_str(), static_cast<int>(ss.str().length()), SQLITE_TRANSIENT);
213 
214  int err = st.step();
215  if(err != SQLITE_DONE)
216  {
217  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "failed to set version " << err << IBRCOMMON_LOGGER_ENDL;
218  }
219  }
220 
221  void SQLiteDatabase::doUpgrade(int oldVersion, int newVersion) throw (ibrcommon::Exception)
222  {
223  if (oldVersion > newVersion)
224  {
225  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "Downgrade from version " << oldVersion << " to version " << newVersion << " is not possible." << IBRCOMMON_LOGGER_ENDL;
226  throw ibrcommon::Exception("Downgrade not possible.");
227  }
228 
229  if ((oldVersion != 0) && (oldVersion < DBSCHEMA_FRESH_VERSION))
230  {
231  throw ibrcommon::Exception("Re-creation required.");
232  }
233 
234  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, info) << "Upgrade from version " << oldVersion << " to version " << newVersion << IBRCOMMON_LOGGER_ENDL;
235 
236  for (int j = oldVersion; j < newVersion; ++j)
237  {
238  switch (j)
239  {
240  // if there is no version field, drop all tables
241  case 0:
242  for (size_t i = 0; i < SQL_TABLE_END; ++i)
243  {
244  Statement st(_database, "DROP TABLE IF EXISTS " + _tables[i] + ";");
245  int err = st.step();
246  if(err != SQLITE_DONE)
247  {
248  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "drop table " << _tables[i] << " failed " << err << IBRCOMMON_LOGGER_ENDL;
249  }
250  }
251 
252  // create all tables
253  for (size_t i = 0; i < 11; ++i)
254  {
255  Statement st(_database, _db_structure[i]);
256  int err = st.step();
257  if(err != SQLITE_DONE)
258  {
259  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "failed to create table " << _tables[i] << "; err: " << err << IBRCOMMON_LOGGER_ENDL;
260  }
261  }
262 
263  // set new database version
264  setVersion(DBSCHEMA_FRESH_VERSION);
265  j = DBSCHEMA_FRESH_VERSION;
266  break;
267 
268  case 1:
269  // NO UPGRADE PATH HERE
270  throw ibrcommon::Exception("Re-creation required.");
271 
272  case 2:
273  // NO UPGRADE PATH HERE
274  throw ibrcommon::Exception("Re-creation required.");
275  }
276  }
277  }
278 
279  void SQLiteDatabase::open() throw (SQLiteDatabase::SQLiteQueryException)
280  {
281  //Configure SQLite Library
283 
284  // check if SQLite is thread-safe
285  if (sqlite3_threadsafe() == 0)
286  {
287  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
288  throw ibrcommon::Exception("need threading support in sqlite!");
289  }
290 
291  //open the database
292  if (sqlite3_open_v2(_file.getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
293  {
294  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
295  sqlite3_close(_database);
296  throw ibrcommon::Exception("Unable to open sqlite database");
297  }
298 
299  try {
300  // check database version and upgrade if necessary
301  int version = getVersion();
302 
303  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, info) << "Database version " << version << " found." << IBRCOMMON_LOGGER_ENDL;
304 
305  if (version != DBSCHEMA_VERSION)
306  {
307  doUpgrade(version, DBSCHEMA_VERSION);
308  }
309  } catch (const ibrcommon::Exception &ex) {
310  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, warning) << "upgrade failed, start-over with a fresh database" << IBRCOMMON_LOGGER_ENDL;
311  doUpgrade(0, DBSCHEMA_VERSION);
312  }
313 
314  // disable synchronous mode
315  sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL);
316 
317  // enable sqlite tracing if debug level is higher than 50
318  if (IBRCOMMON_LOGGER_LEVEL >= 50)
319  {
320  sqlite3_trace(_database, &sql_tracer, NULL);
321  }
322 
323  // calculate next Bundleexpiredtime
324  update_expire_time();
325  }
326 
328  {
329  //close Databaseconnection
330  if (sqlite3_close(_database) != SQLITE_OK)
331  {
332  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL;
333  }
334 
335  // shutdown sqlite library
337  }
338 
340  {
341  // lock the prepared statement
342  Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
343 
344  // bind bundle id to the statement
345  set_bundleid(st, id);
346 
347  // execute the query and check for error
348  if ((st.step() != SQLITE_ROW) || _faulty)
349  {
350  stringstream error;
351  error << "No Bundle found with BundleID: " << id.toString();
352  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteDatabase::TAG, 15) << error.str() << IBRCOMMON_LOGGER_ENDL;
353  throw SQLiteQueryException(error.str());
354  }
355 
356  // query bundle data
357  get(st, meta);
358  }
359 
360  void SQLiteDatabase::get(Statement &st, dtn::data::MetaBundle &bundle, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
361  {
362  try {
363  bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 0) );
364  bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 1) );
365  bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 2) );
366  bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 3) );
367  } catch (const dtn::InvalidDataException&) {
368  IBRCOMMON_LOGGER_TAG(TAG, warning) << "unable to read EIDs from database" << IBRCOMMON_LOGGER_ENDL;
369  throw SQLiteDatabase::SQLiteQueryException("unable to read EIDs from database");
370  }
371 
372  bundle.procflags = sqlite3_column_int(*st, offset + 4);
373  bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
374  bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
375  bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
376  bundle.expiretime = dtn::utils::Clock::getExpireTime(bundle.timestamp, bundle.lifetime);
377 
378  if (bundle.procflags & data::Bundle::FRAGMENT)
379  {
380  bundle.fragment = true;
381  bundle.offset = sqlite3_column_int64(*st, offset + 8);
382  bundle.appdatalength = sqlite3_column_int64(*st, offset + 9);
383  }
384  else
385  {
386  bundle.fragment = false;
387  bundle.offset = 0;
388  bundle.appdatalength = sqlite3_column_int64(*st, 9);
389  }
390 
391  bundle.payloadlength = sqlite3_column_int64(*st, 9);
392 
393  if (sqlite3_column_type(*st, offset + 10) != SQLITE_NULL)
394  {
395  bundle.hopcount = sqlite3_column_int64(*st, 10);
396  }
397 
398  // restore net priority
399  bundle.net_priority = sqlite3_column_int(*st, 11);
400  }
401 
402  void SQLiteDatabase::get(Statement &st, dtn::data::Bundle &bundle, const int offset) const throw (SQLiteDatabase::SQLiteQueryException)
403  {
404  try {
405  bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 0) );
406  bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 1) );
407  bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 2) );
408  bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(*st, offset + 3) );
409  } catch (const dtn::InvalidDataException&) {
410  IBRCOMMON_LOGGER_TAG(TAG, warning) << "unable to read EIDs from database" << IBRCOMMON_LOGGER_ENDL;
411  throw SQLiteDatabase::SQLiteQueryException("unable to read EIDs from database");
412  }
413 
414  bundle.procflags = sqlite3_column_int(*st, offset + 4);
415  bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
416  bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
417  bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
418 
419  if (bundle.procflags & data::Bundle::FRAGMENT)
420  {
421  bundle.fragmentoffset = sqlite3_column_int64(*st, offset + 8);
422  bundle.appdatalength = sqlite3_column_int64(*st, offset + 9);
423  }
424  }
425 
426  void SQLiteDatabase::iterateAll() throw (SQLiteDatabase::SQLiteQueryException)
427  {
428  Statement st(_database, _sql_queries[BUNDLE_GET_ITERATOR]);
429 
430  // abort if enough bundles are found
431  while (st.step() == SQLITE_ROW)
432  {
434 
435  // extract the primary values and set them in the bundle object
436  get(st, m, 0);
437 
438  // call iteration callback
439  _listener.iterateDatabase(m);
440  }
441 
442  st.reset();
443  }
444 
446  {
447  size_t items_added = 0;
448 
449  const std::string base_query =
450  "SELECT " + _select_names[0] + " FROM " + _tables[SQL_TABLE_BUNDLE];
451 
452  size_t offset = 0;
453  const bool unlimited = (cb.limit() <= 0);
454  const size_t query_limit = 50;
455 
456  try {
457  try {
458  const SQLBundleQuery &query = dynamic_cast<const SQLBundleQuery&>(cb);
459 
460  // custom query string
461  std::string query_string = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset LIMIT ?,?;";
462 
463  // create statement for custom query
464  Statement st(_database, query_string);
465 
466  while (unlimited || (items_added < query_limit))
467  {
468  // bind the statement parameter
469  int bind_offset = query.bind(*st, 1);
470 
471  // query the database
472  __get(cb, st, ret, items_added, bind_offset, offset, query_limit);
473 
474  // increment the offset, because we might not have enough
475  offset += query_limit;
476  }
477  } catch (const std::bad_cast&) {
478  Statement st(_database, _sql_queries[BUNDLE_GET_FILTER]);
479 
480  while (unlimited || (items_added < query_limit))
481  {
482  // query the database
483  __get(cb, st, ret, items_added, 1, offset, query_limit);
484 
485  // increment the offset, because we might not have enough
486  offset += query_limit;
487  }
488  }
489  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
490  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
491  } catch (const dtn::storage::NoBundleFoundException&) { }
492 
493  if (items_added == 0) throw dtn::storage::NoBundleFoundException();
494  }
495 
496  void SQLiteDatabase::__get(const BundleSelector &cb, Statement &st, BundleResult &ret, size_t &items_added, const int bind_offset, const size_t offset, const size_t query_limit) const throw (SQLiteDatabase::SQLiteQueryException, NoBundleFoundException, BundleSelectorException)
497  {
498  const bool unlimited = (cb.limit() <= 0);
499 
500  // limit result according to callback result
501  sqlite3_bind_int64(*st, bind_offset, offset);
502  sqlite3_bind_int64(*st, bind_offset + 1, query_limit);
503 
504  // returned no result
505  if ((st.step() == SQLITE_DONE) || _faulty)
507 
508  // abort if enough bundles are found
509  while (unlimited || (items_added < query_limit))
510  {
512 
513  // extract the primary values and set them in the bundle object
514  get(st, m, 0);
515 
516  // check if the bundle is already expired
517  if ( !dtn::utils::Clock::isExpired( m.timestamp, m.lifetime ) )
518  {
519  // ask the filter if this bundle should be added to the return list
520  if (cb.shouldAdd(m))
521  {
522  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL;
523 
524  // add the bundle to the list
525  ret.put(m);
526  items_added++;
527  }
528  }
529 
530  if (st.step() != SQLITE_ROW) break;
531  }
532 
533  st.reset();
534  }
535 
537  {
538  int err = 0;
539 
540  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL;
541 
542  // do this while db is locked
543  Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
544 
545  // set the bundle key values
546  set_bundleid(st, id);
547 
548  // execute the query and check for error
549  if (((err = st.step()) != SQLITE_ROW) || _faulty)
550  {
551  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteDatabase::TAG, 15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
553  }
554 
555  // read bundle data
556  get(st, bundle);
557 
558  try {
559 
560  int err = 0;
561  string file;
562 
563  Statement st(_database, _sql_queries[BLOCK_GET_ID]);
564 
565  // set the bundle key values
566  set_bundleid(st, id);
567 
568  // query the database and step through all blocks
569  while ((err = st.step()) == SQLITE_ROW)
570  {
571  const ibrcommon::File f( (const char*) sqlite3_column_text(*st, 0) );
572  int blocktyp = sqlite3_column_int(*st, 1);
573 
574  blocks.push_back( blocklist_entry(blocktyp, f) );
575  }
576 
577  if (err == SQLITE_DONE)
578  {
579  if (blocks.size() == 0)
580  {
581  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
582  throw SQLiteQueryException("no blocks found");
583  }
584  }
585  else
586  {
587  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
588  throw SQLiteQueryException("can not query for blocks");
589  }
590 
591  } catch (const ibrcommon::Exception &ex) {
592  IBRCOMMON_LOGGER_TAG("SQLiteDatabase", error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
594  }
595  }
596 
598  {
599  int err;
600 
601  const dtn::data::EID _sourceid = bundle.source;
602  dtn::data::Number TTL = bundle.timestamp + bundle.lifetime;
603 
604  Statement st(_database, _sql_queries[BUNDLE_STORE]);
605 
606  set_bundleid(st, bundle);
607 
608  sqlite3_bind_text(*st, 5, bundle.source.getString().c_str(), static_cast<int>(bundle.source.getString().length()), SQLITE_TRANSIENT);
609  sqlite3_bind_text(*st, 6, bundle.destination.getString().c_str(), static_cast<int>(bundle.destination.getString().length()), SQLITE_TRANSIENT);
610  sqlite3_bind_text(*st, 7, bundle.reportto.getString().c_str(), static_cast<int>(bundle.reportto.getString().length()), SQLITE_TRANSIENT);
611  sqlite3_bind_text(*st, 8, bundle.custodian.getString().c_str(), static_cast<int>(bundle.custodian.getString().length()), SQLITE_TRANSIENT);
612  sqlite3_bind_int(*st, 9, bundle.procflags.get<uint32_t>());
613  sqlite3_bind_int64(*st, 10, bundle.lifetime.get<uint64_t>());
614 
615  if (bundle.get(dtn::data::Bundle::FRAGMENT))
616  {
617  sqlite3_bind_int64(*st, 11, bundle.appdatalength.get<uint64_t>());
618  }
619  else
620  {
621  try {
622  const dtn::data::PayloadBlock &pblock = bundle.find<const dtn::data::PayloadBlock>();
623  sqlite3_bind_int64(*st, 11, pblock.getLength() );
625  sqlite3_bind_int64(*st, 11, 0 );
626  }
627  }
628 
629  sqlite3_bind_int64(*st, 12, TTL.get<uint64_t>());
630  sqlite3_bind_int64(*st, 13, dtn::data::MetaBundle(bundle).getPriority());
631 
632  try {
634  sqlite3_bind_int64(*st, 14, schl.getHopsToLive().get<uint64_t>() );
636  sqlite3_bind_null(*st, 14 );
637  }
638 
639  try {
640  const dtn::data::SchedulingBlock &sched = bundle.find<const dtn::data::SchedulingBlock>();
641  sqlite3_bind_int(*st, 15, sched.getPriority().get<int>() );
643  sqlite3_bind_int64(*st, 15, 0 );
644  }
645 
646  err = st.step();
647 
648  if (err == SQLITE_CONSTRAINT)
649  {
650  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
651 
652  stringstream error;
653  error << "store() failure: " << err << " " << sqlite3_errmsg(_database);
654  throw SQLiteQueryException(error.str());
655  }
656  else if ((err != SQLITE_DONE) || _faulty)
657  {
658  stringstream error;
659  error << "store() failure: " << err << " " << sqlite3_errmsg(_database);
660  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << error.str() << IBRCOMMON_LOGGER_ENDL;
661 
662  throw SQLiteQueryException(error.str());
663  }
664 
665  // set new expire time
666  new_expire_time(TTL);
667  }
668 
670  {
671  int blocktyp = (int)block.getType();
672 
673  // protect this query from concurrent access and enable the auto-reset feature
674  Statement st(_database, _sql_queries[BLOCK_STORE]);
675 
676  // set bundle key data
677  set_bundleid(st, id);
678 
679  // set the block type
680  sqlite3_bind_int(*st, 5, blocktyp);
681 
682  // the filename of the block data
683  sqlite3_bind_text(*st, 6, file.getPath().c_str(), static_cast<int>(file.getPath().size()), SQLITE_TRANSIENT);
684 
685  // the ordering number
686  sqlite3_bind_int(*st, 7, index);
687 
688  // execute the query and store the block in the database
689  if (st.step() != SQLITE_DONE)
690  {
691  throw SQLiteQueryException("can not store block of bundle");
692  }
693  }
694 
695  void SQLiteDatabase::transaction() throw (SQLiteDatabase::SQLiteQueryException)
696  {
697  char *zErrMsg = 0;
698 
699  // start a transaction
700  int ret = sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, &zErrMsg);
701 
702  // check if the return value signals an error
703  if ( ret != SQLITE_OK )
704  {
705  sqlite3_free( zErrMsg );
706  throw SQLiteQueryException( zErrMsg );
707  }
708  }
709 
710  void SQLiteDatabase::rollback() throw (SQLiteDatabase::SQLiteQueryException)
711  {
712  char *zErrMsg = 0;
713 
714  // rollback the whole transaction
715  int ret = sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, &zErrMsg);
716 
717  // check if the return value signals an error
718  if ( ret != SQLITE_OK )
719  {
720  sqlite3_free( zErrMsg );
721  throw SQLiteQueryException( zErrMsg );
722  }
723  }
724 
725  void SQLiteDatabase::commit() throw (SQLiteDatabase::SQLiteQueryException)
726  {
727  char *zErrMsg = 0;
728 
729  // commit the transaction
730  int ret = sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, &zErrMsg);
731 
732  // check if the return value signals an error
733  if ( ret != SQLITE_OK )
734  {
735  sqlite3_free( zErrMsg );
736  throw SQLiteQueryException( zErrMsg );
737  }
738  }
739 
741  {
742  {
743  // lock the database
744  Statement st(_database, _sql_queries[BLOCK_GET_ID]);
745 
746  // set the bundle key values
747  set_bundleid(st, id);
748 
749  // step through all blocks
750  while (st.step() == SQLITE_ROW)
751  {
752  // delete each referenced block file
753  ibrcommon::File blockfile( (const char*)sqlite3_column_text(*st, 0) );
754  blockfile.remove();
755  }
756  }
757 
758  {
759  // lock the database
760  Statement st(_database, _sql_queries[BUNDLE_DELETE]);
761 
762  // then remove the bundle data
763  set_bundleid(st, id);
764  st.step();
765 
766  IBRCOMMON_LOGGER_DEBUG_TAG("SQLiteDatabase", 10) << "bundle " << id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL;
767  }
768 
769  //update deprecated timer
770  update_expire_time();
771  }
772 
773  void SQLiteDatabase::clear() throw (SQLiteDatabase::SQLiteQueryException)
774  {
775  Statement vacuum(_database, _sql_queries[VACUUM]);
776  Statement bundle_clear(_database, _sql_queries[BUNDLE_CLEAR]);
777  Statement block_clear(_database, _sql_queries[BLOCK_CLEAR]);
778 
779  bundle_clear.step();
780  block_clear.step();
781 
782  if (SQLITE_DONE != vacuum.step())
783  {
784  throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed.");
785  }
786 
787  reset_expire_time();
788  }
789 
790  bool SQLiteDatabase::empty() const throw (SQLiteDatabase::SQLiteQueryException)
791  {
792  Statement st(_database, _sql_queries[EMPTY_CHECK]);
793 
794  if (SQLITE_DONE == st.step())
795  {
796  return true;
797  }
798  else
799  {
800  return false;
801  }
802  }
803 
804  size_t SQLiteDatabase::count() const throw (SQLiteDatabase::SQLiteQueryException)
805  {
806  size_t rows = 0;
807  int err = 0;
808 
809  Statement st(_database, _sql_queries[COUNT_ENTRIES]);
810 
811  if ((err = st.step()) == SQLITE_ROW)
812  {
813  rows = sqlite3_column_int(*st, 0);
814  }
815  else
816  {
817  stringstream error;
818  error << "count: failure " << err << " " << sqlite3_errmsg(_database);
819  throw SQLiteQueryException(error.str());
820  }
821 
822  return rows;
823  }
824 
825  const std::set<dtn::data::EID> SQLiteDatabase::getDistinctDestinations() throw (SQLiteDatabase::SQLiteQueryException)
826  {
827  std::set<dtn::data::EID> ret;
828 
829  Statement st(_database, _sql_queries[GET_DISTINCT_DESTINATIONS]);
830 
831  // step through all blocks
832  while (st.step() == SQLITE_ROW)
833  {
834  // delete each referenced block file
835  const std::string destination( (const char*)sqlite3_column_text(*st, 0) );
836  ret.insert(destination);
837  }
838 
839  return ret;
840  }
841 
842  void SQLiteDatabase::update_expire_time() throw (SQLiteDatabase::SQLiteQueryException)
843  {
844  Statement st(_database, _sql_queries[EXPIRE_NEXT_TIMESTAMP]);
845 
846  int err = st.step();
847 
848  if (err == SQLITE_ROW)
849  {
850  _next_expiration = sqlite3_column_int64(*st, 0);
851  }
852  else
853  {
854  _next_expiration = 0;
855  }
856  }
857 
858  void SQLiteDatabase::expire(const dtn::data::Timestamp &timestamp) throw ()
859  {
860  /*
861  * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called.
862  */
863  dtn::data::Timestamp exp_time = get_expire_time();
864  if ((timestamp < exp_time) || (exp_time == 0)) return;
865 
866  /*
867  * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem
868  * das nächste Bündel gelöscht werden soll in eine Variable und führt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist.
869  * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt.
870  */
871 
872  try {
873  Statement st(_database, _sql_queries[EXPIRE_BUNDLE_FILENAMES]);
874 
875  // query for blocks of expired bundles
876  sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
877  while (st.step() == SQLITE_ROW)
878  {
879  ibrcommon::File block((const char*)sqlite3_column_text(*st,0));
880  block.remove();
881  }
882  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
883  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
884  }
885 
886  try {
887  Statement st(_database, _sql_queries[EXPIRE_BUNDLES]);
888 
889  // query expired bundles
890  sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
891  while (st.step() == SQLITE_ROW)
892  {
894  get_bundleid(st, id);
896 
897  // raise bundle removed event
898  _listener.eventBundleExpired(id);
899  }
900  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
901  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
902  }
903 
904  try {
905  Statement st(_database, _sql_queries[EXPIRE_BUNDLE_DELETE]);
906 
907  // delete all expired db entries (bundles and blocks)
908  sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
909  st.step();
910  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
911  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
912  }
913 
914  try {
915  //update deprecated timer
916  update_expire_time();
917  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
918  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
919  }
920  }
921 
922  void SQLiteDatabase::vacuum() throw (SQLiteDatabase::SQLiteQueryException)
923  {
924  Statement st(_database, _sql_queries[VACUUM]);
925  st.step();
926  }
927 
929  {
930  if (mode == UPDATE_CUSTODIAN)
931  {
932  // select query with or without fragmentation extension
933  Statement st(_database, _sql_queries[BUNDLE_UPDATE_CUSTODIAN]);
934 
935  sqlite3_bind_text(*st, 1, eid.getString().c_str(), static_cast<int>(eid.getString().length()), SQLITE_TRANSIENT);
936  set_bundleid(st, id, 1);
937 
938  // update the custodian in the database
939  int err = st.step();
940 
941  if (err != SQLITE_DONE)
942  {
943  stringstream error;
944  error << "update_custodian() failure: " << err << " " << sqlite3_errmsg(_database);
945  throw SQLiteDatabase::SQLiteQueryException(error.str());
946  }
947  }
948  }
949 
950  void SQLiteDatabase::new_expire_time(const dtn::data::Timestamp &ttl) throw ()
951  {
952  if (_next_expiration == 0 || ttl < _next_expiration)
953  {
954  _next_expiration = ttl;
955  }
956  }
957 
958  void SQLiteDatabase::reset_expire_time() throw ()
959  {
960  _next_expiration = 0;
961  }
962 
963  const dtn::data::Timestamp& SQLiteDatabase::get_expire_time() const throw ()
964  {
965  return _next_expiration;
966  }
967 
968  void SQLiteDatabase::set_bundleid(Statement &st, const dtn::data::BundleID &id, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
969  {
970  const std::string source_id = id.source.getString();
971  sqlite3_bind_text(*st, offset + 1, source_id.c_str(), static_cast<int>(source_id.length()), SQLITE_TRANSIENT);
972  sqlite3_bind_int64(*st, offset + 2, id.timestamp.get<uint64_t>());
973  sqlite3_bind_int64(*st, offset + 3, id.sequencenumber.get<uint64_t>());
974 
975  if (id.fragment)
976  {
977  sqlite3_bind_int64(*st, offset + 4, id.offset.get<uint64_t>());
978  }
979  else
980  {
981  sqlite3_bind_int64(*st, offset + 4, -1);
982  }
983  }
984 
985  void SQLiteDatabase::get_bundleid(Statement &st, dtn::data::BundleID &id, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
986  {
987  id.source = dtn::data::EID((const char*)sqlite3_column_text(*st, offset + 0));
988  id.timestamp = sqlite3_column_int64(*st, offset + 1);
989  id.sequencenumber = sqlite3_column_int64(*st, offset + 2);
990 
991  id.fragment = (sqlite3_column_text(*st, offset + 3) != NULL);
992  id.offset = sqlite3_column_int64(*st, offset + 3);
993  }
994 
996  {
997  _faulty = mode;
998  }
999  } /* namespace storage */
1000 } /* namespace dtn */