IBR-DTNSuite  0.8
daemon/src/api/ExtendedApiHandler.cpp
Go to the documentation of this file.
00001 /*
00002  * ExtendedApiHandler.cpp
00003  *
00004  *  Created on: 10.10.2011
00005  *      Author: morgenro,roettger
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "api/ExtendedApiHandler.h"
00011 #include "net/BundleReceivedEvent.h"
00012 #include "core/BundleEvent.h"
00013 #include <ibrcommon/Logger.h>
00014 #include <ibrdtn/api/PlainSerializer.h>
00015 #include <ibrdtn/data/AgeBlock.h>
00016 #include <ibrdtn/utils/Utils.h>
00017 #include <ibrcommon/data/Base64Reader.h>
00018 #include "core/BundleCore.h"
00019 #include <ibrdtn/utils/Random.h>
00020 
00021 #ifdef WITH_COMPRESSION
00022 #include <ibrdtn/data/CompressedPayloadBlock.h>
00023 #endif
00024 
00025 #ifdef WITH_BUNDLE_SECURITY
00026 #include "security/SecurityManager.h"
00027 #endif
00028 
00029 namespace dtn
00030 {
00031         namespace api
00032         {
00033                 ExtendedApiHandler::ExtendedApiHandler(ClientHandler &client, ibrcommon::tcpstream &stream)
00034                  : ProtocolHandler(client, stream), _sender(new Sender(*this)),
00035                    _endpoint(_client.getRegistration().getDefaultEID())
00036                 {
00037                         _client.getRegistration().subscribe(_endpoint);
00038                 }
00039 
00040                 ExtendedApiHandler::~ExtendedApiHandler()
00041                 {
00042                         _client.getRegistration().abort();
00043                         _sender->join();
00044                 }
00045 
00046                 bool ExtendedApiHandler::good() const{
00047                         return _stream.good();
00048                 }
00049 
00050                 void ExtendedApiHandler::__cancellation()
00051                 {
00052                         // close the stream
00053                         try {
00054                                 _stream.close();
00055                         } catch (const ibrcommon::ConnectionClosedException&) { };
00056                 }
00057 
00058                 void ExtendedApiHandler::finally()
00059                 {
00060                         IBRCOMMON_LOGGER_DEBUG(60) << "ExtendedApiConnection down" << IBRCOMMON_LOGGER_ENDL;
00061 
00062                         _client.getRegistration().abort();
00063 
00064                         // close the stream
00065                         try {
00066                                 _stream.close();
00067                         } catch (const ibrcommon::ConnectionClosedException&) { };
00068 
00069                         try {
00070                                 // shutdown the sender thread
00071                                 _sender->stop();
00072                         } catch (const std::exception&) { };
00073                 }
00074 
00075                 void ExtendedApiHandler::run()
00076                 {
00077                         _sender->start();
00078 
00079                         std::string buffer;
00080                         _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO EXTENDED" << std::endl;
00081 
00082                         while (_stream.good())
00083                         {
00084                                 getline(_stream, buffer);
00085 
00086                                 std::string::reverse_iterator iter = buffer.rbegin();
00087                                 if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
00088 
00089                                 std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
00090                                 if (cmd.size() == 0) continue;
00091 
00092                                 try {
00093                                         if (cmd[0] == "set")
00094                                         {
00095                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00096 
00097                                                 if (cmd[1] == "endpoint")
00098                                                 {
00099                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00100 
00101                                                         ibrcommon::MutexLock l(_write_lock);
00102                                                         dtn::data::EID new_endpoint = dtn::core::BundleCore::local + "/" + cmd[2];
00103 
00104                                                         // error checking
00105                                                         if (new_endpoint == dtn::data::EID())
00106                                                         {
00107                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
00108                                                         }
00109                                                         else
00110                                                         {
00111                                                                 /* unsubscribe from the old endpoint and subscribe to the new one */
00112                                                                 Registration& reg = _client.getRegistration();
00113                                                                 reg.unsubscribe(_endpoint);
00114                                                                 reg.subscribe(new_endpoint);
00115                                                                 _endpoint = new_endpoint;
00116                                                                 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
00117                                                         }
00118                                                 }
00119                                                 else
00120                                                 {
00121                                                         ibrcommon::MutexLock l(_write_lock);
00122                                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00123                                                 }
00124                                         }
00125                                         else if (cmd[0] == "endpoint")
00126                                         {
00127                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00128 
00129                                                 if (cmd[1] == "add")
00130                                                 {
00131                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00132 
00133                                                         ibrcommon::MutexLock l(_write_lock);
00134                                                         dtn::data::EID new_endpoint = dtn::core::BundleCore::local + "/" + cmd[2];
00135 
00136                                                         // error checking
00137                                                         if (new_endpoint == dtn::data::EID())
00138                                                         {
00139                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
00140                                                         }
00141                                                         else
00142                                                         {
00143                                                                 _client.getRegistration().subscribe(new_endpoint);
00144                                                                 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
00145                                                         }
00146                                                 }
00147                                                 else if (cmd[1] == "del")
00148                                                 {
00149                                                         dtn::data::EID del_endpoint = _endpoint;
00150                                                         if (cmd.size() >= 3)
00151                                                         {
00152                                                                 del_endpoint = dtn::core::BundleCore::local + "/" + cmd[2];
00153                                                         }
00154 
00155                                                         ibrcommon::MutexLock l(_write_lock);
00156 
00157                                                         // error checking
00158                                                         if (del_endpoint == dtn::data::EID())
00159                                                         {
00160                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
00161                                                         }
00162                                                         else
00163                                                         {
00164                                                                 _client.getRegistration().unsubscribe(del_endpoint);
00165 
00166                                                                 if(_endpoint == del_endpoint)
00167                                                                 {
00168                                                                         _endpoint = _client.getRegistration().getDefaultEID();
00169                                                                         _client.getRegistration().subscribe(_endpoint);
00170                                                                 }
00171 
00172                                                                 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
00173                                                         }
00174                                                 }
00175                                                 else if (cmd[1] == "get")
00176                                                 {
00177                                                         _stream << ClientHandler::API_STATUS_OK << " ENDPOINT GET " << _endpoint.getString() << std::endl;
00178                                                 }
00179                                                 else
00180                                                 {
00181                                                         ibrcommon::MutexLock l(_write_lock);
00182                                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00183                                                 }
00184                                         }
00185                                         else if (cmd[0] == "registration")
00186                                         {
00187                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00188 
00189                                                 if (cmd[1] == "add")
00190                                                 {
00191                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00192 
00193                                                         ibrcommon::MutexLock l(_write_lock);
00194                                                         dtn::data::EID endpoint(cmd[2]);
00195 
00196                                                         // error checking
00197                                                         if (endpoint == dtn::data::EID())
00198                                                         {
00199                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
00200                                                         }
00201                                                         else
00202                                                         {
00203                                                                 _client.getRegistration().subscribe(endpoint);
00204                                                                 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
00205                                                         }
00206                                                 }
00207                                                 else if (cmd[1] == "del")
00208                                                 {
00209                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00210 
00211                                                         ibrcommon::MutexLock l(_write_lock);
00212                                                         dtn::data::EID endpoint(cmd[2]);
00213 
00214                                                         // error checking
00215                                                         if (endpoint == dtn::data::EID())
00216                                                         {
00217                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
00218                                                         }
00219                                                         else
00220                                                         {
00221                                                                 _client.getRegistration().unsubscribe(endpoint);
00222                                                                 if(_endpoint == endpoint)
00223                                                                 {
00224                                                                         _endpoint = _client.getRegistration().getDefaultEID();
00225                                                                         _client.getRegistration().subscribe(_endpoint);
00226                                                                 }
00227 
00228                                                                 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
00229                                                         }
00230                                                 }
00231                                                 else if (cmd[1] == "list")
00232                                                 {
00233                                                         ibrcommon::MutexLock l(_write_lock);
00234                                                         const std::set<dtn::data::EID> list = _client.getRegistration().getSubscriptions();
00235 
00236                                                         _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LIST" << std::endl;
00237                                                         for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00238                                                         {
00239                                                                 _stream << (*iter).getString() << std::endl;
00240                                                         }
00241                                                         _stream << std::endl;
00242                                                 }
00243                                                 else if (cmd[1] == "save")
00244                                                 {
00245                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00246 
00247                                                         size_t lifetime = 0;
00248                                                         std::stringstream ss(cmd[2]);
00249 
00250                                                         ss >> lifetime;
00251                                                         if(ss.fail()) throw ibrcommon::Exception("malformed command");
00252 
00253                                                         /* make the registration persistent for a given lifetime */
00254                                                         _client.getRegistration().setPersistent(lifetime);
00255 
00256                                                         ibrcommon::MutexLock l(_write_lock);
00257                                                         _stream << ClientHandler::API_STATUS_OK << " REGISTRATION SAVE " << _client.getRegistration().getHandle() << std::endl;
00258                                                 }
00259                                                 else if (cmd[1] == "load")
00260                                                 {
00261                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00262 
00263                                                         const std::string handle = cmd[2];
00264 
00265                                                         try
00266                                                         {
00267                                                                 Registration& reg = _client.getAPIServer().getRegistration(handle);
00268 
00269                                                                 /* stop the sender */
00270                                                                 _client.getRegistration().abort();
00271                                                                 _sender->join();
00272 
00273                                                                 /* switch the registration */
00274                                                                 _client.switchRegistration(reg);
00275 
00276                                                                 /* and switch the sender */
00277                                                                 Sender *old_sender = _sender;
00278                                                                 try{
00279                                                                         _sender = new Sender(*this);
00280                                                                 }
00281                                                                 catch (const std::bad_alloc &ex)
00282                                                                 {
00283                                                                         _sender = old_sender;
00284                                                                         throw ex;
00285                                                                 }
00286                                                                 delete old_sender;
00287                                                                 _sender->start();
00288 
00289                                                                 ibrcommon::MutexLock l(_write_lock);
00290                                                                 _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LOAD" << std::endl;
00291                                                         }
00292                                                         catch (const Registration::AlreadyAttachedException& ex)
00293                                                         {
00294                                                                 ibrcommon::MutexLock l(_write_lock);
00295                                                                 _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION BUSY" << std::endl;
00296                                                         }
00297                                                         catch (const Registration::NotFoundException& ex)
00298                                                         {
00299                                                                 ibrcommon::MutexLock l(_write_lock);
00300                                                                 _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION NOT FOUND" << std::endl;
00301                                                         }
00302                                                 }
00303                                                 else
00304                                                 {
00305                                                         ibrcommon::MutexLock l(_write_lock);
00306                                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00307                                                 }
00308                                         }
00309                                         else if (cmd[0] == "neighbor")
00310                                         {
00311                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00312 
00313                                                 if (cmd[1] == "list")
00314                                                 {
00315                                                         ibrcommon::MutexLock l(_write_lock);
00316                                                         const std::set<dtn::core::Node> nlist = dtn::core::BundleCore::getInstance().getNeighbors();
00317 
00318                                                         _stream << ClientHandler::API_STATUS_OK << " NEIGHBOR LIST" << std::endl;
00319                                                         for (std::set<dtn::core::Node>::const_iterator iter = nlist.begin(); iter != nlist.end(); iter++)
00320                                                         {
00321                                                                 _stream << (*iter).getEID().getString() << std::endl;
00322                                                         }
00323                                                         _stream << std::endl;
00324                                                 }
00325                                                 else
00326                                                 {
00327                                                         ibrcommon::MutexLock l(_write_lock);
00328                                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00329                                                 }
00330                                         }
00331                                         else if (cmd[0] == "bundle")
00332                                         {
00333                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00334 
00335                                                 if (cmd[1] == "get")
00336                                                 {
00337                                                         // transfer bundle data
00338                                                         ibrcommon::MutexLock l(_write_lock);
00339 
00340                                                         if (cmd.size() == 2)
00341                                                         {
00342                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
00343                                                                 PlainSerializer(_stream) << _bundle_reg;
00344                                                         }
00345                                                         else if (cmd[2] == "binary")
00346                                                         {
00347                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET BINARY "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
00348                                                                 dtn::data::DefaultSerializer(_stream) << _bundle_reg; _stream << std::flush;
00349                                                         }
00350                                                         else if (cmd[2] == "plain")
00351                                                         {
00352                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET PLAIN "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
00353                                                                 PlainSerializer(_stream) << _bundle_reg;
00354                                                         }
00355                                                         else if (cmd[2] == "xml")
00356                                                         {
00357                                                                 _stream << ClientHandler::API_STATUS_NOT_IMPLEMENTED << " FORMAT NOT IMPLEMENTED" << std::endl;
00358                                                         }
00359                                                         else
00360                                                         {
00361                                                                 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN FORMAT" << std::endl;
00362                                                         }
00363                                                 }
00364                                                 else if (cmd[1] == "put")
00365                                                 {
00366                                                         // lock the stream during reception of bundle data
00367                                                         ibrcommon::MutexLock l(_write_lock);
00368 
00369                                                         if (cmd.size() < 3)
00370                                                         {
00371                                                                 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
00372                                                         }
00373                                                         else if (cmd[2] == "plain")
00374                                                         {
00375                                                                 _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE PLAIN" << std::endl;
00376 
00377                                                                 try {
00378                                                                         PlainDeserializer(_stream) >> _bundle_reg;
00379                                                                         _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
00380                                                                 } catch (const std::exception &ex) {
00381                                                                         IBRCOMMON_LOGGER_DEBUG(20) << "API put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00382                                                                         _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
00383 
00384                                                                 }
00385                                                         }
00386                                                         else if (cmd[2] == "binary")
00387                                                         {
00388                                                                 _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE BINARY" << std::endl;
00389 
00390                                                                 try {
00391                                                                         dtn::data::DefaultDeserializer(_stream) >> _bundle_reg;
00392                                                                         _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
00393                                                                 } catch (const std::exception &ex) {
00394                                                                         IBRCOMMON_LOGGER_DEBUG(20) << "API put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00395                                                                         _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
00396                                                                 }
00397                                                         }
00398                                                         else
00399                                                         {
00400                                                                 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
00401                                                         }
00402                                                 }
00403                                                 else if (cmd[1] == "load")
00404                                                 {
00405                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00406 
00407                                                         dtn::data::BundleID id;
00408 
00409                                                         if (cmd[2] == "queue")
00410                                                         {
00411                                                                 id = _bundle_queue.getnpop();
00412                                                         }
00413                                                         else
00414                                                         {
00415                                                                 // construct bundle id
00416                                                                 id = readBundleID(cmd, 2);
00417                                                         }
00418 
00419                                                         // load the bundle
00420                                                         try {
00421                                                                 _bundle_reg = dtn::core::BundleCore::getInstance().getStorage().get(id);
00422 
00423                                                                 // process the bundle block (security, compression, ...)
00424                                                                 dtn::core::BundleCore::processBlocks(_bundle_reg);
00425 
00426                                                                 ibrcommon::MutexLock l(_write_lock);
00427                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE LOADED "; sayBundleID(_stream, id); _stream << std::endl;
00428                                                         } catch (const ibrcommon::Exception&) {
00429                                                                 ibrcommon::MutexLock l(_write_lock);
00430                                                                 _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
00431                                                         }
00432                                                 }
00433                                                 else if (cmd[1] == "clear")
00434                                                 {
00435                                                         _bundle_reg = dtn::data::Bundle();
00436 
00437                                                         ibrcommon::MutexLock l(_write_lock);
00438                                                         _stream << ClientHandler::API_STATUS_OK << " BUNDLE CLEARED" << std::endl;
00439                                                 }
00440                                                 else if (cmd[1] == "free")
00441                                                 {
00442                                                         try {
00443                                                                 dtn::core::BundleCore::getInstance().getStorage().remove(_bundle_reg);
00444                                                                 _bundle_reg = dtn::data::Bundle();
00445                                                                 ibrcommon::MutexLock l(_write_lock);
00446                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE FREE SUCCESSFUL" << std::endl;
00447                                                         } catch (const ibrcommon::Exception&) {
00448                                                                 ibrcommon::MutexLock l(_write_lock);
00449                                                                 _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
00450                                                         }
00451                                                 }
00452                                                 else if (cmd[1] == "delivered")
00453                                                 {
00454                                                         if (cmd.size() < 5) throw ibrcommon::Exception("not enough parameters");
00455 
00456                                                         try {
00457                                                                 // construct bundle id
00458                                                                 dtn::data::BundleID id = readBundleID(cmd, 2);
00459 
00460                                                                 // announce this bundle as delivered
00461                                                                 dtn::data::MetaBundle meta = dtn::core::BundleCore::getInstance().getStorage().get(id);
00462                                                                 _client.getRegistration().delivered(meta);
00463 
00464                                                                 ibrcommon::MutexLock l(_write_lock);
00465                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE DELIVERED ACCEPTED" << std::endl;
00466                                                         } catch (const ibrcommon::Exception&) {
00467                                                                 ibrcommon::MutexLock l(_write_lock);
00468                                                                 _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
00469                                                         }
00470                                                 }
00471                                                 else if (cmd[1] == "store")
00472                                                 {
00473                                                         // store the bundle in the storage
00474                                                         try {
00475                                                                 dtn::core::BundleCore::getInstance().getStorage().store(_bundle_reg);
00476                                                                 ibrcommon::MutexLock l(_write_lock);
00477                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE STORE SUCCESSFUL" << std::endl;
00478                                                         } catch (const ibrcommon::Exception&) {
00479                                                                 ibrcommon::MutexLock l(_write_lock);
00480                                                                 _stream << ClientHandler::API_STATUS_INTERNAL_ERROR << " BUNDLE STORE FAILED" << std::endl;
00481                                                         }
00482                                                 }
00483                                                 else if (cmd[1] == "send")
00484                                                 {
00485                                                         // create a new sequence number
00486                                                         _bundle_reg.relabel();
00487 
00488                                                         // forward the bundle to the storage processing
00489                                                         _client.getAPIServer().processIncomingBundle(_endpoint, _bundle_reg);
00490 
00491                                                         ibrcommon::MutexLock l(_write_lock);
00492                                                         _stream << ClientHandler::API_STATUS_OK << " BUNDLE SENT" << std::endl;
00493                                                 }
00494                                                 else if (cmd[1] == "info")
00495                                                 {
00496                                                         // transfer bundle data
00497                                                         ibrcommon::MutexLock l(_write_lock);
00498 
00499                                                         _stream << ClientHandler::API_STATUS_OK << " BUNDLE INFO "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
00500                                                         PlainSerializer(_stream, true) << _bundle_reg;
00501                                                 }
00502                                                 else if (cmd[1] == "block")
00503                                                 {
00504                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00505 
00506                                                         if (cmd[2] == "add")
00507                                                         {
00508                                                                 PlainDeserializer::BlockInserter inserter(_bundle_reg, PlainDeserializer::BlockInserter::END);
00509 
00510                                                                 /* parse an optional offset, where to insert the block */
00511                                                                 if (cmd.size() > 3)
00512                                                                 {
00513                                                                         int offset;
00514                                                                         istringstream ss(cmd[3]);
00515 
00516                                                                         ss >> offset;
00517                                                                         if (ss.fail()) throw ibrcommon::Exception("malformed command");
00518 
00519                                                                         if (offset >= _bundle_reg.blockCount())
00520                                                                         {
00521                                                                                 inserter = PlainDeserializer::BlockInserter(_bundle_reg, PlainDeserializer::BlockInserter::END);
00522                                                                         }
00523                                                                         else if(offset <= 0)
00524                                                                         {
00525                                                                                 inserter = PlainDeserializer::BlockInserter(_bundle_reg, PlainDeserializer::BlockInserter::FRONT);
00526                                                                         }
00527                                                                         else
00528                                                                         {
00529                                                                                 inserter = PlainDeserializer::BlockInserter(_bundle_reg, PlainDeserializer::BlockInserter::MIDDLE, offset);
00530                                                                         }
00531                                                                 }
00532 
00533                                                                 ibrcommon::MutexLock l(_write_lock);
00534                                                                 _stream << ClientHandler::API_STATUS_CONTINUE << " BUNDLE BLOCK ADD" << std::endl;
00535 
00536                                                                 try
00537                                                                 {
00538                                                                         dtn::data::Block &block = PlainDeserializer(_stream).readBlock(inserter, _bundle_reg.get(dtn::data::Bundle::APPDATA_IS_ADMRECORD));
00539                                                                         if (inserter.getAlignment() == PlainDeserializer::BlockInserter::END)
00540                                                                         {
00541                                                                                 block.set(dtn::data::Block::LAST_BLOCK, true);
00542                                                                         }
00543                                                                         else
00544                                                                         {
00545                                                                                 block.set(dtn::data::Block::LAST_BLOCK, false);
00546                                                                         }
00547                                                                         _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK ADD SUCCESSFUL" << std::endl;
00548                                                                 }
00549                                                                 catch (const PlainDeserializer::PlainDeserializerException &ex){
00550                                                                         _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK ADD FAILED" << std::endl;
00551                                                                 }
00552                                                         }
00553                                                         else if (cmd[2] == "del")
00554                                                         {
00555                                                                 if (cmd.size() < 4) throw ibrcommon::Exception("not enough parameters");
00556 
00557                                                                 int offset;
00558                                                                 istringstream ss(cmd[3]);
00559 
00560                                                                 ss >> offset;
00561                                                                 if (ss.fail()) throw ibrcommon::Exception("malformed command");
00562 
00563                                                                 _bundle_reg.remove(_bundle_reg.getBlock(offset));
00564 
00565                                                                 ibrcommon::MutexLock l(_write_lock);
00566                                                                 _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK DEL SUCCESSFUL" << std::endl;
00567                                                         }
00568                                                         else
00569                                                         {
00570                                                                 ibrcommon::MutexLock l(_write_lock);
00571                                                                 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00572                                                         }
00573                                                 }
00574                                                 else
00575                                                 {
00576                                                         ibrcommon::MutexLock l(_write_lock);
00577                                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00578                                                 }
00579                                         }
00580                                         else if (cmd[0] == "payload")
00581                                         {
00582                                                 int block_offset;
00583                                                 int cmd_index = 1;
00584                                                 dtn::data::Block& b = _bundle_reg.getBlock<dtn::data::PayloadBlock>();
00585 
00586                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00587 
00588                                                 istringstream ss(cmd[1]);
00589                                                 ss >> block_offset;
00590                                                 if(!ss.fail())
00591                                                 {
00592                                                         cmd_index++;
00593                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00594 
00595                                                         if(block_offset < 0 || block_offset >= _bundle_reg.getBlocks().size()){
00596                                                                 throw ibrcommon::Exception("invalid offset");
00597                                                         }
00598 
00599                                                         b = _bundle_reg.getBlock(block_offset);
00600                                                 }
00601 
00602                                                 int cmd_remaining = cmd.size() - (cmd_index + 1);
00603                                                 if (cmd[cmd_index] == "get")
00604                                                 {
00605                                                         ibrcommon::MutexLock l(_write_lock);
00606                                                         _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
00607 
00608                                                         int payload_offset = 0;
00609                                                         int length = 0;
00610 
00611                                                         if(cmd_remaining > 0)
00612                                                         {
00613 
00614                                                                 /* read the payload offset */
00615                                                                 ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
00616                                                                 if(payload_offset < 0)
00617                                                                 {
00618                                                                         payload_offset = 0;
00619                                                                 }
00620 
00621                                                                 if(cmd_remaining > 1)
00622                                                                 {
00623                                                                         ss.clear(); ss.str(cmd[cmd_index+2]); ss >> length;
00624                                                                 }
00625                                                         }
00626 
00627                                                         try{
00628                                                                 size_t slength = 0;
00629                                                                 ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create();
00630 
00631                                                                 /* acquire the iostream to lock the blob */
00632                                                                 ibrcommon::BLOB::iostream blob_stream = blob.iostream();
00633                                                                 /* serialize the payload of the bundle into the blob */
00634                                                                 b.serialize(*blob_stream, slength);
00635 
00636                                                                 if(length <= 0 || (length + payload_offset) > slength)
00637                                                                 {
00638                                                                         length = slength - payload_offset;
00639                                                                 }
00640 
00641                                                                 blob_stream->ignore(payload_offset);
00642 
00643                                                                 PlainSerializer(_stream, false).serialize(blob_stream, length);
00644 
00645                                                                 _stream << std::endl;
00646 
00647                                                         } catch (const std::exception&) {
00648                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED" << std::endl;
00649                                                         }
00650                                                 }
00651                                                 else if (cmd[cmd_index] == "put")
00652                                                 {
00653                                                         ibrcommon::MutexLock l(_write_lock);
00654                                                         _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD PUT" << std::endl;
00655 
00656                                                         int payload_offset = 0;
00657                                                         if(cmd_remaining > 0)
00658                                                         {
00659 
00660                                                                 /* read the payload offset */
00661                                                                 ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
00662                                                                 if(payload_offset < 0)
00663                                                                 {
00664                                                                         payload_offset = 0;
00665                                                                 }
00666                                                         }
00667 
00668                                                         try
00669                                                         {
00670                                                                 size_t slength = 0;
00671                                                                 ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create();
00672 
00673                                                                 /* acquire the iostream to lock the blob */
00674                                                                 ibrcommon::BLOB::iostream blob_stream = blob.iostream();
00675                                                                 /* serialize the payload of the bundle into the blob */
00676                                                                 b.serialize(*blob_stream, slength);
00677 
00678                                                                 /* move the streams put pointer to the given offset */
00679                                                                 if(payload_offset < slength){
00680                                                                         blob_stream->seekp(payload_offset, ios_base::beg);
00681                                                                 }
00682 
00683                                                                 /* write the new data into the blob */
00684                                                                 PlainDeserializer(_stream) >> blob_stream;
00685 
00686                                                                 /* write the result into the block */
00687                                                                 b.deserialize(*blob_stream, blob_stream.size());
00688 
00689                                                                 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD PUT SUCCESSFUL" << std::endl;
00690 
00691                                                         } catch (const std::exception&) {
00692                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED" << std::endl;
00693                                                         }
00694                                                 }
00695                                                 else if (cmd[cmd_index] == "append"){
00696                                                         ibrcommon::MutexLock l(_write_lock);
00697                                                         _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD APPEND" << std::endl;
00698 
00699                                                         try {
00700                                                                 size_t slength = 0;
00701                                                                 ibrcommon::BLOB::Reference blob = ibrcommon::BLOB::create();
00702 
00703                                                                 /* acquire the iostream to lock the blob */
00704                                                                 ibrcommon::BLOB::iostream blob_stream = blob.iostream();
00705                                                                 /* serialize old and new data into the blob */
00706                                                                 b.serialize(*blob_stream, slength);
00707                                                                 PlainDeserializer(_stream) >> blob_stream;
00708 
00709                                                                 /* write the result into the payload of the block */
00710                                                                 b.deserialize(*blob_stream, blob_stream.size());
00711 
00712                                                                 _stream << ClientHandler::API_STATUS_OK << " PAYLOAD APPEND SUCCESSFUL" << std::endl;
00713 
00714                                                         } catch (const std::exception&) {
00715                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED" << std::endl;
00716                                                         }
00717 
00718                                                 }
00719                                                 else if (cmd[cmd_index] == "clear"){
00720 
00721                                                         std::stringstream stream;
00722                                                         b.deserialize(stream, 0);
00723 
00724                                                         ibrcommon::MutexLock l(_write_lock);
00725                                                         _stream << ClientHandler::API_STATUS_OK << " PAYLOAD CLEAR SUCCESSFUL" << std::endl;
00726                                                 }
00727                                                 else if (cmd[cmd_index] == "length"){
00728                                                         ibrcommon::MutexLock l(_write_lock);
00729                                                         _stream << ClientHandler::API_STATUS_OK << " PAYLOAD LENGTH" << std::endl;
00730                                                         _stream << "Length: " << b.getLength() << std::endl;
00731                                                 }
00732                                         }
00733                                         else if (cmd[0] == "nodename")
00734                                         {
00735                                                 _stream << ClientHandler::API_STATUS_OK << " NODENAME " << dtn::core::BundleCore::local.getString() << std::endl;
00736                                         }
00737                                         else
00738                                         {
00739                                                 ibrcommon::MutexLock l(_write_lock);
00740                                                 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00741                                         }
00742                                 } catch (const std::exception&) {
00743                                         ibrcommon::MutexLock l(_write_lock);
00744                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
00745                                 }
00746                         }
00747                 }
00748 
00749 //              void ExtendedApiConnection::eventNodeAvailable(const dtn::core::Node &node)
00750 //              {
00751 //                      ibrcommon::MutexLock l(_write_lock);
00752 //                      _stream << API_STATUS_NOTIFY_NEIGHBOR << " NOTIFY NODE AVAILABLE " << node.getEID().getString() << std::endl;
00753 //              }
00754 
00755 //              void ExtendedApiConnection::eventNodeUnavailable(const dtn::core::Node &node)
00756 //              {
00757 //                      ibrcommon::MutexLock l(_write_lock);
00758 //                      _stream << API_STATUS_NOTIFY_NEIGHBOR << " NOTIFY NODE UNAVAILABLE " << node.getEID().getString() << std::endl;
00759 //              }
00760 
00761                 ExtendedApiHandler::Sender::Sender(ExtendedApiHandler &conn)
00762                  : _handler(conn)
00763                 {
00764                 }
00765 
00766                 ExtendedApiHandler::Sender::~Sender()
00767                 {
00768                         ibrcommon::JoinableThread::join();
00769                 }
00770 
00771                 void ExtendedApiHandler::Sender::__cancellation()
00772                 {
00773                         // abort all blocking calls on the registration object
00774                         _handler._client.getRegistration().abort();
00775                 }
00776 
00777                 void ExtendedApiHandler::Sender::finally()
00778                 {
00779 //                      _handler._server.freeRegistration(_handler.getRegistration());
00780                 }
00781 
00782                 void ExtendedApiHandler::Sender::run()
00783                 {
00784                         Registration &reg = _handler._client.getRegistration();
00785                         try{
00786                                 while(_handler.good()){
00787                                         try{
00788                                                 dtn::data::MetaBundle id = reg.receiveMetaBundle();
00789                                                 _handler._bundle_queue.push(id);
00790                                                 ibrcommon::MutexLock l(_handler._write_lock);
00791                                                 _handler._stream << API_STATUS_NOTIFY_BUNDLE << " NOTIFY BUNDLE ";
00792                                                 sayBundleID(_handler._stream, id);
00793                                                 _handler._stream << std::endl;
00794 
00795                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00796                                                 reg.wait_for_bundle();
00797                                         }
00798 
00799                                         yield();
00800                                 }
00801                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00802                                 IBRCOMMON_LOGGER_DEBUG(40) << "ExtendedApiHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00803                                 return;
00804                         } catch (const ibrcommon::IOException &ex) {
00805                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00806                         } catch (const dtn::InvalidDataException &ex) {
00807                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00808                         } catch (const std::exception &ex) {
00809                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00810                         }
00811 
00812                         try {
00813                                 //FIXME
00814 //                              _handler.stop();
00815                         } catch (const ibrcommon::ThreadException &ex) {
00816                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL;
00817                         }
00818                 }
00819 
00820                 void ExtendedApiHandler::sayBundleID(ostream &stream, const dtn::data::BundleID &id)
00821                 {
00822                         stream << id.timestamp << " " << id.sequencenumber << " ";
00823 
00824                         if (id.fragment)
00825                         {
00826                                 stream << id.offset << " ";
00827                         }
00828 
00829                         stream << id.source.getString();
00830                 }
00831 
00832                 dtn::data::BundleID ExtendedApiHandler::readBundleID(const std::vector<std::string> &data, const size_t start)
00833                 {
00834                         // load bundle id
00835                         std::stringstream ss;
00836                         size_t timestamp = 0;
00837                         size_t sequencenumber = 0;
00838                         bool fragment = false;
00839                         size_t offset = 0;
00840 
00841                         if ((data.size() - start) < 3)
00842                         {
00843                                 throw ibrcommon::Exception("not enough parameters");
00844                         }
00845 
00846                         // read timestamp
00847                         ss.clear(); ss.str(data[start]); ss >> timestamp;
00848                         if(ss.fail())
00849                         {
00850                         throw ibrcommon::Exception("malformed parameters");
00851                         }
00852 
00853                         // read sequence number
00854                         ss.clear(); ss.str(data[start+1]); ss >> sequencenumber;
00855                         if(ss.fail())
00856                         {
00857                         throw ibrcommon::Exception("malformed parameters");
00858                         }
00859 
00860                         // read fragment offset
00861                         if ((data.size() - start) > 3)
00862                         {
00863                                 fragment = true;
00864 
00865                                 // read sequence number
00866                                 ss.clear(); ss.str(data[start+2]); ss >> offset;
00867                                 if(ss.fail())
00868                                 {
00869                                         throw ibrcommon::Exception("malformed parameters");
00870                                 }
00871                         }
00872 
00873                         // read EID
00874                         ss.clear(); dtn::data::EID eid(data[data.size() - 1]);
00875 
00876                         // construct bundle id
00877                         return dtn::data::BundleID(eid, timestamp, sequencenumber, fragment, offset);
00878                 }
00879         }
00880 }