IBR-DTNSuite  0.8
ibrcommon/ibrcommon/net/tcpstream.cpp
Go to the documentation of this file.
00001 /*
00002  * tcpstream.cpp
00003  *
00004  *  Created on: 29.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/Logger.h"
00010 #include "ibrcommon/net/tcpstream.h"
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include <netinet/in.h>
00013 #include <sys/types.h>
00014 #include <sys/socket.h>
00015 #include <arpa/inet.h>
00016 #include <netinet/tcp.h>
00017 #include <errno.h>
00018 #include <signal.h>
00019 #include <fcntl.h>
00020 #include <string.h>
00021 #include <unistd.h>
00022 
00023 namespace ibrcommon
00024 {
00025         tcpstream::tcpstream(int socket) :
00026                 std::iostream(this), errmsg(ERROR_NONE), _socket(socket), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _nonblocking(false), _timeout(0)
00027         {
00028                 // create a pipe for interruption
00029                 if (pipe(_interrupt_pipe_read) < 0)
00030                 {
00031                         IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL;
00032                         throw ibrcommon::Exception("failed to create pipe");
00033                 }
00034 
00035                 // create a pipe for interruption
00036                 if (pipe(_interrupt_pipe_write) < 0)
00037                 {
00038                         ::close(_interrupt_pipe_read[0]);
00039                         ::close(_interrupt_pipe_read[1]);
00040                         IBRCOMMON_LOGGER(error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL;
00041                         throw ibrcommon::Exception("failed to create pipe");
00042                 }
00043 
00044                 // prevent SIGPIPE from being thrown
00045                 int set = 1;
00046 
00047 #ifdef HAVE_FEATURES_H
00048                 ::setsockopt(_socket, SOL_SOCKET, MSG_NOSIGNAL, (void *)&set, sizeof(int));
00049 #else
00050                 ::setsockopt(_socket, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int));
00051 #endif
00052 
00053                 // set the pipe to non-blocking
00054                 vsocket::set_non_blocking(_interrupt_pipe_read[0]);
00055                 vsocket::set_non_blocking(_interrupt_pipe_read[1]);
00056                 vsocket::set_non_blocking(_interrupt_pipe_write[0]);
00057                 vsocket::set_non_blocking(_interrupt_pipe_write[1]);
00058 
00059                 // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00060                 setg(0, 0, 0);
00061                 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00062         }
00063 
00064         tcpstream::~tcpstream()
00065         {
00066                 delete[] in_buf_;
00067                 delete[] out_buf_;
00068 
00069                 // finally, close the socket
00070                 close();
00071 
00072                 // close all used pipes
00073                 ::close(_interrupt_pipe_read[0]);
00074                 ::close(_interrupt_pipe_read[1]);
00075                 ::close(_interrupt_pipe_write[0]);
00076                 ::close(_interrupt_pipe_write[1]);
00077         }
00078 
00079         string tcpstream::getAddress() const
00080         {
00081                 struct ::sockaddr_in sa;
00082                 int iLen = sizeof(sa);
00083 
00084                 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00085                 return inet_ntoa(sa.sin_addr);
00086         }
00087 
00088         int tcpstream::getPort() const
00089         {
00090                 struct ::sockaddr_in sa;
00091                 int iLen = sizeof(sa);
00092 
00093                 getpeername(_socket, (sockaddr*) &sa, (socklen_t*) &iLen);
00094                 return ntohs(sa.sin_port);
00095         }
00096 
00097         void tcpstream::interrupt()
00098         {
00099                 ::write(_interrupt_pipe_read[1], "i", 1);
00100                 ::write(_interrupt_pipe_write[1], "i", 1);
00101         }
00102 
00103         void tcpstream::close(bool errorcheck)
00104         {
00105                 static ibrcommon::Mutex close_lock;
00106                 ibrcommon::MutexLock l(close_lock);
00107 
00108                 if (_socket == -1)
00109                 {
00110                         if (errorcheck) throw ConnectionClosedException();
00111                         return;
00112                 }
00113 
00114                 int sock = _socket;
00115                 _socket = -1;
00116 
00117                 // unblock all socket operations
00118                 interrupt();
00119 
00120                 if ((::close(sock) == -1) && errorcheck)
00121                 {
00122                         throw ConnectionClosedException();
00123                 }
00124         }
00125 
00126         int tcpstream::sync()
00127         {
00128                 int ret = std::char_traits<char>::eq_int_type(this->overflow(
00129                                 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
00130                                 : 0;
00131 
00132                 return ret;
00133         }
00134 
00135         int tcpstream::select(int int_pipe, bool &read, bool &write, bool &error, int timeout) throw (select_exception)
00136         {
00137                 // return if the stream was closed
00138                 if (_socket == -1)
00139                 {
00140                         throw select_exception(select_exception::SELECT_CLOSED);
00141                 }
00142 
00143                 if (!_nonblocking)
00144                 {
00145                         // set the tcp socket to non-blocking
00146                         vsocket::set_non_blocking(_socket);
00147                         
00148                         // do not set this mode again
00149                         _nonblocking = true;
00150                 }
00151 
00152                 int high_fd = _socket;
00153 
00154                 fd_set fds_read, fds_write, fds_error;
00155                 fd_set *fdsp_write = NULL, *fdsp_error = NULL;
00156 
00157                 FD_ZERO(&fds_read);
00158                 FD_ZERO(&fds_write);
00159                 FD_ZERO(&fds_error);
00160 
00161                 if (read)
00162                 {
00163                         FD_SET(_socket, &fds_read);
00164                 }
00165 
00166                 if (write)
00167                 {
00168                         FD_SET(_socket, &fds_write);
00169                         fdsp_write = &fds_write;
00170                 }
00171 
00172                 if (error)
00173                 {
00174                         FD_SET(_socket, &fds_error);
00175                         fdsp_error = &fds_error;
00176                 }
00177 
00178                 read = false;
00179                 write = false;
00180                 error = false;
00181 
00182                 // socket for the self-pipe trick
00183                 FD_SET(int_pipe, &fds_read);
00184                 if (high_fd < int_pipe) high_fd = int_pipe;
00185 
00186                 int res = 0;
00187 
00188                 // set timeout
00189                 struct timeval tv;
00190                 tv.tv_sec = timeout;
00191                 tv.tv_usec = 0;
00192 
00193                 bool _continue = true;
00194                 while (_continue)
00195                 {
00196                         if (timeout > 0)
00197                         {
00198 #ifdef HAVE_FEATURES_H
00199                                 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv);
00200 #else
00201                                 res = __nonlinux_select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, &tv);
00202 #endif
00203                         }
00204                         else
00205                         {
00206                                 res = ::select(high_fd + 1, &fds_read, fdsp_write, fdsp_error, NULL);
00207                         }
00208 
00209                         // check for select error
00210                         if (res < 0)
00211                         {
00212                                 throw select_exception(select_exception::SELECT_ERROR);
00213                         }
00214 
00215                         // check for timeout
00216                         if (res == 0)
00217                         {
00218                                 throw select_exception(select_exception::SELECT_TIMEOUT);
00219                         }
00220 
00221                         if (FD_ISSET(int_pipe, &fds_read))
00222                         {
00223                                 IBRCOMMON_LOGGER_DEBUG(25) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL;
00224 
00225                                 // this was an interrupt with the self-pipe-trick
00226                                 char buf[2];
00227                                 ::read(int_pipe, buf, 2);
00228                         }
00229 
00230                         // return if the stream was closed
00231                         if (_socket == -1)
00232                         {
00233                                 throw select_exception(select_exception::SELECT_CLOSED);
00234                         }
00235 
00236                         if (FD_ISSET(_socket, &fds_read))
00237                         {
00238                                 read = true;
00239                                 _continue = false;
00240                         }
00241 
00242                         if (FD_ISSET(_socket, &fds_write))
00243                         {
00244                                 write = true;
00245                                 _continue = false;
00246                         }
00247 
00248                         if (FD_ISSET(_socket, &fds_error))
00249                         {
00250                                 error = true;
00251                                 _continue = false;
00252                         }
00253                 }
00254 
00255                 return res;
00256         }
00257 
00258         std::char_traits<char>::int_type tcpstream::overflow(std::char_traits<char>::int_type c)
00259         {
00260                 char *ibegin = out_buf_;
00261                 char *iend = pptr();
00262 
00263                 // mark the buffer as free
00264                 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00265 
00266                 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
00267                 {
00268                         *iend++ = std::char_traits<char>::to_char_type(c);
00269                 }
00270 
00271                 // if there is nothing to send, just return
00272                 if ((iend - ibegin) == 0)
00273                 {
00274                         IBRCOMMON_LOGGER_DEBUG(90) << "tcpstream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
00275                         return std::char_traits<char>::not_eof(c);
00276                 }
00277 
00278                 try {
00279                         bool read = false, write = true, error = false;
00280                         select(_interrupt_pipe_write[0], read, write, error, _timeout);
00281 
00282                         // bytes to send
00283                         size_t bytes = (iend - ibegin);
00284 
00285                         // send the data
00286                         ssize_t ret = ::send(_socket, out_buf_, (iend - ibegin), 0);
00287 
00288                         if (ret < 0)
00289                         {
00290                                 switch (errno)
00291                                 {
00292                                 case EPIPE:
00293                                         // connection has been reset
00294                                         errmsg = ERROR_EPIPE;
00295                                         break;
00296 
00297                                 case ECONNRESET:
00298                                         // Connection reset by peer
00299                                         errmsg = ERROR_RESET;
00300                                         break;
00301 
00302                                 case EAGAIN:
00303                                         // sent failed but we should retry again
00304                                         return overflow(c);
00305 
00306                                 default:
00307                                         errmsg = ERROR_WRITE;
00308                                         break;
00309                                 }
00310 
00311                                 // failure
00312                                 close();
00313                                 std::stringstream ss; ss << "<tcpstream> send() in tcpstream failed: " << errno;
00314                                 throw ConnectionClosedException(ss.str());
00315                         }
00316                         else
00317                         {
00318                                 // check how many bytes are sent
00319                                 if ((size_t)ret < bytes)
00320                                 {
00321                                         // we did not sent all bytes
00322                                         char *resched_begin = ibegin + ret;
00323                                         char *resched_end = iend;
00324 
00325                                         // bytes left to send
00326                                         size_t bytes_left = resched_end - resched_begin;
00327 
00328                                         // move the data to the begin of the buffer
00329                                         ::memcpy(ibegin, resched_begin, bytes_left);
00330 
00331                                         // new free buffer
00332                                         char *buffer_begin = ibegin + bytes_left;
00333 
00334                                         // mark the buffer as free
00335                                         setp(buffer_begin, out_buf_ + BUFF_SIZE - 1);
00336                                 }
00337                         }
00338                 } catch (const select_exception &ex) {
00339                         // send timeout
00340                         errmsg = ERROR_WRITE;
00341                         close();
00342                         throw ConnectionClosedException("<tcpstream> send() timed out");
00343                 }
00344 
00345                 return std::char_traits<char>::not_eof(c);
00346         }
00347 
00348         std::char_traits<char>::int_type tcpstream::underflow()
00349         {
00350                 try {
00351                         bool read = true;
00352                         bool write = false;
00353                         bool error = false;
00354 
00355                         select(_interrupt_pipe_read[0], read, write, error, _timeout);
00356 
00357                         // read some bytes
00358                         int bytes = ::recv(_socket, in_buf_, BUFF_SIZE, 0);
00359 
00360                         // end of stream
00361                         if (bytes == 0)
00362                         {
00363                                 errmsg = ERROR_CLOSED;
00364                                 close();
00365                                 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() returned zero: " << errno << IBRCOMMON_LOGGER_ENDL;
00366                                 return std::char_traits<char>::eof();
00367                         }
00368                         else if (bytes < 0)
00369                         {
00370                                 switch (errno)
00371                                 {
00372                                 case EPIPE:
00373                                         // connection has been reset
00374                                         errmsg = ERROR_EPIPE;
00375                                         break;
00376 
00377                                 default:
00378                                         errmsg = ERROR_READ;
00379                                         break;
00380                                 }
00381 
00382                                 close();
00383                                 IBRCOMMON_LOGGER_DEBUG(40) << "<tcpstream> recv() failed: " << errno << IBRCOMMON_LOGGER_ENDL;
00384                                 return std::char_traits<char>::eof();
00385                         }
00386 
00387                         // Since the input buffer content is now valid (or is new)
00388                         // the get pointer should be initialized (or reset).
00389                         setg(in_buf_, in_buf_, in_buf_ + bytes);
00390 
00391                         return std::char_traits<char>::not_eof((unsigned char) in_buf_[0]);
00392                 } catch (const select_exception &ex) {
00393                         return std::char_traits<char>::eof();
00394                 }
00395         }
00396 
00397         void tcpstream::setTimeout(unsigned int value)
00398         {
00399                 _timeout = value;
00400         }
00401 
00402         void tcpstream::enableKeepalive()
00403         {
00404                 /* Set the option active */
00405                 int optval = 1;
00406                 if(setsockopt(_socket, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval)) < 0) {
00407                         throw ibrcommon::vsocket_exception("<tcpstream> can not activate keepalives");
00408                 }
00409         }
00410 
00411         void tcpstream::enableLinger(int l)
00412         {
00413                 // set linger option to the socket
00414                 struct linger linger;
00415 
00416                 linger.l_onoff = 1;
00417                 linger.l_linger = l;
00418                 ::setsockopt(_socket, SOL_SOCKET, SO_LINGER, &linger, sizeof(linger));
00419         }
00420 
00421         void tcpstream::enableNoDelay()
00422         {
00423                 int set = 1;
00424                 ::setsockopt(_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&set, sizeof(set));
00425         }
00426 
00427 }