IBR-DTNSuite
0.8
|
00001 /* 00002 * DatagramConnection.cpp 00003 * 00004 * Created on: 21.11.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "net/DatagramConnection.h" 00009 #include "net/BundleReceivedEvent.h" 00010 #include "core/BundleEvent.h" 00011 #include "net/TransferCompletedEvent.h" 00012 #include "net/TransferAbortedEvent.h" 00013 #include "routing/RequeueBundleEvent.h" 00014 #include "core/BundleCore.h" 00015 00016 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00017 #include <ibrdtn/utils/Utils.h> 00018 #include <ibrdtn/data/Serializer.h> 00019 00020 #include <ibrcommon/Logger.h> 00021 #include <string.h> 00022 00023 namespace dtn 00024 { 00025 namespace net 00026 { 00027 DatagramConnection::DatagramConnection(const std::string &identifier, const DatagramConnectionParameter ¶ms, DatagramConnectionCallback &callback) 00028 : _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length, params.max_seq_numbers), _sender(*this, _stream), _last_ack(-1), _wait_ack(-1), _params(params) 00029 { 00030 } 00031 00032 DatagramConnection::~DatagramConnection() 00033 { 00034 // do not destroy this instance as long as 00035 // the sender thread is running 00036 _sender.join(); 00037 } 00038 00039 void DatagramConnection::shutdown() 00040 { 00041 try { 00042 // abort the connection thread 00043 ibrcommon::DetachedThread::stop(); 00044 } catch (const ibrcommon::ThreadException &ex) { 00045 IBRCOMMON_LOGGER_DEBUG(50) << "DatagramConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00046 } 00047 } 00048 00049 void DatagramConnection::__cancellation() 00050 { 00051 // close the stream 00052 try { 00053 _stream.close(); 00054 } catch (const ibrcommon::Exception&) { }; 00055 } 00056 00057 void DatagramConnection::run() 00058 { 00059 try { 00060 while(_stream.good()) 00061 { 00062 dtn::data::DefaultDeserializer deserializer(_stream); 00063 dtn::data::Bundle bundle; 00064 deserializer >> bundle; 00065 00066 // validate the bundle 00067 dtn::core::BundleCore::getInstance().validate(bundle); 00068 00069 00070 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::run"<< IBRCOMMON_LOGGER_ENDL; 00071 00072 // determine sender 00073 EID sender; 00074 00075 // increment value in the scope control hop limit block 00076 try { 00077 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00078 schl.increment(); 00079 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00080 00081 // raise default bundle received event 00082 dtn::net::BundleReceivedEvent::raise(sender, bundle, false, true); 00083 } 00084 } catch (const dtn::InvalidDataException &ex) { 00085 IBRCOMMON_LOGGER_DEBUG(10) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00086 } catch (std::exception &ex) { 00087 IBRCOMMON_LOGGER_DEBUG(10) << "Thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00088 } 00089 } 00090 00091 void DatagramConnection::setup() 00092 { 00093 _callback.connectionUp(this); 00094 _sender.start(); 00095 } 00096 00097 void DatagramConnection::finally() 00098 { 00099 IBRCOMMON_LOGGER_DEBUG(60) << "DatagramConnection down" << IBRCOMMON_LOGGER_ENDL; 00100 00101 try { 00102 ibrcommon::MutexLock l(_ack_cond); 00103 _ack_cond.abort(); 00104 } catch (const std::exception&) { }; 00105 00106 try { 00107 // shutdown the sender thread 00108 _sender.stop(); 00109 00110 // wait until all operations are stopped 00111 _sender.join(); 00112 } catch (const std::exception&) { }; 00113 00114 try { 00115 // remove this connection from the connection list 00116 _callback.connectionDown(this); 00117 } catch (const ibrcommon::MutexException&) { }; 00118 00119 // clear the queue 00120 _sender.clearQueue(); 00121 } 00122 00123 const std::string& DatagramConnection::getIdentifier() const 00124 { 00125 return _identifier; 00126 } 00127 00132 void DatagramConnection::queue(const ConvergenceLayer::Job &job) 00133 { 00134 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::queue"<< IBRCOMMON_LOGGER_ENDL; 00135 _sender.queue.push(job); 00136 } 00137 00143 void DatagramConnection::queue(const char &flags, const unsigned int &seqno, const char *buf, int len) 00144 { 00145 _stream.queue(flags, seqno, buf, len); 00146 00147 if (_params.flowcontrol == DatagramConnectionParameter::FLOW_STOPNWAIT) 00148 { 00149 // send ack for this message 00150 _callback.callback_ack(*this, seqno, getIdentifier()); 00151 } 00152 } 00153 00154 void DatagramConnection::ack(const unsigned int &seqno) 00155 { 00156 ibrcommon::MutexLock l(_ack_cond); 00157 if (_wait_ack == seqno) 00158 { 00159 _last_ack = seqno; 00160 _ack_cond.signal(true); 00161 IBRCOMMON_LOGGER_DEBUG(20) << "DatagramConnection: ack received " << _last_ack << IBRCOMMON_LOGGER_ENDL; 00162 } 00163 } 00164 00165 void DatagramConnection::stream_send(const char &flags, const unsigned int &seqno, const char *buf, int len) throw (DatagramException) 00166 { 00167 _wait_ack = seqno; 00168 00169 // max. 5 retries 00170 for (int i = 0; i < 5; i++) 00171 { 00172 // send the datagram 00173 _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len); 00174 00175 if (_params.flowcontrol == DatagramConnectionParameter::FLOW_STOPNWAIT) 00176 { 00177 // set timeout to 200 ms 00178 struct timespec ts; 00179 ibrcommon::Conditional::gettimeout(200, &ts); 00180 00181 try { 00182 // wait here for an ACK 00183 ibrcommon::MutexLock l(_ack_cond); 00184 while (_last_ack != _wait_ack) 00185 { 00186 _ack_cond.wait(&ts); 00187 } 00188 00189 // success! 00190 return; 00191 } 00192 catch (const ibrcommon::Conditional::ConditionalAbortException &e) { }; 00193 } 00194 else 00195 { 00196 // success by default 00197 return; 00198 } 00199 } 00200 00201 // transmission failed - abort the stream 00202 IBRCOMMON_LOGGER_DEBUG(20) << "DatagramConnection::stream_send: transmission failed - abort the stream" << IBRCOMMON_LOGGER_ENDL; 00203 throw DatagramException("transmission failed - abort the stream"); 00204 } 00205 00206 DatagramConnection::Stream::Stream(DatagramConnection &conn, const size_t maxmsglen, const unsigned int maxseqno) 00207 : std::iostream(this), _buf_size(maxmsglen), _maxseqno(maxseqno), _in_state(SEGMENT_FIRST), _out_state(SEGMENT_FIRST), 00208 _queue_buf(new char[_buf_size]), _queue_buf_len(0), _out_buf(new char[_buf_size]), _in_buf(new char[_buf_size]), 00209 in_seq_num_(0), out_seq_num_(0), _abort(false), _callback(conn) 00210 { 00211 // Initialize get pointer. This should be zero so that underflow 00212 // is called upon first read. 00213 setg(0, 0, 0); 00214 00215 // mark the buffer for outgoing data as free 00216 // the +1 sparse the first byte in the buffer and leave room 00217 // for the processing flags of the segment 00218 setp(_out_buf, _out_buf + _buf_size - 1); 00219 } 00220 00221 DatagramConnection::Stream::~Stream() 00222 { 00223 delete[] _queue_buf; 00224 delete[] _out_buf; 00225 delete[] _in_buf; 00226 } 00227 00228 void DatagramConnection::Stream::queue(const char &flags, const unsigned int &seqno, const char *buf, int len) 00229 { 00230 ibrcommon::MutexLock l(_queue_buf_cond); 00231 if (_abort) throw DatagramException("stream aborted"); 00232 00233 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::queue(): Received frame sequence number: " << seqno << IBRCOMMON_LOGGER_ENDL; 00234 00235 // Check if the sequence number is what we expect 00236 if (in_seq_num_ != seqno) 00237 { 00238 IBRCOMMON_LOGGER(error) << "Received frame with out of bound sequence number (" << seqno << " expected " << in_seq_num_ << ")"<< IBRCOMMON_LOGGER_ENDL; 00239 _abort = true; 00240 _queue_buf_cond.signal(); 00241 00242 if (flags & SEGMENT_FIRST) 00243 { 00244 throw DatagramException("out of bound exception - re-initiate the connection"); 00245 } 00246 return; 00247 } 00248 00249 // check if this is the first segment since we expect a first segment 00250 if ((_in_state & SEGMENT_FIRST) && (!(flags & SEGMENT_FIRST))) 00251 { 00252 IBRCOMMON_LOGGER(error) << "Received frame with wrong segment mark"<< IBRCOMMON_LOGGER_ENDL; 00253 _abort = true; 00254 _queue_buf_cond.signal(); 00255 return; 00256 } 00257 // check if this is a second first segment without any previous last segment 00258 else if ((_in_state == SEGMENT_MIDDLE) && (flags & SEGMENT_FIRST)) 00259 { 00260 IBRCOMMON_LOGGER(error) << "Received frame with wrong segment mark"<< IBRCOMMON_LOGGER_ENDL; 00261 _abort = true; 00262 _queue_buf_cond.signal(); 00263 return; 00264 } 00265 00266 if (flags & SEGMENT_FIRST) 00267 { 00268 IBRCOMMON_LOGGER_DEBUG(45) << "DatagramConnection: first segment received" << IBRCOMMON_LOGGER_ENDL; 00269 } 00270 00271 // if this is the last segment then... 00272 if (flags & SEGMENT_LAST) 00273 { 00274 IBRCOMMON_LOGGER_DEBUG(45) << "DatagramConnection: last segment received" << IBRCOMMON_LOGGER_ENDL; 00275 00276 // ... expect a first segment as next 00277 _in_state = SEGMENT_FIRST; 00278 } 00279 else 00280 { 00281 // if this is not the last segment we expect everything 00282 // but a first segment 00283 _in_state = SEGMENT_MIDDLE; 00284 } 00285 00286 // increment the sequence number 00287 in_seq_num_ = (in_seq_num_ + 1) % _maxseqno; 00288 00289 // wait until the buffer is free 00290 while (_queue_buf_len > 0) 00291 { 00292 _queue_buf_cond.wait(); 00293 } 00294 00295 // copy the new data into the buffer, but leave out the first byte (header) 00296 ::memcpy(_queue_buf, buf, len); 00297 00298 // store the buffer length 00299 _queue_buf_len = len; 00300 00301 // notify waiting threads 00302 _queue_buf_cond.signal(); 00303 } 00304 00305 void DatagramConnection::Stream::close() 00306 { 00307 ibrcommon::MutexLock l(_queue_buf_cond); 00308 00309 while (_queue_buf_len > 0) 00310 { 00311 _queue_buf_cond.wait(); 00312 } 00313 00314 _abort = true; 00315 _queue_buf_cond.abort(); 00316 } 00317 00318 int DatagramConnection::Stream::sync() 00319 { 00320 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::sync" << IBRCOMMON_LOGGER_ENDL; 00321 00322 // Here we know we get the last segment. Mark it so. 00323 _out_state |= SEGMENT_LAST; 00324 00325 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00326 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00327 : 0; 00328 00329 // initialize the first byte with SEGMENT_FIRST flag 00330 _out_state = SEGMENT_FIRST; 00331 00332 return ret; 00333 } 00334 00335 std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c) 00336 { 00337 if (_abort) throw DatagramException("stream aborted"); 00338 00339 char *ibegin = _out_buf; 00340 char *iend = pptr(); 00341 00342 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::overflow" << IBRCOMMON_LOGGER_ENDL; 00343 00344 // mark the buffer for outgoing data as free 00345 // the +1 sparse the first byte in the buffer and leave room 00346 // for the processing flags of the segment 00347 setp(_out_buf, _out_buf + _buf_size - 1); 00348 00349 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00350 { 00351 *iend++ = std::char_traits<char>::to_char_type(c); 00352 } 00353 00354 // bytes to send 00355 size_t bytes = (iend - ibegin); 00356 00357 // if there is nothing to send, just return 00358 if (bytes == 0) 00359 { 00360 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL; 00361 return std::char_traits<char>::not_eof(c); 00362 } 00363 00364 // Send segment to CL, use callback interface 00365 _callback.stream_send(_out_state, out_seq_num_, _out_buf, bytes); 00366 00367 // increment the sequence number for outgoing segments 00368 out_seq_num_ = (out_seq_num_ + 1) % _maxseqno; 00369 00370 // set the segment state to middle 00371 _out_state = SEGMENT_MIDDLE; 00372 00373 return std::char_traits<char>::not_eof(c); 00374 } 00375 00376 std::char_traits<char>::int_type DatagramConnection::Stream::underflow() 00377 { 00378 ibrcommon::MutexLock l(_queue_buf_cond); 00379 00380 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::underflow"<< IBRCOMMON_LOGGER_ENDL; 00381 00382 while (_queue_buf_len == 0) 00383 { 00384 if (_abort) throw ibrcommon::Exception("stream aborted"); 00385 _queue_buf_cond.wait(); 00386 } 00387 00388 // copy the queue buffer to an internal buffer 00389 ::memcpy(_in_buf,_queue_buf, _queue_buf_len); 00390 00391 // Since the input buffer content is now valid (or is new) 00392 // the get pointer should be initialized (or reset). 00393 setg(_in_buf, _in_buf, _in_buf + _queue_buf_len); 00394 00395 // mark the queue buffer as free 00396 _queue_buf_len = 0; 00397 _queue_buf_cond.signal(); 00398 00399 return std::char_traits<char>::not_eof((unsigned char) _in_buf[0]); 00400 } 00401 00402 DatagramConnection::Sender::Sender(DatagramConnection &conn, Stream &stream) 00403 : _stream(stream), _connection(conn) 00404 { 00405 } 00406 00407 DatagramConnection::Sender::~Sender() 00408 { 00409 } 00410 00411 void DatagramConnection::Sender::run() 00412 { 00413 try { 00414 while(_stream.good()) 00415 { 00416 _current_job = queue.getnpop(true); 00417 dtn::data::DefaultSerializer serializer(_stream); 00418 00419 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Sender::run"<< IBRCOMMON_LOGGER_ENDL; 00420 00421 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00422 00423 // read the bundle out of the storage 00424 const dtn::data::Bundle bundle = storage.get(_current_job._bundle); 00425 00426 // Put bundle into stringstream 00427 serializer << bundle; _stream.flush(); 00428 // raise bundle event 00429 dtn::net::TransferCompletedEvent::raise(_current_job._destination, bundle); 00430 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00431 _current_job.clear(); 00432 } 00433 00434 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Sender::run stream destroyed"<< IBRCOMMON_LOGGER_ENDL; 00435 } catch (const ibrcommon::QueueUnblockedException &ex) { 00436 IBRCOMMON_LOGGER_DEBUG(50) << "DatagramConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00437 return; 00438 } catch (std::exception &ex) { 00439 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00440 } 00441 00442 _connection.stop(); 00443 } 00444 00445 void DatagramConnection::Sender::clearQueue() 00446 { 00447 // requeue all bundles still queued 00448 try { 00449 while (true) 00450 { 00451 const ConvergenceLayer::Job job = queue.getnpop(); 00452 00453 // raise transfer abort event for all bundles without an ACK 00454 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle); 00455 } 00456 } catch (const ibrcommon::QueueUnblockedException&) { 00457 // queue empty 00458 } 00459 } 00460 00461 void DatagramConnection::Sender::finally() 00462 { 00463 // notify the aborted transfer of the last bundle 00464 if (_current_job._bundle != dtn::data::BundleID()) 00465 { 00466 // put-back job on the queue 00467 queue.push(_current_job); 00468 } 00469 00470 // abort all blocking operations on the stream 00471 _stream.close(); 00472 } 00473 00474 void DatagramConnection::Sender::__cancellation() 00475 { 00476 _stream.close(); 00477 queue.abort(); 00478 } 00479 } /* namespace data */ 00480 } /* namespace dtn */