34 _chunk_size(chunk_size), _chunk_payload(ibrcommon::BLOB::create()), _chunk_offset(0), _in_seq(0),
35 _out_seq(0), _streaming(wait_seq_zero), _first_chunk(true), _last_chunk_received(false), _timeout_receive(0)
39 setp(&_in_buf[0], &_in_buf[
BUFF_SIZE - 1]);
48 int ret = std::char_traits<char>::eq_int_type(this->
overflow(
49 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
60 char *ibegin = &_in_buf[0];
64 setp(&_in_buf[0], &_in_buf[0] +
BUFF_SIZE - 1);
66 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
68 *iend++ = std::char_traits<char>::to_char_type(c);
72 if ((iend - ibegin) == 0)
74 return std::char_traits<char>::not_eof(c);
78 BundleStreamBuf::append(_chunk_payload, &_in_buf[0], iend - ibegin);
81 if (_chunk_payload.
size() >
static_cast<std::streamsize
>(_chunk_size))
86 return std::char_traits<char>::not_eof(c);
89 void BundleStreamBuf::flushPayload(
bool final)
93 if ((_first_chunk) && (_chunk_payload.
size() == 0))
105 if (_first_chunk) _first_chunk =
false;
127 _timeout_receive = timeout;
133 (*stream).seekp(0, ios::end);
134 (*stream).write(data, length);
140 if (_last_chunk_received)
142 return std::char_traits<char>::eof();
146 while (_chunks.empty())
156 if (c._seq >= _in_seq)
167 while (_in_seq != (*_chunks.begin())._seq)
177 if (c._seq >= _in_seq)
182 }
catch (std::exception&) {
187 if (((_timeout_receive > 0) && (tm.
getSeconds() > _timeout_receive)) || !_streaming)
190 _in_seq = (*_chunks.begin())._seq;
200 const Chunk &c = (*_chunks.begin());
202 if (c._meta != _current_bundle)
206 _current_bundle = storage.
get(c._meta);
215 bool end_of_stream =
false;
216 std::streamsize bytes = 0;
224 (*stream).seekg(_chunk_offset, ios::beg);
230 bytes = (*stream).gcount();
233 end_of_stream = (*stream).eof();
244 _last_chunk_received =
true;
268 _chunk_offset += bytes;
273 setg(&_out_buf[0], &_out_buf[0], &_out_buf[0] + bytes);
275 return std::char_traits<char>::not_eof(_out_buf[0]);
279 : _meta(m), _seq(0), _first(false), _last(false)
292 BundleStreamBuf::Chunk::~Chunk()
296 bool BundleStreamBuf::Chunk::operator==(
const Chunk& other)
const
298 return (_seq == other._seq);
301 bool BundleStreamBuf::Chunk::operator<(
const Chunk& other)
const
303 return (_seq < other._seq);