IBR-DTNSuite  0.8
daemon/src/api/ApiServer.cpp
Go to the documentation of this file.
00001 /*
00002  * ApiServer.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "api/ApiServer.h"
00011 #include "core/BundleCore.h"
00012 #include "routing/QueueBundleEvent.h"
00013 #include "net/BundleReceivedEvent.h"
00014 #include "core/NodeEvent.h"
00015 #include <ibrdtn/data/AgeBlock.h>
00016 #include <ibrcommon/Logger.h>
00017 #include <typeinfo>
00018 #include <algorithm>
00019 #include <unistd.h>
00020 
00021 #ifdef WITH_COMPRESSION
00022 #include <ibrdtn/data/CompressedPayloadBlock.h>
00023 #endif
00024 
00025 #ifdef WITH_BUNDLE_SECURITY
00026 #include "security/SecurityManager.h"
00027 #endif
00028 
00029 namespace dtn
00030 {
00031         namespace api
00032         {
00033                 ApiServer::ApiServer(const ibrcommon::File &socket)
00034                  : _srv(socket), _shutdown(false), _garbage_collector(*this)
00035                 {
00036                 }
00037 
00038                 ApiServer::ApiServer(const ibrcommon::vinterface &net, int port)
00039                  : _srv(), _shutdown(false), _garbage_collector(*this)
00040                 {
00041                         _srv.bind(net, port);
00042                 }
00043 
00044                 ApiServer::~ApiServer()
00045                 {
00046                         _garbage_collector.stop();
00047                         _garbage_collector.join();
00048                         join();
00049                 }
00050 
00051                 void ApiServer::__cancellation()
00052                 {
00053                         _srv.shutdown();
00054                 }
00055 
00056                 void ApiServer::componentUp()
00057                 {
00058                         _srv.listen(5);
00059                         bindEvent(dtn::routing::QueueBundleEvent::className);
00060                         bindEvent(dtn::core::NodeEvent::className);
00061                         startGarbageCollector();
00062                 }
00063 
00064                 void ApiServer::componentRun()
00065                 {
00066                         try {
00067                                 while (!_shutdown)
00068                                 {
00069                                         // accept the next client
00070                                         ibrcommon::tcpstream *conn = _srv.accept();
00071 
00072                                         if (_shutdown)
00073                                         {
00074                                                 conn->close();
00075                                                 delete conn;
00076                                                 return;
00077                                         }
00078 
00079                                         // generate some output
00080                                         IBRCOMMON_LOGGER_DEBUG(5) << "new connected client at the extended API server" << IBRCOMMON_LOGGER_ENDL;
00081 
00082                                         // send welcome banner
00083                                         (*conn) << "IBR-DTN " << dtn::daemon::Configuration::getInstance().version() << " API 1.0" << std::endl;
00084 
00085 
00086                                         ClientHandler *obj;
00087                                         {
00088                                                 ibrcommon::MutexLock l1(_registration_lock);
00089                                                 // create a new registration
00090                                                 Registration reg; _registrations.push_back(reg);
00091                                                 IBRCOMMON_LOGGER_DEBUG(5) << "new registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL;
00092 
00093                                                 obj  = new ClientHandler(*this, _registrations.back(), conn);
00094                                         }
00095 
00096                                         ibrcommon::MutexLock l2(_connection_lock);
00097                                         _connections.push_back(obj);
00098 
00099                                         // start the client handler
00100                                         obj->start();
00101 
00102                                         // breakpoint
00103                                         ibrcommon::Thread::yield();
00104                                 }
00105                         } catch (const std::exception&) {
00106                                 // ignore all errors
00107                                 return;
00108                         }
00109                 }
00110 
00111                 void ApiServer::componentDown()
00112                 {
00113                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00114                         unbindEvent(dtn::core::NodeEvent::className);
00115 
00116                         // close the listen API socket
00117                         _srv.close();
00118                         _shutdown = true;
00119 
00120                         _garbage_collector.pause();
00121 
00122                         {
00123                                 ibrcommon::MutexLock l(_connection_lock);
00124 
00125                                 // shutdown all clients
00126                                 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00127                                 {
00128                                         (*iter)->stop();
00129                                 }
00130                         }
00131 
00132                         // wait until all clients are down
00133                         while (_connections.size() > 0) ::sleep(1);
00134                 }
00135 
00136                 void ApiServer::processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
00137                 {
00138                         // check address fields for "api:me", this has to be replaced
00139                         static const dtn::data::EID clienteid("api:me");
00140 
00141                         // set the source address to the sending EID
00142                         bundle._source = source;
00143 
00144                         if (bundle._destination == clienteid) bundle._destination = source;
00145                         if (bundle._reportto == clienteid) bundle._reportto = source;
00146                         if (bundle._custodian == clienteid) bundle._custodian = source;
00147 
00148                         // if the timestamp is not set, add a ageblock
00149                         if (bundle._timestamp == 0)
00150                         {
00151                                 // check for ageblock
00152                                 try {
00153                                         bundle.getBlock<dtn::data::AgeBlock>();
00154                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00155                                         // add a new ageblock
00156                                         bundle.push_front<dtn::data::AgeBlock>();
00157                                 }
00158                         }
00159 
00160 #ifdef WITH_COMPRESSION
00161                         // if the compression bit is set, then compress the bundle
00162                         if (bundle.get(dtn::data::PrimaryBlock::IBRDTN_REQUEST_COMPRESSION))
00163                         {
00164                                 try {
00165                                         dtn::data::CompressedPayloadBlock::compress(bundle, dtn::data::CompressedPayloadBlock::COMPRESSION_ZLIB);
00166                                 } catch (const ibrcommon::Exception &ex) {
00167                                         IBRCOMMON_LOGGER(warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00168                                 };
00169                         }
00170 #endif
00171 
00172 #ifdef WITH_BUNDLE_SECURITY
00173                         // if the encrypt bit is set, then try to encrypt the bundle
00174                         if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT))
00175                         {
00176                                 try {
00177                                         dtn::security::SecurityManager::getInstance().encrypt(bundle);
00178 
00179                                         bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT, false);
00180                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00181                                         // sign requested, but no key is available
00182                                         IBRCOMMON_LOGGER(warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL;
00183                                 } catch (const dtn::security::SecurityManager::EncryptException&) {
00184                                         IBRCOMMON_LOGGER(warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL;
00185                                 }
00186                         }
00187 
00188                         // if the sign bit is set, then try to sign the bundle
00189                         if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN))
00190                         {
00191                                 try {
00192                                         dtn::security::SecurityManager::getInstance().sign(bundle);
00193 
00194                                         bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN, false);
00195                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00196                                         // sign requested, but no key is available
00197                                         IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00198                                 }
00199                         }
00200 #endif
00201 
00202                         // raise default bundle received event
00203                         dtn::net::BundleReceivedEvent::raise(source, bundle, true, true);
00204                 }
00205 
00206                 Registration& ApiServer::getRegistration(const std::string &handle)
00207                 {
00208                         ibrcommon::MutexLock l(_registration_lock);
00209                         for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); iter++)
00210                         {
00211                                 if (*iter == handle)
00212                                 {
00213                                         if (iter->isPersistent()){
00214                                                 iter->attach();
00215                                                 return *iter;
00216                                         }
00217                                         break;
00218                                 }
00219                         }
00220 
00221                         throw Registration::NotFoundException("Registration not found");
00222                 }
00223 
00224                 size_t ApiServer::timeout(ibrcommon::Timer*)
00225                 {
00226                         {
00227                                 ibrcommon::MutexLock l(_registration_lock);
00228 
00229                                 // remove non-persistent and detached registrations
00230                                 for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end();)
00231                                 {
00232                                         try
00233                                         {
00234                                                 iter->attach();
00235                                                 if(!iter->isPersistent()){
00236                                                         IBRCOMMON_LOGGER_DEBUG(5) << "release registration " << iter->getHandle() << IBRCOMMON_LOGGER_ENDL;
00237                                                         iter = _registrations.erase(iter);
00238                                                 }
00239                                                 else
00240                                                 {
00241                                                         iter->detach();
00242                                                         iter++;
00243                                                 }
00244                                         }
00245                                         catch(const Registration::AlreadyAttachedException &ex)
00246                                         {
00247                                                 iter++;
00248                                         }
00249                                 }
00250                         }
00251 
00252                         return nextRegistrationExpiry();
00253                 }
00254 
00255                 const std::string ApiServer::getName() const
00256                 {
00257                         return "ApiServer";
00258                 }
00259 
00260                 void ApiServer::connectionUp(ClientHandler *obj)
00261                 {
00262                         // generate some output
00263                         IBRCOMMON_LOGGER_DEBUG(5) << "api connection up" << IBRCOMMON_LOGGER_ENDL;
00264                 }
00265 
00266                 void ApiServer::connectionDown(ClientHandler *obj)
00267                 {
00268                         // generate some output
00269                         IBRCOMMON_LOGGER_DEBUG(5) << "api connection down" << IBRCOMMON_LOGGER_ENDL;
00270 
00271                         ibrcommon::MutexLock l(_connection_lock);
00272 
00273                         // remove this object out of the list
00274                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00275                         {
00276                                 if (obj == (*iter))
00277                                 {
00278                                         _connections.erase(iter);
00279                                         break;
00280                                 }
00281                         }
00282                 }
00283 
00284                 void ApiServer::freeRegistration(Registration &reg)
00285                 {
00286                         if(reg.isPersistent())
00287                         {
00288                                 reg.detach();
00289                                 startGarbageCollector();
00290                         }
00291                         else
00292                         {
00293                                 ibrcommon::MutexLock l(_registration_lock);
00294                                 // remove the registration
00295                                 for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); iter++)
00296                                 {
00297                                         if (reg == (*iter))
00298                                         {
00299                                                 IBRCOMMON_LOGGER_DEBUG(5) << "release registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL;
00300                                                 _registrations.erase(iter);
00301                                                 break;
00302                                         }
00303                                 }
00304                         }
00305                 }
00306 
00307                 void ApiServer::raiseEvent(const dtn::core::Event *evt)
00308                 {
00309                         try {
00310                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00311 
00312                                 // ignore fragments - we can not deliver them directly to the client
00313                                 if (queued.bundle.fragment) return;
00314 
00315                                 ibrcommon::MutexLock l(_connection_lock);
00316                                 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00317                                 {
00318                                         ClientHandler &conn = **iter;
00319                                         if (conn.getRegistration().hasSubscribed(queued.bundle.destination))
00320                                         {
00321                                                 conn.getRegistration().notify(Registration::NOTIFY_BUNDLE_AVAILABLE);
00322                                         }
00323                                 }
00324                         } catch (const std::bad_cast&) { };
00325 
00326                         try {
00327                                 const dtn::core::NodeEvent &ne = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00328 
00329                                 ibrcommon::MutexLock l(_connection_lock);
00330                                 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00331                                 {
00332                                         ClientHandler &conn = **iter;
00333 
00334                                         if (ne.getAction() == NODE_AVAILABLE)
00335                                         {
00336                                                 conn.eventNodeAvailable(ne.getNode());
00337                                         }
00338                                         else if (ne.getAction() == NODE_UNAVAILABLE)
00339                                         {
00340                                                 conn.eventNodeUnavailable(ne.getNode());
00341                                         }
00342                                 }
00343                         } catch (const std::bad_cast&) { };
00344                 }
00345 
00346                 void ApiServer::startGarbageCollector()
00347                 {
00348                         try
00349                         {
00350                                 /* set the timeout for the GarbageCollector */
00351                                 size_t timeout = nextRegistrationExpiry();
00352                                 _garbage_collector.set(timeout);
00353 
00354                                 /* start it, if it is not running yet */
00355                                 if(!_garbage_collector.isRunning())
00356                                         _garbage_collector.start();
00357                         }
00358                         catch(const ibrcommon::Timer::StopTimerException &ex)
00359                         {
00360                         }
00361                 }
00362 
00363                 size_t ApiServer::nextRegistrationExpiry()
00364                 {
00365                         ibrcommon::MutexLock l(_registration_lock);
00366                         bool persistentFound = false;
00367                         size_t new_timeout = 0;
00368                         size_t current_time = ibrcommon::Timer::get_current_time();
00369 
00370                         // find the registration that expires next
00371                         for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); iter++)
00372                         {
00373                                 try
00374                                 {
00375                                         iter->attach();
00376                                         if(!iter->isPersistent()){
00377                                                 /* found an expired registration, trigger the timer */
00378                                                 iter->detach();
00379                                                 new_timeout = 0;
00380                                                 persistentFound = true;
00381                                                 break;
00382                                         }
00383                                         else
00384                                         {
00385                                                 size_t expire_time = iter->getExpireTime();
00386                                                 /* we dont have to check if the expire time is smaller then the current_time
00387                                                         since isPersistent() would have returned false */
00388                                                 size_t expire_timeout = expire_time - current_time;
00389                                                 iter->detach();
00390 
00391                                                 /* if persistentFound is false, no persistent registration was found yet */
00392                                                 if(!persistentFound)
00393                                                 {
00394                                                         persistentFound = true;
00395                                                         new_timeout = expire_timeout;
00396                                                 }
00397                                                 else if(expire_timeout < new_timeout)
00398                                                 {
00399                                                         new_timeout = expire_timeout;
00400                                                 }
00401                                         }
00402                                 }
00403                                 catch(const Registration::AlreadyAttachedException &ex)
00404                                 {
00405                                 }
00406                         }
00407 
00408                         if(!persistentFound) throw ibrcommon::Timer::StopTimerException();
00409 
00410                         return new_timeout;
00411                 }
00412         }
00413 }