IBR-DTNSuite  0.12
SQLiteBundleSet.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteBundleSet.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: David Goltzsche <goltzsch@ibr.cs.tu-bs.de>
7  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Created on: 19.04.2013
22  */
23 
25 #include <ibrdtn/utils/Random.h>
26 #include <ibrcommon/Logger.h>
27 
28 namespace dtn
29 {
30  namespace storage
31  {
32  ibrcommon::Mutex SQLiteBundleSet::Factory::_create_lock;
33 
35  : _sqldb(db)
36  {
37  }
38 
40  {
41  }
42 
44  {
45  try {
46  return new SQLiteBundleSet(create(_sqldb), false, listener, bf_size, _sqldb);
47  } catch (const SQLiteDatabase::SQLiteQueryException&) {
48  return NULL;
49  }
50  }
51 
53  {
54  try {
55  return new SQLiteBundleSet(create(_sqldb, name), true, listener, bf_size, _sqldb);
56  } catch (const SQLiteDatabase::SQLiteQueryException&) {
57  return NULL;
58  }
59  }
60 
62  {
63  ibrcommon::MutexLock l(_create_lock);
64 
65  std::string name;
66  dtn::utils::Random rand;
67  do {
68  name = rand.gen_chars(32);
69  } while (__exists(db, name, false));
70 
71  return __create(db, name, false);
72  }
73 
75  {
76  ibrcommon::MutexLock l(_create_lock);
77  return __create(db, name, true);
78  }
79 
80  size_t SQLiteBundleSet::Factory::__create(SQLiteDatabase &db, const std::string &name, bool persistent) throw (SQLiteDatabase::SQLiteQueryException)
81  {
82  // create a new name (fails, if the name already exists)
83  SQLiteDatabase::Statement st1(db._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_ADD]);
84  sqlite3_bind_text(*st1, 1, name.c_str(), static_cast<int>(name.length()), SQLITE_TRANSIENT);
85  sqlite3_bind_int(*st1, 2, persistent ? 1 : 0);
86  st1.step();
87 
88  // get the ID of the name
89  SQLiteDatabase::Statement st2(db._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_GET_ID]);
90  sqlite3_bind_text(*st2, 1, name.c_str(), static_cast<int>(name.length()), SQLITE_TRANSIENT);
91  sqlite3_bind_int(*st2, 2, persistent ? 1 : 0);
92 
93  if (st2.step() == SQLITE_ROW) {
94  return sqlite3_column_int64(*st2, 0);
95  }
96 
97  throw SQLiteDatabase::SQLiteQueryException("could not create the bundle-set name");
98  }
99 
100  bool SQLiteBundleSet::Factory::__exists(SQLiteDatabase &db, const std::string &name, bool persistent) throw (SQLiteDatabase::SQLiteQueryException)
101  {
102  SQLiteDatabase::Statement st(db._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_GET_ID]);
103  sqlite3_bind_text(*st, 1, name.c_str(), static_cast<int>(name.length()), SQLITE_TRANSIENT);
104  sqlite3_bind_int(*st, 2, persistent ? 1 : 0);
105 
106  return ( st.step() == SQLITE_ROW);
107  }
108 
110  : _set_id(id), _bf_size(bf_size), _bf(bf_size * 8), _listener(listener), _consistent(true),_sqldb(database), _persistent(persistant)
111  {
112  // if this is a persitant bundle-set
113  if (_persistent) {
114  // rebuild the bloom filter
115  rebuild_bloom_filter();
116 
117  // load the next expiration from the storage
118  try {
119  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_EXPIRE_NEXT_TIMESTAMP]);
120  sqlite3_bind_int64(*st, 1, _set_id);
121 
122  int err = st.step();
123 
124  if (err == SQLITE_ROW)
125  {
126  _next_expiration = sqlite3_column_int64(*st, 0);
127  }
128  } catch (const SQLiteDatabase::SQLiteQueryException&) {
129  // error
130  }
131  }
132  }
133 
135  {
136  // clear on deletion if this set is not persistent
137  if (!_persistent) destroy();
138  }
139 
140  void SQLiteBundleSet::destroy()
141  {
142  clear();
143 
144  try {
145  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_REMOVE]);
146  sqlite3_bind_int64(*st, 1, _set_id);
147 
148  st.step();
149  } catch (const SQLiteDatabase::SQLiteQueryException&) {
150  // error
151  }
152  }
153 
155  {
156  // create a new bundle-set
157  SQLiteBundleSet *set = new SQLiteBundleSet(Factory::create(_sqldb), false, NULL, _bf_size, _sqldb);
158 
159  // copy all entries
160  try {
161  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_COPY]);
162  sqlite3_bind_int64(*st, 1, set->_set_id); // destination
163  sqlite3_bind_int64(*st, 2, _set_id); // source
164 
165  st.step();
166  } catch (const SQLiteDatabase::SQLiteQueryException&) {
167  // error
168  }
169 
170  // rebuild the bloom-filter
171  set->rebuild_bloom_filter();
172 
174  }
175 
177  {
178  // clear all bundles first
179  clear();
180 
181  // cast the given set to a MemoryBundleSet
182  const SQLiteBundleSet *set = dynamic_cast<const SQLiteBundleSet*>(other.getPointer());
183 
184  // incompatible bundle-set implementation - abort here
185  if (set == NULL) return;
186 
187  // copy all entries
188  try {
189  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_COPY]);
190  sqlite3_bind_int64(*st, 1, _set_id); // destination
191  sqlite3_bind_int64(*st, 2, set->_set_id); // source
192 
193  st.step();
194  } catch (const SQLiteDatabase::SQLiteQueryException&) {
195  // error
196  }
197 
198  // rebuild the bloom-filter
199  rebuild_bloom_filter();
200  }
201 
202  void SQLiteBundleSet::add(const dtn::data::MetaBundle &bundle) throw ()
203  {
204  try {
205  // insert bundle id into database
206  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_ADD]);
207 
208  sqlite3_bind_int64(*st, 1, _set_id);
209  sqlite3_bind_text(*st, 2, bundle.source.getString().c_str(), static_cast<int>(bundle.source.getString().length()), SQLITE_TRANSIENT);
210  sqlite3_bind_int64(*st, 3, bundle.timestamp.get<uint64_t>());
211  sqlite3_bind_int64(*st, 4, bundle.sequencenumber.get<uint64_t>());
212 
213  if (bundle.isFragment()) {
214  sqlite3_bind_int64(*st, 5, bundle.fragmentoffset.get<uint64_t>());
215  sqlite3_bind_int64(*st, 6, bundle.getPayloadLength());
216  } else {
217  sqlite3_bind_int64(*st, 5, -1);
218  sqlite3_bind_int64(*st, 6, -1);
219  }
220 
221  sqlite3_bind_int64(*st, 7, bundle.expiretime.get<uint64_t>());
222 
223  st.step();
224 
225  // update expiretime, if necessary
226  new_expire_time(bundle.expiretime);
227 
228  // add bundle to the bloomfilter
229  bundle.addTo(_bf);
230  } catch (const SQLiteDatabase::SQLiteQueryException&) {
231  // error
232  }
233  }
234 
235  void SQLiteBundleSet::clear() throw ()
236  {
237  try {
238  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_CLEAR]);
239  sqlite3_bind_int64(*st, 1, _set_id);
240 
241  st.step();
242  } catch (const SQLiteDatabase::SQLiteQueryException&) {
243  // error
244  }
245 
246  // clear the bloom-filter
247  _bf.clear();
248  }
249 
250  bool SQLiteBundleSet::has(const dtn::data::BundleID &id) const throw ()
251  {
252  // check bloom-filter first
253  if (!id.isIn(_bf)) return false;
254 
255  // Return true if the bloom-filter is not consistent with
256  // the bundles set. This happen if the MemoryBundleSet gets deserialized.
257  if (!_consistent) return true;
258 
259  try {
260  SQLiteDatabase::Statement st( const_cast<sqlite3*>(_sqldb._database), SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET]);
261 
262  sqlite3_bind_int64(*st, 1, _set_id);
263  sqlite3_bind_text(*st, 2, id.source.getString().c_str(), static_cast<int>(id.source.getString().length()), SQLITE_TRANSIENT);
264  sqlite3_bind_int64(*st, 3, id.timestamp.get<uint64_t>());
265  sqlite3_bind_int64(*st, 4, id.sequencenumber.get<uint64_t>());
266 
267  if (id.isFragment()) {
268  sqlite3_bind_int64(*st, 5, id.fragmentoffset.get<uint64_t>());
269  sqlite3_bind_int64(*st, 6, id.getPayloadLength());
270  } else {
271  sqlite3_bind_int64(*st, 5, -1);
272  sqlite3_bind_int64(*st, 6, -1);
273  }
274 
275  if (st.step() == SQLITE_ROW)
276  return true;
277  } catch (const SQLiteDatabase::SQLiteQueryException&) {
278  // error
279  }
280 
281  return false;
282  }
283 
284  void SQLiteBundleSet::expire(const dtn::data::Timestamp timestamp) throw ()
285  {
286  // we can not expire bundles if we have no idea of time
287  if (timestamp == 0) return;
288 
289  // do not expire if its not the time
290  if (_next_expiration > timestamp) return;
291 
292  // look for expired bundles and announce them in the listener
293  if (_listener != NULL) {
294  try {
295  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_EXPIRED]);
296  sqlite3_bind_int64(*st, 1, _set_id);
297 
298  // set expiration timestamp
299  sqlite3_bind_int64(*st, 2, timestamp.get<uint64_t>());
300 
301  while (st.step() == SQLITE_ROW)
302  {
304  get_bundleid(st, id);
305 
307 
308  // raise bundle expired event
309  _listener->eventBundleExpired(bundle);
310  }
311  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
312  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
313  }
314  }
315 
316  // delete expired bundles
317  try {
318  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_EXPIRE]);
319  sqlite3_bind_int64(*st, 1, _set_id);
320 
321  // set expiration timestamp
322  sqlite3_bind_int64(*st, 2, timestamp.get<uint64_t>());
323 
324  st.step();
325  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
326  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
327  }
328 
329  // rebuild the bloom filter
330  rebuild_bloom_filter();
331  }
332 
334  {
335  int rows = 0;
336 
337  try {
338  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_COUNT]);
339  sqlite3_bind_int64(*st, 1, _set_id);
340 
341  if (st.step() == SQLITE_ROW)
342  {
343  rows = sqlite3_column_int(*st, 0);
344  }
345  } catch (const SQLiteDatabase::SQLiteQueryException&) {
346  // error
347  }
348 
349  return rows;
350  }
351 
353  {
354  return dtn::data::Number(_bf.size()).getLength() + _bf.size();
355  }
356 
358  {
359  return _bf;
360  }
361 
362  std::set<dtn::data::MetaBundle> SQLiteBundleSet::getNotIn(const ibrcommon::BloomFilter &filter) const throw ()
363  {
364  std::set<dtn::data::MetaBundle> ret;
365 
366  try {
367  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_ALL]);
368  sqlite3_bind_int64(*st, 1, _set_id);
369 
370  std::set<dtn::data::MetaBundle> ret;
372 
373  while (st.step() == SQLITE_ROW)
374  {
375  // get the bundle id
376  get_bundleid(st, id);
377 
378  if ( ! id.isIn(filter) )
379  {
381  ret.insert( bundle );
382  }
383  }
384  } catch (const SQLiteDatabase::SQLiteQueryException&) {
385  // error
386  }
387 
388  return ret;
389  }
390 
391  std::ostream& SQLiteBundleSet::serialize(std::ostream &stream) const
392  {
393  dtn::data::Number size(_bf.size());
394  stream << size;
395 
396  const char *data = reinterpret_cast<const char*>(_bf.table());
397  stream.write(data, _bf.size());
398 
399  return stream;
400  }
401 
402  std::istream& SQLiteBundleSet::deserialize(std::istream &stream)
403  {
404  dtn::data::Number count;
405  stream >> count;
406 
407  std::vector<char> buffer(count.get<size_t>());
408 
409  stream.read(&buffer[0], buffer.size());
410 
412  _bf.load((unsigned char*)&buffer[0], buffer.size());
413 
414  // set the set to in-consistent mode
415  _consistent = false;
416 
417  return stream;
418  }
419 
420  void SQLiteBundleSet::new_expire_time(const dtn::data::Timestamp &ttl) throw ()
421  {
422  if (_next_expiration == 0 || ttl < _next_expiration)
423  {
424  _next_expiration = ttl;
425  }
426  }
427 
428  void SQLiteBundleSet::rebuild_bloom_filter()
429  {
430  // rebuild the bloom-filter
431  _bf.clear();
432 
433  try {
434  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_ALL]);
435  sqlite3_bind_int64(*st, 1, _set_id);
436 
437  std::set<dtn::data::MetaBundle> ret;
439 
440  while (st.step() == SQLITE_ROW)
441  {
442  // get the bundle id
443  get_bundleid(st, id);
444  id.addTo(_bf);
445  }
446 
447  _consistent = true;
448  } catch (const SQLiteDatabase::SQLiteQueryException&) {
449  // error
450  }
451  }
452 
453  void SQLiteBundleSet::get_bundleid(SQLiteDatabase::Statement &st, dtn::data::BundleID &id, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
454  {
455  id.source = dtn::data::EID((const char*)sqlite3_column_text(*st, offset + 0));
456  id.timestamp = sqlite3_column_int64(*st, offset + 1);
457  id.sequencenumber = sqlite3_column_int64(*st, offset + 2);
458  dtn::data::Number fragmentoffset = 0;
459  id.setFragment(sqlite3_column_int64(*st, offset + 2) >= 0);
460 
461  if (id.isFragment()) {
462  id.fragmentoffset = sqlite3_column_int64(*st, offset + 3);
463  id.setPayloadLength(sqlite3_column_int64(*st, offset + 4));
464  } else {
465  id.fragmentoffset = 0;
466  id.setPayloadLength(0);
467  }
468  }
469  } /* namespace data */
470 } /* namespace dtn */