IBR-DTNSuite
0.8
|
00001 /* 00002 * OrderedStreamHandler.cpp 00003 * 00004 * Created on: 17.11.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "api/OrderedStreamHandler.h" 00009 #include "core/BundleCore.h" 00010 #include "net/BundleReceivedEvent.h" 00011 00012 #include <ibrdtn/data/PrimaryBlock.h> 00013 #include <ibrdtn/utils/Utils.h> 00014 #include <ibrcommon/Logger.h> 00015 00016 #ifdef WITH_COMPRESSION 00017 #include <ibrdtn/data/CompressedPayloadBlock.h> 00018 #endif 00019 00020 #ifdef WITH_BUNDLE_SECURITY 00021 #include "security/SecurityManager.h" 00022 #endif 00023 00024 namespace dtn 00025 { 00026 namespace api 00027 { 00028 OrderedStreamHandler::OrderedStreamHandler(ClientHandler &client, ibrcommon::tcpstream &stream) 00029 : ProtocolHandler(client, stream), _sender(*this), _streambuf(*this), _bundlestream(&_streambuf), _group(true), _lifetime(3600) 00030 { 00031 _endpoint = client.getRegistration().getDefaultEID(); 00032 } 00033 00034 OrderedStreamHandler::~OrderedStreamHandler() 00035 { 00036 _sender.stop(); 00037 _sender.join(); 00038 } 00039 00040 void OrderedStreamHandler::delivered(const dtn::data::MetaBundle &m) 00041 { 00042 _client.getRegistration().delivered(m); 00043 } 00044 00045 void OrderedStreamHandler::put(dtn::data::Bundle &b) 00046 { 00047 IBRCOMMON_LOGGER_DEBUG(20) << "OrderedStreamHandler: put()" << IBRCOMMON_LOGGER_ENDL; 00048 00049 // set destination EID 00050 b._destination = _peer; 00051 00052 // set source 00053 b._source = _endpoint; 00054 00055 // set lifetime 00056 b._lifetime = _lifetime; 00057 00058 // set flag if the bundles are addresses to a group 00059 if (_group) 00060 { 00061 b.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, false); 00062 } 00063 else 00064 { 00065 b.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00066 } 00067 00068 // raise default bundle received event 00069 dtn::net::BundleReceivedEvent::raise(_client.getRegistration().getDefaultEID(), b, true, true); 00070 } 00071 00072 dtn::data::MetaBundle OrderedStreamHandler::get(size_t timeout) 00073 { 00074 Registration ® = _client.getRegistration(); 00075 IBRCOMMON_LOGGER_DEBUG(20) << "OrderedStreamHandler: get()" << IBRCOMMON_LOGGER_ENDL; 00076 00077 while (true) 00078 { 00079 try { 00080 dtn::data::MetaBundle bundle = reg.receiveMetaBundle(); 00081 00082 // discard bundle if they are not from the specified peer 00083 if ((!_group) && (bundle.source != _peer)) 00084 { 00085 IBRCOMMON_LOGGER_DEBUG(30) << "OrderedStreamHandler: get(): bundle source " << bundle.source.getString() << " not expected - discard" << IBRCOMMON_LOGGER_ENDL; 00086 continue; 00087 } 00088 00089 return bundle; 00090 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00091 IBRCOMMON_LOGGER_DEBUG(30) << "OrderedStreamHandler: get(): no bundle found wait for notify" << IBRCOMMON_LOGGER_ENDL; 00092 reg.wait_for_bundle(timeout); 00093 } 00094 } 00095 } 00096 00097 void OrderedStreamHandler::__cancellation() 00098 { 00099 // close the stream 00100 try { 00101 _stream.close(); 00102 } catch (const ibrcommon::ConnectionClosedException&) { }; 00103 } 00104 00105 void OrderedStreamHandler::finally() 00106 { 00107 IBRCOMMON_LOGGER_DEBUG(60) << "OrderedStreamHandler down" << IBRCOMMON_LOGGER_ENDL; 00108 00109 _client.getRegistration().abort(); 00110 00111 // close the stream 00112 try { 00113 _stream.close(); 00114 } catch (const ibrcommon::ConnectionClosedException&) { }; 00115 00116 try { 00117 // shutdown the sender thread 00118 _sender.stop(); 00119 } catch (const std::exception&) { }; 00120 } 00121 00122 void OrderedStreamHandler::run() 00123 { 00124 std::string buffer; 00125 _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO ORDERED STREAM PROTOCOL" << std::endl; 00126 00127 while (_stream.good()) 00128 { 00129 getline(_stream, buffer); 00130 00131 std::string::reverse_iterator iter = buffer.rbegin(); 00132 if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1); 00133 00134 std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer); 00135 if (cmd.size() == 0) continue; 00136 00137 try { 00138 if (cmd[0] == "connect") 00139 { 00140 _stream << ClientHandler::API_STATUS_CONTINUE << " CONNECTION ESTABLISHED" << std::endl; 00141 00142 // start sender to transfer received payload to the client 00143 _sender.start(); 00144 00145 // forward data to stream buffer 00146 _bundlestream << _stream.rdbuf() << std::flush; 00147 } 00148 else if (cmd[0] == "set") 00149 { 00150 if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters"); 00151 00152 if (cmd[1] == "endpoint") 00153 { 00154 if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters"); 00155 00156 _endpoint = dtn::core::BundleCore::local + "/" + cmd[2]; 00157 00158 // error checking 00159 if (_endpoint == dtn::data::EID()) 00160 { 00161 _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl; 00162 _endpoint = dtn::core::BundleCore::local; 00163 } 00164 else 00165 { 00166 _client.getRegistration().subscribe(_endpoint); 00167 _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl; 00168 } 00169 } 00170 else if (cmd[1] == "destination") 00171 { 00172 _peer = cmd[2]; 00173 _group = false; 00174 _stream << ClientHandler::API_STATUS_OK << " DESTINATION CHANGED" << std::endl; 00175 } 00176 else if (cmd[1] == "group") 00177 { 00178 _peer = cmd[2]; 00179 _group = true; 00180 _stream << ClientHandler::API_STATUS_OK << " DESTINATION GROUP CHANGED" << std::endl; 00181 } 00182 else if (cmd[1] == "lifetime") 00183 { 00184 std::stringstream ss(cmd[2]); 00185 ss >> _lifetime; 00186 _stream << ClientHandler::API_STATUS_OK << " LIFETIME CHANGED" << std::endl; 00187 } 00188 else if (cmd[1] == "chunksize") 00189 { 00190 size_t size = 0; 00191 std::stringstream ss(cmd[2]); 00192 ss >> size; 00193 _streambuf.setChunkSize(size); 00194 _stream << ClientHandler::API_STATUS_OK << " CHUNKSIZE CHANGED" << std::endl; 00195 } 00196 else if (cmd[1] == "timeout") 00197 { 00198 size_t timeout = 0; 00199 std::stringstream ss(cmd[2]); 00200 ss >> timeout; 00201 _streambuf.setTimeout(timeout); 00202 _stream << ClientHandler::API_STATUS_OK << " TIMEOUT CHANGED" << std::endl; 00203 } 00204 else 00205 { 00206 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00207 } 00208 } 00209 else 00210 { 00211 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl; 00212 } 00213 } catch (const std::exception&) { 00214 _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl; 00215 } 00216 } 00217 } 00218 00219 OrderedStreamHandler::Sender::Sender(OrderedStreamHandler &conn) 00220 : _handler(conn) 00221 { 00222 } 00223 00224 OrderedStreamHandler::Sender::~Sender() 00225 { 00226 ibrcommon::JoinableThread::join(); 00227 } 00228 00229 void OrderedStreamHandler::Sender::__cancellation() 00230 { 00231 // cancel the main thread in here 00232 _handler._client.getRegistration().abort(); 00233 } 00234 00235 void OrderedStreamHandler::Sender::finally() 00236 { 00237 _handler._client.getRegistration().abort(); 00238 } 00239 00240 void OrderedStreamHandler::Sender::run() 00241 { 00242 try { 00243 _handler._stream << _handler._bundlestream.rdbuf() << std::flush; 00244 } catch (const std::exception &ex) { 00245 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00246 } 00247 } 00248 } /* namespace api */ 00249 } /* namespace dtn */