27 #ifndef HAVE_FEATURES_H
35 #include <sys/socket.h>
37 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
43 #include <sys/types.h>
54 static int win32_pipe(
int handles[2] )
57 struct sockaddr_in serv_addr;
58 int len =
sizeof( serv_addr );
60 handles[0] = handles[1] = INVALID_SOCKET;
62 if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
68 memset( &serv_addr, 0,
sizeof( serv_addr ) );
69 serv_addr.sin_family = AF_INET;
70 serv_addr.sin_port = htons(0);
71 serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
72 if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
78 if (listen(s, 1) == SOCKET_ERROR)
84 if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
90 if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
97 if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
103 if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
106 closesocket(handles[1]);
107 handles[1] = INVALID_SOCKET;
115 static int piperead(
int s,
char *buf,
int len )
117 int ret = recv(s, buf, len, 0);
119 if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
125 #define __compat_pipe(a) win32_pipe(a)
126 #define pipewrite(a,b,c) send(a,b,c,0)
129 #define __compat_pipe(a) ::pipe(a)
130 #define piperead(a,b,c) ::read(a,b,c)
131 #define pipewrite(a,b,c) ::write(a,b,c)
134 #ifdef HAVE_FEATURES_H
135 #define __compat_select ::select
137 int __compat_select(
int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *timeout)
141 return ::select(nfds, readfds, writefds, exceptfds, NULL);
146 struct timeval to_copy;
147 ::memcpy(&to_copy, timeout,
sizeof to_copy);
150 int ret = ::select(nfds, readfds, writefds, exceptfds, &to_copy);
156 timeout->tv_usec = 0;
162 struct timespec time_spend;
166 timeout->tv_sec = timeout->tv_sec - time_spend.tv_sec;
167 timeout->tv_usec = timeout->tv_usec - (time_spend.tv_nsec / 1000);
168 if (timeout->tv_usec < 0) {
170 timeout->tv_usec += 1000000L;
175 if (timeout->tv_sec < 0)
178 timeout->tv_usec = 0;
186 vsocket::pipesocket::pipesocket()
190 vsocket::pipesocket::~pipesocket()
194 int vsocket::pipesocket::getOutput()
const throw (socket_exception)
196 if (_state == SOCKET_DOWN)
throw socket_exception(
"output fd not available");
200 void vsocket::pipesocket::up() throw (socket_exception)
202 if (_state != SOCKET_DOWN)
203 throw socket_exception(
"socket is already up");
211 throw socket_exception(
"failed to create pipe");
215 _output_fd = pipe_fds[1];
217 this->set_blocking_mode(
false);
218 this->set_blocking_mode(
false, _output_fd);
223 void vsocket::pipesocket::down() throw (socket_exception)
225 if (_state != SOCKET_UP)
226 throw socket_exception(
"socket is not up");
231 _state = SOCKET_DOWN;
234 void vsocket::pipesocket::read(
char *buf,
size_t len)
throw (socket_exception)
236 ssize_t ret =
piperead(this->fd(), buf, len);
238 throw socket_exception(
"read error");
240 throw socket_exception(
"end of file");
243 void vsocket::pipesocket::write(
const char *buf,
size_t len)
throw (socket_exception)
245 ssize_t ret =
pipewrite(_output_fd, buf, len);
247 throw socket_exception(
"write error");
250 vsocket::SocketState::SocketState(STATE initial)
255 vsocket::SocketState::~SocketState()
259 vsocket::SocketState::STATE vsocket::SocketState::get()
const
264 void vsocket::SocketState::set(STATE s)
throw (state_exception)
270 throw state_exception(
"illegal state requested");
273 if ((_state != DOWN) && (_state != PENDING_UP))
274 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of DOWN");
280 if ((_state != PENDING_DOWN) && (_state != SAFE_DOWN) && (_state != PENDING_UP))
281 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of PENDING_DOWN, PENDING_UP or SAFE_DOWN");
288 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of DOWN");
294 if ((_state != IDLE) && (_state != DOWN_REQUEST))
295 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of IDLE or DOWN_REQUEST");
301 if ((_state != PENDING_UP) && (_state != SAFE) && (_state != SELECT))
302 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of PENDING_UP, SAFE or SELECT");
308 if ((_state != IDLE) && (_state != SELECT))
309 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of IDLE or SELECT");
316 if (_state != SELECT)
317 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of SELECT or DOWN_REQUEST");
323 if ((_state != SAFE_REQUEST) && (_state != IDLE))
324 throw state_exception(
"can not change to " + __getname(s) +
", state is " + __getname(_state) +
" instead of SAFE_REQUEST or IDLE");
328 }
catch (
const Conditional::ConditionalAbortException &e) {
329 throw state_exception(e.what());
333 void vsocket::SocketState::setwait(STATE s, STATE abortstate)
throw (state_exception)
339 throw state_exception(
"illegal state requested");
342 while ((_state != DOWN) && (_state != PENDING_UP)) {
343 if (_state == abortstate)
344 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
352 while ((_state != PENDING_DOWN) && (_state != SAFE_DOWN) && (_state != DOWN)) {
353 if (_state == abortstate)
354 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
362 while (_state != DOWN) {
363 if (_state == abortstate)
364 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
372 while ((_state != IDLE) && (_state != DOWN_REQUEST)) {
373 if (_state == abortstate)
374 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
382 while ((_state != PENDING_UP) && (_state != SAFE) && (_state != SELECT)) {
383 if (_state == abortstate)
384 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
392 while ((_state != IDLE) && (_state != SELECT)) {
393 if (_state == abortstate)
394 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
403 while (_state != SELECT) {
404 if (_state == abortstate)
405 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
413 while ((_state != SAFE_REQUEST) && (_state != IDLE)) {
414 if (_state == abortstate)
415 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
421 }
catch (
const Conditional::ConditionalAbortException &e) {
422 throw state_exception(e.what());
426 void vsocket::SocketState::wait(STATE s, STATE abortstate)
throw (state_exception)
429 while (_state != s) {
430 if (_state == abortstate)
431 throw state_exception(
"abort state " + __getname(abortstate) +
" reached");
434 }
catch (
const Conditional::ConditionalAbortException &e) {
435 throw state_exception(e.what());
439 void vsocket::SocketState::__change(STATE s)
446 std::string vsocket::SocketState::__getname(STATE s)
const
458 return "PENDING_DOWN";
464 return "SAFE_REQUEST";
466 return "DOWN_REQUEST";
474 vsocket::SafeLock::SafeLock(SocketState &state, vsocket &sock)
480 while ( (_state.get() != SocketState::DOWN) && (_state.get() != SocketState::IDLE) && (_state.get() != SocketState::SELECT) ) {
484 if (_state.get() == SocketState::SELECT) {
485 _state.set(SocketState::SAFE_REQUEST);
488 _state.wait(SocketState::SAFE);
489 }
else if (_state.get() == SocketState::DOWN) {
490 _state.set(SocketState::SAFE_DOWN);
492 _state.set(SocketState::SAFE);
496 vsocket::SafeLock::~SafeLock()
500 if (_state.get() == SocketState::SAFE)
502 _state.set(SocketState::IDLE);
504 else if (_state.get() == SocketState::SAFE_DOWN)
506 _state.set(SocketState::DOWN);
510 throw SocketState::state_exception(
"socket not in safe state");
514 vsocket::SelectGuard::SelectGuard(SocketState &state,
int &counter,
ibrcommon::vsocket &sock)
515 : _state(state), _counter(counter), _sock(sock)
520 _state.setwait(SocketState::SELECT, SocketState::DOWN);
523 }
catch (
const SocketState::state_exception&) {
524 throw vsocket_interrupt(
"select interrupted while waiting for IDLE socket");
528 vsocket::SelectGuard::~SelectGuard()
537 if (_state.get() == SocketState::SAFE_REQUEST) {
538 _state.set(SocketState::SAFE);
539 }
else if (_state.get() == SocketState::DOWN_REQUEST) {
540 _state.set(SocketState::PENDING_DOWN);
542 _state.set(SocketState::IDLE);
548 if (_state.get() == SocketState::SAFE_REQUEST) {
550 }
else if (_state.get() == SocketState::DOWN_REQUEST) {
555 throw vsocket_interrupt(
"select interrupted while checking SAFE_REQUEST state");
560 : _state(SocketState::DOWN), _select_count(0)
569 }
catch (
const socket_exception &ex) {
575 SafeLock l(_state, *
this);
576 _sockets.insert(socket);
581 SafeLock l(_state, *
this);
582 _sockets.insert(socket);
583 _socket_map[iface].insert(socket);
588 SafeLock l(_state, *
this);
589 _sockets.erase(socket);
592 for (std::map<vinterface, socketset>::iterator iter = _socket_map.begin(); iter != _socket_map.end(); ++iter)
601 return _sockets.size();
606 SafeLock l(_state, *
this);
615 for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter)
631 std::map<vinterface, socketset>::const_iterator iter = _socket_map.find(iface);
633 if (iter == _socket_map.end()) {
638 return (*iter).second;
646 _state.setwait(SocketState::PENDING_UP, SocketState::IDLE);
649 for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter) {
651 if (!(*iter)->ready()) (*iter)->up();
652 }
catch (
const socket_exception&) {
654 for (socketset::iterator riter = _sockets.begin(); riter != iter; ++riter) {
659 _state.set(SocketState::DOWN);
666 _state.set(SocketState::IDLE);
673 if (_state.get() == SocketState::DOWN) {
676 else if (_state.get() == SocketState::PENDING_DOWN || _state.get() == SocketState::DOWN_REQUEST) {
678 _state.wait(SocketState::DOWN);
681 else if (_state.get() == SocketState::SELECT) {
682 _state.setwait(SocketState::DOWN_REQUEST, SocketState::DOWN);
686 _state.setwait(SocketState::PENDING_DOWN, SocketState::DOWN);
695 for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter) {
697 if ((*iter)->ready()) (*iter)->down();
698 }
catch (
const socket_exception&) { }
703 _state.setwait(SocketState::DOWN);
706 void vsocket::interrupt()
721 SelectGuard guard(_state, _select_count, *
this);
728 FD_SET(_pipe.fd(), &fds_read);
729 high_fd = _pipe.fd();
733 for (socketset::iterator iter = _sockets.begin();
734 iter != _sockets.end(); ++iter)
737 if (!sock.
ready())
continue;
739 if (readset != NULL) {
740 FD_SET(sock.
fd(), &fds_read);
741 if (high_fd < sock.
fd()) high_fd = sock.
fd();
744 if (writeset != NULL) {
745 FD_SET(sock.
fd(), &fds_write);
746 if (high_fd < sock.
fd()) high_fd = sock.
fd();
749 if (errorset != NULL) {
750 FD_SET(sock.
fd(), &fds_error);
751 if (high_fd < sock.
fd()) high_fd = sock.
fd();
757 int res =
__compat_select(high_fd + 1, &fds_read, &fds_write, &fds_error, tv);
760 int errcode = WSAGetLastError();
766 if (errcode == EINTR) {
770 else if (errcode == 0) {
773 else if (errcode == EBADF) {
783 if (FD_ISSET(_pipe.fd(), &fds_read))
797 for (socketset::iterator iter = _sockets.begin();
798 iter != _sockets.end(); ++iter)
802 if (readset != NULL) {
803 if (FD_ISSET(sock->
fd(), &fds_read))
805 readset->insert(sock);
809 if (writeset != NULL) {
810 if (FD_ISSET(sock->
fd(), &fds_write))
812 writeset->insert(sock);
816 if (errorset != NULL) {
817 if (FD_ISSET(sock->
fd(), &fds_error))
819 errorset->insert(sock);