IBR-DTNSuite
0.8
|
00001 /* 00002 * dtnstream.cpp 00003 * 00004 * The application dtnstream can transfer a data stream to another instance of dtnstream. 00005 * It uses an extension block to mark the sequence of the stream. 00006 * 00007 * Created on: 23.03.2011 00008 * Author: morgenro 00009 */ 00010 00011 #include "config.h" 00012 #include "BundleStream.h" 00013 #include <ibrdtn/api/Client.h> 00014 #include <ibrdtn/data/EID.h> 00015 #include <ibrdtn/data/PayloadBlock.h> 00016 #include <ibrcommon/net/tcpclient.h> 00017 #include <ibrcommon/data/File.h> 00018 #include <ibrcommon/TimeMeasurement.h> 00019 #include <iostream> 00020 #include <unistd.h> 00021 00022 unsigned int __timeout_receive__ = 0; 00023 00024 StreamBundle::StreamBundle() 00025 : _ref(ibrcommon::BLOB::create()) 00026 { 00027 StreamBlock &block = _b.push_front<StreamBlock>(); 00028 block.setSequenceNumber(0); 00029 00030 _b.push_back(_ref); 00031 } 00032 00033 StreamBundle::StreamBundle(const dtn::api::Bundle &b) 00034 : dtn::api::Bundle(b), _ref(getData()) 00035 { 00036 } 00037 00038 StreamBundle::~StreamBundle() 00039 { 00040 } 00041 00042 void StreamBundle::append(const char* data, size_t length) 00043 { 00044 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00045 (*stream).seekp(0, ios::end); 00046 (*stream).write(data, length); 00047 } 00048 00049 void StreamBundle::clear() 00050 { 00051 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00052 stream.clear(); 00053 00054 // increment the sequence number 00055 try { 00056 StreamBlock &block = _b.getBlock<StreamBlock>(); 00057 block.setSequenceNumber(block.getSequenceNumber() + 1); 00058 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00059 } 00060 00061 size_t StreamBundle::size() 00062 { 00063 ibrcommon::BLOB::iostream stream = _ref.iostream(); 00064 return stream.size(); 00065 } 00066 00067 size_t StreamBundle::getSequenceNumber(const StreamBundle &b) 00068 { 00069 try { 00070 const StreamBlock &block = b._b.getBlock<StreamBlock>(); 00071 return block.getSequenceNumber(); 00072 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { } 00073 00074 return 0; 00075 } 00076 00077 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer, bool wait_seq_zero) 00078 : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk), 00079 _buffer(buffer), _chunk_offset(0), _in_seq(0), _streaming(wait_seq_zero) 00080 { 00081 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00082 setg(0, 0, 0); 00083 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00084 }; 00085 00086 BundleStreamBuf::~BundleStreamBuf() 00087 { 00088 delete[] _in_buf; 00089 delete[] _out_buf; 00090 }; 00091 00092 int BundleStreamBuf::sync() 00093 { 00094 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00095 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00096 : 0; 00097 00098 // send the current chunk and clear it 00099 _client << _chunk; _client.flush(); 00100 _chunk.clear(); 00101 00102 return ret; 00103 } 00104 00105 std::char_traits<char>::int_type BundleStreamBuf::overflow(std::char_traits<char>::int_type c) 00106 { 00107 char *ibegin = _in_buf; 00108 char *iend = pptr(); 00109 00110 // mark the buffer as free 00111 setp(_in_buf, _in_buf + BUFF_SIZE - 1); 00112 00113 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00114 { 00115 *iend++ = std::char_traits<char>::to_char_type(c); 00116 } 00117 00118 // if there is nothing to send, just return 00119 if ((iend - ibegin) == 0) 00120 { 00121 return std::char_traits<char>::not_eof(c); 00122 } 00123 00124 // copy data into the bundles payload 00125 _chunk.append(_in_buf, iend - ibegin); 00126 00127 // if size exceeds chunk limit, send it 00128 if (_chunk.size() > _buffer) 00129 { 00130 _client << _chunk; _client.flush(); 00131 _chunk.clear(); 00132 } 00133 00134 return std::char_traits<char>::not_eof(c); 00135 } 00136 00137 void BundleStreamBuf::received(const dtn::api::Bundle &b) 00138 { 00139 ibrcommon::MutexLock l(_chunks_cond); 00140 00141 if (StreamBundle::getSequenceNumber(b) < _in_seq) return; 00142 00143 _chunks.insert(Chunk(b)); 00144 _chunks_cond.signal(true); 00145 00146 // bundle received 00147 // std::cerr << ". " << StreamBundle::getSequenceNumber(b) << std::flush; 00148 } 00149 00150 std::char_traits<char>::int_type BundleStreamBuf::underflow() 00151 { 00152 ibrcommon::MutexLock l(_chunks_cond); 00153 00154 return __underflow(); 00155 } 00156 00157 std::char_traits<char>::int_type BundleStreamBuf::__underflow() 00158 { 00159 // receive chunks until the next sequence number is received 00160 while (_chunks.empty()) 00161 { 00162 // wait for the next bundle 00163 _chunks_cond.wait(); 00164 } 00165 00166 ibrcommon::TimeMeasurement tm; 00167 tm.start(); 00168 00169 // while not the right sequence number received -> wait 00170 while ((_in_seq != (*_chunks.begin())._seq)) 00171 { 00172 try { 00173 // wait for the next bundle 00174 _chunks_cond.wait(1000); 00175 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { }; 00176 00177 tm.stop(); 00178 if (((__timeout_receive__ > 0) && (tm.getSeconds() > __timeout_receive__)) || !_streaming) 00179 { 00180 // skip the missing bundles and proceed with the last received one 00181 _in_seq = (*_chunks.begin())._seq; 00182 00183 // set streaming to active 00184 _streaming = true; 00185 } 00186 } 00187 00188 // get the first chunk in the buffer 00189 const Chunk &c = (*_chunks.begin()); 00190 00191 dtn::api::Bundle b = c._bundle; 00192 ibrcommon::BLOB::Reference r = b.getData(); 00193 00194 // get stream lock 00195 ibrcommon::BLOB::iostream stream = r.iostream(); 00196 00197 // jump to the offset position 00198 (*stream).seekg(_chunk_offset, ios::beg); 00199 00200 // copy the data of the last received bundle into the buffer 00201 (*stream).read(_out_buf, BUFF_SIZE); 00202 00203 // get the read bytes 00204 size_t bytes = (*stream).gcount(); 00205 00206 if ((*stream).eof()) 00207 { 00208 // bundle consumed 00209 // std::cerr << std::endl << "# " << c._seq << std::endl << std::flush; 00210 00211 // delete the last chunk 00212 _chunks.erase(c); 00213 00214 // reset the chunk offset 00215 _chunk_offset = 0; 00216 00217 // increment sequence number 00218 _in_seq++; 00219 00220 // if no more bytes are read, get the next bundle -> call underflow() recursive 00221 if (bytes == 0) 00222 { 00223 return __underflow(); 00224 } 00225 } 00226 else 00227 { 00228 // increment the chunk offset 00229 _chunk_offset += bytes; 00230 } 00231 00232 // Since the input buffer content is now valid (or is new) 00233 // the get pointer should be initialized (or reset). 00234 setg(_out_buf, _out_buf, _out_buf + bytes); 00235 00236 return std::char_traits<char>::not_eof((unsigned char) _out_buf[0]); 00237 } 00238 00239 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b) 00240 : _bundle(b), _seq(StreamBundle::getSequenceNumber(b)) 00241 { 00242 } 00243 00244 BundleStreamBuf::Chunk::~Chunk() 00245 { 00246 } 00247 00248 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const 00249 { 00250 return (_seq == other._seq); 00251 } 00252 00253 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const 00254 { 00255 return (_seq < other._seq); 00256 } 00257 00258 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app, const dtn::data::EID &group, bool wait_seq_zero) 00259 : dtn::api::Client(app, group, stream), _stream(stream), _buf(*this, _chunk, chunk_size, wait_seq_zero) 00260 {}; 00261 00262 BundleStream::~BundleStream() {}; 00263 00264 BundleStreamBuf& BundleStream::rdbuf() 00265 { 00266 return _buf; 00267 } 00268 00269 dtn::api::Bundle& BundleStream::base() 00270 { 00271 return _chunk; 00272 } 00273 00274 void BundleStream::received(const dtn::api::Bundle &b) 00275 { 00276 _buf.received(b); 00277 } 00278 00279 void print_help() 00280 { 00281 std::cout << "-- dtnstream (IBR-DTN) --" << std::endl; 00282 std::cout << "Syntax: dtnstream [options]" << std::endl; 00283 std::cout << "" << std::endl; 00284 std::cout << "* common options *" << std::endl; 00285 std::cout << " -h display this text" << std::endl; 00286 std::cout << " -U <socket> use UNIX domain sockets" << std::endl; 00287 std::cout << " -s <identifier> set the source identifier (e.g. stream)" << std::endl; 00288 std::cout << "" << std::endl; 00289 std::cout << "* send options *" << std::endl; 00290 std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl; 00291 std::cout << " -G destination is a group" << std::endl; 00292 std::cout << " -c <bytes> set the chunk size (max. size of each bundle)" << std::endl; 00293 std::cout << " -l <seconds> set the lifetime of stream chunks default: 30" << std::endl; 00294 std::cout << " -E request encryption on the bundle layer" << std::endl; 00295 std::cout << " -S request signature on the bundle layer" << std::endl; 00296 std::cout << "" << std::endl; 00297 std::cout << "* receive options *" << std::endl; 00298 std::cout << " -g <group> join a destination group" << std::endl; 00299 std::cout << " -t <seconds> set the timeout of the buffer" << std::endl; 00300 std::cout << " -w wait for the bundle with seq zero" << std::endl; 00301 std::cout << "" << std::endl; 00302 } 00303 00304 int main(int argc, char *argv[]) 00305 { 00306 int opt = 0; 00307 dtn::data::EID _destination; 00308 std::string _source = "stream"; 00309 unsigned int _lifetime = 30; 00310 size_t _chunk_size = 4096; 00311 dtn::data::EID _group; 00312 bool _bundle_encryption = false; 00313 bool _bundle_signed = false; 00314 bool _bundle_group = false; 00315 bool _wait_seq_zero = false; 00316 ibrcommon::File _unixdomain; 00317 00318 while((opt = getopt(argc, argv, "hg:Gd:t:s:c:l:ESU:w")) != -1) 00319 { 00320 switch (opt) 00321 { 00322 case 'h': 00323 print_help(); 00324 return 0; 00325 00326 case 'd': 00327 _destination = std::string(optarg); 00328 break; 00329 00330 case 'g': 00331 _group = std::string(optarg); 00332 break; 00333 00334 case 'G': 00335 _bundle_group = true; 00336 break; 00337 00338 case 's': 00339 _source = optarg; 00340 break; 00341 00342 case 'c': 00343 _chunk_size = atoi(optarg); 00344 break; 00345 00346 case 't': 00347 __timeout_receive__ = atoi(optarg); 00348 break; 00349 00350 case 'l': 00351 _lifetime = atoi(optarg); 00352 break; 00353 00354 case 'E': 00355 _bundle_encryption = true; 00356 break; 00357 00358 case 'S': 00359 _bundle_signed = true; 00360 break; 00361 00362 case 'U': 00363 _unixdomain = ibrcommon::File(optarg); 00364 break; 00365 00366 case 'w': 00367 _wait_seq_zero = true; 00368 break; 00369 00370 default: 00371 std::cout << "unknown command" << std::endl; 00372 return -1; 00373 } 00374 } 00375 00376 try { 00377 // Create a stream to the server using TCP. 00378 ibrcommon::tcpclient conn; 00379 00380 // check if the unixdomain socket exists 00381 if (_unixdomain.exists()) 00382 { 00383 // connect to the unix domain socket 00384 conn.open(_unixdomain); 00385 } 00386 else 00387 { 00388 // connect to the standard local api port 00389 conn.open("127.0.0.1", 4550); 00390 00391 // enable nodelay option 00392 conn.enableNoDelay(); 00393 } 00394 00395 // Initiate a derivated client 00396 BundleStream bs(conn, _chunk_size, _source, _group, _wait_seq_zero); 00397 00398 // Connect to the server. Actually, this function initiate the 00399 // stream protocol by starting the thread and sending the contact header. 00400 bs.connect(); 00401 00402 // transmitter mode 00403 if (_destination != dtn::data::EID()) 00404 { 00405 bs.base().setDestination(_destination); 00406 bs.base().setLifetime(_lifetime); 00407 if (_bundle_encryption) bs.base().requestEncryption(); 00408 if (_bundle_signed) bs.base().requestSigned(); 00409 if (_bundle_group) bs.base().setSingleton(false); 00410 std::ostream stream(&bs.rdbuf()); 00411 stream << std::cin.rdbuf() << std::flush; 00412 } 00413 // receiver mode 00414 else 00415 { 00416 std::istream stream(&bs.rdbuf()); 00417 std::cout << stream.rdbuf() << std::flush; 00418 } 00419 00420 // Shutdown the client connection. 00421 bs.close(); 00422 conn.close(); 00423 } catch (const ibrcommon::tcpclient::SocketException&) { 00424 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl; 00425 return -1; 00426 } catch (const std::exception&) { 00427 00428 } 00429 00430 }