IBR-DTNSuite  0.8
ibrdtn/ibrdtn/api/Client.cpp
Go to the documentation of this file.
00001 /*
00002  * Client.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 
00009 
00010 
00011 #include "ibrdtn/api/Client.h"
00012 #include "ibrdtn/api/Bundle.h"
00013 #include "ibrdtn/data/SDNV.h"
00014 #include "ibrdtn/data/Exceptions.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include "ibrdtn/streams/StreamContactHeader.h"
00017 
00018 #include <ibrcommon/net/tcpstream.h>
00019 #include <ibrcommon/Logger.h>
00020 
00021 #include <iostream>
00022 #include <string>
00023 
00024 namespace dtn
00025 {
00026         namespace api
00027         {
00028                 Client::AsyncReceiver::AsyncReceiver(Client &client)
00029                  : _client(client), _running(true)
00030                 {
00031                 }
00032 
00033                 Client::AsyncReceiver::~AsyncReceiver()
00034                 {
00035                 }
00036 
00037                 void Client::AsyncReceiver::__cancellation()
00038                 {
00039                         _running = false;
00040                 }
00041 
00042                 void Client::AsyncReceiver::run()
00043                 {
00044                         try {
00045                                 while (!_client.eof() && _running)
00046                                 {
00047                                         dtn::api::Bundle b;
00048                                         _client >> b;
00049                                         _client.received(b);
00050                                         yield();
00051                                 }
00052                         } catch (const dtn::api::ConnectionException &ex) {
00053                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00054                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00055                         } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) {
00056                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00057                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00058                         } catch (const ibrcommon::IOException &ex) {
00059                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00060                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00061                         } catch (const dtn::InvalidDataException &ex) {
00062                                 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00063                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00064                         } catch (const std::exception&) {
00065                                 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL;
00066                                 _client.shutdown(CONNECTION_SHUTDOWN_ERROR);
00067                         }
00068                 }
00069 
00070                 Client::Client(const std::string &app, const dtn::data::EID &group, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode)
00071                   : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _group(group), _receiver(*this)
00072                 {
00073                 }
00074 
00075                 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode)
00076                   : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this)
00077                 {
00078                 }
00079 
00080                 Client::~Client()
00081                 {
00082                         try {
00083                                 // stop the receiver
00084                                 _receiver.stop();
00085                         } catch (const ibrcommon::ThreadException &ex) {
00086                                 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00087                         }
00088                         
00089                         // Close the stream. This releases all reading or writing threads.
00090                         _stream.close();
00091 
00092                         // wait until the async thread has been finished
00093                         _receiver.join();
00094                 }
00095 
00096                 void Client::connect()
00097                 {
00098                         // do a handshake
00099                         EID localeid;
00100                         if (_app.length() > 0) localeid = EID("api:" + _app);
00101 
00102                         // connection flags
00103                         char flags = 0;
00104 
00105                         // request acknowledgements
00106                         flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00107 
00108                         // set comm. mode
00109                         if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY;
00110 
00111                         // receive API banner
00112                         std::string buffer;
00113                         std::getline(_stream, buffer);
00114 
00115                         // if requested...
00116                         if (!_group.isNone())
00117                         {
00118                                 // join the group
00119                                 _stream << "registration add " << _group.getString() << std::endl;
00120 
00121                                 // read the reply
00122                                 std::getline(_stream, buffer);
00123                         }
00124 
00125                         // switch to API tcpcl mode
00126                         _stream << "protocol tcpcl" << std::endl;
00127 
00128                         // do the handshake (no timeout, no keepalive)
00129                         handshake(localeid, 0, flags);
00130 
00131                         try {
00132                                 // run the receiver
00133                                 _receiver.start();
00134                         } catch (const ibrcommon::ThreadException &ex) {
00135                                 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00136                         }
00137                 }
00138 
00139                 void Client::close()
00140                 {
00141                         // shutdown the bundle stream connection
00142                         shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00143                 }
00144 
00145                 void Client::abort()
00146                 {
00147                         _inqueue.abort();
00148 
00149                         // shutdown the bundle stream connection
00150                         shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00151                 }
00152 
00153                 void Client::eventConnectionDown()
00154                 {
00155                         _inqueue.abort();
00156 
00157                         try {
00158                                 _receiver.stop();
00159                         } catch (const ibrcommon::ThreadException &ex) {
00160                                 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00161                         }
00162                 }
00163 
00164                 void Client::eventBundleAck(size_t ack)
00165                 {
00166                         lastack = ack;
00167                 }
00168 
00169                 void Client::received(const dtn::api::Bundle &b)
00170                 {
00171                         // if we are in send only mode...
00172                         if (_mode != dtn::api::Client::MODE_SENDONLY)
00173                         {
00174                                 _inqueue.push(b);
00175                         }
00176 
00177                         // ... then discard the received bundle
00178                 }
00179 
00180                 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException)
00181                 {
00182                         try {
00183                                 return _inqueue.getnpop(true, timeout * 1000);
00184                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00185                                 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT)
00186                                 {
00187                                         throw ConnectionTimeoutException();
00188                                 }
00189                                 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00190                                 {
00191                                         throw ConnectionAbortedException(ex.what());
00192                                 }
00193 
00194                                 throw ConnectionException(ex.what());
00195                         } catch (const std::exception &ex) {
00196                                 throw ConnectionException(ex.what());
00197                         }
00198 
00199                         throw ConnectionException();
00200                 }
00201         }
00202 }