IBR-DTNSuite  0.8
daemon/src/api/Registration.cpp
Go to the documentation of this file.
00001 /*
00002  * Registration.cpp
00003  *
00004  *  Created on: 15.06.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "api/Registration.h"
00010 #include "storage/BundleStorage.h"
00011 #include "core/BundleCore.h"
00012 #include "core/BundleEvent.h"
00013 #include "core/BundlePurgeEvent.h"
00014 
00015 #ifdef HAVE_SQLITE
00016 #include "storage/SQLiteBundleStorage.h"
00017 #endif
00018 
00019 #include <ibrdtn/utils/Clock.h>
00020 #include <ibrdtn/utils/Random.h>
00021 #include <ibrcommon/Logger.h>
00022 
00023 #include <limits.h>
00024 #include <stdint.h>
00025 
00026 namespace dtn
00027 {
00028         namespace api
00029         {
00030                 std::set<std::string> Registration::_handles;
00031 
00032                 const std::string Registration::alloc_handle()
00033                 {
00034                         static dtn::utils::Random rand;
00035 
00036                         std::string new_handle = rand.gen_chars(16);
00037 
00038                         // if the local host is configured with an IPN address
00039                         if (dtn::core::BundleCore::local.isCompressable())
00040                         {
00041                                 // .. then use 32-bit numbers only
00042                                 uint32_t *int_handle = (uint32_t*)new_handle.c_str();
00043                                 std::stringstream ss;
00044                                 ss << *int_handle;
00045                                 new_handle = ss.str();
00046                         }
00047 
00048                         while (_handles.find(new_handle) != _handles.end())
00049                         {
00050                                 new_handle = rand.gen_chars(16);
00051 
00052                                 // if the local host is configured with an IPN address
00053                                 if (dtn::core::BundleCore::local.isCompressable())
00054                                 {
00055                                         // .. then use 32-bit numbers only
00056                                         uint32_t *int_handle = (uint32_t*)new_handle.c_str();
00057                                         std::stringstream ss;
00058                                         ss << *int_handle;
00059                                         new_handle = ss.str();
00060                                 }
00061                         }
00062 
00063                         Registration::_handles.insert(new_handle);
00064 
00065                         return new_handle;
00066                 }
00067 
00068                 void Registration::free_handle(const std::string &handle)
00069                 {
00070                         Registration::_handles.erase(handle);
00071                 }
00072 
00073                 Registration::Registration()
00074                  : _handle(alloc_handle()),
00075                    _default_eid(core::BundleCore::local + dtn::core::BundleCore::local.getDelimiter() + _handle),
00076                    _persistent(false), _detached(false)
00077                 {
00078                 }
00079 
00080                 Registration::~Registration()
00081                 {
00082                         free_handle(_handle);
00083                 }
00084 
00085                 void Registration::notify(const NOTIFY_CALL call)
00086                 {
00087                         ibrcommon::MutexLock l(_wait_for_cond);
00088                         if (call == NOTIFY_BUNDLE_AVAILABLE)
00089                         {
00090                                 _no_more_bundles = false;
00091                                 _wait_for_cond.signal(true);
00092                         }
00093                         else
00094                         {
00095                                 _notify_queue.push(call);
00096                         }
00097                 }
00098 
00099                 void Registration::wait_for_bundle(size_t timeout)
00100                 {
00101                         ibrcommon::MutexLock l(_wait_for_cond);
00102 
00103                         while (_no_more_bundles)
00104                         {
00105                                 if (timeout > 0)
00106                                 {
00107                                         _wait_for_cond.wait(timeout);
00108                                 }
00109                                 else
00110                                 {
00111                                         _wait_for_cond.wait();
00112                                 }
00113                         }
00114                 }
00115 
00116                 Registration::NOTIFY_CALL Registration::wait()
00117                 {
00118                         return _notify_queue.getnpop(true);
00119                 }
00120 
00121                 bool Registration::hasSubscribed(const dtn::data::EID &endpoint)
00122                 {
00123                         ibrcommon::MutexLock l(_endpoints_lock);
00124                         return (_endpoints.find(endpoint) != _endpoints.end());
00125                 }
00126 
00127                 const std::set<dtn::data::EID> Registration::getSubscriptions()
00128                 {
00129                         ibrcommon::MutexLock l(_endpoints_lock);
00130                         return _endpoints;
00131                 }
00132 
00133                 void Registration::delivered(const dtn::data::MetaBundle &m)
00134                 {
00135                         // raise bundle event
00136                         dtn::core::BundleEvent::raise(m, dtn::core::BUNDLE_DELIVERED);
00137 
00138                         if (m.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))
00139                         {
00140                                 dtn::core::BundlePurgeEvent::raise(m);
00141                         }
00142                 }
00143 
00144                 dtn::data::Bundle Registration::receive() throw (dtn::storage::BundleStorage::NoBundleFoundException)
00145                 {
00146                         ibrcommon::MutexLock l(_receive_lock);
00147 
00148                         // get the global storage
00149                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00150 
00151                         while (true)
00152                         {
00153                                 try {
00154                                         // get the first bundle in the queue
00155                                         dtn::data::MetaBundle b = _queue.getnpop(false);
00156 
00157                                         // load the bundle
00158                                         return storage.get(b);
00159                                 } catch (const ibrcommon::QueueUnblockedException &e) {
00160                                         if (e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00161                                         {
00162                                                 // query for new bundles
00163                                                 underflow();
00164                                         }
00165                                 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { }
00166                         }
00167 
00168                         throw dtn::storage::BundleStorage::NoBundleFoundException();
00169                 }
00170 
00171                 dtn::data::MetaBundle Registration::receiveMetaBundle() throw (dtn::storage::BundleStorage::NoBundleFoundException)
00172                 {
00173                         ibrcommon::MutexLock l(_receive_lock);
00174                         while(true)
00175                         {
00176                                 try {
00177                                         // get the first bundle in the queue
00178                                         dtn::data::MetaBundle b = _queue.getnpop(false);
00179                                         return b;
00180                                 }
00181                                 catch(const ibrcommon::QueueUnblockedException & e){
00182                                         if(e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT){
00183                                                 // query for new bundles
00184                                                 underflow();
00185                                         }
00186                                 }
00187                                 catch(const dtn::storage::BundleStorage::NoBundleFoundException & ){
00188                                 }
00189                         }
00190 
00191                         throw dtn::storage::BundleStorage::NoBundleFoundException();
00192                 }
00193 
00194                 void Registration::underflow()
00195                 {
00196                         // expire outdated bundles in the list
00197                         _received_bundles.expire(dtn::utils::Clock::getTime());
00198 
00202 #ifdef HAVE_SQLITE
00203                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback, public dtn::storage::SQLiteDatabase::SQLBundleQuery
00204 #else
00205                         class BundleFilter : public dtn::storage::BundleStorage::BundleFilterCallback
00206 #endif
00207                         {
00208                         public:
00209                                 BundleFilter(const std::set<dtn::data::EID> endpoints, const dtn::data::BundleList &blist, bool loopback)
00210                                  : _endpoints(endpoints), _blist(blist), _loopback(loopback)
00211                                 {};
00212 
00213                                 virtual ~BundleFilter() {};
00214 
00215                                 virtual size_t limit() const { return 10; };
00216 
00217                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00218                                 {
00219                                         if (_endpoints.find(meta.destination) == _endpoints.end())
00220                                         {
00221                                                 return false;
00222                                         }
00223 
00224                                         // filter own bundles
00225                                         if (!_loopback)
00226                                         {
00227                                                 if (_endpoints.find(meta.source) != _endpoints.end())
00228                                                 {
00229                                                         return false;
00230                                                 }
00231                                         }
00232 
00233                                         IBRCOMMON_LOGGER_DEBUG(10) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00234 
00235                                         if (_blist.contains(meta))
00236                                         {
00237                                                 return false;
00238                                         }
00239 
00240                                         return true;
00241                                 };
00242 
00243 #ifdef HAVE_SQLITE
00244                                 const std::string getWhere() const
00245                                 {
00246                                         if (_endpoints.size() > 1)
00247                                         {
00248                                                 std::string where = "(";
00249 
00250                                                 for (size_t i = _endpoints.size() - 1; i > 0; i--)
00251                                                 {
00252                                                         where += "destination = ? OR ";
00253                                                 }
00254 
00255                                                 return where + "destination = ?)";
00256                                         }
00257                                         else if (_endpoints.size() == 1)
00258                                         {
00259                                                 return "destination = ?";
00260                                         }
00261                                         else
00262                                         {
00263                                                 return "destination = null";
00264                                         }
00265                                 };
00266 
00267                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00268                                 {
00269                                         size_t o = offset;
00270 
00271                                         for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); iter++)
00272                                         {
00273                                                 const std::string data = (*iter).getString();
00274 
00275                                                 sqlite3_bind_text(st, o, data.c_str(), data.size(), SQLITE_TRANSIENT);
00276                                                 o++;
00277                                         }
00278 
00279                                         return o;
00280                                 }
00281 #endif
00282 
00283                         private:
00284                                 const std::set<dtn::data::EID> _endpoints;
00285                                 const dtn::data::BundleList &_blist;
00286                                 const bool _loopback;
00287                         } filter(_endpoints, _received_bundles, false);
00288 
00289                         // get the global storage
00290                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00291 
00292                         // query the database for more bundles
00293                         ibrcommon::MutexLock l(_endpoints_lock);
00294                         const std::list<dtn::data::MetaBundle> list = storage.get( filter );
00295 
00296                         if (list.size() == 0)
00297                         {
00298                                 ibrcommon::MutexLock l(_wait_for_cond);
00299                                 _no_more_bundles = true;
00300                                 throw dtn::storage::BundleStorage::NoBundleFoundException();
00301                         }
00302 
00303                         try {
00304                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00305                                 {
00306                                         _queue.push(*iter);
00307 
00308                                         IBRCOMMON_LOGGER_DEBUG(10) << "add bundle to list of delivered bundles: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL;
00309                                         _received_bundles.add(*iter);
00310                                 }
00311                         } catch (const ibrcommon::Exception&) { }
00312                 }
00313 
00314                 void Registration::subscribe(const dtn::data::EID &endpoint)
00315                 {
00316                         {
00317                                 ibrcommon::MutexLock l(_endpoints_lock);
00318 
00319                                 // add endpoint to the local set
00320                                 _endpoints.insert(endpoint);
00321                         }
00322 
00323                         // trigger the search for new bundles
00324                         notify(NOTIFY_BUNDLE_AVAILABLE);
00325                 }
00326 
00327                 void Registration::unsubscribe(const dtn::data::EID &endpoint)
00328                 {
00329                         ibrcommon::MutexLock l(_endpoints_lock);
00330                         _endpoints.erase(endpoint);
00331                 }
00332 
00336                 bool Registration::operator==(const std::string &other) const
00337                 {
00338                         return (_handle == other);
00339                 }
00340 
00344                 bool Registration::operator==(const Registration &other) const
00345                 {
00346                         return (_handle == other._handle);
00347                 }
00348 
00352                 bool Registration::operator<(const Registration &other) const
00353                 {
00354                         return (_handle < other._handle);
00355                 }
00356 
00357                 void Registration::abort()
00358                 {
00359                         _queue.abort();
00360                         _notify_queue.abort();
00361 
00362                         ibrcommon::MutexLock l(_wait_for_cond);
00363                         _wait_for_cond.abort();
00364                 }
00365 
00366                 const dtn::data::EID& Registration::getDefaultEID() const
00367                 {
00368                         return _default_eid;
00369                 }
00370 
00371                 const std::string& Registration::getHandle() const
00372                 {
00373                         return _handle;
00374                 }
00375 
00376                 void Registration::setPersistent(ibrcommon::Timer::time_t lifetime)
00377                 {
00378                         _expiry = lifetime + ibrcommon::Timer::get_current_time();
00379                         _persistent = true;
00380                 }
00381 
00382                 void Registration::unsetPersistent()
00383                 {
00384                         _persistent = false;
00385                 }
00386 
00387                 bool Registration::isPersistent()
00388                 {
00389                         if(_expiry <= ibrcommon::Timer::get_current_time())
00390                         {
00391                                 _persistent = false;
00392                         }
00393 
00394                         return _persistent;
00395                 }
00396 
00397                 bool Registration::isPersistent() const
00398                 {
00399                         if(_expiry <= ibrcommon::Timer::get_current_time())
00400                         {
00401                                 return false;
00402                         }
00403 
00404                         return _persistent;
00405                 }
00406 
00407                 ibrcommon::Timer::time_t Registration::getExpireTime() const
00408                 {
00409                         if(!isPersistent()) throw NotPersistentException("Registration is not persistent.");
00410 
00411                         return _expiry;
00412 
00413                 }
00414 
00415                 void Registration::attach()
00416                 {
00417                         ibrcommon::MutexLock l(_attach_lock);
00418                         if(!_detached) throw AlreadyAttachedException("Registration is already attached to a client.");
00419 
00420                         _detached = false;
00421                 }
00422 
00423                 void Registration::detach()
00424                 {
00425                         ibrcommon::MutexLock l1(_wait_for_cond);
00426                         ibrcommon::MutexLock l2(_attach_lock);
00427 
00428                         _detached = true;
00429 
00430                         _queue.reset();
00431                         _notify_queue.reset();
00432 
00433                         _wait_for_cond.reset();
00434                 }
00435         }
00436 }