IBR-DTNSuite
0.8
|
00001 /* 00002 * BinaryStreamClient.cpp 00003 * 00004 * Created on: 19.07.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "Configuration.h" 00010 #include "api/BinaryStreamClient.h" 00011 #include "core/GlobalEvent.h" 00012 #include "core/BundleCore.h" 00013 #include "net/BundleReceivedEvent.h" 00014 #include "core/BundleEvent.h" 00015 #include <ibrdtn/streams/StreamContactHeader.h> 00016 #include <ibrdtn/data/Serializer.h> 00017 #include <iostream> 00018 #include <ibrcommon/Logger.h> 00019 00020 namespace dtn 00021 { 00022 namespace api 00023 { 00024 BinaryStreamClient::BinaryStreamClient(ClientHandler &client, ibrcommon::tcpstream &stream) 00025 : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream) 00026 { 00027 } 00028 00029 BinaryStreamClient::~BinaryStreamClient() 00030 { 00031 _client.getRegistration().abort(); 00032 _sender.join(); 00033 } 00034 00035 const dtn::data::EID& BinaryStreamClient::getPeer() const 00036 { 00037 return _eid; 00038 } 00039 00040 void BinaryStreamClient::eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases) 00041 { 00042 } 00043 00044 void BinaryStreamClient::eventTimeout() 00045 { 00046 } 00047 00048 void BinaryStreamClient::eventError() 00049 { 00050 } 00051 00052 void BinaryStreamClient::eventConnectionUp(const dtn::streams::StreamContactHeader &header) 00053 { 00054 Registration ® = _client.getRegistration(); 00055 00056 if (header._localeid.isNone()) 00057 { 00058 // create an EID based on the registration handle 00059 _eid = reg.getDefaultEID(); 00060 } 00061 else 00062 { 00063 // contact received event 00064 _eid = BundleCore::local + BundleCore::local.getDelimiter() + header._localeid.getSSP(); 00065 } 00066 00067 IBRCOMMON_LOGGER_DEBUG(20) << "new client connected, handle: " << reg.getHandle() << "; eid: "<< _eid.getString() << IBRCOMMON_LOGGER_ENDL; 00068 00069 reg.subscribe(_eid); 00070 } 00071 00072 void BinaryStreamClient::eventConnectionDown() 00073 { 00074 IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL; 00075 00076 _client.getRegistration().unsubscribe(_eid); 00077 00078 try { 00079 // stop the sender 00080 _sender.stop(); 00081 } catch (const ibrcommon::ThreadException &ex) { 00082 IBRCOMMON_LOGGER_DEBUG(50) << "BinaryStreamClient::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00083 } 00084 } 00085 00086 void BinaryStreamClient::eventBundleRefused() 00087 { 00088 try { 00089 const dtn::data::Bundle bundle = _sentqueue.getnpop(); 00090 00091 // set ACK to zero 00092 _lastack = 0; 00093 00094 } catch (const ibrcommon::QueueUnblockedException&) { 00095 // pop on empty queue! 00096 } 00097 } 00098 00099 void BinaryStreamClient::eventBundleForwarded() 00100 { 00101 try { 00102 const dtn::data::Bundle bundle = _sentqueue.getnpop(); 00103 00104 // notify bundle as delivered 00105 _client.getRegistration().delivered(bundle); 00106 00107 // set ACK to zero 00108 _lastack = 0; 00109 } catch (const ibrcommon::QueueUnblockedException&) { 00110 // pop on empty queue! 00111 } 00112 } 00113 00114 void BinaryStreamClient::eventBundleAck(size_t ack) 00115 { 00116 _lastack = ack; 00117 } 00118 00119 void BinaryStreamClient::__cancellation() 00120 { 00121 // shutdown 00122 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00123 00124 // close the stream 00125 try { 00126 _stream.close(); 00127 } catch (const ibrcommon::ConnectionClosedException&) { }; 00128 } 00129 00130 void BinaryStreamClient::finally() 00131 { 00132 IBRCOMMON_LOGGER_DEBUG(60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL; 00133 00134 // abort blocking registrations 00135 _client.getRegistration().abort(); 00136 00137 // close the stream 00138 try { 00139 _stream.close(); 00140 } catch (const ibrcommon::ConnectionClosedException&) { }; 00141 00142 try { 00143 // shutdown the sender thread 00144 _sender.stop(); 00145 } catch (const std::exception&) { }; 00146 } 00147 00148 void BinaryStreamClient::run() 00149 { 00150 try { 00151 char flags = 0; 00152 00153 // request acknowledgements 00154 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00155 00156 // do the handshake 00157 _connection.handshake(dtn::core::BundleCore::local, 10, flags); 00158 00159 // start the sender thread 00160 _sender.start(); 00161 00162 while (_connection.good()) 00163 { 00164 dtn::data::Bundle bundle; 00165 dtn::data::DefaultDeserializer(_connection) >> bundle; 00166 00167 // create a new sequence number 00168 bundle.relabel(); 00169 00170 // process the new bundle 00171 _client.getAPIServer().processIncomingBundle(_eid, bundle); 00172 } 00173 } catch (const ibrcommon::ThreadException &ex) { 00174 IBRCOMMON_LOGGER(error) << "failed to start thread in BinaryStreamClient\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00175 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00176 } catch (const dtn::SerializationFailedException &ex) { 00177 IBRCOMMON_LOGGER(error) << "BinaryStreamClient::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00178 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00179 } catch (const ibrcommon::IOException &ex) { 00180 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00181 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00182 } catch (const dtn::InvalidDataException &ex) { 00183 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00184 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00185 } catch (const std::exception &ex) { 00186 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00187 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00188 } 00189 } 00190 00191 bool BinaryStreamClient::good() const 00192 { 00193 return _stream.good(); 00194 } 00195 00196 BinaryStreamClient::Sender::Sender(BinaryStreamClient &client) 00197 : _client(client) 00198 { 00199 } 00200 00201 BinaryStreamClient::Sender::~Sender() 00202 { 00203 ibrcommon::JoinableThread::join(); 00204 } 00205 00206 void BinaryStreamClient::Sender::__cancellation() 00207 { 00208 // cancel the main thread in here 00209 this->abort(); 00210 00211 // abort all blocking calls on the registration object 00212 _client._client.getRegistration().abort(); 00213 } 00214 00215 void BinaryStreamClient::Sender::run() 00216 { 00217 Registration ® = _client._client.getRegistration(); 00218 00219 try { 00220 while (_client.good()) 00221 { 00222 try { 00223 dtn::data::Bundle bundle = reg.receive(); 00224 00225 // process the bundle block (security, compression, ...) 00226 dtn::core::BundleCore::processBlocks(bundle); 00227 00228 // add bundle to the queue 00229 _client._sentqueue.push(bundle); 00230 00231 // transmit the bundle 00232 dtn::data::DefaultSerializer(_client._connection) << bundle; 00233 00234 // mark the end of the bundle 00235 _client._connection << std::flush; 00236 } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) { 00237 reg.wait_for_bundle(); 00238 } 00239 00240 // idle a little bit 00241 yield(); 00242 } 00243 } catch (const ibrcommon::QueueUnblockedException &ex) { 00244 IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00245 return; 00246 } catch (const ibrcommon::IOException &ex) { 00247 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00248 } catch (const dtn::InvalidDataException &ex) { 00249 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00250 } catch (const std::exception &ex) { 00251 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00252 } 00253 } 00254 00255 void BinaryStreamClient::queue(const dtn::data::Bundle &bundle) 00256 { 00257 _sender.push(bundle); 00258 } 00259 } 00260 }