IBR-DTNSuite  0.8
daemon/src/api/OrderedStreamHandler.cpp
Go to the documentation of this file.
00001 /*
00002  * OrderedStreamHandler.cpp
00003  *
00004  *  Created on: 17.11.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "api/OrderedStreamHandler.h"
00009 #include "core/BundleCore.h"
00010 #include "net/BundleReceivedEvent.h"
00011 
00012 #include <ibrdtn/data/PrimaryBlock.h>
00013 #include <ibrdtn/utils/Utils.h>
00014 #include <ibrcommon/Logger.h>
00015 
00016 #ifdef WITH_COMPRESSION
00017 #include <ibrdtn/data/CompressedPayloadBlock.h>
00018 #endif
00019 
00020 #ifdef WITH_BUNDLE_SECURITY
00021 #include "security/SecurityManager.h"
00022 #endif
00023 
00024 namespace dtn
00025 {
00026         namespace api
00027         {
00028                 OrderedStreamHandler::OrderedStreamHandler(ClientHandler &client, ibrcommon::tcpstream &stream)
00029                  : ProtocolHandler(client, stream), _sender(*this), _streambuf(*this), _bundlestream(&_streambuf), _group(true), _lifetime(3600)
00030                 {
00031                         _endpoint = client.getRegistration().getDefaultEID();
00032                 }
00033 
00034                 OrderedStreamHandler::~OrderedStreamHandler()
00035                 {
00036                         _sender.stop();
00037                         _sender.join();
00038                 }
00039 
00040                 void OrderedStreamHandler::delivered(const dtn::data::MetaBundle &m)
00041                 {
00042                         _client.getRegistration().delivered(m);
00043                 }
00044 
00045                 void OrderedStreamHandler::put(dtn::data::Bundle &b)
00046                 {
00047                         IBRCOMMON_LOGGER_DEBUG(20) << "OrderedStreamHandler: put()" << IBRCOMMON_LOGGER_ENDL;
00048 
00049                         // set destination EID
00050                         b._destination = _peer;
00051 
00052                         // set source
00053                         b._source = _endpoint;
00054 
00055                         // set lifetime
00056                         b._lifetime = _lifetime;
00057 
00058                         // set flag if the bundles are addresses to a group
00059                         if (_group)
00060                         {
00061                                 b.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, false);
00062                         }
00063                         else
00064                         {
00065                                 b.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00066                         }
00067 
00068                         // raise default bundle received event
00069                         dtn::net::BundleReceivedEvent::raise(_client.getRegistration().getDefaultEID(), b, true, true);
00070                 }
00071 
00072                 dtn::data::MetaBundle OrderedStreamHandler::get(size_t timeout)
00073                 {
00074                         Registration &reg = _client.getRegistration();
00075                         IBRCOMMON_LOGGER_DEBUG(20) << "OrderedStreamHandler: get()" << IBRCOMMON_LOGGER_ENDL;
00076 
00077                         while (true)
00078                         {
00079                                 try {
00080                                         dtn::data::MetaBundle bundle = reg.receiveMetaBundle();
00081 
00082                                         // discard bundle if they are not from the specified peer
00083                                         if ((!_group) && (bundle.source != _peer))
00084                                         {
00085                                                 IBRCOMMON_LOGGER_DEBUG(30) << "OrderedStreamHandler: get(): bundle source " << bundle.source.getString() << " not expected - discard" << IBRCOMMON_LOGGER_ENDL;
00086                                                 continue;
00087                                         }
00088 
00089                                         return bundle;
00090                                 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00091                                         IBRCOMMON_LOGGER_DEBUG(30) << "OrderedStreamHandler: get(): no bundle found wait for notify" << IBRCOMMON_LOGGER_ENDL;
00092                                         reg.wait_for_bundle(timeout);
00093                                 }
00094                         }
00095                 }
00096 
00097                 void OrderedStreamHandler::__cancellation()
00098                 {
00099                         // close the stream
00100                         try {
00101                                 _stream.close();
00102                         } catch (const ibrcommon::ConnectionClosedException&) { };
00103                 }
00104 
00105                 void OrderedStreamHandler::finally()
00106                 {
00107                         IBRCOMMON_LOGGER_DEBUG(60) << "OrderedStreamHandler down" << IBRCOMMON_LOGGER_ENDL;
00108 
00109                         _client.getRegistration().abort();
00110 
00111                         // close the stream
00112                         try {
00113                                 _stream.close();
00114                         } catch (const ibrcommon::ConnectionClosedException&) { };
00115 
00116                         try {
00117                                 // shutdown the sender thread
00118                                 _sender.stop();
00119                         } catch (const std::exception&) { };
00120                 }
00121 
00122                 void OrderedStreamHandler::run()
00123                 {
00124                         std::string buffer;
00125                         _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO ORDERED STREAM PROTOCOL" << std::endl;
00126 
00127                         while (_stream.good())
00128                         {
00129                                 getline(_stream, buffer);
00130 
00131                                 std::string::reverse_iterator iter = buffer.rbegin();
00132                                 if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
00133 
00134                                 std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
00135                                 if (cmd.size() == 0) continue;
00136 
00137                                 try {
00138                                         if (cmd[0] == "connect")
00139                                         {
00140                                                 _stream << ClientHandler::API_STATUS_CONTINUE << " CONNECTION ESTABLISHED" << std::endl;
00141 
00142                                                 // start sender to transfer received payload to the client
00143                                                 _sender.start();
00144 
00145                                                 // forward data to stream buffer
00146                                                 _bundlestream << _stream.rdbuf() << std::flush;
00147                                         }
00148                                         else if (cmd[0] == "set")
00149                                         {
00150                                                 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
00151 
00152                                                 if (cmd[1] == "endpoint")
00153                                                 {
00154                                                         if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
00155 
00156                                                         _endpoint = dtn::core::BundleCore::local + "/" + cmd[2];
00157 
00158                                                         // error checking
00159                                                         if (_endpoint == dtn::data::EID())
00160                                                         {
00161                                                                 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
00162                                                                 _endpoint = dtn::core::BundleCore::local;
00163                                                         }
00164                                                         else
00165                                                         {
00166                                                                 _client.getRegistration().subscribe(_endpoint);
00167                                                                 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
00168                                                         }
00169                                                 }
00170                                                 else if (cmd[1] == "destination")
00171                                                 {
00172                                                         _peer = cmd[2];
00173                                                         _group = false;
00174                                                         _stream << ClientHandler::API_STATUS_OK << " DESTINATION CHANGED" << std::endl;
00175                                                 }
00176                                                 else if (cmd[1] == "group")
00177                                                 {
00178                                                         _peer = cmd[2];
00179                                                         _group = true;
00180                                                         _stream << ClientHandler::API_STATUS_OK << " DESTINATION GROUP CHANGED" << std::endl;
00181                                                 }
00182                                                 else if (cmd[1] == "lifetime")
00183                                                 {
00184                                                         std::stringstream ss(cmd[2]);
00185                                                         ss >> _lifetime;
00186                                                         _stream << ClientHandler::API_STATUS_OK << " LIFETIME CHANGED" << std::endl;
00187                                                 }
00188                                                 else if (cmd[1] == "chunksize")
00189                                                 {
00190                                                         size_t size = 0;
00191                                                         std::stringstream ss(cmd[2]);
00192                                                         ss >> size;
00193                                                         _streambuf.setChunkSize(size);
00194                                                         _stream << ClientHandler::API_STATUS_OK << " CHUNKSIZE CHANGED" << std::endl;
00195                                                 }
00196                                                 else if (cmd[1] == "timeout")
00197                                                 {
00198                                                         size_t timeout = 0;
00199                                                         std::stringstream ss(cmd[2]);
00200                                                         ss >> timeout;
00201                                                         _streambuf.setTimeout(timeout);
00202                                                         _stream << ClientHandler::API_STATUS_OK << " TIMEOUT CHANGED" << std::endl;
00203                                                 }
00204                                                 else
00205                                                 {
00206                                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00207                                                 }
00208                                         }
00209                                         else
00210                                         {
00211                                                 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
00212                                         }
00213                                 } catch (const std::exception&) {
00214                                         _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
00215                                 }
00216                         }
00217                 }
00218 
00219                 OrderedStreamHandler::Sender::Sender(OrderedStreamHandler &conn)
00220                  : _handler(conn)
00221                 {
00222                 }
00223 
00224                 OrderedStreamHandler::Sender::~Sender()
00225                 {
00226                         ibrcommon::JoinableThread::join();
00227                 }
00228 
00229                 void OrderedStreamHandler::Sender::__cancellation()
00230                 {
00231                         // cancel the main thread in here
00232                         _handler._client.getRegistration().abort();
00233                 }
00234 
00235                 void OrderedStreamHandler::Sender::finally()
00236                 {
00237                         _handler._client.getRegistration().abort();
00238                 }
00239 
00240                 void OrderedStreamHandler::Sender::run()
00241                 {
00242                         try {
00243                                 _handler._stream << _handler._bundlestream.rdbuf() << std::flush;
00244                         } catch (const std::exception &ex) {
00245                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00246                         }
00247                 }
00248         } /* namespace api */
00249 } /* namespace dtn */