IBR-DTNSuite
0.8
|
00001 /* 00002 * BundleStreamBuf.cpp 00003 * 00004 * Created on: 17.11.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "BundleStreamBuf.h" 00009 #include "core/BundleCore.h" 00010 #include <ibrdtn/data/StreamBlock.h> 00011 #include <ibrcommon/Logger.h> 00012 #include <ibrcommon/TimeMeasurement.h> 00013 00014 namespace dtn 00015 { 00016 namespace api 00017 { 00018 BundleStreamBuf::BundleStreamBuf(BundleStreamBufCallback &callback, size_t chunk_size, bool wait_seq_zero) 00019 : _callback(callback), _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), 00020 _chunk_size(chunk_size), _chunk_payload(ibrcommon::BLOB::create()), _chunk_offset(0), _in_seq(0), 00021 _out_seq(0), _streaming(wait_seq_zero), _first_chunk(true), _last_chunk_received(false), _timeout_receive(0) 00022 { 00023 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00024 setg(0, 0, 0); 00025 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00026 }; 00027 00028 BundleStreamBuf::~BundleStreamBuf() 00029 { 00030 delete[] _in_buf; 00031 delete[] _out_buf; 00032 }; 00033 00034 int BundleStreamBuf::sync() 00035 { 00036 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00037 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00038 : 0; 00039 00040 // send the current chunk and clear it 00041 flushPayload(true); 00042 00043 return ret; 00044 } 00045 00046 std::char_traits<char>::int_type BundleStreamBuf::overflow(std::char_traits<char>::int_type c) 00047 { 00048 char *ibegin = _in_buf; 00049 char *iend = pptr(); 00050 00051 // mark the buffer as free 00052 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00053 00054 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00055 { 00056 *iend++ = std::char_traits<char>::to_char_type(c); 00057 } 00058 00059 // if there is nothing to send, just return 00060 if ((iend - ibegin) == 0) 00061 { 00062 return std::char_traits<char>::not_eof(c); 00063 } 00064 00065 // copy data into the bundles payload 00066 BundleStreamBuf::append(_chunk_payload, _in_buf, iend - ibegin); 00067 00068 // if size exceeds chunk limit, send it 00069 if (_chunk_payload.iostream().size() > _chunk_size) 00070 { 00071 flushPayload(); 00072 } 00073 00074 return std::char_traits<char>::not_eof(c); 00075 } 00076 00077 void BundleStreamBuf::flushPayload(bool final) 00078 { 00079 // do not send a bundle if there are no bytes buffered 00080 // and no bundle has been sent before 00081 if ((_first_chunk) && (_chunk_payload.iostream().size() == 0)) 00082 { 00083 return; 00084 } 00085 00086 // create an empty bundle 00087 dtn::data::Bundle b; 00088 00089 dtn::data::StreamBlock &block = b.push_front<dtn::data::StreamBlock>(); 00090 block.setSequenceNumber(_out_seq); 00091 if (final) block.set(dtn::data::StreamBlock::STREAM_END, true); 00092 block.set(dtn::data::StreamBlock::STREAM_BEGIN, _first_chunk); 00093 if (_first_chunk) _first_chunk = false; 00094 00095 // add tmp payload to the bundle 00096 b.push_back(_chunk_payload); 00097 00098 // send the current chunk 00099 _callback.put(b); 00100 00101 // and clear the payload 00102 _chunk_payload = ibrcommon::BLOB::create(); 00103 00104 // increment the sequence number 00105 _out_seq++; 00106 } 00107 00108 void BundleStreamBuf::setChunkSize(size_t size) 00109 { 00110 _chunk_size = size; 00111 } 00112 00113 void BundleStreamBuf::setTimeout(size_t timeout) 00114 { 00115 _timeout_receive = timeout; 00116 } 00117 00118 void BundleStreamBuf::append(ibrcommon::BLOB::Reference &ref, const char* data, size_t length) 00119 { 00120 ibrcommon::BLOB::iostream stream = ref.iostream(); 00121 (*stream).seekp(0, ios::end); 00122 (*stream).write(data, length); 00123 } 00124 00125 std::char_traits<char>::int_type BundleStreamBuf::underflow() 00126 { 00127 // return with EOF if the last chunk was received 00128 if (_last_chunk_received) 00129 { 00130 return std::char_traits<char>::eof(); 00131 } 00132 00133 // receive chunks until the next sequence number is received 00134 while (_chunks.empty()) 00135 { 00136 // request the next bundle 00137 dtn::data::MetaBundle b = _callback.get(); 00138 00139 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle received" << IBRCOMMON_LOGGER_ENDL; 00140 00141 // create a chunk object 00142 Chunk c(b); 00143 00144 if (c._seq >= _in_seq) 00145 { 00146 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL; 00147 _chunks.insert(c); 00148 } 00149 } 00150 00151 ibrcommon::TimeMeasurement tm; 00152 tm.start(); 00153 00154 // while not the right sequence number received -> wait 00155 while ((_in_seq != (*_chunks.begin())._seq)) 00156 { 00157 try { 00158 // request the next bundle 00159 dtn::data::MetaBundle b = _callback.get(_timeout_receive); 00160 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle received" << IBRCOMMON_LOGGER_ENDL; 00161 00162 // create a chunk object 00163 Chunk c(b); 00164 00165 if (c._seq >= _in_seq) 00166 { 00167 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): bundle accepted, seq. no. " << c._seq << IBRCOMMON_LOGGER_ENDL; 00168 _chunks.insert(c); 00169 } 00170 } catch (std::exception&) { 00171 // timed out 00172 } 00173 00174 tm.stop(); 00175 if (((_timeout_receive > 0) && (tm.getSeconds() > _timeout_receive)) || !_streaming) 00176 { 00177 // skip the missing bundles and proceed with the last received one 00178 _in_seq = (*_chunks.begin())._seq; 00179 00180 // set streaming to active 00181 _streaming = true; 00182 } 00183 } 00184 00185 IBRCOMMON_LOGGER_DEBUG(40) << "BundleStreamBuf::underflow(): read the payload" << IBRCOMMON_LOGGER_ENDL; 00186 00187 // get the first chunk in the buffer 00188 const Chunk &c = (*_chunks.begin()); 00189 00190 if (c._meta != _current_bundle) 00191 { 00192 // load the bundle from storage 00193 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00194 _current_bundle = storage.get(c._meta); 00195 00196 // process the bundle block (security, compression, ...) 00197 dtn::core::BundleCore::processBlocks(_current_bundle); 00198 } 00199 00200 const dtn::data::PayloadBlock &payload = _current_bundle.getBlock<dtn::data::PayloadBlock>(); 00201 ibrcommon::BLOB::Reference r = payload.getBLOB(); 00202 00203 bool end_of_stream = false; 00204 size_t bytes = 0; 00205 00206 // lock the stream while reading from it 00207 { 00208 // get stream lock 00209 ibrcommon::BLOB::iostream stream = r.iostream(); 00210 00211 // jump to the offset position 00212 (*stream).seekg(_chunk_offset, ios::beg); 00213 00214 // copy the data of the last received bundle into the buffer 00215 (*stream).read(_out_buf, BUFF_SIZE); 00216 00217 // get the read bytes 00218 bytes = (*stream).gcount(); 00219 00220 // check for end of stream 00221 end_of_stream = (*stream).eof(); 00222 } 00223 00224 if (end_of_stream) 00225 { 00226 // bundle consumed 00227 // std::cerr << std::endl << "# " << c._seq << std::endl << std::flush; 00228 00229 // check if this was the last chunk 00230 if (c._last) 00231 { 00232 _last_chunk_received = true; 00233 } 00234 00235 // set bundle as delivered 00236 _callback.delivered(c._meta); 00237 00238 // delete the last chunk 00239 _chunks.erase(c); 00240 00241 // reset the chunk offset 00242 _chunk_offset = 0; 00243 00244 // increment sequence number 00245 _in_seq++; 00246 00247 // if no more bytes are read, get the next bundle -> call underflow() recursive 00248 if (bytes == 0) 00249 { 00250 return underflow(); 00251 } 00252 } 00253 else 00254 { 00255 // increment the chunk offset 00256 _chunk_offset += bytes; 00257 } 00258 00259 // Since the input buffer content is now valid (or is new) 00260 // the get pointer should be initialized (or reset). 00261 setg(_out_buf, _out_buf, _out_buf + bytes); 00262 00263 return std::char_traits<char>::not_eof((unsigned char) _out_buf[0]); 00264 } 00265 00266 BundleStreamBuf::Chunk::Chunk(const dtn::data::MetaBundle &m) 00267 : _meta(m), _seq(0), _first(false), _last(false) 00268 { 00269 dtn::storage::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00270 dtn::data::Bundle bundle = storage.get(_meta); 00271 00272 try { 00273 const dtn::data::StreamBlock &block = bundle.getBlock<dtn::data::StreamBlock>(); 00274 _seq = block.getSequenceNumber(); 00275 _first = block.get(dtn::data::StreamBlock::STREAM_BEGIN); 00276 _last = block.get(dtn::data::StreamBlock::STREAM_END); 00277 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { } 00278 } 00279 00280 BundleStreamBuf::Chunk::~Chunk() 00281 { 00282 } 00283 00284 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const 00285 { 00286 return (_seq == other._seq); 00287 } 00288 00289 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const 00290 { 00291 return (_seq < other._seq); 00292 } 00293 } /* namespace data */ 00294 } /* namespace dtn */