IBR-DTNSuite  0.8
daemon/src/api/BundleStreamBuf.cpp
Go to the documentation of this file.
00001 /*
00002  * BundleStreamBuf.cpp
00003  *
00004  *  Created on: 17.11.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "BundleStreamBuf.h"
00009 #include "core/BundleCore.h"
00010 #include <ibrdtn/data/StreamBlock.h>
00011 #include <ibrcommon/Logger.h>
00012 #include <ibrcommon/TimeMeasurement.h>
00013 
00014 namespace dtn
00015 {
00016         namespace api
00017         {
00018                 BundleStreamBuf::BundleStreamBuf(BundleStreamBufCallback &callback, size_t chunk_size, bool wait_seq_zero)
00019                  : _callback(callback), _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]),
00020                    _chunk_size(chunk_size), _chunk_payload(ibrcommon::BLOB::create()), _chunk_offset(0), _in_seq(0),
00021                    _out_seq(0), _streaming(wait_seq_zero), _first_chunk(true), _last_chunk_received(false), _timeout_receive(0)
00022                 {
00023                         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00024                         setg(0, 0, 0);
00025                         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00026                 };
00027 
00028                 BundleStreamBuf::~BundleStreamBuf()
00029                 {
00030                         delete[] _in_buf;
00031                         delete[] _out_buf;
00032                 };
00033 
00034                 int BundleStreamBuf::sync()
00035                 {
00036                         int ret = std::char_traits<char>::eq_int_type(this->overflow(
00037                                         std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00038                                         : 0;
00039 
00040                         // send the current chunk and clear it
00041                         flushPayload(true);
00042 
00043                         return ret;
00044                 }
00045 
00046                 std::char_traits<char>::int_type BundleStreamBuf::overflow(std::char_traits<char>::int_type c)
00047                 {
00048                         char *ibegin = _in_buf;
00049                         char *iend = pptr();
00050 
00051                         // mark the buffer as free
00052                         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00053 
00054                         if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00055                         {
00056                                 *iend++ = std::char_traits<char>::to_char_type(c);
00057                         }
00058 
00059                         // if there is nothing to send, just return
00060                         if ((iend - ibegin) == 0)
00061                         {
00062                                 return std::char_traits<char>::not_eof(c);
00063                         }
00064 
00065                         // copy data into the bundles payload
00066                         BundleStreamBuf::append(_chunk_payload, _in_buf, iend - ibegin);
00067 
00068                         // if size exceeds chunk limit, send it
00069                         if (_chunk_payload.iostream().size() > _chunk_size)
00070                         {
00071                                 flushPayload();
00072                         }
00073 
00074                         return std::char_traits<char>::not_eof(c);
00075                 }
00076 
00077                 void BundleStreamBuf::flushPayload(bool final)
00078                 {
00079                         // do not send a bundle if there are no bytes buffered
00080                         // and no bundle has been sent before
00081                         if ((_first_chunk) && (_chunk_payload.iostream().size() == 0))
00082                         {
00083                                 return;
00084                         }
00085 
00086                         // create an empty bundle
00087                         dtn::data::Bundle b;
00088 
00089                         dtn::data::StreamBlock &block = b.push_front<dtn::data::StreamBlock>();
00090                         block.setSequenceNumber(_out_seq);
00091                         if (final) block.set(dtn::data::StreamBlock::STREAM_END, true);
00092                         block.set(dtn::data::StreamBlock::STREAM_BEGIN, _first_chunk);
00093                         if (_first_chunk) _first_chunk = false;
00094 
00095                         // add tmp payload to the bundle
00096                         b.push_back(_chunk_payload);
00097 
00098                         // send the current chunk
00099                         _callback.put(b);
00100 
00101                         // and clear the payload
00102                         _chunk_payload = ibrcommon::BLOB::create();
00103 
00104                         // increment the sequence number
00105                         _out_seq++;
00106                 }
00107 
00108                 void BundleStreamBuf::setChunkSize(size_t size)
00109                 {
00110                         _chunk_size = size;
00111                 }
00112 
00113                 void BundleStreamBuf::setTimeout(size_t timeout)
00114                 {
00115                         _timeout_receive = timeout;
00116                 }
00117 
00118                 void BundleStreamBuf::append(ibrcommon::BLOB::Reference &ref, const char* data, size_t length)
00119                 {
00120                         ibrcommon::BLOB::iostream stream = ref.iostream();
00121                         (*stream).seekp(0, ios::end);
00122                         (*stream).write(data, length);
00123                 }
00124 
00125                 std::char_traits<char>::int_type BundleStreamBuf::underflow()
00126                 {
00127                         // return with EOF if the last chunk was received
00128                         if (_last_chunk_received)
00129                         {
00130                                 return std::char_traits<char>::eof();
00131                         }
00132 
00133                         // receive chunks until the next sequence number is received
00134                         while (_chunks.empty())
00135                         {
00136                                 // request the next bundle
00137                                 dtn::data::MetaBundle b = _callback.get();
00138 
00139                                 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle received" << IBRCOMMON_LOGGER_ENDL;
00140 
00141                                 // create a chunk object
00142                                 Chunk c(b);
00143 
00144                                 if (c._seq >= _in_seq)
00145                                 {
00146                                         IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL;
00147                                         _chunks.insert(c);
00148                                 }
00149                         }
00150 
00151                         ibrcommon::TimeMeasurement tm;
00152                         tm.start();
00153 
00154                         // while not the right sequence number received -> wait
00155                         while ((_in_seq != (*_chunks.begin())._seq))
00156                         {
00157                                 try {
00158                                         // request the next bundle
00159                                         dtn::data::MetaBundle b = _callback.get(_timeout_receive);
00160                                         IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle received" << IBRCOMMON_LOGGER_ENDL;
00161 
00162                                         // create a chunk object
00163                                         Chunk c(b);
00164 
00165                                         if (c._seq >= _in_seq)
00166                                         {
00167                                                 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL;
00168                                                 _chunks.insert(c);
00169                                         }
00170                                 } catch (std::exception&) {
00171                                         // timed out
00172                                 }
00173 
00174                                 tm.stop();
00175                                 if (((_timeout_receive > 0) && (tm.getSeconds() > _timeout_receive)) || !_streaming)
00176                                 {
00177                                         // skip the missing bundles and proceed with the last received one
00178                                         _in_seq = (*_chunks.begin())._seq;
00179 
00180                                         // set streaming to active
00181                                         _streaming = true;
00182                                 }
00183                         }
00184 
00185                         IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): read the payload" << IBRCOMMON_LOGGER_ENDL;
00186 
00187                         // get the first chunk in the buffer
00188                         const Chunk &c = (*_chunks.begin());
00189 
00190                         if (c._meta != _current_bundle)
00191                         {
00192                                 // load the bundle from storage
00193                                 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00194                                 _current_bundle = storage.get(c._meta);
00195 
00196                                 // process the bundle block (security, compression, ...)
00197                                 dtn::core::BundleCore::processBlocks(_current_bundle);
00198                         }
00199 
00200                         const dtn::data::PayloadBlock &payload = _current_bundle.getBlock<dtn::data::PayloadBlock>();
00201                         ibrcommon::BLOB::Reference r = payload.getBLOB();
00202 
00203                         bool end_of_stream = false;
00204                         size_t bytes = 0;
00205 
00206                         // lock the stream while reading from it
00207                         {
00208                                 // get stream lock
00209                                 ibrcommon::BLOB::iostream stream = r.iostream();
00210 
00211                                 // jump to the offset position
00212                                 (*stream).seekg(_chunk_offset, ios::beg);
00213 
00214                                 // copy the data of the last received bundle into the buffer
00215                                 (*stream).read(_out_buf, BUFF_SIZE);
00216 
00217                                 // get the read bytes
00218                                 bytes = (*stream).gcount();
00219 
00220                                 // check for end of stream
00221                                 end_of_stream = (*stream).eof();
00222                         }
00223 
00224                         if (end_of_stream)
00225                         {
00226                                 // bundle consumed
00227                 //              std::cerr << std::endl << "# " << c._seq << std::endl << std::flush;
00228 
00229                                 // check if this was the last chunk
00230                                 if (c._last)
00231                                 {
00232                                         _last_chunk_received = true;
00233                                 }
00234 
00235                                 // set bundle as delivered
00236                                 _callback.delivered(c._meta);
00237 
00238                                 // delete the last chunk
00239                                 _chunks.erase(c);
00240 
00241                                 // reset the chunk offset
00242                                 _chunk_offset = 0;
00243 
00244                                 // increment sequence number
00245                                 _in_seq++;
00246 
00247                                 // if no more bytes are read, get the next bundle -> call underflow() recursive
00248                                 if (bytes == 0)
00249                                 {
00250                                         return underflow();
00251                                 }
00252                         }
00253                         else
00254                         {
00255                                 // increment the chunk offset
00256                                 _chunk_offset += bytes;
00257                         }
00258 
00259                         // Since the input buffer content is now valid (or is new)
00260                         // the get pointer should be initialized (or reset).
00261                         setg(_out_buf, _out_buf, _out_buf + bytes);
00262 
00263                         return std::char_traits<char>::not_eof((unsigned char) _out_buf[0]);
00264                 }
00265 
00266                 BundleStreamBuf::Chunk::Chunk(const dtn::data::MetaBundle &m)
00267                  : _meta(m), _seq(0), _first(false), _last(false)
00268                 {
00269                         dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00270                         dtn::data::Bundle bundle = storage.get(_meta);
00271 
00272                         try {
00273                                 const dtn::data::StreamBlock &block = bundle.getBlock<dtn::data::StreamBlock>();
00274                                 _seq = block.getSequenceNumber();
00275                                 _first = block.get(dtn::data::StreamBlock::STREAM_BEGIN);
00276                                 _last = block.get(dtn::data::StreamBlock::STREAM_END);
00277                         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }
00278                 }
00279 
00280                 BundleStreamBuf::Chunk::~Chunk()
00281                 {
00282                 }
00283 
00284                 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
00285                 {
00286                         return (_seq == other._seq);
00287                 }
00288 
00289                 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
00290                 {
00291                         return (_seq < other._seq);
00292                 }
00293         } /* namespace data */
00294 } /* namespace dtn */