IBR-DTNSuite  0.8
daemon/src/api/BinaryStreamClient.cpp
Go to the documentation of this file.
00001 /*
00002  * BinaryStreamClient.cpp
00003  *
00004  *  Created on: 19.07.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "api/BinaryStreamClient.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "net/BundleReceivedEvent.h"
00014 #include "core/BundleEvent.h"
00015 #include <ibrdtn/streams/StreamContactHeader.h>
00016 #include <ibrdtn/data/Serializer.h>
00017 #include <iostream>
00018 #include <ibrcommon/Logger.h>
00019 
00020 namespace dtn
00021 {
00022         namespace api
00023         {
00024                 BinaryStreamClient::BinaryStreamClient(ClientHandler &client, ibrcommon::tcpstream &stream)
00025                  : ProtocolHandler(client, stream), _sender(*this), _connection(*this, _stream)
00026                 {
00027                 }
00028 
00029                 BinaryStreamClient::~BinaryStreamClient()
00030                 {
00031                         _client.getRegistration().abort();
00032                         _sender.join();
00033                 }
00034 
00035                 const dtn::data::EID& BinaryStreamClient::getPeer() const
00036                 {
00037                         return _eid;
00038                 }
00039 
00040                 void BinaryStreamClient::eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases)
00041                 {
00042                 }
00043 
00044                 void BinaryStreamClient::eventTimeout()
00045                 {
00046                 }
00047 
00048                 void BinaryStreamClient::eventError()
00049                 {
00050                 }
00051 
00052                 void BinaryStreamClient::eventConnectionUp(const dtn::streams::StreamContactHeader &header)
00053                 {
00054                         Registration &reg = _client.getRegistration();
00055 
00056                         if (header._localeid.isNone())
00057                         {
00058                                 // create an EID based on the registration handle
00059                                 _eid = reg.getDefaultEID();
00060                         }
00061                         else
00062                         {
00063                                 // contact received event
00064                                 _eid = BundleCore::local + BundleCore::local.getDelimiter() + header._localeid.getSSP();
00065                         }
00066 
00067                         IBRCOMMON_LOGGER_DEBUG(20) << "new client connected, handle: " << reg.getHandle() << "; eid: "<< _eid.getString() << IBRCOMMON_LOGGER_ENDL;
00068 
00069                         reg.subscribe(_eid);
00070                 }
00071 
00072                 void BinaryStreamClient::eventConnectionDown()
00073                 {
00074                         IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00075 
00076                         _client.getRegistration().unsubscribe(_eid);
00077 
00078                         try {
00079                                 // stop the sender
00080                                 _sender.stop();
00081                         } catch (const ibrcommon::ThreadException &ex) {
00082                                 IBRCOMMON_LOGGER_DEBUG(50) << "BinaryStreamClient::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00083                         }
00084                 }
00085 
00086                 void BinaryStreamClient::eventBundleRefused()
00087                 {
00088                         try {
00089                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00090 
00091                                 // set ACK to zero
00092                                 _lastack = 0;
00093 
00094                         } catch (const ibrcommon::QueueUnblockedException&) {
00095                                 // pop on empty queue!
00096                         }
00097                 }
00098 
00099                 void BinaryStreamClient::eventBundleForwarded()
00100                 {
00101                         try {
00102                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00103 
00104                                 // notify bundle as delivered
00105                                 _client.getRegistration().delivered(bundle);
00106 
00107                                 // set ACK to zero
00108                                 _lastack = 0;
00109                         } catch (const ibrcommon::QueueUnblockedException&) {
00110                                 // pop on empty queue!
00111                         }
00112                 }
00113 
00114                 void BinaryStreamClient::eventBundleAck(size_t ack)
00115                 {
00116                         _lastack = ack;
00117                 }
00118 
00119                 void BinaryStreamClient::__cancellation()
00120                 {
00121                         // shutdown
00122                         _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00123 
00124                         // close the stream
00125                         try {
00126                                 _stream.close();
00127                         } catch (const ibrcommon::ConnectionClosedException&) { };
00128                 }
00129 
00130                 void BinaryStreamClient::finally()
00131                 {
00132                         IBRCOMMON_LOGGER_DEBUG(60) << "BinaryStreamClient down" << IBRCOMMON_LOGGER_ENDL;
00133 
00134                         // abort blocking registrations
00135                         _client.getRegistration().abort();
00136 
00137                         // close the stream
00138                         try {
00139                                 _stream.close();
00140                         } catch (const ibrcommon::ConnectionClosedException&) { };
00141 
00142                         try {
00143                                 // shutdown the sender thread
00144                                 _sender.stop();
00145                         } catch (const std::exception&) { };
00146                 }
00147 
00148                 void BinaryStreamClient::run()
00149                 {
00150                         try {
00151                                 char flags = 0;
00152 
00153                                 // request acknowledgements
00154                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00155 
00156                                 // do the handshake
00157                                 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00158 
00159                                 // start the sender thread
00160                                 _sender.start();
00161 
00162                                 while (_connection.good())
00163                                 {
00164                                         dtn::data::Bundle bundle;
00165                                         dtn::data::DefaultDeserializer(_connection) >> bundle;
00166 
00167                                         // create a new sequence number
00168                                         bundle.relabel();
00169 
00170                                         // process the new bundle
00171                                         _client.getAPIServer().processIncomingBundle(_eid, bundle);
00172                                 }
00173                         } catch (const ibrcommon::ThreadException &ex) {
00174                                 IBRCOMMON_LOGGER(error) << "failed to start thread in BinaryStreamClient\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00175                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00176                         } catch (const dtn::SerializationFailedException &ex) {
00177                                 IBRCOMMON_LOGGER(error) << "BinaryStreamClient::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00178                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00179                         } catch (const ibrcommon::IOException &ex) {
00180                                 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00181                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00182                         } catch (const dtn::InvalidDataException &ex) {
00183                                 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00184                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00185                         } catch (const std::exception &ex) {
00186                                 IBRCOMMON_LOGGER_DEBUG(10) << "BinaryStreamClient::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00187                                 _connection.shutdown(dtn::streams::StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00188                         }
00189                 }
00190 
00191                 bool BinaryStreamClient::good() const
00192                 {
00193                         return _stream.good();
00194                 }
00195 
00196                 BinaryStreamClient::Sender::Sender(BinaryStreamClient &client)
00197                  : _client(client)
00198                 {
00199                 }
00200 
00201                 BinaryStreamClient::Sender::~Sender()
00202                 {
00203                         ibrcommon::JoinableThread::join();
00204                 }
00205 
00206                 void BinaryStreamClient::Sender::__cancellation()
00207                 {
00208                         // cancel the main thread in here
00209                         this->abort();
00210 
00211                         // abort all blocking calls on the registration object
00212                         _client._client.getRegistration().abort();
00213                 }
00214 
00215                 void BinaryStreamClient::Sender::run()
00216                 {
00217                         Registration &reg = _client._client.getRegistration();
00218 
00219                         try {
00220                                 while (_client.good())
00221                                 {
00222                                         try {
00223                                                 dtn::data::Bundle bundle = reg.receive();
00224 
00225                                                 // process the bundle block (security, compression, ...)
00226                                                 dtn::core::BundleCore::processBlocks(bundle);
00227 
00228                                                 // add bundle to the queue
00229                                                 _client._sentqueue.push(bundle);
00230 
00231                                                 // transmit the bundle
00232                                                 dtn::data::DefaultSerializer(_client._connection) << bundle;
00233 
00234                                                 // mark the end of the bundle
00235                                                 _client._connection << std::flush;
00236                                         } catch (const dtn::storage::BundleStorage::NoBundleFoundException&) {
00237                                                 reg.wait_for_bundle();
00238                                         }
00239 
00240                                         // idle a little bit
00241                                         yield();
00242                                 }
00243                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00244                                 IBRCOMMON_LOGGER_DEBUG(40) << "BinaryStreamClient::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00245                                 return;
00246                         } catch (const ibrcommon::IOException &ex) {
00247                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00248                         } catch (const dtn::InvalidDataException &ex) {
00249                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00250                         } catch (const std::exception &ex) {
00251                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00252                         }
00253                 }
00254 
00255                 void BinaryStreamClient::queue(const dtn::data::Bundle &bundle)
00256                 {
00257                         _sender.push(bundle);
00258                 }
00259         }
00260 }