IBR-DTNSuite
0.8
|
00001 /* 00002 * StreamConnection.cpp 00003 * 00004 * Created on: 02.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/streams/StreamConnection.h" 00009 #include "ibrdtn/data/Exceptions.h" 00010 #include "ibrdtn/streams/StreamContactHeader.h" 00011 #include "ibrdtn/streams/StreamDataSegment.h" 00012 #include "ibrdtn/data/Exceptions.h" 00013 #include <ibrcommon/TimeMeasurement.h> 00014 #include <ibrcommon/Logger.h> 00015 00016 using namespace dtn::data; 00017 00018 namespace dtn 00019 { 00020 namespace streams 00021 { 00022 StreamConnection::StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size) 00023 : std::iostream(&_buf), _callback(cb), _buf(*this, stream, buffer_size), _shutdown_reason(CONNECTION_SHUTDOWN_NOTSET) 00024 { 00025 } 00026 00027 StreamConnection::~StreamConnection() 00028 { 00029 } 00030 00031 void StreamConnection::handshake(const dtn::data::EID &eid, const size_t timeout, const char flags) 00032 { 00033 // create a new header 00034 dtn::streams::StreamContactHeader header(eid); 00035 00036 // set timeout 00037 header._keepalive = timeout; 00038 00039 // set flags 00040 header._flags = flags; 00041 00042 // do the handshake 00043 _peer = _buf.handshake(header); 00044 00045 // signal the complete handshake 00046 _callback.eventConnectionUp(_peer); 00047 } 00048 00049 void StreamConnection::reject() 00050 { 00051 _buf.reject(); 00052 } 00053 00054 void StreamConnection::keepalive() 00055 { 00056 _buf.keepalive(); 00057 } 00058 00059 void StreamConnection::shutdown(ConnectionShutdownCases csc) 00060 { 00061 if (csc == CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN) 00062 { 00063 // wait for the last ACKs 00064 _buf.wait(); 00065 } 00066 00067 // skip if another shutdown is in progress 00068 { 00069 ibrcommon::MutexLock l(_shutdown_reason_lock); 00070 if (_shutdown_reason != CONNECTION_SHUTDOWN_NOTSET) 00071 { 00072 _buf.abort(); 00073 return; 00074 } 00075 _shutdown_reason = csc; 00076 } 00077 00078 try { 00079 switch (csc) 00080 { 00081 case CONNECTION_SHUTDOWN_IDLE: 00082 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_IDLE_TIMEOUT); 00083 _buf.abort(); 00084 _callback.eventTimeout(); 00085 break; 00086 case CONNECTION_SHUTDOWN_ERROR: 00087 _buf.abort(); 00088 _callback.eventError(); 00089 break; 00090 case CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN: 00091 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_NONE); 00092 _callback.eventShutdown(csc); 00093 break; 00094 case CONNECTION_SHUTDOWN_NODE_TIMEOUT: 00095 _buf.abort(); 00096 _callback.eventTimeout(); 00097 break; 00098 case CONNECTION_SHUTDOWN_PEER_SHUTDOWN: 00099 _buf.shutdown(StreamDataSegment::MSG_SHUTDOWN_NONE); 00100 case CONNECTION_SHUTDOWN_NOTSET: 00101 _buf.abort(); 00102 _callback.eventShutdown(csc); 00103 break; 00104 } 00105 } catch (const StreamConnection::StreamErrorException&) { 00106 _callback.eventError(); 00107 } 00108 00109 _buf.close(); 00110 _callback.eventConnectionDown(); 00111 } 00112 00113 void StreamConnection::eventShutdown(StreamConnection::ConnectionShutdownCases csc) 00114 { 00115 _callback.eventShutdown(csc); 00116 } 00117 00118 void StreamConnection::eventBundleAck(size_t ack) 00119 { 00120 _callback.eventBundleAck(ack); 00121 } 00122 00123 void StreamConnection::eventBundleRefused() 00124 { 00125 IBRCOMMON_LOGGER_DEBUG(20) << "bundle has been refused" << IBRCOMMON_LOGGER_ENDL; 00126 _callback.eventBundleRefused(); 00127 } 00128 00129 void StreamConnection::eventBundleForwarded() 00130 { 00131 IBRCOMMON_LOGGER_DEBUG(20) << "bundle has been forwarded" << IBRCOMMON_LOGGER_ENDL; 00132 _callback.eventBundleForwarded(); 00133 } 00134 00135 void StreamConnection::connectionTimeout() 00136 { 00137 // call superclass 00138 _callback.eventTimeout(); 00139 } 00140 00141 void StreamConnection::enableIdleTimeout(size_t seconds) 00142 { 00143 _buf.enableIdleTimeout(seconds); 00144 } 00145 } 00146 }