IBR-DTNSuite  0.8
tools/src/BundleStream.h
Go to the documentation of this file.
00001 /*
00002  * BundleStream.h
00003  *
00004  *  Created on: 23.03.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef BUNDLESTREAM_H_
00009 #define BUNDLESTREAM_H_
00010 
00011 #include <ibrdtn/api/Bundle.h>
00012 #include <ibrdtn/api/Client.h>
00013 #include <ibrdtn/data/EID.h>
00014 #include <ibrdtn/data/SDNV.h>
00015 #include <ibrdtn/data/StreamBlock.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 
00018 class StreamBundle : public dtn::api::Bundle
00019 {
00020 public:
00021         StreamBundle();
00022         StreamBundle(const dtn::api::Bundle &b);
00023         virtual ~StreamBundle();
00024 
00028         void append(const char* data, size_t length);
00029 
00033         void clear();
00034 
00039         size_t size();
00040 
00041         static size_t getSequenceNumber(const StreamBundle &b);
00042 
00043 private:
00044         // reference to the BLOB where all data is stored until transmission
00045         ibrcommon::BLOB::Reference _ref;
00046 };
00047 
00048 class BundleStreamBuf : public std::basic_streambuf<char, std::char_traits<char> >
00049 {
00050 public:
00051         // The size of the input and output buffers.
00052         static const size_t BUFF_SIZE = 5120;
00053 
00054         BundleStreamBuf(dtn::api::Client &client, StreamBundle &chunk, size_t buffer = 4096, bool wait_seq_zero = false);
00055         virtual ~BundleStreamBuf();
00056 
00057         virtual void received(const dtn::api::Bundle &b);
00058 
00059 protected:
00060         virtual int sync();
00061         virtual std::char_traits<char>::int_type overflow(std::char_traits<char>::int_type = std::char_traits<char>::eof());
00062         virtual std::char_traits<char>::int_type underflow();
00063         std::char_traits<char>::int_type __underflow();
00064 
00065 private:
00066         class Chunk
00067         {
00068         public:
00069                 Chunk(const dtn::api::Bundle &b);
00070                 virtual ~Chunk();
00071 
00072                 bool operator==(const Chunk& other) const;
00073                 bool operator<(const Chunk& other) const;
00074 
00075                 dtn::api::Bundle _bundle;
00076                 size_t _seq;
00077         };
00078 
00079         // Input buffer
00080         char *_in_buf;
00081         // Output buffer
00082         char *_out_buf;
00083 
00084         dtn::api::Client &_client;
00085         StreamBundle &_chunk;
00086         size_t _buffer;
00087 
00088         ibrcommon::Conditional _chunks_cond;
00089         std::set<Chunk> _chunks;
00090         size_t _chunk_offset;
00091 
00092         size_t _in_seq;
00093         bool _streaming;
00094 };
00095 
00096 class BundleStream : public dtn::api::Client
00097 {
00098 public:
00099         BundleStream(ibrcommon::tcpstream &stream, size_t chunk_size, const std::string &app = "stream", const dtn::data::EID &group = dtn::data::EID(), bool wait_seq_zero = false);
00100         virtual ~BundleStream();
00101 
00102         BundleStreamBuf& rdbuf();
00103         dtn::api::Bundle& base();
00104 
00105 protected:
00106         virtual void received(const dtn::api::Bundle &b);
00107 
00108 private:
00109         ibrcommon::tcpstream &_stream;
00110 
00111         BundleStreamBuf _buf;
00112         StreamBundle _chunk;
00113 };
00114 
00115 #endif /* BUNDLESTREAM_H_ */