IBR-DTNSuite
0.8
|
00001 /* 00002 * tcpstream.cpp 00003 * 00004 * Created on: 29.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrcommon/config.h" 00009 #include "ibrcommon/Logger.h" 00010 #include "ibrcommon/net/tcpstream.h" 00011 #include "ibrcommon/thread/MutexLock.h" 00012 #include <netinet/in.h> 00013 #include <sys/types.h> 00014 #include <sys/socket.h> 00015 #include <arpa/inet.h> 00016 #include <netinet/tcp.h> 00017 #include <errno.h> 00018 #include <signal.h> 00019 #include <fcntl.h> 00020 #include <string.h> 00021 #include <unistd.h> 00022 00023 namespace ibrcommon 00024 { 00025 tcpstream::tcpstream(int socket) : 00026 std::iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _nonblocking(false), _timeout(0) 00027 { 00028 // create a pipe for interruption 00029 if (pipe(_interrupt_pipe_read) < 0) 00030 { 00031 IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL; 00032 throw ibrcommon::Exception("failed to create pipe"); 00033 } 00034 00035 // create a pipe for interruption 00036 if (pipe(_interrupt_pipe_write) < 0) 00037 { 00038 ::close(_interrupt_pipe_read[0]); 00039 ::close(_interrupt_pipe_read[1]); 00040 IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL; 00041 throw ibrcommon::Exception("failed to create pipe"); 00042 } 00043 00044 // prevent SIGPIPE from being thrown 00045 int set = 1; 00046 00047 #ifdef HAVE_FEATURES_H 00048 ::setsockopt(_socket, SOL_SOCKET, MSG_NOSIGNAL, (void *)&set, sizeof(int)); 00049 #else 00050 ::setsockopt(_socket, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int)); 00051 #endif 00052 00053 // set the pipe to non-blocking 00054 vsocket::set_non_blocking(_interrupt_pipe_read[0]); 00055 vsocket::set_non_blocking(_interrupt_pipe_read[1]); 00056 vsocket::set_non_blocking(_interrupt_pipe_write[0]); 00057 vsocket::set_non_blocking(_interrupt_pipe_write[1]); 00058 00059 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00060 setg(0, 0, 0); 00061 setp(out_buf_, out_buf_ + BUFF_SIZE - 1); 00062 } 00063 00064 tcpstream::~tcpstream() 00065 { 00066 delete[] in_buf_; 00067 delete[] out_buf_; 00068 00069 // finally, close the socket 00070 close(); 00071 00072 // close all used pipes 00073 ::close(_interrupt_pipe_read[0]); 00074 ::close(_interrupt_pipe_read[1]); 00075 ::close(_interrupt_pipe_write[0]); 00076 ::close(_interrupt_pipe_write[1]); 00077 } 00078 00079 string tcpstream::getAddress() const 00080 { 00081 struct ::sockaddr_in sa; 00082 int iLen = sizeof(sa); 00083 00084 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen); 00085 return inet_ntoa(sa.sin_addr); 00086 } 00087 00088 int tcpstream::getPort() const 00089 { 00090 struct ::sockaddr_in sa; 00091 int iLen = sizeof(sa); 00092 00093 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen); 00094 return ntohs(sa.sin_port); 00095 } 00096 00097 void tcpstream::interrupt() 00098 { 00099 ::write(_interrupt_pipe_read[1], "i", 1); 00100 ::write(_interrupt_pipe_write[1], "i", 1); 00101 } 00102 00103 void tcpstream::close(bool errorcheck) 00104 { 00105 static ibrcommon::Mutex close_lock; 00106 ibrcommon::MutexLock l(close_lock); 00107 00108 if (_socket == -1) 00109 { 00110 if (errorcheck) throw ConnectionClosedException(); 00111 return; 00112 } 00113 00114 int sock = _socket; 00115 _socket = -1; 00116 00117 // unblock all socket operations 00118 interrupt(); 00119 00120 if ((::close(sock) == -1) && errorcheck) 00121 { 00122 throw ConnectionClosedException(); 00123 } 00124 } 00125 00126 int tcpstream::sync() 00127 { 00128 int ret = std::char_traits<char>::eq_int_type(this->overflow( 00129 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1 00130 : 0; 00131 00132 return ret; 00133 } 00134 00135 int tcpstream::select(int int_pipe, bool &read, bool &write, bool &error, int timeout) throw (select_exception) 00136 { 00137 // return if the stream was closed 00138 if (_socket == -1) 00139 { 00140 throw select_exception(select_exception::SELECT_CLOSED); 00141 } 00142 00143 if (!_nonblocking) 00144 { 00145 // set the tcp socket to non-blocking 00146 vsocket::set_non_blocking(_socket); 00147 00148 // do not set this mode again 00149 _nonblocking = true; 00150 } 00151 00152 int high_fd = _socket; 00153 00154 fd_set fds_read, fds_write, fds_error; 00155 fd_set *fdsp_write = NULL, *fdsp_error = NULL; 00156 00157 FD_ZERO(&fds_read); 00158 FD_ZERO(&fds_write); 00159 FD_ZERO(&fds_error); 00160 00161 if (read) 00162 { 00163 FD_SET(_socket, &fds_read); 00164 } 00165 00166 if (write) 00167 { 00168 FD_SET(_socket, &fds_write); 00169 fdsp_write = &fds_write; 00170 } 00171 00172 if (error) 00173 { 00174 FD_SET(_socket, &fds_error); 00175 fdsp_error = &fds_error; 00176 } 00177 00178 read = false; 00179 write = false; 00180 error = false; 00181 00182 // socket for the self-pipe trick 00183 FD_SET(int_pipe, &fds_read); 00184 if (high_fd < int_pipe) high_fd = int_pipe; 00185 00186 int res = 0; 00187 00188 // set timeout 00189 struct timeval tv; 00190 tv.tv_sec = timeout; 00191 tv.tv_usec = 0; 00192 00193 bool _continue = true; 00194 while (_continue) 00195 { 00196 if (timeout > 0) 00197 { 00198 #ifdef HAVE_FEATURES_H 00199 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv); 00200 #else 00201 res = __nonlinux_select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv); 00202 #endif 00203 } 00204 else 00205 { 00206 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, NULL); 00207 } 00208 00209 // check for select error 00210 if (res < 0) 00211 { 00212 throw select_exception(select_exception::SELECT_ERROR); 00213 } 00214 00215 // check for timeout 00216 if (res == 0) 00217 { 00218 throw select_exception(select_exception::SELECT_TIMEOUT); 00219 } 00220 00221 if (FD_ISSET(int_pipe, &fds_read)) 00222 { 00223 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL; 00224 00225 // this was an interrupt with the self-pipe-trick 00226 char buf[2]; 00227 ::read(int_pipe, buf, 2); 00228 } 00229 00230 // return if the stream was closed 00231 if (_socket == -1) 00232 { 00233 throw select_exception(select_exception::SELECT_CLOSED); 00234 } 00235 00236 if (FD_ISSET(_socket, &fds_read)) 00237 { 00238 read = true; 00239 _continue = false; 00240 } 00241 00242 if (FD_ISSET(_socket, &fds_write)) 00243 { 00244 write = true; 00245 _continue = false; 00246 } 00247 00248 if (FD_ISSET(_socket, &fds_error)) 00249 { 00250 error = true; 00251 _continue = false; 00252 } 00253 } 00254 00255 return res; 00256 } 00257 00258 std::char_traits<char>::int_type tcpstream::overflow(std::char_traits<char>::int_type c) 00259 { 00260 char *ibegin = out_buf_; 00261 char *iend = pptr(); 00262 00263 // mark the buffer as free 00264 setp(out_buf_, out_buf_ + BUFF_SIZE - 1); 00265 00266 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof())) 00267 { 00268 *iend++ = std::char_traits<char>::to_char_type(c); 00269 } 00270 00271 // if there is nothing to send, just return 00272 if ((iend - ibegin) == 0) 00273 { 00274 IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL; 00275 return std::char_traits<char>::not_eof(c); 00276 } 00277 00278 try { 00279 bool read = false, write = true, error = false; 00280 select(_interrupt_pipe_write[0], read, write, error, _timeout); 00281 00282 // bytes to send 00283 size_t bytes = (iend - ibegin); 00284 00285 // send the data 00286 ssize_t ret = ::send(_socket, out_buf_, (iend - ibegin), 0); 00287 00288 if (ret < 0) 00289 { 00290 switch (errno) 00291 { 00292 case EPIPE: 00293 // connection has been reset 00294 errmsg = ERROR_EPIPE; 00295 break; 00296 00297 case ECONNRESET: 00298 // Connection reset by peer 00299 errmsg = ERROR_RESET; 00300 break; 00301 00302 case EAGAIN: 00303 // sent failed but we should retry again 00304 return overflow(c); 00305 00306 default: 00307 errmsg = ERROR_WRITE; 00308 break; 00309 } 00310 00311 // failure 00312 close(); 00313 std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno; 00314 throw ConnectionClosedException(ss.str()); 00315 } 00316 else 00317 { 00318 // check how many bytes are sent 00319 if ((size_t)ret < bytes) 00320 { 00321 // we did not sent all bytes 00322 char *resched_begin = ibegin + ret; 00323 char *resched_end = iend; 00324 00325 // bytes left to send 00326 size_t bytes_left = resched_end - resched_begin; 00327 00328 // move the data to the begin of the buffer 00329 ::memcpy(ibegin, resched_begin, bytes_left); 00330 00331 // new free buffer 00332 char *buffer_begin = ibegin + bytes_left; 00333 00334 // mark the buffer as free 00335 setp(buffer_begin, out_buf_ + BUFF_SIZE - 1); 00336 } 00337 } 00338 } catch (const select_exception &ex) { 00339 // send timeout 00340 errmsg = ERROR_WRITE; 00341 close(); 00342 throw ConnectionClosedException("<tcpstream> send() timed out"); 00343 } 00344 00345 return std::char_traits<char>::not_eof(c); 00346 } 00347 00348 std::char_traits<char>::int_type tcpstream::underflow() 00349 { 00350 try { 00351 bool read = true; 00352 bool write = false; 00353 bool error = false; 00354 00355 select(_interrupt_pipe_read[0], read, write, error, _timeout); 00356 00357 // read some bytes 00358 int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0); 00359 00360 // end of stream 00361 if (bytes == 0) 00362 { 00363 errmsg = ERROR_CLOSED; 00364 close(); 00365 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() returned zero: " << errno << IBRCOMMON_LOGGER_ENDL; 00366 return std::char_traits<char>::eof(); 00367 } 00368 else if (bytes < 0) 00369 { 00370 switch (errno) 00371 { 00372 case EPIPE: 00373 // connection has been reset 00374 errmsg = ERROR_EPIPE; 00375 break; 00376 00377 default: 00378 errmsg = ERROR_READ; 00379 break; 00380 } 00381 00382 close(); 00383 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() failed: " << errno << IBRCOMMON_LOGGER_ENDL; 00384 return std::char_traits<char>::eof(); 00385 } 00386 00387 // Since the input buffer content is now valid (or is new) 00388 // the get pointer should be initialized (or reset). 00389 setg(in_buf_, in_buf_, in_buf_ + bytes); 00390 00391 return std::char_traits<char>::not_eof((unsigned char) in_buf_[0]); 00392 } catch (const select_exception &ex) { 00393 return std::char_traits<char>::eof(); 00394 } 00395 } 00396 00397 void tcpstream::setTimeout(unsigned int value) 00398 { 00399 _timeout = value; 00400 } 00401 00402 void tcpstream::enableKeepalive() 00403 { 00404 /* Set the option active */ 00405 int optval = 1; 00406 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) { 00407 throw ibrcommon::vsocket_exception("<tcpstream> can not activate keepalives"); 00408 } 00409 } 00410 00411 void tcpstream::enableLinger(int l) 00412 { 00413 // set linger option to the socket 00414 struct linger linger; 00415 00416 linger.l_onoff = 1; 00417 linger.l_linger = l; 00418 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger)); 00419 } 00420 00421 void tcpstream::enableNoDelay() 00422 { 00423 int set = 1; 00424 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set)); 00425 } 00426 00427 }