IBR-DTNSuite  0.8
daemon/src/net/DatagramConnection.cpp
Go to the documentation of this file.
00001 /*
00002  * DatagramConnection.cpp
00003  *
00004  *  Created on: 21.11.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/DatagramConnection.h"
00009 #include "net/BundleReceivedEvent.h"
00010 #include "core/BundleEvent.h"
00011 #include "net/TransferCompletedEvent.h"
00012 #include "net/TransferAbortedEvent.h"
00013 #include "routing/RequeueBundleEvent.h"
00014 #include "core/BundleCore.h"
00015 
00016 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00017 #include <ibrdtn/utils/Utils.h>
00018 #include <ibrdtn/data/Serializer.h>
00019 
00020 #include <ibrcommon/Logger.h>
00021 #include <string.h>
00022 
00023 namespace dtn
00024 {
00025         namespace net
00026         {
00027                 DatagramConnection::DatagramConnection(const std::string &identifier, const DatagramConnectionParameter &params, DatagramConnectionCallback &callback)
00028                  : _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length, params.max_seq_numbers), _sender(*this, _stream), _last_ack(-1), _wait_ack(-1), _params(params)
00029                 {
00030                 }
00031 
00032                 DatagramConnection::~DatagramConnection()
00033                 {
00034                         // do not destroy this instance as long as
00035                         // the sender thread is running
00036                         _sender.join();
00037                 }
00038 
00039                 void DatagramConnection::shutdown()
00040                 {
00041                         try {
00042                                 // abort the connection thread
00043                                 ibrcommon::DetachedThread::stop();
00044                         } catch (const ibrcommon::ThreadException &ex) {
00045                                 IBRCOMMON_LOGGER_DEBUG(50) << "DatagramConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00046                         }
00047                 }
00048 
00049                 void DatagramConnection::__cancellation()
00050                 {
00051                         // close the stream
00052                         try {
00053                                 _stream.close();
00054                         } catch (const ibrcommon::Exception&) { };
00055                 }
00056 
00057                 void DatagramConnection::run()
00058                 {
00059                         try {
00060                                 while(_stream.good())
00061                                 {
00062                                         dtn::data::DefaultDeserializer deserializer(_stream);
00063                                         dtn::data::Bundle bundle;
00064                                         deserializer >> bundle;
00065                                         
00066                                         // validate the bundle
00067                                         dtn::core::BundleCore::getInstance().validate(bundle);
00068 
00069 
00070                                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::run"<< IBRCOMMON_LOGGER_ENDL;
00071 
00072                                         // determine sender
00073                                         EID sender;
00074 
00075                                         // increment value in the scope control hop limit block
00076                                         try {
00077                                                 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00078                                                 schl.increment();
00079                                         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00080 
00081                                         // raise default bundle received event
00082                                         dtn::net::BundleReceivedEvent::raise(sender, bundle, false, true);
00083                                 }
00084                         } catch (const dtn::InvalidDataException &ex) {
00085                                 IBRCOMMON_LOGGER_DEBUG(10) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00086                         } catch (std::exception &ex) {
00087                                 IBRCOMMON_LOGGER_DEBUG(10) << "Thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00088                         }
00089                 }
00090 
00091                 void DatagramConnection::setup()
00092                 {
00093                         _callback.connectionUp(this);
00094                         _sender.start();
00095                 }
00096 
00097                 void DatagramConnection::finally()
00098                 {
00099                         IBRCOMMON_LOGGER_DEBUG(60) << "DatagramConnection down" << IBRCOMMON_LOGGER_ENDL;
00100 
00101                         try {
00102                                 ibrcommon::MutexLock l(_ack_cond);
00103                                 _ack_cond.abort();
00104                         } catch (const std::exception&) { };
00105 
00106                         try {
00107                                 // shutdown the sender thread
00108                                 _sender.stop();
00109 
00110                                 // wait until all operations are stopped
00111                                 _sender.join();
00112                         } catch (const std::exception&) { };
00113 
00114                         try {
00115                                 // remove this connection from the connection list
00116                                 _callback.connectionDown(this);
00117                         } catch (const ibrcommon::MutexException&) { };
00118 
00119                         // clear the queue
00120                         _sender.clearQueue();
00121                 }
00122 
00123                 const std::string& DatagramConnection::getIdentifier() const
00124                 {
00125                         return _identifier;
00126                 }
00127 
00132                 void DatagramConnection::queue(const ConvergenceLayer::Job &job)
00133                 {
00134                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::queue"<< IBRCOMMON_LOGGER_ENDL;
00135                         _sender.queue.push(job);
00136                 }
00137 
00143                 void DatagramConnection::queue(const char &flags, const unsigned int &seqno, const char *buf, int len)
00144                 {
00145                         _stream.queue(flags, seqno, buf, len);
00146 
00147                         if (_params.flowcontrol == DatagramConnectionParameter::FLOW_STOPNWAIT)
00148                         {
00149                                 // send ack for this message
00150                                 _callback.callback_ack(*this, seqno, getIdentifier());
00151                         }
00152                 }
00153 
00154                 void DatagramConnection::ack(const unsigned int &seqno)
00155                 {
00156                         ibrcommon::MutexLock l(_ack_cond);
00157                         if (_wait_ack == seqno)
00158                         {
00159                                 _last_ack = seqno;
00160                                 _ack_cond.signal(true);
00161                                 IBRCOMMON_LOGGER_DEBUG(20) << "DatagramConnection: ack received " << _last_ack << IBRCOMMON_LOGGER_ENDL;
00162                         }
00163                 }
00164 
00165                 void DatagramConnection::stream_send(const char &flags, const unsigned int &seqno, const char *buf, int len) throw (DatagramException)
00166                 {
00167                         _wait_ack = seqno;
00168 
00169                         // max. 5 retries
00170                         for (int i = 0; i < 5; i++)
00171                         {
00172                                 // send the datagram
00173                                 _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len);
00174 
00175                                 if (_params.flowcontrol == DatagramConnectionParameter::FLOW_STOPNWAIT)
00176                                 {
00177                                         // set timeout to 200 ms
00178                                         struct timespec ts;
00179                                         ibrcommon::Conditional::gettimeout(200, &ts);
00180 
00181                                         try {
00182                                                 // wait here for an ACK
00183                                                 ibrcommon::MutexLock l(_ack_cond);
00184                                                 while (_last_ack != _wait_ack)
00185                                                 {
00186                                                         _ack_cond.wait(&ts);
00187                                                 }
00188 
00189                                                 // success!
00190                                                 return;
00191                                         }
00192                                         catch (const ibrcommon::Conditional::ConditionalAbortException &e) { };
00193                                 }
00194                                 else
00195                                 {
00196                                         // success by default
00197                                         return;
00198                                 }
00199                         }
00200 
00201                         // transmission failed - abort the stream
00202                         IBRCOMMON_LOGGER_DEBUG(20) << "DatagramConnection::stream_send: transmission failed - abort the stream" << IBRCOMMON_LOGGER_ENDL;
00203                         throw DatagramException("transmission failed - abort the stream");
00204                 }
00205 
00206                 DatagramConnection::Stream::Stream(DatagramConnection &conn, const size_t maxmsglen, const unsigned int maxseqno)
00207                  : std::iostream(this), _buf_size(maxmsglen), _maxseqno(maxseqno), _in_state(SEGMENT_FIRST), _out_state(SEGMENT_FIRST),
00208                    _queue_buf(new char[_buf_size]), _queue_buf_len(0), _out_buf(new char[_buf_size]), _in_buf(new char[_buf_size]),
00209                    in_seq_num_(0), out_seq_num_(0), _abort(false), _callback(conn)
00210                 {
00211                         // Initialize get pointer. This should be zero so that underflow
00212                         // is called upon first read.
00213                         setg(0, 0, 0);
00214 
00215                         // mark the buffer for outgoing data as free
00216                         // the +1 sparse the first byte in the buffer and leave room
00217                         // for the processing flags of the segment
00218                         setp(_out_buf, _out_buf + _buf_size - 1);
00219                 }
00220 
00221                 DatagramConnection::Stream::~Stream()
00222                 {
00223                         delete[] _queue_buf;
00224                         delete[] _out_buf;
00225                         delete[] _in_buf;
00226                 }
00227 
00228                 void DatagramConnection::Stream::queue(const char &flags, const unsigned int &seqno, const char *buf, int len)
00229                 {
00230                         ibrcommon::MutexLock l(_queue_buf_cond);
00231                         if (_abort) throw DatagramException("stream aborted");
00232 
00233                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::queue(): Received frame sequence number: " << seqno << IBRCOMMON_LOGGER_ENDL;
00234 
00235                         // Check if the sequence number is what we expect
00236                         if (in_seq_num_ != seqno)
00237                         {
00238                                 IBRCOMMON_LOGGER(error) << "Received frame with out of bound sequence number (" << seqno << " expected " << in_seq_num_ << ")"<< IBRCOMMON_LOGGER_ENDL;
00239                                 _abort = true;
00240                                 _queue_buf_cond.signal();
00241 
00242                                 if (flags & SEGMENT_FIRST)
00243                                 {
00244                                         throw DatagramException("out of bound exception - re-initiate the connection");
00245                                 }
00246                                 return;
00247                         }
00248 
00249                         // check if this is the first segment since we expect a first segment
00250                         if ((_in_state & SEGMENT_FIRST) && (!(flags & SEGMENT_FIRST)))
00251                         {
00252                                 IBRCOMMON_LOGGER(error) << "Received frame with wrong segment mark"<< IBRCOMMON_LOGGER_ENDL;
00253                                 _abort = true;
00254                                 _queue_buf_cond.signal();
00255                                 return;
00256                         }
00257                         // check if this is a second first segment without any previous last segment
00258                         else if ((_in_state == SEGMENT_MIDDLE) && (flags & SEGMENT_FIRST))
00259                         {
00260                                 IBRCOMMON_LOGGER(error) << "Received frame with wrong segment mark"<< IBRCOMMON_LOGGER_ENDL;
00261                                 _abort = true;
00262                                 _queue_buf_cond.signal();
00263                                 return;
00264                         }
00265 
00266                         if (flags & SEGMENT_FIRST)
00267                         {
00268                                 IBRCOMMON_LOGGER_DEBUG(45) << "DatagramConnection: first segment received" << IBRCOMMON_LOGGER_ENDL;
00269                         }
00270 
00271                         // if this is the last segment then...
00272                         if (flags & SEGMENT_LAST)
00273                         {
00274                                 IBRCOMMON_LOGGER_DEBUG(45) << "DatagramConnection: last segment received" << IBRCOMMON_LOGGER_ENDL;
00275 
00276                                 // ... expect a first segment as next
00277                                 _in_state = SEGMENT_FIRST;
00278                         }
00279                         else
00280                         {
00281                                 // if this is not the last segment we expect everything
00282                                 // but a first segment
00283                                 _in_state = SEGMENT_MIDDLE;
00284                         }
00285 
00286                         // increment the sequence number
00287                         in_seq_num_ = (in_seq_num_ + 1) % _maxseqno;
00288 
00289                         // wait until the buffer is free
00290                         while (_queue_buf_len > 0)
00291                         {
00292                                 _queue_buf_cond.wait();
00293                         }
00294 
00295                         // copy the new data into the buffer, but leave out the first byte (header)
00296                         ::memcpy(_queue_buf, buf, len);
00297 
00298                         // store the buffer length
00299                         _queue_buf_len = len;
00300 
00301                         // notify waiting threads
00302                         _queue_buf_cond.signal();
00303                 }
00304 
00305                 void DatagramConnection::Stream::close()
00306                 {
00307                         ibrcommon::MutexLock l(_queue_buf_cond);
00308 
00309                         while (_queue_buf_len > 0)
00310                         {
00311                                 _queue_buf_cond.wait();
00312                         }
00313 
00314                         _abort = true;
00315                         _queue_buf_cond.abort();
00316                 }
00317 
00318                 int DatagramConnection::Stream::sync()
00319                 {
00320                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::sync" << IBRCOMMON_LOGGER_ENDL;
00321 
00322                         // Here we know we get the last segment. Mark it so.
00323                         _out_state |= SEGMENT_LAST;
00324 
00325                         int ret = std::char_traits<char>::eq_int_type(this->overflow(
00326                                         std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00327                                         : 0;
00328 
00329                         // initialize the first byte with SEGMENT_FIRST flag
00330                         _out_state = SEGMENT_FIRST;
00331 
00332                         return ret;
00333                 }
00334 
00335                 std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
00336                 {
00337                         if (_abort) throw DatagramException("stream aborted");
00338 
00339                         char *ibegin = _out_buf;
00340                         char *iend = pptr();
00341 
00342                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::overflow" << IBRCOMMON_LOGGER_ENDL;
00343 
00344                         // mark the buffer for outgoing data as free
00345                         // the +1 sparse the first byte in the buffer and leave room
00346                         // for the processing flags of the segment
00347                         setp(_out_buf, _out_buf + _buf_size - 1);
00348 
00349                         if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00350                         {
00351                                 *iend++ = std::char_traits<char>::to_char_type(c);
00352                         }
00353 
00354                         // bytes to send
00355                         size_t bytes = (iend - ibegin);
00356 
00357                         // if there is nothing to send, just return
00358                         if (bytes == 0)
00359                         {
00360                                 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
00361                                 return std::char_traits<char>::not_eof(c);
00362                         }
00363 
00364                         // Send segment to CL, use callback interface
00365                         _callback.stream_send(_out_state, out_seq_num_, _out_buf, bytes);
00366 
00367                         // increment the sequence number for outgoing segments
00368                         out_seq_num_ = (out_seq_num_ + 1) % _maxseqno;
00369 
00370                         // set the segment state to middle
00371                         _out_state = SEGMENT_MIDDLE;
00372 
00373                         return std::char_traits<char>::not_eof(c);
00374                 }
00375 
00376                 std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
00377                 {
00378                         ibrcommon::MutexLock l(_queue_buf_cond);
00379 
00380                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Stream::underflow"<< IBRCOMMON_LOGGER_ENDL;
00381 
00382                         while (_queue_buf_len == 0)
00383                         {
00384                                 if (_abort) throw ibrcommon::Exception("stream aborted");
00385                                 _queue_buf_cond.wait();
00386                         }
00387 
00388                         // copy the queue buffer to an internal buffer
00389                         ::memcpy(_in_buf,_queue_buf, _queue_buf_len);
00390 
00391                         // Since the input buffer content is now valid (or is new)
00392                         // the get pointer should be initialized (or reset).
00393                         setg(_in_buf, _in_buf, _in_buf + _queue_buf_len);
00394 
00395                         // mark the queue buffer as free
00396                         _queue_buf_len = 0;
00397                         _queue_buf_cond.signal();
00398 
00399                         return std::char_traits<char>::not_eof((unsigned char) _in_buf[0]);
00400                 }
00401 
00402                 DatagramConnection::Sender::Sender(DatagramConnection &conn, Stream &stream)
00403                  : _stream(stream), _connection(conn)
00404                 {
00405                 }
00406 
00407                 DatagramConnection::Sender::~Sender()
00408                 {
00409                 }
00410 
00411                 void DatagramConnection::Sender::run()
00412                 {
00413                         try {
00414                                 while(_stream.good())
00415                                 {
00416                                         _current_job = queue.getnpop(true);
00417                                         dtn::data::DefaultSerializer serializer(_stream);
00418 
00419                                         IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Sender::run"<< IBRCOMMON_LOGGER_ENDL;
00420 
00421                                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00422 
00423                                         // read the bundle out of the storage
00424                                         const dtn::data::Bundle bundle = storage.get(_current_job._bundle);
00425 
00426                                         // Put bundle into stringstream
00427                                         serializer << bundle; _stream.flush();
00428                                         // raise bundle event
00429                                         dtn::net::TransferCompletedEvent::raise(_current_job._destination, bundle);
00430                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00431                                         _current_job.clear();
00432                                 }
00433 
00434                                 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Sender::run stream destroyed"<< IBRCOMMON_LOGGER_ENDL;
00435                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00436                                 IBRCOMMON_LOGGER_DEBUG(50) << "DatagramConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00437                                 return;
00438                         } catch (std::exception &ex) {
00439                                 IBRCOMMON_LOGGER_DEBUG(10) << "DatagramConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00440                         }
00441 
00442                         _connection.stop();
00443                 }
00444 
00445                 void DatagramConnection::Sender::clearQueue()
00446                 {
00447                         // requeue all bundles still queued
00448                         try {
00449                                 while (true)
00450                                 {
00451                                         const ConvergenceLayer::Job job = queue.getnpop();
00452 
00453                                         // raise transfer abort event for all bundles without an ACK
00454                                         dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00455                                 }
00456                         } catch (const ibrcommon::QueueUnblockedException&) {
00457                                 // queue empty
00458                         }
00459                 }
00460 
00461                 void DatagramConnection::Sender::finally()
00462                 {
00463                         // notify the aborted transfer of the last bundle
00464                         if (_current_job._bundle != dtn::data::BundleID())
00465                         {
00466                                 // put-back job on the queue
00467                                 queue.push(_current_job);
00468                         }
00469 
00470                         // abort all blocking operations on the stream
00471                         _stream.close();
00472                 }
00473 
00474                 void DatagramConnection::Sender::__cancellation()
00475                 {
00476                         _stream.close();
00477                         queue.abort();
00478                 }
00479         } /* namespace data */
00480 } /* namespace dtn */