IBR-DTNSuite  0.8
tools/src/dtnstream.cpp
Go to the documentation of this file.
00001 /*
00002  * dtnstream.cpp
00003  *
00004  * The application dtnstream can transfer a data stream to another instance of dtnstream.
00005  * It uses an extension block to mark the sequence of the stream.
00006  *
00007  *  Created on: 23.03.2011
00008  *      Author: morgenro
00009  */
00010 
00011 #include "config.h"
00012 #include "BundleStream.h"
00013 #include <ibrdtn/api/Client.h>
00014 #include <ibrdtn/data/EID.h>
00015 #include <ibrdtn/data/PayloadBlock.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/data/File.h>
00018 #include <ibrcommon/TimeMeasurement.h>
00019 #include <iostream>
00020 #include <unistd.h>
00021 
00022 unsigned int __timeout_receive__ = 0;
00023 
00024 StreamBundle::StreamBundle()
00025  : _ref(ibrcommon::BLOB::create())
00026 {
00027         StreamBlock &block = _b.push_front<StreamBlock>();
00028         block.setSequenceNumber(0);
00029 
00030         _b.push_back(_ref);
00031 }
00032 
00033 StreamBundle::StreamBundle(const dtn::api::Bundle &b)
00034  : dtn::api::Bundle(b), _ref(getData())
00035 {
00036 }
00037 
00038 StreamBundle::~StreamBundle()
00039 {
00040 }
00041 
00042 void StreamBundle::append(const char* data, size_t length)
00043 {
00044         ibrcommon::BLOB::iostream stream = _ref.iostream();
00045         (*stream).seekp(0, ios::end);
00046         (*stream).write(data, length);
00047 }
00048 
00049 void StreamBundle::clear()
00050 {
00051         ibrcommon::BLOB::iostream stream = _ref.iostream();
00052         stream.clear();
00053 
00054         // increment the sequence number
00055         try {
00056                 StreamBlock &block = _b.getBlock<StreamBlock>();
00057                 block.setSequenceNumber(block.getSequenceNumber() + 1);
00058         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00059 }
00060 
00061 size_t StreamBundle::size()
00062 {
00063         ibrcommon::BLOB::iostream stream = _ref.iostream();
00064         return stream.size();
00065 }
00066 
00067 size_t StreamBundle::getSequenceNumber(const StreamBundle &b)
00068 {
00069         try {
00070                 const StreamBlock &block = b._b.getBlock<StreamBlock>();
00071                 return block.getSequenceNumber();
00072         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }
00073 
00074         return 0;
00075 }
00076 
00077 BundleStreamBuf::BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer, bool wait_seq_zero)
00078  : _in_buf(new char[BUFF_SIZE]), _out_buf(new char[BUFF_SIZE]), _client(client), _chunk(chunk),
00079    _buffer(buffer), _chunk_offset(0), _in_seq(0), _streaming(wait_seq_zero)
00080 {
00081         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00082         setg(0, 0, 0);
00083         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00084 };
00085 
00086 BundleStreamBuf::~BundleStreamBuf()
00087 {
00088         delete[] _in_buf;
00089         delete[] _out_buf;
00090 };
00091 
00092 int BundleStreamBuf::sync()
00093 {
00094         int ret = std::char_traits<char>::eq_int_type(this->overflow(
00095                         std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00096                         : 0;
00097 
00098         // send the current chunk and clear it
00099         _client << _chunk; _client.flush();
00100         _chunk.clear();
00101 
00102         return ret;
00103 }
00104 
00105 std::char_traits<char>::int_type BundleStreamBuf::overflow(std::char_traits<char>::int_type c)
00106 {
00107         char *ibegin = _in_buf;
00108         char *iend = pptr();
00109 
00110         // mark the buffer as free
00111         setp(_in_buf, _in_buf + BUFF_SIZE - 1);
00112 
00113         if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00114         {
00115                 *iend++ = std::char_traits<char>::to_char_type(c);
00116         }
00117 
00118         // if there is nothing to send, just return
00119         if ((iend - ibegin) == 0)
00120         {
00121                 return std::char_traits<char>::not_eof(c);
00122         }
00123 
00124         // copy data into the bundles payload
00125         _chunk.append(_in_buf, iend - ibegin);
00126 
00127         // if size exceeds chunk limit, send it
00128         if (_chunk.size() > _buffer)
00129         {
00130                 _client << _chunk; _client.flush();
00131                 _chunk.clear();
00132         }
00133 
00134         return std::char_traits<char>::not_eof(c);
00135 }
00136 
00137 void BundleStreamBuf::received(const dtn::api::Bundle &b)
00138 {
00139         ibrcommon::MutexLock l(_chunks_cond);
00140 
00141         if (StreamBundle::getSequenceNumber(b) < _in_seq) return;
00142 
00143         _chunks.insert(Chunk(b));
00144         _chunks_cond.signal(true);
00145 
00146         // bundle received
00147 //      std::cerr << ". " << StreamBundle::getSequenceNumber(b) << std::flush;
00148 }
00149 
00150 std::char_traits<char>::int_type BundleStreamBuf::underflow()
00151 {
00152         ibrcommon::MutexLock l(_chunks_cond);
00153 
00154         return __underflow();
00155 }
00156 
00157 std::char_traits<char>::int_type BundleStreamBuf::__underflow()
00158 {
00159         // receive chunks until the next sequence number is received
00160         while (_chunks.empty())
00161         {
00162                 // wait for the next bundle
00163                 _chunks_cond.wait();
00164         }
00165 
00166         ibrcommon::TimeMeasurement tm;
00167         tm.start();
00168 
00169         // while not the right sequence number received -> wait
00170         while ((_in_seq != (*_chunks.begin())._seq))
00171         {
00172                 try {
00173                         // wait for the next bundle
00174                         _chunks_cond.wait(1000);
00175                 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
00176 
00177                 tm.stop();
00178                 if (((__timeout_receive__ > 0) && (tm.getSeconds() > __timeout_receive__)) || !_streaming)
00179                 {
00180                         // skip the missing bundles and proceed with the last received one
00181                         _in_seq = (*_chunks.begin())._seq;
00182 
00183                         // set streaming to active
00184                         _streaming = true;
00185                 }
00186         }
00187 
00188         // get the first chunk in the buffer
00189         const Chunk &c = (*_chunks.begin());
00190 
00191         dtn::api::Bundle b = c._bundle;
00192         ibrcommon::BLOB::Reference r = b.getData();
00193 
00194         // get stream lock
00195         ibrcommon::BLOB::iostream stream = r.iostream();
00196 
00197         // jump to the offset position
00198         (*stream).seekg(_chunk_offset, ios::beg);
00199 
00200         // copy the data of the last received bundle into the buffer
00201         (*stream).read(_out_buf, BUFF_SIZE);
00202 
00203         // get the read bytes
00204         size_t bytes = (*stream).gcount();
00205 
00206         if ((*stream).eof())
00207         {
00208                 // bundle consumed
00209 //              std::cerr << std::endl << "# " << c._seq << std::endl << std::flush;
00210 
00211                 // delete the last chunk
00212                 _chunks.erase(c);
00213 
00214                 // reset the chunk offset
00215                 _chunk_offset = 0;
00216 
00217                 // increment sequence number
00218                 _in_seq++;
00219 
00220                 // if no more bytes are read, get the next bundle -> call underflow() recursive
00221                 if (bytes == 0)
00222                 {
00223                         return __underflow();
00224                 }
00225         }
00226         else
00227         {
00228                 // increment the chunk offset
00229                 _chunk_offset += bytes;
00230         }
00231         
00232         // Since the input buffer content is now valid (or is new)
00233         // the get pointer should be initialized (or reset).
00234         setg(_out_buf, _out_buf, _out_buf + bytes);
00235 
00236         return std::char_traits<char>::not_eof((unsigned char) _out_buf[0]);
00237 }
00238 
00239 BundleStreamBuf::Chunk::Chunk(const dtn::api::Bundle &b)
00240  : _bundle(b), _seq(StreamBundle::getSequenceNumber(b))
00241 {
00242 }
00243 
00244 BundleStreamBuf::Chunk::~Chunk()
00245 {
00246 }
00247 
00248 bool BundleStreamBuf::Chunk::operator==(const Chunk& other) const
00249 {
00250         return (_seq == other._seq);
00251 }
00252 
00253 bool BundleStreamBuf::Chunk::operator<(const Chunk& other) const
00254 {
00255         return (_seq < other._seq);
00256 }
00257 
00258 BundleStream::BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app, const dtn::data::EID &group, bool wait_seq_zero)
00259  : dtn::api::Client(app, group, stream), _stream(stream), _buf(*this, _chunk, chunk_size, wait_seq_zero)
00260 {};
00261 
00262 BundleStream::~BundleStream() {};
00263 
00264 BundleStreamBuf& BundleStream::rdbuf()
00265 {
00266         return _buf;
00267 }
00268 
00269 dtn::api::Bundle& BundleStream::base()
00270 {
00271         return _chunk;
00272 }
00273 
00274 void BundleStream::received(const dtn::api::Bundle &b)
00275 {
00276         _buf.received(b);
00277 }
00278 
00279 void print_help()
00280 {
00281         std::cout << "-- dtnstream (IBR-DTN) --" << std::endl;
00282         std::cout << "Syntax: dtnstream [options]"  << std::endl;
00283         std::cout << "" << std::endl;
00284         std::cout << "* common options *" << std::endl;
00285         std::cout << " -h               display this text" << std::endl;
00286         std::cout << " -U <socket>      use UNIX domain sockets" << std::endl;
00287         std::cout << " -s <identifier>  set the source identifier (e.g. stream)" << std::endl;
00288         std::cout << "" << std::endl;
00289         std::cout << "* send options *" << std::endl;
00290         std::cout << " -d <destination> set the destination eid (e.g. dtn://node/stream)" << std::endl;
00291         std::cout << " -G               destination is a group" << std::endl;
00292         std::cout << " -c <bytes>       set the chunk size (max. size of each bundle)" << std::endl;
00293         std::cout << " -l <seconds>     set the lifetime of stream chunks default: 30" << std::endl;
00294         std::cout << " -E               request encryption on the bundle layer" << std::endl;
00295         std::cout << " -S               request signature on the bundle layer" << std::endl;
00296         std::cout << "" << std::endl;
00297         std::cout << "* receive options *" << std::endl;
00298         std::cout << " -g <group>       join a destination group" << std::endl;
00299         std::cout << " -t <seconds>     set the timeout of the buffer" << std::endl;
00300         std::cout << " -w               wait for the bundle with seq zero" << std::endl;
00301         std::cout << "" << std::endl;
00302 }
00303 
00304 int main(int argc, char *argv[])
00305 {
00306         int opt = 0;
00307         dtn::data::EID _destination;
00308         std::string _source = "stream";
00309         unsigned int _lifetime = 30;
00310         size_t _chunk_size = 4096;
00311         dtn::data::EID _group;
00312         bool _bundle_encryption = false;
00313         bool _bundle_signed = false;
00314         bool _bundle_group = false;
00315         bool _wait_seq_zero = false;
00316         ibrcommon::File _unixdomain;
00317 
00318         while((opt = getopt(argc, argv, "hg:Gd:t:s:c:l:ESU:w")) != -1)
00319         {
00320                 switch (opt)
00321                 {
00322                 case 'h':
00323                         print_help();
00324                         return 0;
00325 
00326                 case 'd':
00327                         _destination = std::string(optarg);
00328                         break;
00329 
00330                 case 'g':
00331                         _group = std::string(optarg);
00332                         break;
00333 
00334                 case 'G':
00335                         _bundle_group = true;
00336                         break;
00337 
00338                 case 's':
00339                         _source = optarg;
00340                         break;
00341 
00342                 case 'c':
00343                         _chunk_size = atoi(optarg);
00344                         break;
00345 
00346                 case 't':
00347                         __timeout_receive__ = atoi(optarg);
00348                         break;
00349 
00350                 case 'l':
00351                         _lifetime = atoi(optarg);
00352                         break;
00353 
00354                 case 'E':
00355                         _bundle_encryption = true;
00356                         break;
00357 
00358                 case 'S':
00359                         _bundle_signed = true;
00360                         break;
00361 
00362                 case 'U':
00363                         _unixdomain = ibrcommon::File(optarg);
00364                         break;
00365 
00366                 case 'w':
00367                         _wait_seq_zero = true;
00368                         break;
00369 
00370                 default:
00371                         std::cout << "unknown command" << std::endl;
00372                         return -1;
00373                 }
00374         }
00375 
00376         try {
00377                 // Create a stream to the server using TCP.
00378                 ibrcommon::tcpclient conn;
00379 
00380                 // check if the unixdomain socket exists
00381                 if (_unixdomain.exists())
00382                 {
00383                         // connect to the unix domain socket
00384                         conn.open(_unixdomain);
00385                 }
00386                 else
00387                 {
00388                         // connect to the standard local api port
00389                         conn.open("127.0.0.1", 4550);
00390 
00391                         // enable nodelay option
00392                         conn.enableNoDelay();
00393                 }
00394 
00395                 // Initiate a derivated client
00396                 BundleStream bs(conn, _chunk_size, _source, _group, _wait_seq_zero);
00397 
00398                 // Connect to the server. Actually, this function initiate the
00399                 // stream protocol by starting the thread and sending the contact header.
00400                 bs.connect();
00401 
00402                 // transmitter mode
00403                 if (_destination != dtn::data::EID())
00404                 {
00405                         bs.base().setDestination(_destination);
00406                         bs.base().setLifetime(_lifetime);
00407                         if (_bundle_encryption) bs.base().requestEncryption();
00408                         if (_bundle_signed) bs.base().requestSigned();
00409                         if (_bundle_group) bs.base().setSingleton(false);
00410                         std::ostream stream(&bs.rdbuf());
00411                         stream << std::cin.rdbuf() << std::flush;
00412                 }
00413                 // receiver mode
00414                 else
00415                 {
00416                         std::istream stream(&bs.rdbuf());
00417                         std::cout << stream.rdbuf() << std::flush;
00418                 }
00419 
00420                 // Shutdown the client connection.
00421                 bs.close();
00422                 conn.close();
00423         } catch (const ibrcommon::tcpclient::SocketException&) {
00424                 std::cerr << "Can not connect to the daemon. Does it run?" << std::endl;
00425                 return -1;
00426         } catch (const std::exception&) {
00427 
00428         }
00429 
00430 }