IBR-DTNSuite
0.8
|
00001 /* 00002 * Registration.cpp 00003 * 00004 * Created on: 15.06.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "api/Registration.h" 00010 #include "storage/BundleStorage.h" 00011 #include "core/BundleCore.h" 00012 #include "core/BundleEvent.h" 00013 #include "core/BundlePurgeEvent.h" 00014 00015 #ifdef HAVE_SQLITE 00016 #include "storage/SQLiteBundleStorage.h" 00017 #endif 00018 00019 #include <ibrdtn/utils/Clock.h> 00020 #include <ibrdtn/utils/Random.h> 00021 #include <ibrcommon/Logger.h> 00022 00023 #include <limits.h> 00024 #include <stdint.h> 00025 00026 namespace dtn 00027 { 00028 namespace api 00029 { 00030 std::set<std::string> Registration::_handles; 00031 00032 const std::string Registration::alloc_handle() 00033 { 00034 static dtn::utils::Random rand; 00035 00036 std::string new_handle = rand.gen_chars(16); 00037 00038 // if the local host is configured with an IPN address 00039 if (dtn::core::BundleCore::local.isCompressable()) 00040 { 00041 // .. then use 32-bit numbers only 00042 uint32_t *int_handle = (uint32_t*)new_handle.c_str(); 00043 std::stringstream ss; 00044 ss << *int_handle; 00045 new_handle = ss.str(); 00046 } 00047 00048 while (_handles.find(new_handle) != _handles.end()) 00049 { 00050 new_handle = rand.gen_chars(16); 00051 00052 // if the local host is configured with an IPN address 00053 if (dtn::core::BundleCore::local.isCompressable()) 00054 { 00055 // .. then use 32-bit numbers only 00056 uint32_t *int_handle = (uint32_t*)new_handle.c_str(); 00057 std::stringstream ss; 00058 ss << *int_handle; 00059 new_handle = ss.str(); 00060 } 00061 } 00062 00063 Registration::_handles.insert(new_handle); 00064 00065 return new_handle; 00066 } 00067 00068 void Registration::free_handle(const std::string &handle) 00069 { 00070 Registration::_handles.erase(handle); 00071 } 00072 00073 Registration::Registration() 00074 : _handle(alloc_handle()), 00075 _default_eid(core::BundleCore::local + dtn::core::BundleCore::local.getDelimiter() + _handle), 00076 _persistent(false), _detached(false) 00077 { 00078 } 00079 00080 Registration::~Registration() 00081 { 00082 free_handle(_handle); 00083 } 00084 00085 void Registration::notify(const NOTIFY_CALL call) 00086 { 00087 ibrcommon::MutexLock l(_wait_for_cond); 00088 if (call == NOTIFY_BUNDLE_AVAILABLE) 00089 { 00090 _no_more_bundles = false; 00091 _wait_for_cond.signal(true); 00092 } 00093 else 00094 { 00095 _notify_queue.push(call); 00096 } 00097 } 00098 00099 void Registration::wait_for_bundle(size_t timeout) 00100 { 00101 ibrcommon::MutexLock l(_wait_for_cond); 00102 00103 while (_no_more_bundles) 00104 { 00105 if (timeout > 0) 00106 { 00107 _wait_for_cond.wait(timeout); 00108 } 00109 else 00110 { 00111 _wait_for_cond.wait(); 00112 } 00113 } 00114 } 00115 00116 Registration::NOTIFY_CALL Registration::wait() 00117 { 00118 return _notify_queue.getnpop(true); 00119 } 00120 00121 bool Registration::hasSubscribed(const dtn::data::EID &endpoint) 00122 { 00123 ibrcommon::MutexLock l(_endpoints_lock); 00124 return (_endpoints.find(endpoint) != _endpoints.end()); 00125 } 00126 00127 const std::set<dtn::data::EID> Registration::getSubscriptions() 00128 { 00129 ibrcommon::MutexLock l(_endpoints_lock); 00130 return _endpoints; 00131 } 00132 00133 void Registration::delivered(const dtn::data::MetaBundle &m) 00134 { 00135 // raise bundle event 00136 dtn::core::BundleEvent::raise(m, dtn::core::BUNDLE_DELIVERED); 00137 00138 if (m.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)) 00139 { 00140 dtn::core::BundlePurgeEvent::raise(m); 00141 } 00142 } 00143 00144 dtn::data::Bundle Registration::receive() throw (dtn::storage::BundleStorage::NoBundleFoundException) 00145 { 00146 ibrcommon::MutexLock l(_receive_lock); 00147 00148 // get the global storage 00149 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00150 00151 while (true) 00152 { 00153 try { 00154 // get the first bundle in the queue 00155 dtn::data::MetaBundle b = _queue.getnpop(false); 00156 00157 // load the bundle 00158 return storage.get(b); 00159 } catch (const ibrcommon::QueueUnblockedException &e) { 00160 if (e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT) 00161 { 00162 // query for new bundles 00163 underflow(); 00164 } 00165 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { } 00166 } 00167 00168 throw dtn::storage::BundleStorage::NoBundleFoundException(); 00169 } 00170 00171 dtn::data::MetaBundle Registration::receiveMetaBundle() throw (dtn::storage::BundleStorage::NoBundleFoundException) 00172 { 00173 ibrcommon::MutexLock l(_receive_lock); 00174 while(true) 00175 { 00176 try { 00177 // get the first bundle in the queue 00178 dtn::data::MetaBundle b = _queue.getnpop(false); 00179 return b; 00180 } 00181 catch(const ibrcommon::QueueUnblockedException & e){ 00182 if(e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT){ 00183 // query for new bundles 00184 underflow(); 00185 } 00186 } 00187 catch(const dtn::storage::BundleStorage::NoBundleFoundException & ){ 00188 } 00189 } 00190 00191 throw dtn::storage::BundleStorage::NoBundleFoundException(); 00192 } 00193 00194 void Registration::underflow() 00195 { 00196 // expire outdated bundles in the list 00197 _received_bundles.expire(dtn::utils::Clock::getTime()); 00198 00202 #ifdef HAVE_SQLITE 00203 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback, public dtn::storage::SQLiteDatabase::SQLBundleQuery 00204 #else 00205 class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback 00206 #endif 00207 { 00208 public: 00209 BundleFilter(const std::set<dtn::data::EID> endpoints, const dtn::data::BundleList &blist, bool loopback) 00210 : _endpoints(endpoints), _blist(blist), _loopback(loopback) 00211 {}; 00212 00213 virtual ~BundleFilter() {}; 00214 00215 virtual size_t limit() const { return 10; }; 00216 00217 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00218 { 00219 if (_endpoints.find(meta.destination) == _endpoints.end()) 00220 { 00221 return false; 00222 } 00223 00224 // filter own bundles 00225 if (!_loopback) 00226 { 00227 if (_endpoints.find(meta.source) != _endpoints.end()) 00228 { 00229 return false; 00230 } 00231 } 00232 00233 IBRCOMMON_LOGGER_DEBUG(10) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00234 00235 if (_blist.contains(meta)) 00236 { 00237 return false; 00238 } 00239 00240 return true; 00241 }; 00242 00243 #ifdef HAVE_SQLITE 00244 const std::string getWhere() const 00245 { 00246 if (_endpoints.size() > 1) 00247 { 00248 std::string where = "("; 00249 00250 for (size_t i = _endpoints.size() - 1; i > 0; i--) 00251 { 00252 where += "destination = ? OR "; 00253 } 00254 00255 return where + "destination = ?)"; 00256 } 00257 else if (_endpoints.size() == 1) 00258 { 00259 return "destination = ?"; 00260 } 00261 else 00262 { 00263 return "destination = null"; 00264 } 00265 }; 00266 00267 size_t bind(sqlite3_stmt *st, size_t offset) const 00268 { 00269 size_t o = offset; 00270 00271 for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); iter++) 00272 { 00273 const std::string data = (*iter).getString(); 00274 00275 sqlite3_bind_text(st, o, data.c_str(), data.size(), SQLITE_TRANSIENT); 00276 o++; 00277 } 00278 00279 return o; 00280 } 00281 #endif 00282 00283 private: 00284 const std::set<dtn::data::EID> _endpoints; 00285 const dtn::data::BundleList &_blist; 00286 const bool _loopback; 00287 } filter(_endpoints, _received_bundles, false); 00288 00289 // get the global storage 00290 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00291 00292 // query the database for more bundles 00293 ibrcommon::MutexLock l(_endpoints_lock); 00294 const std::list<dtn::data::MetaBundle> list = storage.get( filter ); 00295 00296 if (list.size() == 0) 00297 { 00298 ibrcommon::MutexLock l(_wait_for_cond); 00299 _no_more_bundles = true; 00300 throw dtn::storage::BundleStorage::NoBundleFoundException(); 00301 } 00302 00303 try { 00304 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00305 { 00306 _queue.push(*iter); 00307 00308 IBRCOMMON_LOGGER_DEBUG(10) << "add bundle to list of delivered bundles: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL; 00309 _received_bundles.add(*iter); 00310 } 00311 } catch (const ibrcommon::Exception&) { } 00312 } 00313 00314 void Registration::subscribe(const dtn::data::EID &endpoint) 00315 { 00316 { 00317 ibrcommon::MutexLock l(_endpoints_lock); 00318 00319 // add endpoint to the local set 00320 _endpoints.insert(endpoint); 00321 } 00322 00323 // trigger the search for new bundles 00324 notify(NOTIFY_BUNDLE_AVAILABLE); 00325 } 00326 00327 void Registration::unsubscribe(const dtn::data::EID &endpoint) 00328 { 00329 ibrcommon::MutexLock l(_endpoints_lock); 00330 _endpoints.erase(endpoint); 00331 } 00332 00336 bool Registration::operator==(const std::string &other) const 00337 { 00338 return (_handle == other); 00339 } 00340 00344 bool Registration::operator==(const Registration &other) const 00345 { 00346 return (_handle == other._handle); 00347 } 00348 00352 bool Registration::operator<(const Registration &other) const 00353 { 00354 return (_handle < other._handle); 00355 } 00356 00357 void Registration::abort() 00358 { 00359 _queue.abort(); 00360 _notify_queue.abort(); 00361 00362 ibrcommon::MutexLock l(_wait_for_cond); 00363 _wait_for_cond.abort(); 00364 } 00365 00366 const dtn::data::EID& Registration::getDefaultEID() const 00367 { 00368 return _default_eid; 00369 } 00370 00371 const std::string& Registration::getHandle() const 00372 { 00373 return _handle; 00374 } 00375 00376 void Registration::setPersistent(ibrcommon::Timer::time_t lifetime) 00377 { 00378 _expiry = lifetime + ibrcommon::Timer::get_current_time(); 00379 _persistent = true; 00380 } 00381 00382 void Registration::unsetPersistent() 00383 { 00384 _persistent = false; 00385 } 00386 00387 bool Registration::isPersistent() 00388 { 00389 if(_expiry <= ibrcommon::Timer::get_current_time()) 00390 { 00391 _persistent = false; 00392 } 00393 00394 return _persistent; 00395 } 00396 00397 bool Registration::isPersistent() const 00398 { 00399 if(_expiry <= ibrcommon::Timer::get_current_time()) 00400 { 00401 return false; 00402 } 00403 00404 return _persistent; 00405 } 00406 00407 ibrcommon::Timer::time_t Registration::getExpireTime() const 00408 { 00409 if(!isPersistent()) throw NotPersistentException("Registration is not persistent."); 00410 00411 return _expiry; 00412 00413 } 00414 00415 void Registration::attach() 00416 { 00417 ibrcommon::MutexLock l(_attach_lock); 00418 if(!_detached) throw AlreadyAttachedException("Registration is already attached to a client."); 00419 00420 _detached = false; 00421 } 00422 00423 void Registration::detach() 00424 { 00425 ibrcommon::MutexLock l1(_wait_for_cond); 00426 ibrcommon::MutexLock l2(_attach_lock); 00427 00428 _detached = true; 00429 00430 _queue.reset(); 00431 _notify_queue.reset(); 00432 00433 _wait_for_cond.reset(); 00434 } 00435 } 00436 }