IBR-DTNSuite
0.8
|
00001 /* 00002 * Client.cpp 00003 * 00004 * Created on: 24.06.2009 00005 * Author: morgenro 00006 */ 00007 00008 00009 00010 00011 #include "ibrdtn/api/Client.h" 00012 #include "ibrdtn/api/Bundle.h" 00013 #include "ibrdtn/data/SDNV.h" 00014 #include "ibrdtn/data/Exceptions.h" 00015 #include "ibrdtn/streams/StreamDataSegment.h" 00016 #include "ibrdtn/streams/StreamContactHeader.h" 00017 00018 #include <ibrcommon/net/tcpstream.h> 00019 #include <ibrcommon/Logger.h> 00020 00021 #include <iostream> 00022 #include <string> 00023 00024 namespace dtn 00025 { 00026 namespace api 00027 { 00028 Client::AsyncReceiver::AsyncReceiver(Client &client) 00029 : _client(client), _running(true) 00030 { 00031 } 00032 00033 Client::AsyncReceiver::~AsyncReceiver() 00034 { 00035 } 00036 00037 void Client::AsyncReceiver::__cancellation() 00038 { 00039 _running = false; 00040 } 00041 00042 void Client::AsyncReceiver::run() 00043 { 00044 try { 00045 while (!_client.eof() && _running) 00046 { 00047 dtn::api::Bundle b; 00048 _client >> b; 00049 _client.received(b); 00050 yield(); 00051 } 00052 } catch (const dtn::api::ConnectionException &ex) { 00053 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - ConnectionException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00054 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00055 } catch (const dtn::streams::StreamConnection::StreamErrorException &ex) { 00056 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - StreamErrorException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00057 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00058 } catch (const ibrcommon::IOException &ex) { 00059 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - IOException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00060 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00061 } catch (const dtn::InvalidDataException &ex) { 00062 IBRCOMMON_LOGGER(error) << "Client::AsyncReceiver - InvalidDataException: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00063 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00064 } catch (const std::exception&) { 00065 IBRCOMMON_LOGGER(error) << "error" << IBRCOMMON_LOGGER_ENDL; 00066 _client.shutdown(CONNECTION_SHUTDOWN_ERROR); 00067 } 00068 } 00069 00070 Client::Client(const std::string &app, const dtn::data::EID &group, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode) 00071 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _group(group), _receiver(*this) 00072 { 00073 } 00074 00075 Client::Client(const std::string &app, ibrcommon::tcpstream &stream, const COMMUNICATION_MODE mode) 00076 : StreamConnection(*this, stream), _stream(stream), _mode(mode), _app(app), _receiver(*this) 00077 { 00078 } 00079 00080 Client::~Client() 00081 { 00082 try { 00083 // stop the receiver 00084 _receiver.stop(); 00085 } catch (const ibrcommon::ThreadException &ex) { 00086 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client destructor: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00087 } 00088 00089 // Close the stream. This releases all reading or writing threads. 00090 _stream.close(); 00091 00092 // wait until the async thread has been finished 00093 _receiver.join(); 00094 } 00095 00096 void Client::connect() 00097 { 00098 // do a handshake 00099 EID localeid; 00100 if (_app.length() > 0) localeid = EID("api:" + _app); 00101 00102 // connection flags 00103 char flags = 0; 00104 00105 // request acknowledgements 00106 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00107 00108 // set comm. mode 00109 if (_mode == MODE_SENDONLY) flags |= HANDSHAKE_SENDONLY; 00110 00111 // receive API banner 00112 std::string buffer; 00113 std::getline(_stream, buffer); 00114 00115 // if requested... 00116 if (!_group.isNone()) 00117 { 00118 // join the group 00119 _stream << "registration add " << _group.getString() << std::endl; 00120 00121 // read the reply 00122 std::getline(_stream, buffer); 00123 } 00124 00125 // switch to API tcpcl mode 00126 _stream << "protocol tcpcl" << std::endl; 00127 00128 // do the handshake (no timeout, no keepalive) 00129 handshake(localeid, 0, flags); 00130 00131 try { 00132 // run the receiver 00133 _receiver.start(); 00134 } catch (const ibrcommon::ThreadException &ex) { 00135 IBRCOMMON_LOGGER(error) << "failed to start Client::Receiver\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00136 } 00137 } 00138 00139 void Client::close() 00140 { 00141 // shutdown the bundle stream connection 00142 shutdown(StreamConnection::CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN); 00143 } 00144 00145 void Client::abort() 00146 { 00147 _inqueue.abort(); 00148 00149 // shutdown the bundle stream connection 00150 shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00151 } 00152 00153 void Client::eventConnectionDown() 00154 { 00155 _inqueue.abort(); 00156 00157 try { 00158 _receiver.stop(); 00159 } catch (const ibrcommon::ThreadException &ex) { 00160 IBRCOMMON_LOGGER_DEBUG(20) << "ThreadException in Client::eventConnectionDown: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00161 } 00162 } 00163 00164 void Client::eventBundleAck(size_t ack) 00165 { 00166 lastack = ack; 00167 } 00168 00169 void Client::received(const dtn::api::Bundle &b) 00170 { 00171 // if we are in send only mode... 00172 if (_mode != dtn::api::Client::MODE_SENDONLY) 00173 { 00174 _inqueue.push(b); 00175 } 00176 00177 // ... then discard the received bundle 00178 } 00179 00180 dtn::api::Bundle Client::getBundle(size_t timeout) throw (ConnectionException) 00181 { 00182 try { 00183 return _inqueue.getnpop(true, timeout * 1000); 00184 } catch (const ibrcommon::QueueUnblockedException &ex) { 00185 if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT) 00186 { 00187 throw ConnectionTimeoutException(); 00188 } 00189 else if (ex.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT) 00190 { 00191 throw ConnectionAbortedException(ex.what()); 00192 } 00193 00194 throw ConnectionException(ex.what()); 00195 } catch (const std::exception &ex) { 00196 throw ConnectionException(ex.what()); 00197 } 00198 00199 throw ConnectionException(); 00200 } 00201 } 00202 }