IBR-DTNSuite
0.8
|
00001 /* 00002 * ExtendedApiHandler.cpp 00003 * 00004 * Created on: 10.10.2011 00005 * Author: morgenro,roettger 00006 */ 00007 00008 #include "config.h" 00009 #include "Configuration.h" 00010 #include "api/ExtendedApiHandler.h" 00011 #include "net/BundleReceivedEvent.h" 00012 #include "core/BundleEvent.h" 00013 #include <ibrcommon/Logger.h> 00014 #include <ibrdtn/api/PlainSerializer.h> 00015 #include <ibrdtn/data/AgeBlock.h> 00016 #include <ibrdtn/utils/Utils.h> 00017 #include <ibrcommon/data/Base64Reader.h> 00018 #include "core/BundleCore.h" 00019 #include <ibrdtn/utils/Random.h> 00020 00021 #ifdef WITH_COMPRESSION 00022 #include <ibrdtn/data/CompressedPayloadBlock.h> 00023 #endif 00024 00025 #ifdef WITH_BUNDLE_SECURITY 00026 #include "security/SecurityManager.h" 00027 #endif 00028 00029 namespace dtn 00030 { 00031 namespace api 00032 { 00033 ExtendedApiHandler::ExtendedApiHandler(ClientHandler &client, ibrcommon::tcpstream &stream) 00034 : ProtocolHandler(client, stream), _sender(new Sender(*this)), 00035 _endpoint(_client.getRegistration().getDefaultEID()) 00036 { 00037 _client.getRegistration().subscribe(_endpoint); 00038 } 00039 00040 ExtendedApiHandler::~ExtendedApiHandler() 00041 { 00042 _client.getRegistration().abort(); 00043 _sender->join(); 00044 } 00045 00046 bool ExtendedApiHandler::good() const{ 00047 return _stream.good(); 00048 } 00049 00050 void ExtendedApiHandler::__cancellation() 00051 { 00052 // close the stream 00053 try { 00054 _stream.close(); 00055 } catch (const ibrcommon::ConnectionClosedException&) { }; 00056 } 00057 00058 void ExtendedApiHandler::finally() 00059 { 00060 IBRCOMMON_LOGGER_DEBUG(60) << "ExtendedApiConnection down" << IBRCOMMON_LOGGER_ENDL; 00061 00062 _client.getRegistration().abort(); 00063 00064 // close the stream 00065 try { 00066 _stream.close(); 00067 } catch (const ibrcommon::ConnectionClosedException&) { }; 00068 00069 try { 00070 // shutdown the sender thread 00071 _sender->stop(); 00072 } catch (const std::exception&) { }; 00073 } 00074 00075 void ExtendedApiHandler::run() 00076 { 00077 _sender->start(); 00078 00079 std::string buffer; 00080 _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO EXTENDED" << std::endl; 00081 00082 while (_stream.good()) 00083 { 00084 getline(_stream, buffer); 00085 00086 std::string::reverse_iterator iter = buffer.rbegin(); 00087 if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1); 00088 00089 std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer); 00090 if (cmd.size() == 0) continue; 00091 00092 try { 00093 if (cmd[0] == "set") 00094 { 00095 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00096 00097 if (cmd[1] == "endpoint") 00098 { 00099 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00100 00101 ibrcommon::MutexLock l(_write_lock); 00102 dtn::data::EID new_endpoint = dtn::core::BundleCore::local + "/" + cmd[2]; 00103 00104 // error checking 00105 if (new_endpoint == dtn::data::EID()) 00106 { 00107 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl; 00108 } 00109 else 00110 { 00111 /* unsubscribe from the old endpoint and subscribe to the new one */ 00112 Registration& reg = _client.getRegistration(); 00113 reg.unsubscribe(_endpoint); 00114 reg.subscribe(new_endpoint); 00115 _endpoint = new_endpoint; 00116 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl; 00117 } 00118 } 00119 else 00120 { 00121 ibrcommon::MutexLock l(_write_lock); 00122 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00123 } 00124 } 00125 else if (cmd[0] == "endpoint") 00126 { 00127 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00128 00129 if (cmd[1] == "add") 00130 { 00131 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00132 00133 ibrcommon::MutexLock l(_write_lock); 00134 dtn::data::EID new_endpoint = dtn::core::BundleCore::local + "/" + cmd[2]; 00135 00136 // error checking 00137 if (new_endpoint == dtn::data::EID()) 00138 { 00139 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl; 00140 } 00141 else 00142 { 00143 _client.getRegistration().subscribe(new_endpoint); 00144 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl; 00145 } 00146 } 00147 else if (cmd[1] == "del") 00148 { 00149 dtn::data::EID del_endpoint = _endpoint; 00150 if (cmd.size() >= 3) 00151 { 00152 del_endpoint = dtn::core::BundleCore::local + "/" + cmd[2]; 00153 } 00154 00155 ibrcommon::MutexLock l(_write_lock); 00156 00157 // error checking 00158 if (del_endpoint == dtn::data::EID()) 00159 { 00160 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl; 00161 } 00162 else 00163 { 00164 _client.getRegistration().unsubscribe(del_endpoint); 00165 00166 if(_endpoint == del_endpoint) 00167 { 00168 _endpoint = _client.getRegistration().getDefaultEID(); 00169 _client.getRegistration().subscribe(_endpoint); 00170 } 00171 00172 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl; 00173 } 00174 } 00175 else if (cmd[1] == "get") 00176 { 00177 _stream << ClientHandler::API_STATUS_OK << " ENDPOINT GET " << _endpoint.getString() << std::endl; 00178 } 00179 else 00180 { 00181 ibrcommon::MutexLock l(_write_lock); 00182 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00183 } 00184 } 00185 else if (cmd[0] == "registration") 00186 { 00187 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00188 00189 if (cmd[1] == "add") 00190 { 00191 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00192 00193 ibrcommon::MutexLock l(_write_lock); 00194 dtn::data::EID endpoint(cmd[2]); 00195 00196 // error checking 00197 if (endpoint == dtn::data::EID()) 00198 { 00199 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl; 00200 } 00201 else 00202 { 00203 _client.getRegistration().subscribe(endpoint); 00204 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl; 00205 } 00206 } 00207 else if (cmd[1] == "del") 00208 { 00209 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00210 00211 ibrcommon::MutexLock l(_write_lock); 00212 dtn::data::EID endpoint(cmd[2]); 00213 00214 // error checking 00215 if (endpoint == dtn::data::EID()) 00216 { 00217 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl; 00218 } 00219 else 00220 { 00221 _client.getRegistration().unsubscribe(endpoint); 00222 if(_endpoint == endpoint) 00223 { 00224 _endpoint = _client.getRegistration().getDefaultEID(); 00225 _client.getRegistration().subscribe(_endpoint); 00226 } 00227 00228 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl; 00229 } 00230 } 00231 else if (cmd[1] == "list") 00232 { 00233 ibrcommon::MutexLock l(_write_lock); 00234 const std::set<dtn::data::EID> list = _client.getRegistration().getSubscriptions(); 00235 00236 _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LIST" << std::endl; 00237 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00238 { 00239 _stream << (*iter).getString() << std::endl; 00240 } 00241 _stream << std::endl; 00242 } 00243 else if (cmd[1] == "save") 00244 { 00245 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00246 00247 size_t lifetime = 0; 00248 std::stringstream ss(cmd[2]); 00249 00250 ss >> lifetime; 00251 if(ss.fail()) throw ibrcommon::Exception("malformed command"); 00252 00253 /* make the registration persistent for a given lifetime */ 00254 _client.getRegistration().setPersistent(lifetime); 00255 00256 ibrcommon::MutexLock l(_write_lock); 00257 _stream << ClientHandler::API_STATUS_OK << " REGISTRATION SAVE " << _client.getRegistration().getHandle() << std::endl; 00258 } 00259 else if (cmd[1] == "load") 00260 { 00261 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00262 00263 const std::string handle = cmd[2]; 00264 00265 try 00266 { 00267 Registration& reg = _client.getAPIServer().getRegistration(handle); 00268 00269 /* stop the sender */ 00270 _client.getRegistration().abort(); 00271 _sender->join(); 00272 00273 /* switch the registration */ 00274 _client.switchRegistration(reg); 00275 00276 /* and switch the sender */ 00277 Sender *old_sender = _sender; 00278 try{ 00279 _sender = new Sender(*this); 00280 } 00281 catch (const std::bad_alloc &ex) 00282 { 00283 _sender = old_sender; 00284 throw ex; 00285 } 00286 delete old_sender; 00287 _sender->start(); 00288 00289 ibrcommon::MutexLock l(_write_lock); 00290 _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LOAD" << std::endl; 00291 } 00292 catch (const Registration::AlreadyAttachedException& ex) 00293 { 00294 ibrcommon::MutexLock l(_write_lock); 00295 _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION BUSY" << std::endl; 00296 } 00297 catch (const Registration::NotFoundException& ex) 00298 { 00299 ibrcommon::MutexLock l(_write_lock); 00300 _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION NOT FOUND" << std::endl; 00301 } 00302 } 00303 else 00304 { 00305 ibrcommon::MutexLock l(_write_lock); 00306 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00307 } 00308 } 00309 else if (cmd[0] == "neighbor") 00310 { 00311 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00312 00313 if (cmd[1] == "list") 00314 { 00315 ibrcommon::MutexLock l(_write_lock); 00316 const std::set<dtn::core::Node> nlist = dtn::core::BundleCore::getInstance().getNeighbors(); 00317 00318 _stream << ClientHandler::API_STATUS_OK << " NEIGHBOR LIST" << std::endl; 00319 for (std::set<dtn::core::Node>::const_iterator iter = nlist.begin(); iter != nlist.end(); iter++) 00320 { 00321 _stream << (*iter).getEID().getString() << std::endl; 00322 } 00323 _stream << std::endl; 00324 } 00325 else 00326 { 00327 ibrcommon::MutexLock l(_write_lock); 00328 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00329 } 00330 } 00331 else if (cmd[0] == "bundle") 00332 { 00333 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00334 00335 if (cmd[1] == "get") 00336 { 00337 // transfer bundle data 00338 ibrcommon::MutexLock l(_write_lock); 00339 00340 if (cmd.size() == 2) 00341 { 00342 _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET "; sayBundleID(_stream, _bundle_reg); _stream << std::endl; 00343 PlainSerializer(_stream) << _bundle_reg; 00344 } 00345 else if (cmd[2] == "binary") 00346 { 00347 _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET BINARY "; sayBundleID(_stream, _bundle_reg); _stream << std::endl; 00348 dtn::data::DefaultSerializer(_stream) << _bundle_reg; _stream << std::flush; 00349 } 00350 else if (cmd[2] == "plain") 00351 { 00352 _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET PLAIN "; sayBundleID(_stream, _bundle_reg); _stream << std::endl; 00353 PlainSerializer(_stream) << _bundle_reg; 00354 } 00355 else if (cmd[2] == "xml") 00356 { 00357 _stream << ClientHandler::API_STATUS_NOT_IMPLEMENTED << " FORMAT NOT IMPLEMENTED" << std::endl; 00358 } 00359 else 00360 { 00361 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN FORMAT" << std::endl; 00362 } 00363 } 00364 else if (cmd[1] == "put") 00365 { 00366 // lock the stream during reception of bundle data 00367 ibrcommon::MutexLock l(_write_lock); 00368 00369 if (cmd.size() < 3) 00370 { 00371 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl; 00372 } 00373 else if (cmd[2] == "plain") 00374 { 00375 _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE PLAIN" << std::endl; 00376 00377 try { 00378 PlainDeserializer(_stream) >> _bundle_reg; 00379 _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl; 00380 } catch (const std::exception &ex) { 00381 IBRCOMMON_LOGGER_DEBUG(20) << "API put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00382 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl; 00383 00384 } 00385 } 00386 else if (cmd[2] == "binary") 00387 { 00388 _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE BINARY" << std::endl; 00389 00390 try { 00391 dtn::data::DefaultDeserializer(_stream) >> _bundle_reg; 00392 _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl; 00393 } catch (const std::exception &ex) { 00394 IBRCOMMON_LOGGER_DEBUG(20) << "API put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00395 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl; 00396 } 00397 } 00398 else 00399 { 00400 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl; 00401 } 00402 } 00403 else if (cmd[1] == "load") 00404 { 00405 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00406 00407 dtn::data::BundleID id; 00408 00409 if (cmd[2] == "queue") 00410 { 00411 id = _bundle_queue.getnpop(); 00412 } 00413 else 00414 { 00415 // construct bundle id 00416 id = readBundleID(cmd, 2); 00417 } 00418 00419 // load the bundle 00420 try { 00421 _bundle_reg = dtn::core::BundleCore::getInstance().getStorage().get(id); 00422 00423 // process the bundle block (security, compression, ...) 00424 dtn::core::BundleCore::processBlocks(_bundle_reg); 00425 00426 ibrcommon::MutexLock l(_write_lock); 00427 _stream << ClientHandler::API_STATUS_OK << " BUNDLE LOADED "; sayBundleID(_stream, id); _stream << std::endl; 00428 } catch (const ibrcommon::Exception&) { 00429 ibrcommon::MutexLock l(_write_lock); 00430 _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl; 00431 } 00432 } 00433 else if (cmd[1] == "clear") 00434 { 00435 _bundle_reg = dtn::data::Bundle(); 00436 00437 ibrcommon::MutexLock l(_write_lock); 00438 _stream << ClientHandler::API_STATUS_OK << " BUNDLE CLEARED" << std::endl; 00439 } 00440 else if (cmd[1] == "free") 00441 { 00442 try { 00443 dtn::core::BundleCore::getInstance().getStorage().remove(_bundle_reg); 00444 _bundle_reg = dtn::data::Bundle(); 00445 ibrcommon::MutexLock l(_write_lock); 00446 _stream << ClientHandler::API_STATUS_OK << " BUNDLE FREE SUCCESSFUL" << std::endl; 00447 } catch (const ibrcommon::Exception&) { 00448 ibrcommon::MutexLock l(_write_lock); 00449 _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl; 00450 } 00451 } 00452 else if (cmd[1] == "delivered") 00453 { 00454 if (cmd.size() < 5) throw ibrcommon::Exception("not enough parameters"); 00455 00456 try { 00457 // construct bundle id 00458 dtn::data::BundleID id = readBundleID(cmd, 2); 00459 00460 // announce this bundle as delivered 00461 dtn::data::MetaBundle meta = dtn::core::BundleCore::getInstance().getStorage().get(id); 00462 _client.getRegistration().delivered(meta); 00463 00464 ibrcommon::MutexLock l(_write_lock); 00465 _stream << ClientHandler::API_STATUS_OK << " BUNDLE DELIVERED ACCEPTED" << std::endl; 00466 } catch (const ibrcommon::Exception&) { 00467 ibrcommon::MutexLock l(_write_lock); 00468 _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl; 00469 } 00470 } 00471 else if (cmd[1] == "store") 00472 { 00473 // store the bundle in the storage 00474 try { 00475 dtn::core::BundleCore::getInstance().getStorage().store(_bundle_reg); 00476 ibrcommon::MutexLock l(_write_lock); 00477 _stream << ClientHandler::API_STATUS_OK << " BUNDLE STORE SUCCESSFUL" << std::endl; 00478 } catch (const ibrcommon::Exception&) { 00479 ibrcommon::MutexLock l(_write_lock); 00480 _stream << ClientHandler::API_STATUS_INTERNAL_ERROR << " BUNDLE STORE FAILED" << std::endl; 00481 } 00482 } 00483 else if (cmd[1] == "send") 00484 { 00485 // create a new sequence number 00486 _bundle_reg.relabel(); 00487 00488 // forward the bundle to the storage processing 00489 _client.getAPIServer().processIncomingBundle(_endpoint, _bundle_reg); 00490 00491 ibrcommon::MutexLock l(_write_lock); 00492 _stream << ClientHandler::API_STATUS_OK << " BUNDLE SENT" << std::endl; 00493 } 00494 else if (cmd[1] == "info") 00495 { 00496 // transfer bundle data 00497 ibrcommon::MutexLock l(_write_lock); 00498 00499 _stream << ClientHandler::API_STATUS_OK << " BUNDLE INFO "; sayBundleID(_stream, _bundle_reg); _stream << std::endl; 00500 PlainSerializer(_stream, true) << _bundle_reg; 00501 } 00502 else if (cmd[1] == "block") 00503 { 00504 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00505 00506 if (cmd[2] == "add") 00507 { 00508 PlainDeserializer::BlockInserter inserter(_bundle_reg, PlainDeserializer::BlockInserter::END); 00509 00510 /* parse an optional offset, where to insert the block */ 00511 if (cmd.size() > 3) 00512 { 00513 int offset; 00514 istringstream ss(cmd[3]); 00515 00516 ss >> offset; 00517 if (ss.fail()) throw ibrcommon::Exception("malformed command"); 00518 00519 if (offset >= _bundle_reg.blockCount()) 00520 { 00521 inserter = PlainDeserializer::BlockInserter(_bundle_reg, PlainDeserializer::BlockInserter::END); 00522 } 00523 else if(offset <= 0) 00524 { 00525 inserter = PlainDeserializer::BlockInserter(_bundle_reg, PlainDeserializer::BlockInserter::FRONT); 00526 } 00527 else 00528 { 00529 inserter = PlainDeserializer::BlockInserter(_bundle_reg, PlainDeserializer::BlockInserter::MIDDLE, offset); 00530 } 00531 } 00532 00533 ibrcommon::MutexLock l(_write_lock); 00534 _stream << ClientHandler::API_STATUS_CONTINUE << " BUNDLE BLOCK ADD" << std::endl; 00535 00536 try 00537 { 00538 dtn::data::Block &block = PlainDeserializer(_stream).readBlock(inserter, _bundle_reg.get(dtn::data::Bundle::APPDATA_IS_ADMRECORD)); 00539 if (inserter.getAlignment() == PlainDeserializer::BlockInserter::END) 00540 { 00541 block.set(dtn::data::Block::LAST_BLOCK, true); 00542 } 00543 else 00544 { 00545 block.set(dtn::data::Block::LAST_BLOCK, false); 00546 } 00547 _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK ADD SUCCESSFUL" << std::endl; 00548 } 00549 catch (const PlainDeserializer::PlainDeserializerException &ex){ 00550 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK ADD FAILED" << std::endl; 00551 } 00552 } 00553 else if (cmd[2] == "del") 00554 { 00555 if (cmd.size() < 4) throw ibrcommon::Exception("not enough parameters"); 00556 00557 int offset; 00558 istringstream ss(cmd[3]); 00559 00560 ss >> offset; 00561 if (ss.fail()) throw ibrcommon::Exception("malformed command"); 00562 00563 _bundle_reg.remove(_bundle_reg.getBlock(offset)); 00564 00565 ibrcommon::MutexLock l(_write_lock); 00566 _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK DEL SUCCESSFUL" << std::endl; 00567 } 00568 else 00569 { 00570 ibrcommon::MutexLock l(_write_lock); 00571 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00572 } 00573 } 00574 else 00575 { 00576 ibrcommon::MutexLock l(_write_lock); 00577 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00578 } 00579 } 00580 else if (cmd[0] == "payload") 00581 { 00582 int block_offset; 00583 int cmd_index = 1; 00584 dtn::data::Block& b = _bundle_reg.getBlock<dtn::data::PayloadBlock>(); 00585 00586 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00587 00588 istringstream ss(cmd[1]); 00589 ss >> block_offset; 00590 if(!ss.fail()) 00591 { 00592 cmd_index++; 00593 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00594 00595 if(block_offset < 0 || block_offset >= _bundle_reg.getBlocks().size()){ 00596 throw ibrcommon::Exception("invalid offset"); 00597 } 00598 00599 b = _bundle_reg.getBlock(block_offset); 00600 } 00601 00602 int cmd_remaining = cmd.size() - (cmd_index + 1); 00603 if (cmd[cmd_index] == "get") 00604 { 00605 ibrcommon::MutexLock l(_write_lock); 00606 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl; 00607 00608 int payload_offset = 0; 00609 int length = 0; 00610 00611 if(cmd_remaining > 0) 00612 { 00613 00614 /* read the payload offset */ 00615 ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset; 00616 if(payload_offset < 0) 00617 { 00618 payload_offset = 0; 00619 } 00620 00621 if(cmd_remaining > 1) 00622 { 00623 ss.clear(); ss.str(cmd[cmd_index+2]); ss >> length; 00624 } 00625 } 00626 00627 try{ 00628 size_t slength = 0; 00629 ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create(); 00630 00631 /* acquire the iostream to lock the blob */ 00632 ibrcommon::BLOB::iostream blob_stream = blob.iostream(); 00633 /* serialize the payload of the bundle into the blob */ 00634 b.serialize(*blob_stream, slength); 00635 00636 if(length <= 0 || (length + payload_offset) > slength) 00637 { 00638 length = slength - payload_offset; 00639 } 00640 00641 blob_stream->ignore(payload_offset); 00642 00643 PlainSerializer(_stream, false).serialize(blob_stream, length); 00644 00645 _stream << std::endl; 00646 00647 } catch (const std::exception&) { 00648 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED" << std::endl; 00649 } 00650 } 00651 else if (cmd[cmd_index] == "put") 00652 { 00653 ibrcommon::MutexLock l(_write_lock); 00654 _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD PUT" << std::endl; 00655 00656 int payload_offset = 0; 00657 if(cmd_remaining > 0) 00658 { 00659 00660 /* read the payload offset */ 00661 ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset; 00662 if(payload_offset < 0) 00663 { 00664 payload_offset = 0; 00665 } 00666 } 00667 00668 try 00669 { 00670 size_t slength = 0; 00671 ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create(); 00672 00673 /* acquire the iostream to lock the blob */ 00674 ibrcommon::BLOB::iostream blob_stream = blob.iostream(); 00675 /* serialize the payload of the bundle into the blob */ 00676 b.serialize(*blob_stream, slength); 00677 00678 /* move the streams put pointer to the given offset */ 00679 if(payload_offset < slength){ 00680 blob_stream->seekp(payload_offset, ios_base::beg); 00681 } 00682 00683 /* write the new data into the blob */ 00684 PlainDeserializer(_stream) >> blob_stream; 00685 00686 /* write the result into the block */ 00687 b.deserialize(*blob_stream, blob_stream.size()); 00688 00689 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD PUT SUCCESSFUL" << std::endl; 00690 00691 } catch (const std::exception&) { 00692 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED" << std::endl; 00693 } 00694 } 00695 else if (cmd[cmd_index] == "append"){ 00696 ibrcommon::MutexLock l(_write_lock); 00697 _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD APPEND" << std::endl; 00698 00699 try { 00700 size_t slength = 0; 00701 ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create(); 00702 00703 /* acquire the iostream to lock the blob */ 00704 ibrcommon::BLOB::iostream blob_stream = blob.iostream(); 00705 /* serialize old and new data into the blob */ 00706 b.serialize(*blob_stream, slength); 00707 PlainDeserializer(_stream) >> blob_stream; 00708 00709 /* write the result into the payload of the block */ 00710 b.deserialize(*blob_stream, blob_stream.size()); 00711 00712 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD APPEND SUCCESSFUL" << std::endl; 00713 00714 } catch (const std::exception&) { 00715 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED" << std::endl; 00716 } 00717 00718 } 00719 else if (cmd[cmd_index] == "clear"){ 00720 00721 std::stringstream stream; 00722 b.deserialize(stream, 0); 00723 00724 ibrcommon::MutexLock l(_write_lock); 00725 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD CLEAR SUCCESSFUL" << std::endl; 00726 } 00727 else if (cmd[cmd_index] == "length"){ 00728 ibrcommon::MutexLock l(_write_lock); 00729 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD LENGTH" << std::endl; 00730 _stream << "Length: " << b.getLength() << std::endl; 00731 } 00732 } 00733 else if (cmd[0] == "nodename") 00734 { 00735 _stream << ClientHandler::API_STATUS_OK << " NODENAME " << dtn::core::BundleCore::local.getString() << std::endl; 00736 } 00737 else 00738 { 00739 ibrcommon::MutexLock l(_write_lock); 00740 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00741 } 00742 } catch (const std::exception&) { 00743 ibrcommon::MutexLock l(_write_lock); 00744 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl; 00745 } 00746 } 00747 } 00748 00749 // void ExtendedApiConnection::eventNodeAvailable(const dtn::core::Node &node) 00750 // { 00751 // ibrcommon::MutexLock l(_write_lock); 00752 // _stream << API_STATUS_NOTIFY_NEIGHBOR << " NOTIFY NODE AVAILABLE " << node.getEID().getString() << std::endl; 00753 // } 00754 00755 // void ExtendedApiConnection::eventNodeUnavailable(const dtn::core::Node &node) 00756 // { 00757 // ibrcommon::MutexLock l(_write_lock); 00758 // _stream << API_STATUS_NOTIFY_NEIGHBOR << " NOTIFY NODE UNAVAILABLE " << node.getEID().getString() << std::endl; 00759 // } 00760 00761 ExtendedApiHandler::Sender::Sender(ExtendedApiHandler &conn) 00762 : _handler(conn) 00763 { 00764 } 00765 00766 ExtendedApiHandler::Sender::~Sender() 00767 { 00768 ibrcommon::JoinableThread::join(); 00769 } 00770 00771 void ExtendedApiHandler::Sender::__cancellation() 00772 { 00773 // abort all blocking calls on the registration object 00774 _handler._client.getRegistration().abort(); 00775 } 00776 00777 void ExtendedApiHandler::Sender::finally() 00778 { 00779 // _handler._server.freeRegistration(_handler.getRegistration()); 00780 } 00781 00782 void ExtendedApiHandler::Sender::run() 00783 { 00784 Registration ® = _handler._client.getRegistration(); 00785 try{ 00786 while(_handler.good()){ 00787 try{ 00788 dtn::data::MetaBundle id = reg.receiveMetaBundle(); 00789 _handler._bundle_queue.push(id); 00790 ibrcommon::MutexLock l(_handler._write_lock); 00791 _handler._stream << API_STATUS_NOTIFY_BUNDLE << " NOTIFY BUNDLE "; 00792 sayBundleID(_handler._stream, id); 00793 _handler._stream << std::endl; 00794 00795 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00796 reg.wait_for_bundle(); 00797 } 00798 00799 yield(); 00800 } 00801 } catch (const ibrcommon::QueueUnblockedException &ex) { 00802 IBRCOMMON_LOGGER_DEBUG(40) << "ExtendedApiHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00803 return; 00804 } catch (const ibrcommon::IOException &ex) { 00805 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00806 } catch (const dtn::InvalidDataException &ex) { 00807 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00808 } catch (const std::exception &ex) { 00809 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00810 } 00811 00812 try { 00813 //FIXME 00814 // _handler.stop(); 00815 } catch (const ibrcommon::ThreadException &ex) { 00816 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL; 00817 } 00818 } 00819 00820 void ExtendedApiHandler::sayBundleID(ostream &stream, const dtn::data::BundleID &id) 00821 { 00822 stream << id.timestamp << " " << id.sequencenumber << " "; 00823 00824 if (id.fragment) 00825 { 00826 stream << id.offset << " "; 00827 } 00828 00829 stream << id.source.getString(); 00830 } 00831 00832 dtn::data::BundleID ExtendedApiHandler::readBundleID(const std::vector<std::string> &data, const size_t start) 00833 { 00834 // load bundle id 00835 std::stringstream ss; 00836 size_t timestamp = 0; 00837 size_t sequencenumber = 0; 00838 bool fragment = false; 00839 size_t offset = 0; 00840 00841 if ((data.size() - start) < 3) 00842 { 00843 throw ibrcommon::Exception("not enough parameters"); 00844 } 00845 00846 // read timestamp 00847 ss.clear(); ss.str(data[start]); ss >> timestamp; 00848 if(ss.fail()) 00849 { 00850 throw ibrcommon::Exception("malformed parameters"); 00851 } 00852 00853 // read sequence number 00854 ss.clear(); ss.str(data[start+1]); ss >> sequencenumber; 00855 if(ss.fail()) 00856 { 00857 throw ibrcommon::Exception("malformed parameters"); 00858 } 00859 00860 // read fragment offset 00861 if ((data.size() - start) > 3) 00862 { 00863 fragment = true; 00864 00865 // read sequence number 00866 ss.clear(); ss.str(data[start+2]); ss >> offset; 00867 if(ss.fail()) 00868 { 00869 throw ibrcommon::Exception("malformed parameters"); 00870 } 00871 } 00872 00873 // read EID 00874 ss.clear(); dtn::data::EID eid(data[data.size() - 1]); 00875 00876 // construct bundle id 00877 return dtn::data::BundleID(eid, timestamp, sequencenumber, fragment, offset); 00878 } 00879 } 00880 }