IBR-DTNSuite  0.10
BundleStreamBuf.cpp
Go to the documentation of this file.
1 /*
2  * BundleStreamBuf.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
25 
26 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t min_buffer, size_t max_buffer, bool wait_seq_zero)
27  : _in_buf(min_buffer), _out_buf(min_buffer), _client(client), _chunk(chunk),
28  _min_buf_size(min_buffer), _max_buf_size(max_buffer), _chunk_offset(0), _in_seq(0),
29  _streaming(wait_seq_zero), _request_ack(false), _flush_request(false), _receive_timeout(0)
30 {
31  // Initialize get pointer. This should be zero so that underflow is called upon first read.
32  setg(0, 0, 0);
33  setp(&_in_buf[0], &_in_buf[__get_next_buffer_size() - 1]);
34 }
35 
37 {
38 }
39 
44 {
45  // lock the data structures
46  ibrcommon::MutexLock l(_chunks_cond);
47 
48  if (_chunk.size() == 0) {
49  _flush_request = true;
50  return;
51  }
52 
53  __flush();
54 }
55 
57 {
58  _receive_timeout = timeout;
59 }
60 
62 {
63  // request delivery acks
64  if (_request_ack) {
66  _chunk.reportto = dtn::data::EID("api:me");
67  }
68 
69  // send the current chunk and clear it
70  _client << _chunk; _client.flush();
71  _chunk.clear();
72 
73  _flush_request = false;
74 }
75 
77 {
78  if ((_chunk.size() + _min_buf_size) <= _max_buf_size)
79  {
80  return _min_buf_size;
81  }
82  else
83  {
84  return _max_buf_size - _chunk.size();
85  }
86 }
87 
89 {
90  int ret = std::char_traits<char>::eq_int_type(this->overflow(
91  std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
92  : 0;
93 
94  flush();
95 
96  return ret;
97 }
98 
100 {
101  _request_ack = val;
102  _flush_request = val;
103 }
104 
105 std::char_traits<char>::int_type BundleStreamBuf::overflow(std::char_traits<char>::int_type c)
106 {
107  char *ibegin = &_in_buf[0];
108  char *iend = pptr();
109 
110  if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
111  {
112  *iend++ = std::char_traits<char>::to_char_type(c);
113  }
114 
115  // if there is nothing to send, just return
116  if ((iend - ibegin) == 0)
117  {
118  return std::char_traits<char>::not_eof(c);
119  }
120 
121  // lock the data structures
122  ibrcommon::MutexLock l(_chunks_cond);
123 
124  // copy data into the bundles payload
125  _chunk.append(&_in_buf[0], iend - ibegin);
126 
127  // mark the buffer as free
128  setp(&_in_buf[0], &_in_buf[__get_next_buffer_size() - 1]);
129 
130  // if size exceeds chunk limit, send it
131  if ((_chunk.size() >= _max_buf_size) || _flush_request)
132  {
133  __flush();
134  }
135 
136  return std::char_traits<char>::not_eof(c);
137 }
138 
140 {
141  try {
142  // get the stream block of the bundle - drop bundles without it
143  const StreamBlock &block = b.find<StreamBlock>();
144 
145  // lock the data structures
146  ibrcommon::MutexLock l(_chunks_cond);
147 
148  // check if the sequencenumber is already received
149  if (_in_seq < block.getSequenceNumber()) return;
150 
151  // insert the received chunk into the chunk set
152  _chunks.insert(Chunk(b));
153 
154  // unblock reading processes
155  _chunks_cond.signal(true);
157 }
158 
159 std::char_traits<char>::int_type BundleStreamBuf::underflow()
160 {
161  ibrcommon::MutexLock l(_chunks_cond);
162 
163  return __underflow();
164 }
165 
166 std::char_traits<char>::int_type BundleStreamBuf::__underflow()
167 {
168  // receive chunks until the next sequence number is received
169  while (_chunks.empty())
170  {
171  // wait for the next bundle
172  _chunks_cond.wait();
173  }
174 
176  tm.start();
177 
178  // while not the right sequence number received -> wait
179  while ((_in_seq != (*_chunks.begin())._seq))
180  {
181  try {
182  // wait for the next bundle
183  _chunks_cond.wait(1000);
185 
186  tm.stop();
187  if (((_receive_timeout > 0) && (tm.getSeconds() > _receive_timeout)) || !_streaming)
188  {
189  // skip the missing bundles and proceed with the last received one
190  _in_seq = (*_chunks.begin())._seq;
191 
192  // set streaming to active
193  _streaming = true;
194  }
195  }
196 
197  // get the first chunk in the buffer
198  const Chunk &c = (*_chunks.begin());
199 
200  dtn::data::Bundle b = c._bundle;
202 
203  // get stream lock
204  ibrcommon::BLOB::iostream stream = r.iostream();
205 
206  // jump to the offset position
207  (*stream).seekg(_chunk_offset, ios::beg);
208 
209  // copy the data of the last received bundle into the buffer
210  (*stream).read(&_out_buf[0], _out_buf.size());
211 
212  // get the read bytes
213  size_t bytes = (*stream).gcount();
214 
215  if ((*stream).eof())
216  {
217  // delete the last chunk
218  _chunks.erase(c);
219 
220  // reset the chunk offset
221  _chunk_offset = 0;
222 
223  // increment sequence number
224  _in_seq++;
225 
226  // if no more bytes are read, get the next bundle -> call underflow() recursive
227  if (bytes == 0)
228  {
229  return __underflow();
230  }
231  }
232  else
233  {
234  // increment the chunk offset
235  _chunk_offset += bytes;
236  }
237 
238  // Since the input buffer content is now valid (or is new)
239  // the get pointer should be initialized (or reset).
240  setg(&_out_buf[0], &_out_buf[0], &_out_buf[0] + bytes);
241 
242  return std::char_traits<char>::not_eof(_out_buf[0]);
243 }
244 
245 BundleStreamBuf::Chunk::Chunk(const dtn::data::Bundle &b)
246  : _bundle(b), _seq(0)
247 {
248  // get the stream block of the bundle - drop bundles without it
249  const StreamBlock &block = b.find<StreamBlock>();
250  _seq = block.getSequenceNumber();
251 }
252 
253 BundleStreamBuf::Chunk::~Chunk()
254 {
255 }
256 
257 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
258 {
259  return (_seq == other._seq);
260 }
261 
262 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
263 {
264  return (_seq < other._seq);
265 }