39 const std::string SQLiteDatabase::TAG =
"SQLiteDatabase";
41 const std::string SQLiteDatabase::_select_names[2] = {
42 "source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength, hopcount, netpriority",
43 "source, timestamp, sequencenumber, fragmentoffset, procflags"
46 const std::string SQLiteDatabase::_tables[SQL_TABLE_END] =
47 {
"bundles",
"blocks",
"routing",
"routing_bundles",
"routing_nodes",
"properties" };
50 const int SQLiteDatabase::DBSCHEMA_FRESH_VERSION = 3;
52 const int SQLiteDatabase::DBSCHEMA_VERSION = 3;
54 const std::string SQLiteDatabase::QUERY_SCHEMAVERSION =
"SELECT `value` FROM " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] +
" WHERE `key` = 'version' LIMIT 0,1;";
55 const std::string SQLiteDatabase::SET_SCHEMAVERSION =
"INSERT INTO " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] +
" (`key`, `value`) VALUES ('version', ?);";
57 const std::string SQLiteDatabase::_sql_queries[SQL_QUERIES_END] =
59 "SELECT " + _select_names[0] +
" FROM " + _tables[SQL_TABLE_BUNDLE],
60 "SELECT " + _select_names[0] +
" FROM " + _tables[SQL_TABLE_BUNDLE] +
" ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset LIMIT ?,?;",
61 "SELECT " + _select_names[0] +
" FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;",
62 "SELECT DISTINCT destination FROM " + _tables[SQL_TABLE_BUNDLE],
64 "SELECT " + _select_names[1] +
" FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE expiretime <= ?;",
65 "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +
" as a, "+ _tables[SQL_TABLE_BLOCK] +
" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.expiretime <= ?;",
66 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE expiretime <= ?;",
67 "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +
" ORDER BY expiretime ASC LIMIT 1;",
69 "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +
" LIMIT 1;",
70 "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +
";",
72 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
73 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +
";",
74 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +
" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority, hopcount, netpriority) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);",
75 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +
" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
77 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +
" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
79 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +
" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;",
80 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +
" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;",
81 "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +
";",
82 "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +
" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);",
87 const std::string SQLiteDatabase::_db_structure[11] =
89 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] +
"` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
90 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] +
"` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `appdatalength` INTEGER NOT NULL DEFAULT 0, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL, `hopcount` INTEGER DEFAULT NULL, `netpriority` INTEGER NOT NULL DEFAULT 0);",
91 "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +
" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
92 "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +
" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
93 "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +
" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
94 "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] +
" FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] +
" WHERE " + _tables[SQL_TABLE_BLOCK] +
".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] +
".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] +
".sequencenumber = OLD.sequencenumber AND " + _tables[SQL_TABLE_BLOCK] +
".fragmentoffset = old.fragmentoffset; END;",
95 "CREATE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] +
" (source_id, timestamp, sequencenumber, fragmentoffset);",
96 "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] +
" (destination);",
97 "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] +
" (destination, priority);",
98 "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] +
" (source_id, timestamp, sequencenumber, fragmentoffset);"
99 "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] +
" (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);",
100 "CREATE TABLE IF NOT EXISTS '" + _tables[SQL_TABLE_PROPERTIES] +
"' ( `key` TEXT PRIMARY KEY ASC ON CONFLICT REPLACE, `value` TEXT NOT NULL);"
111 : _database(database), _st(NULL), _query(query)
119 sqlite3_finalize(_st);
132 sqlite3_clear_bindings(_st);
139 throw SQLiteQueryException(
"statement not prepared");
141 int ret = sqlite3_step(_st);
147 throw SQLiteQueryException(
"Database is corrupt: " + std::string(sqlite3_errmsg(_database)));
149 case SQLITE_INTERRUPT:
150 throw SQLiteQueryException(
"Database interrupt: " + std::string(sqlite3_errmsg(_database)));
153 throw SQLiteQueryException(
"Database schema error: " + std::string(sqlite3_errmsg(_database)));
156 throw SQLiteQueryException(
"Database error: " + std::string(sqlite3_errmsg(_database)));
165 throw SQLiteQueryException(
"already prepared");
167 int err = sqlite3_prepare_v2(_database, _query.c_str(),
static_cast<int>(_query.length()), &_st, 0);
169 if ( err != SQLITE_OK )
170 throw SQLiteQueryException(
"failed to prepare statement: " + _query);
176 : _file(file), _database(NULL), _next_expiration(0), _listener(listener), _faulty(false)
184 int SQLiteDatabase::getVersion() throw (
SQLiteDatabase::SQLiteQueryException)
190 Statement st(_database, QUERY_SCHEMAVERSION);
196 if (err == SQLITE_ROW)
198 std::string dbversion = (
const char*) sqlite3_column_text(*st, 0);
199 std::stringstream ss(dbversion);
206 void SQLiteDatabase::setVersion(
int version)
throw (SQLiteDatabase::SQLiteQueryException)
208 std::stringstream ss; ss << version;
209 Statement st(_database, SET_SCHEMAVERSION);
212 sqlite3_bind_text(*st, 1, ss.str().c_str(),
static_cast<int>(ss.str().length()), SQLITE_TRANSIENT);
215 if(err != SQLITE_DONE)
223 if (oldVersion > newVersion)
229 if ((oldVersion != 0) && (oldVersion < DBSCHEMA_FRESH_VERSION))
236 for (
int j = oldVersion; j < newVersion; ++j)
242 for (
size_t i = 0; i < SQL_TABLE_END; ++i)
244 Statement st(_database,
"DROP TABLE IF EXISTS " + _tables[i] +
";");
246 if(err != SQLITE_DONE)
253 for (
size_t i = 0; i < 11; ++i)
255 Statement st(_database, _db_structure[i]);
257 if(err != SQLITE_DONE)
264 setVersion(DBSCHEMA_FRESH_VERSION);
265 j = DBSCHEMA_FRESH_VERSION;
285 if (sqlite3_threadsafe() == 0)
292 if (sqlite3_open_v2(_file.
getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
295 sqlite3_close(_database);
301 int version = getVersion();
305 if (version != DBSCHEMA_VERSION)
307 doUpgrade(version, DBSCHEMA_VERSION);
311 doUpgrade(0, DBSCHEMA_VERSION);
315 sqlite3_exec(_database,
"PRAGMA synchronous = OFF;", NULL, NULL, NULL);
324 update_expire_time();
330 if (sqlite3_close(_database) != SQLITE_OK)
342 Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
345 set_bundleid(st,
id);
348 if ((st.
step() != SQLITE_ROW) || _faulty)
351 error <<
"No Bundle found with BundleID: " <<
id.toString();
353 throw SQLiteQueryException(error.str());
363 bundle.source =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 0) );
364 bundle.destination =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 1) );
365 bundle.reportto =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 2) );
366 bundle.custodian =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 3) );
369 throw SQLiteDatabase::SQLiteQueryException(
"unable to read EIDs from database");
372 bundle.procflags = sqlite3_column_int(*st, offset + 4);
373 bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
374 bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
375 bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
380 bundle.fragment =
true;
381 bundle.offset = sqlite3_column_int64(*st, offset + 8);
382 bundle.appdatalength = sqlite3_column_int64(*st, offset + 9);
386 bundle.fragment =
false;
388 bundle.appdatalength = sqlite3_column_int64(*st, 9);
391 bundle.payloadlength = sqlite3_column_int64(*st, 9);
393 if (sqlite3_column_type(*st, offset + 10) != SQLITE_NULL)
395 bundle.hopcount = sqlite3_column_int64(*st, 10);
399 bundle.net_priority = sqlite3_column_int(*st, 11);
405 bundle.source =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 0) );
406 bundle.destination =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 1) );
407 bundle.reportto =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 2) );
408 bundle.custodian =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 3) );
411 throw SQLiteDatabase::SQLiteQueryException(
"unable to read EIDs from database");
414 bundle.procflags = sqlite3_column_int(*st, offset + 4);
415 bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
416 bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
417 bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
421 bundle.fragmentoffset = sqlite3_column_int64(*st, offset + 8);
422 bundle.appdatalength = sqlite3_column_int64(*st, offset + 9);
428 Statement st(_database, _sql_queries[BUNDLE_GET_ITERATOR]);
431 while (st.
step() == SQLITE_ROW)
447 size_t items_added = 0;
449 const std::string base_query =
450 "SELECT " + _select_names[0] +
" FROM " + _tables[SQL_TABLE_BUNDLE];
453 const bool unlimited = (cb.limit() <= 0);
454 const size_t query_limit = 50;
461 std::string query_string = base_query +
" WHERE " + query.
getWhere() +
" ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset LIMIT ?,?;";
466 while (unlimited || (items_added < query_limit))
469 int bind_offset = query.
bind(*st, 1);
472 __get(cb, st, ret, items_added, bind_offset, offset, query_limit);
475 offset += query_limit;
477 }
catch (
const std::bad_cast&) {
478 Statement st(_database, _sql_queries[BUNDLE_GET_FILTER]);
480 while (unlimited || (items_added < query_limit))
483 __get(cb, st, ret, items_added, 1, offset, query_limit);
486 offset += query_limit;
498 const bool unlimited = (cb.limit() <= 0);
501 sqlite3_bind_int64(*st, bind_offset, offset);
502 sqlite3_bind_int64(*st, bind_offset + 1, query_limit);
505 if ((st.step() == SQLITE_DONE) || _faulty)
509 while (unlimited || (items_added < query_limit))
530 if (st.step() != SQLITE_ROW)
break;
543 Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
546 set_bundleid(st,
id);
549 if (((err = st.
step()) != SQLITE_ROW) || _faulty)
563 Statement st(_database, _sql_queries[BLOCK_GET_ID]);
566 set_bundleid(st,
id);
569 while ((err = st.
step()) == SQLITE_ROW)
572 int blocktyp = sqlite3_column_int(*st, 1);
577 if (err == SQLITE_DONE)
579 if (blocks.size() == 0)
582 throw SQLiteQueryException(
"no blocks found");
588 throw SQLiteQueryException(
"can not query for blocks");
604 Statement st(_database, _sql_queries[BUNDLE_STORE]);
606 set_bundleid(st, bundle);
608 sqlite3_bind_text(*st, 5, bundle.source.getString().c_str(),
static_cast<int>(bundle.source.getString().length()), SQLITE_TRANSIENT);
609 sqlite3_bind_text(*st, 6, bundle.destination.getString().c_str(),
static_cast<int>(bundle.destination.getString().length()), SQLITE_TRANSIENT);
610 sqlite3_bind_text(*st, 7, bundle.reportto.getString().c_str(),
static_cast<int>(bundle.reportto.getString().length()), SQLITE_TRANSIENT);
611 sqlite3_bind_text(*st, 8, bundle.custodian.getString().c_str(),
static_cast<int>(bundle.custodian.getString().length()), SQLITE_TRANSIENT);
612 sqlite3_bind_int(*st, 9, bundle.procflags.get<uint32_t>());
613 sqlite3_bind_int64(*st, 10, bundle.lifetime.get<uint64_t>());
617 sqlite3_bind_int64(*st, 11, bundle.appdatalength.get<uint64_t>());
623 sqlite3_bind_int64(*st, 11, pblock.
getLength() );
625 sqlite3_bind_int64(*st, 11, 0 );
629 sqlite3_bind_int64(*st, 12, TTL.
get<uint64_t>());
636 sqlite3_bind_null(*st, 14 );
643 sqlite3_bind_int64(*st, 15, 0 );
648 if (err == SQLITE_CONSTRAINT)
653 error <<
"store() failure: " << err <<
" " << sqlite3_errmsg(_database);
654 throw SQLiteQueryException(error.str());
656 else if ((err != SQLITE_DONE) || _faulty)
659 error <<
"store() failure: " << err <<
" " << sqlite3_errmsg(_database);
662 throw SQLiteQueryException(error.str());
666 new_expire_time(TTL);
671 int blocktyp = (int)block.getType();
674 Statement st(_database, _sql_queries[BLOCK_STORE]);
677 set_bundleid(st,
id);
680 sqlite3_bind_int(*st, 5, blocktyp);
683 sqlite3_bind_text(*st, 6, file.getPath().c_str(),
static_cast<int>(file.getPath().size()), SQLITE_TRANSIENT);
686 sqlite3_bind_int(*st, 7, index);
689 if (st.
step() != SQLITE_DONE)
691 throw SQLiteQueryException(
"can not store block of bundle");
700 int ret = sqlite3_exec(_database,
"BEGIN TRANSACTION;", NULL, NULL, &zErrMsg);
703 if ( ret != SQLITE_OK )
705 sqlite3_free( zErrMsg );
706 throw SQLiteQueryException( zErrMsg );
715 int ret = sqlite3_exec(_database,
"ROLLBACK TRANSACTION;", NULL, NULL, &zErrMsg);
718 if ( ret != SQLITE_OK )
720 sqlite3_free( zErrMsg );
721 throw SQLiteQueryException( zErrMsg );
730 int ret = sqlite3_exec(_database,
"END TRANSACTION;", NULL, NULL, &zErrMsg);
733 if ( ret != SQLITE_OK )
735 sqlite3_free( zErrMsg );
736 throw SQLiteQueryException( zErrMsg );
744 Statement st(_database, _sql_queries[BLOCK_GET_ID]);
747 set_bundleid(st,
id);
750 while (st.
step() == SQLITE_ROW)
760 Statement st(_database, _sql_queries[BUNDLE_DELETE]);
763 set_bundleid(st,
id);
770 update_expire_time();
776 Statement bundle_clear(_database, _sql_queries[BUNDLE_CLEAR]);
777 Statement block_clear(_database, _sql_queries[BLOCK_CLEAR]);
782 if (SQLITE_DONE != vacuum.
step())
784 throw SQLiteQueryException(
"SQLiteBundleStore: clear(): vacuum failed.");
792 Statement st(_database, _sql_queries[EMPTY_CHECK]);
794 if (SQLITE_DONE == st.
step())
809 Statement st(_database, _sql_queries[COUNT_ENTRIES]);
811 if ((err = st.
step()) == SQLITE_ROW)
813 rows = sqlite3_column_int(*st, 0);
818 error <<
"count: failure " << err <<
" " << sqlite3_errmsg(_database);
819 throw SQLiteQueryException(error.str());
827 std::set<dtn::data::EID> ret;
829 Statement st(_database, _sql_queries[GET_DISTINCT_DESTINATIONS]);
832 while (st.
step() == SQLITE_ROW)
835 const std::string destination( (
const char*)sqlite3_column_text(*st, 0) );
836 ret.insert(destination);
842 void SQLiteDatabase::update_expire_time() throw (
SQLiteDatabase::SQLiteQueryException)
844 Statement st(_database, _sql_queries[EXPIRE_NEXT_TIMESTAMP]);
848 if (err == SQLITE_ROW)
850 _next_expiration = sqlite3_column_int64(*st, 0);
854 _next_expiration = 0;
864 if ((timestamp < exp_time) || (exp_time == 0))
return;
873 Statement st(_database, _sql_queries[EXPIRE_BUNDLE_FILENAMES]);
876 sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
877 while (st.
step() == SQLITE_ROW)
887 Statement st(_database, _sql_queries[EXPIRE_BUNDLES]);
890 sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
891 while (st.
step() == SQLITE_ROW)
894 get_bundleid(st,
id);
898 _listener.eventBundleExpired(
id);
905 Statement st(_database, _sql_queries[EXPIRE_BUNDLE_DELETE]);
908 sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
916 update_expire_time();
924 Statement st(_database, _sql_queries[VACUUM]);
930 if (mode == UPDATE_CUSTODIAN)
933 Statement st(_database, _sql_queries[BUNDLE_UPDATE_CUSTODIAN]);
935 sqlite3_bind_text(*st, 1, eid.getString().c_str(),
static_cast<int>(eid.getString().length()), SQLITE_TRANSIENT);
936 set_bundleid(st,
id, 1);
941 if (err != SQLITE_DONE)
944 error <<
"update_custodian() failure: " << err <<
" " << sqlite3_errmsg(_database);
952 if (_next_expiration == 0 || ttl < _next_expiration)
954 _next_expiration = ttl;
958 void SQLiteDatabase::reset_expire_time() throw ()
960 _next_expiration = 0;
965 return _next_expiration;
968 void SQLiteDatabase::set_bundleid(Statement &st,
const dtn::data::BundleID &
id,
int offset)
const throw (SQLiteDatabase::SQLiteQueryException)
970 const std::string source_id =
id.source.getString();
971 sqlite3_bind_text(*st, offset + 1, source_id.c_str(),
static_cast<int>(source_id.length()), SQLITE_TRANSIENT);
972 sqlite3_bind_int64(*st, offset + 2,
id.timestamp.get<uint64_t>());
973 sqlite3_bind_int64(*st, offset + 3,
id.sequencenumber.get<uint64_t>());
977 sqlite3_bind_int64(*st, offset + 4,
id.offset.get<uint64_t>());
981 sqlite3_bind_int64(*st, offset + 4, -1);
985 void SQLiteDatabase::get_bundleid(Statement &st,
dtn::data::BundleID &
id,
int offset)
const throw (SQLiteDatabase::SQLiteQueryException)
987 id.source =
dtn::data::EID((
const char*)sqlite3_column_text(*st, offset + 0));
988 id.timestamp = sqlite3_column_int64(*st, offset + 1);
989 id.sequencenumber = sqlite3_column_int64(*st, offset + 2);
991 id.fragment = (sqlite3_column_text(*st, offset + 3) != NULL);
992 id.offset = sqlite3_column_int64(*st, offset + 3);