IBR-DTNSuite
0.8
|
00001 /* 00002 * ApiServer.cpp 00003 * 00004 * Created on: 24.06.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "Configuration.h" 00010 #include "api/ApiServer.h" 00011 #include "core/BundleCore.h" 00012 #include "routing/QueueBundleEvent.h" 00013 #include "net/BundleReceivedEvent.h" 00014 #include "core/NodeEvent.h" 00015 #include <ibrdtn/data/AgeBlock.h> 00016 #include <ibrcommon/Logger.h> 00017 #include <typeinfo> 00018 #include <algorithm> 00019 #include <unistd.h> 00020 00021 #ifdef WITH_COMPRESSION 00022 #include <ibrdtn/data/CompressedPayloadBlock.h> 00023 #endif 00024 00025 #ifdef WITH_BUNDLE_SECURITY 00026 #include "security/SecurityManager.h" 00027 #endif 00028 00029 namespace dtn 00030 { 00031 namespace api 00032 { 00033 ApiServer::ApiServer(const ibrcommon::File &socket) 00034 : _srv(socket), _shutdown(false), _garbage_collector(*this) 00035 { 00036 } 00037 00038 ApiServer::ApiServer(const ibrcommon::vinterface &net, int port) 00039 : _srv(), _shutdown(false), _garbage_collector(*this) 00040 { 00041 _srv.bind(net, port); 00042 } 00043 00044 ApiServer::~ApiServer() 00045 { 00046 _garbage_collector.stop(); 00047 _garbage_collector.join(); 00048 join(); 00049 } 00050 00051 void ApiServer::__cancellation() 00052 { 00053 _srv.shutdown(); 00054 } 00055 00056 void ApiServer::componentUp() 00057 { 00058 _srv.listen(5); 00059 bindEvent(dtn::routing::QueueBundleEvent::className); 00060 bindEvent(dtn::core::NodeEvent::className); 00061 startGarbageCollector(); 00062 } 00063 00064 void ApiServer::componentRun() 00065 { 00066 try { 00067 while (!_shutdown) 00068 { 00069 // accept the next client 00070 ibrcommon::tcpstream *conn = _srv.accept(); 00071 00072 if (_shutdown) 00073 { 00074 conn->close(); 00075 delete conn; 00076 return; 00077 } 00078 00079 // generate some output 00080 IBRCOMMON_LOGGER_DEBUG(5) << "new connected client at the extended API server" << IBRCOMMON_LOGGER_ENDL; 00081 00082 // send welcome banner 00083 (*conn) << "IBR-DTN " << dtn::daemon::Configuration::getInstance().version() << " API 1.0" << std::endl; 00084 00085 00086 ClientHandler *obj; 00087 { 00088 ibrcommon::MutexLock l1(_registration_lock); 00089 // create a new registration 00090 Registration reg; _registrations.push_back(reg); 00091 IBRCOMMON_LOGGER_DEBUG(5) << "new registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL; 00092 00093 obj = new ClientHandler(*this, _registrations.back(), conn); 00094 } 00095 00096 ibrcommon::MutexLock l2(_connection_lock); 00097 _connections.push_back(obj); 00098 00099 // start the client handler 00100 obj->start(); 00101 00102 // breakpoint 00103 ibrcommon::Thread::yield(); 00104 } 00105 } catch (const std::exception&) { 00106 // ignore all errors 00107 return; 00108 } 00109 } 00110 00111 void ApiServer::componentDown() 00112 { 00113 unbindEvent(dtn::routing::QueueBundleEvent::className); 00114 unbindEvent(dtn::core::NodeEvent::className); 00115 00116 // close the listen API socket 00117 _srv.close(); 00118 _shutdown = true; 00119 00120 _garbage_collector.pause(); 00121 00122 { 00123 ibrcommon::MutexLock l(_connection_lock); 00124 00125 // shutdown all clients 00126 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00127 { 00128 (*iter)->stop(); 00129 } 00130 } 00131 00132 // wait until all clients are down 00133 while (_connections.size() > 0) ::sleep(1); 00134 } 00135 00136 void ApiServer::processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle) 00137 { 00138 // check address fields for "api:me", this has to be replaced 00139 static const dtn::data::EID clienteid("api:me"); 00140 00141 // set the source address to the sending EID 00142 bundle._source = source; 00143 00144 if (bundle._destination == clienteid) bundle._destination = source; 00145 if (bundle._reportto == clienteid) bundle._reportto = source; 00146 if (bundle._custodian == clienteid) bundle._custodian = source; 00147 00148 // if the timestamp is not set, add a ageblock 00149 if (bundle._timestamp == 0) 00150 { 00151 // check for ageblock 00152 try { 00153 bundle.getBlock<dtn::data::AgeBlock>(); 00154 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { 00155 // add a new ageblock 00156 bundle.push_front<dtn::data::AgeBlock>(); 00157 } 00158 } 00159 00160 #ifdef WITH_COMPRESSION 00161 // if the compression bit is set, then compress the bundle 00162 if (bundle.get(dtn::data::PrimaryBlock::IBRDTN_REQUEST_COMPRESSION)) 00163 { 00164 try { 00165 dtn::data::CompressedPayloadBlock::compress(bundle, dtn::data::CompressedPayloadBlock::COMPRESSION_ZLIB); 00166 } catch (const ibrcommon::Exception &ex) { 00167 IBRCOMMON_LOGGER(warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00168 }; 00169 } 00170 #endif 00171 00172 #ifdef WITH_BUNDLE_SECURITY 00173 // if the encrypt bit is set, then try to encrypt the bundle 00174 if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT)) 00175 { 00176 try { 00177 dtn::security::SecurityManager::getInstance().encrypt(bundle); 00178 00179 bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT, false); 00180 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00181 // sign requested, but no key is available 00182 IBRCOMMON_LOGGER(warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL; 00183 } catch (const dtn::security::SecurityManager::EncryptException&) { 00184 IBRCOMMON_LOGGER(warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL; 00185 } 00186 } 00187 00188 // if the sign bit is set, then try to sign the bundle 00189 if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN)) 00190 { 00191 try { 00192 dtn::security::SecurityManager::getInstance().sign(bundle); 00193 00194 bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN, false); 00195 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00196 // sign requested, but no key is available 00197 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL; 00198 } 00199 } 00200 #endif 00201 00202 // raise default bundle received event 00203 dtn::net::BundleReceivedEvent::raise(source, bundle, true, true); 00204 } 00205 00206 Registration& ApiServer::getRegistration(const std::string &handle) 00207 { 00208 ibrcommon::MutexLock l(_registration_lock); 00209 for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); iter++) 00210 { 00211 if (*iter == handle) 00212 { 00213 if (iter->isPersistent()){ 00214 iter->attach(); 00215 return *iter; 00216 } 00217 break; 00218 } 00219 } 00220 00221 throw Registration::NotFoundException("Registration not found"); 00222 } 00223 00224 size_t ApiServer::timeout(ibrcommon::Timer*) 00225 { 00226 { 00227 ibrcommon::MutexLock l(_registration_lock); 00228 00229 // remove non-persistent and detached registrations 00230 for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end();) 00231 { 00232 try 00233 { 00234 iter->attach(); 00235 if(!iter->isPersistent()){ 00236 IBRCOMMON_LOGGER_DEBUG(5) << "release registration " << iter->getHandle() << IBRCOMMON_LOGGER_ENDL; 00237 iter = _registrations.erase(iter); 00238 } 00239 else 00240 { 00241 iter->detach(); 00242 iter++; 00243 } 00244 } 00245 catch(const Registration::AlreadyAttachedException &ex) 00246 { 00247 iter++; 00248 } 00249 } 00250 } 00251 00252 return nextRegistrationExpiry(); 00253 } 00254 00255 const std::string ApiServer::getName() const 00256 { 00257 return "ApiServer"; 00258 } 00259 00260 void ApiServer::connectionUp(ClientHandler *obj) 00261 { 00262 // generate some output 00263 IBRCOMMON_LOGGER_DEBUG(5) << "api connection up" << IBRCOMMON_LOGGER_ENDL; 00264 } 00265 00266 void ApiServer::connectionDown(ClientHandler *obj) 00267 { 00268 // generate some output 00269 IBRCOMMON_LOGGER_DEBUG(5) << "api connection down" << IBRCOMMON_LOGGER_ENDL; 00270 00271 ibrcommon::MutexLock l(_connection_lock); 00272 00273 // remove this object out of the list 00274 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00275 { 00276 if (obj == (*iter)) 00277 { 00278 _connections.erase(iter); 00279 break; 00280 } 00281 } 00282 } 00283 00284 void ApiServer::freeRegistration(Registration ®) 00285 { 00286 if(reg.isPersistent()) 00287 { 00288 reg.detach(); 00289 startGarbageCollector(); 00290 } 00291 else 00292 { 00293 ibrcommon::MutexLock l(_registration_lock); 00294 // remove the registration 00295 for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); iter++) 00296 { 00297 if (reg == (*iter)) 00298 { 00299 IBRCOMMON_LOGGER_DEBUG(5) << "release registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL; 00300 _registrations.erase(iter); 00301 break; 00302 } 00303 } 00304 } 00305 } 00306 00307 void ApiServer::raiseEvent(const dtn::core::Event *evt) 00308 { 00309 try { 00310 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00311 00312 // ignore fragments - we can not deliver them directly to the client 00313 if (queued.bundle.fragment) return; 00314 00315 ibrcommon::MutexLock l(_connection_lock); 00316 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00317 { 00318 ClientHandler &conn = **iter; 00319 if (conn.getRegistration().hasSubscribed(queued.bundle.destination)) 00320 { 00321 conn.getRegistration().notify(Registration::NOTIFY_BUNDLE_AVAILABLE); 00322 } 00323 } 00324 } catch (const std::bad_cast&) { }; 00325 00326 try { 00327 const dtn::core::NodeEvent &ne = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00328 00329 ibrcommon::MutexLock l(_connection_lock); 00330 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00331 { 00332 ClientHandler &conn = **iter; 00333 00334 if (ne.getAction() == NODE_AVAILABLE) 00335 { 00336 conn.eventNodeAvailable(ne.getNode()); 00337 } 00338 else if (ne.getAction() == NODE_UNAVAILABLE) 00339 { 00340 conn.eventNodeUnavailable(ne.getNode()); 00341 } 00342 } 00343 } catch (const std::bad_cast&) { }; 00344 } 00345 00346 void ApiServer::startGarbageCollector() 00347 { 00348 try 00349 { 00350 /* set the timeout for the GarbageCollector */ 00351 size_t timeout = nextRegistrationExpiry(); 00352 _garbage_collector.set(timeout); 00353 00354 /* start it, if it is not running yet */ 00355 if(!_garbage_collector.isRunning()) 00356 _garbage_collector.start(); 00357 } 00358 catch(const ibrcommon::Timer::StopTimerException &ex) 00359 { 00360 } 00361 } 00362 00363 size_t ApiServer::nextRegistrationExpiry() 00364 { 00365 ibrcommon::MutexLock l(_registration_lock); 00366 bool persistentFound = false; 00367 size_t new_timeout = 0; 00368 size_t current_time = ibrcommon::Timer::get_current_time(); 00369 00370 // find the registration that expires next 00371 for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); iter++) 00372 { 00373 try 00374 { 00375 iter->attach(); 00376 if(!iter->isPersistent()){ 00377 /* found an expired registration, trigger the timer */ 00378 iter->detach(); 00379 new_timeout = 0; 00380 persistentFound = true; 00381 break; 00382 } 00383 else 00384 { 00385 size_t expire_time = iter->getExpireTime(); 00386 /* we dont have to check if the expire time is smaller then the current_time 00387 since isPersistent() would have returned false */ 00388 size_t expire_timeout = expire_time - current_time; 00389 iter->detach(); 00390 00391 /* if persistentFound is false, no persistent registration was found yet */ 00392 if(!persistentFound) 00393 { 00394 persistentFound = true; 00395 new_timeout = expire_timeout; 00396 } 00397 else if(expire_timeout < new_timeout) 00398 { 00399 new_timeout = expire_timeout; 00400 } 00401 } 00402 } 00403 catch(const Registration::AlreadyAttachedException &ex) 00404 { 00405 } 00406 } 00407 00408 if(!persistentFound) throw ibrcommon::Timer::StopTimerException(); 00409 00410 return new_timeout; 00411 } 00412 } 00413 }