IBR-DTNSuite  0.12
vsocket.cpp
Go to the documentation of this file.
1 /*
2  * vsocket.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "ibrcommon/config.h"
23 #include "ibrcommon/net/vsocket.h"
25 #include "ibrcommon/Logger.h"
26 
27 #ifndef HAVE_FEATURES_H
29 #endif
30 
31 #ifdef __WIN32__
32 #include <winsock2.h>
33 #include <windows.h>
34 #else
35 #include <sys/socket.h>
36 #include <netdb.h>
37 #include <netinet/tcp.h>
38 #include <sys/un.h>
39 #include <arpa/inet.h>
40 #endif
41 
42 #include <algorithm>
43 #include <sys/types.h>
44 #include <errno.h>
45 #include <sstream>
46 #include <string.h>
47 #include <fcntl.h>
48 #include <signal.h>
49 #include <unistd.h>
50 
51 namespace ibrcommon
52 {
53 #ifdef __WIN32__
54  static int win32_pipe( int handles[2] )
55  {
56  SOCKET s;
57  struct sockaddr_in serv_addr;
58  int len = sizeof( serv_addr );
59 
60  handles[0] = handles[1] = INVALID_SOCKET;
61 
62  if ( ( s = socket( AF_INET, SOCK_STREAM, 0 ) ) == INVALID_SOCKET )
63  {
64  /* ereport(LOG, (errmsg_internal("pgpipe failed to create socket: %ui", WSAGetLastError()))); */
65  return -1;
66  }
67 
68  memset( &serv_addr, 0, sizeof( serv_addr ) );
69  serv_addr.sin_family = AF_INET;
70  serv_addr.sin_port = htons(0);
71  serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
72  if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
73  {
74  /* ereport(LOG, (errmsg_internal("pgpipe failed to bind: %ui", WSAGetLastError()))); */
75  closesocket(s);
76  return -1;
77  }
78  if (listen(s, 1) == SOCKET_ERROR)
79  {
80  /* ereport(LOG, (errmsg_internal("pgpipe failed to listen: %ui", WSAGetLastError()))); */
81  closesocket(s);
82  return -1;
83  }
84  if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
85  {
86  /* ereport(LOG, (errmsg_internal("pgpipe failed to getsockname: %ui", WSAGetLastError()))); */
87  closesocket(s);
88  return -1;
89  }
90  if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
91  {
92  /* ereport(LOG, (errmsg_internal("pgpipe failed to create socket 2: %ui", WSAGetLastError()))); */
93  closesocket(s);
94  return -1;
95  }
96 
97  if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
98  {
99  /* ereport(LOG, (errmsg_internal("pgpipe failed to connect socket: %ui", WSAGetLastError()))); */
100  closesocket(s);
101  return -1;
102  }
103  if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
104  {
105  /* ereport(LOG, (errmsg_internal("pgpipe failed to accept socket: %ui", WSAGetLastError()))); */
106  closesocket(handles[1]);
107  handles[1] = INVALID_SOCKET;
108  closesocket(s);
109  return -1;
110  }
111  closesocket(s);
112  return 0;
113  }
114 
115  static int piperead( int s, char *buf, int len )
116  {
117  int ret = recv(s, buf, len, 0);
118 
119  if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
120  /* EOF on the pipe! (win32 socket based implementation) */
121  ret = 0;
122  return ret;
123  }
124 
125 #define __compat_pipe(a) win32_pipe(a)
126 #define pipewrite(a,b,c) send(a,b,c,0)
127 
128 #else
129 #define __compat_pipe(a) ::pipe(a)
130 #define piperead(a,b,c) ::read(a,b,c)
131 #define pipewrite(a,b,c) ::write(a,b,c)
132 #endif
133 
134 #ifdef HAVE_FEATURES_H
135 #define __compat_select ::select
136 #else
137  int __compat_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
138  {
139  if (timeout == NULL)
140  {
141  return ::select(nfds, readfds, writefds, exceptfds, NULL);
142  }
143 
144  TimeMeasurement tm;
145 
146  struct timeval to_copy;
147  ::memcpy(&to_copy, timeout, sizeof to_copy);
148 
149  tm.start();
150  int ret = ::select(nfds, readfds, writefds, exceptfds, &to_copy);
151 
152  // on timeout set the timeout value to zero
153  if (ret == 0)
154  {
155  timeout->tv_sec = 0;
156  timeout->tv_usec = 0;
157  }
158  else
159  {
160  tm.stop();
161 
162  struct timespec time_spend;
163  tm.getTime(time_spend);
164 
165  do {
166  timeout->tv_sec = timeout->tv_sec - time_spend.tv_sec;
167  timeout->tv_usec = timeout->tv_usec - (time_spend.tv_nsec / 1000);
168  if (timeout->tv_usec < 0) {
169  --timeout->tv_sec;
170  timeout->tv_usec += 1000000L;
171  }
172  } while (0);
173 
174  // adjust timeout value if that falls below zero
175  if (timeout->tv_sec < 0)
176  {
177  timeout->tv_sec = 0;
178  timeout->tv_usec = 0;
179  }
180  }
181 
182  return ret;
183  }
184 #endif
185 
186  vsocket::pipesocket::pipesocket()
187  : _output_fd(-1)
188  { }
189 
190  vsocket::pipesocket::~pipesocket()
191  {
192  }
193 
194  int vsocket::pipesocket::getOutput() const throw (socket_exception)
195  {
196  if (_state == SOCKET_DOWN) throw socket_exception("output fd not available");
197  return _output_fd;
198  }
199 
200  void vsocket::pipesocket::up() throw (socket_exception)
201  {
202  if (_state != SOCKET_DOWN)
203  throw socket_exception("socket is already up");
204 
205  int pipe_fds[2];
206 
207  // create a pipe for interruption
208  if (__compat_pipe(pipe_fds) < 0)
209  {
210  IBRCOMMON_LOGGER_TAG("pipesocket", error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL;
211  throw socket_exception("failed to create pipe");
212  }
213 
214  _fd = pipe_fds[0];
215  _output_fd = pipe_fds[1];
216 
217  this->set_blocking_mode(false);
218  this->set_blocking_mode(false, _output_fd);
219 
220  _state = SOCKET_UP;
221  }
222 
223  void vsocket::pipesocket::down() throw (socket_exception)
224  {
225  if (_state != SOCKET_UP)
226  throw socket_exception("socket is not up");
227 
228  this->close();
229  ::close(_output_fd);
230 
231  _state = SOCKET_DOWN;
232  }
233 
234  void vsocket::pipesocket::read(char *buf, size_t len) throw (socket_exception)
235  {
236  ssize_t ret = piperead(this->fd(), buf, len);
237  if (ret == -1)
238  throw socket_exception("read error");
239  if (ret == 0)
240  throw socket_exception("end of file");
241  }
242 
243  void vsocket::pipesocket::write(const char *buf, size_t len) throw (socket_exception)
244  {
245  ssize_t ret = pipewrite(_output_fd, buf, len);
246  if (ret == -1)
247  throw socket_exception("write error");
248  }
249 
250  vsocket::SocketState::SocketState(STATE initial)
251  : _state(initial)
252  {
253  }
254 
255  vsocket::SocketState::~SocketState()
256  {
257  }
258 
259  vsocket::SocketState::STATE vsocket::SocketState::get() const
260  {
261  return _state;
262  }
263 
264  void vsocket::SocketState::set(STATE s) throw (state_exception)
265  {
266  try {
267  // try to get into state "s" and wait until this is possible
268  switch (s) {
269  case NONE:
270  throw state_exception("illegal state requested");
271  case SAFE_DOWN:
272  // throw exception if not (DOWN or PENDING DOWN)
273  if ((_state != DOWN) && (_state != PENDING_UP))
274  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of DOWN");
275  __change(s);
276  break;
277 
278  case DOWN:
279  // throw exception if not (PENDING_DOWN or SAFE_DOWN)
280  if ((_state != PENDING_DOWN) && (_state != SAFE_DOWN) && (_state != PENDING_UP))
281  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of PENDING_DOWN, PENDING_UP or SAFE_DOWN");
282  __change(s);
283  break;
284 
285  case PENDING_UP:
286  // throw exception if not (DOWN)
287  if (_state != DOWN)
288  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of DOWN");
289  __change(s);
290  break;
291 
292  case PENDING_DOWN:
293  // throw exception if not (IDLE, DOWN_REQUEST)
294  if ((_state != IDLE) && (_state != DOWN_REQUEST))
295  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of IDLE or DOWN_REQUEST");
296  __change(s);
297  break;
298 
299  case IDLE:
300  // throw exception if not (PENDING_UP, SAFE, SELECT)
301  if ((_state != PENDING_UP) && (_state != SAFE) && (_state != SELECT))
302  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of PENDING_UP, SAFE or SELECT");
303  __change(s);
304  break;
305 
306  case SELECT:
307  // throw exception if not (IDLE, SELECT)
308  if ((_state != IDLE) && (_state != SELECT))
309  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of IDLE or SELECT");
310  __change(s);
311  break;
312 
313  case DOWN_REQUEST:
314  case SAFE_REQUEST:
315  // throw exception if not (SELECT, DOWN_REQUEST)
316  if (_state != SELECT)
317  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of SELECT or DOWN_REQUEST");
318  __change(s);
319  break;
320 
321  case SAFE:
322  // throw exception if not (SAFE_REQUEST, IDLE)
323  if ((_state != SAFE_REQUEST) && (_state != IDLE))
324  throw state_exception("can not change to " + __getname(s) + ", state is " + __getname(_state) + " instead of SAFE_REQUEST or IDLE");
325  __change(s);
326  break;
327  }
328  } catch (const Conditional::ConditionalAbortException &e) {
329  throw state_exception(e.what());
330  }
331  }
332 
333  void vsocket::SocketState::setwait(STATE s, STATE abortstate) throw (state_exception)
334  {
335  try {
336  // try to get into state "s" and wait until this is possible
337  switch (s) {
338  case NONE:
339  throw state_exception("illegal state requested");
340  case SAFE_DOWN:
341  // throw exception if not (DOWN or PENDING_UP)
342  while ((_state != DOWN) && (_state != PENDING_UP)) {
343  if (_state == abortstate)
344  throw state_exception("abort state " + __getname(abortstate) + " reached");
346  }
347  __change(s);
348  break;
349 
350  case DOWN:
351  // throw exception if not (PENDING_DOWN or SAFE_DOWN)
352  while ((_state != PENDING_DOWN) && (_state != SAFE_DOWN) && (_state != DOWN)) {
353  if (_state == abortstate)
354  throw state_exception("abort state " + __getname(abortstate) + " reached");
356  }
357  __change(s);
358  break;
359 
360  case PENDING_UP:
361  // throw exception if not (DOWN)
362  while (_state != DOWN) {
363  if (_state == abortstate)
364  throw state_exception("abort state " + __getname(abortstate) + " reached");
366  }
367  __change(s);
368  break;
369 
370  case PENDING_DOWN:
371  // throw exception if not (IDLE, DOWN_REQUEST)
372  while ((_state != IDLE) && (_state != DOWN_REQUEST)) {
373  if (_state == abortstate)
374  throw state_exception("abort state " + __getname(abortstate) + " reached");
376  }
377  __change(s);
378  break;
379 
380  case IDLE:
381  // throw exception if not (PENDING_UP, SAFE, SELECT)
382  while ((_state != PENDING_UP) && (_state != SAFE) && (_state != SELECT)) {
383  if (_state == abortstate)
384  throw state_exception("abort state " + __getname(abortstate) + " reached");
386  }
387  __change(s);
388  break;
389 
390  case SELECT:
391  // throw exception if not (IDLE, SELECT)
392  while ((_state != IDLE) && (_state != SELECT)) {
393  if (_state == abortstate)
394  throw state_exception("abort state " + __getname(abortstate) + " reached");
396  }
397  __change(s);
398  break;
399 
400  case DOWN_REQUEST:
401  case SAFE_REQUEST:
402  // throw exception if not (SELECT)
403  while (_state != SELECT) {
404  if (_state == abortstate)
405  throw state_exception("abort state " + __getname(abortstate) + " reached");
407  }
408  __change(s);
409  break;
410 
411  case SAFE:
412  // throw exception if not (SAFE_REQUEST, IDLE)
413  while ((_state != SAFE_REQUEST) && (_state != IDLE)) {
414  if (_state == abortstate)
415  throw state_exception("abort state " + __getname(abortstate) + " reached");
417  }
418  __change(s);
419  break;
420  }
421  } catch (const Conditional::ConditionalAbortException &e) {
422  throw state_exception(e.what());
423  }
424  }
425 
426  void vsocket::SocketState::wait(STATE s, STATE abortstate) throw (state_exception)
427  {
428  try {
429  while (_state != s) {
430  if (_state == abortstate)
431  throw state_exception("abort state " + __getname(abortstate) + " reached");
433  }
434  } catch (const Conditional::ConditionalAbortException &e) {
435  throw state_exception(e.what());
436  }
437  }
438 
439  void vsocket::SocketState::__change(STATE s)
440  {
441  IBRCOMMON_LOGGER_DEBUG_TAG("SocketState", 90) << "SocketState transition: " << __getname(_state) << " -> " << __getname(s) << IBRCOMMON_LOGGER_ENDL;
442  _state = s;
444  }
445 
446  std::string vsocket::SocketState::__getname(STATE s) const
447  {
448  switch (s) {
449  case NONE:
450  return "NONE";
451  case SAFE_DOWN:
452  return "SAFE_DOWN";
453  case DOWN:
454  return "DOWN";
455  case PENDING_UP:
456  return "PENDING_UP";
457  case PENDING_DOWN:
458  return "PENDING_DOWN";
459  case IDLE:
460  return "IDLE";
461  case SELECT:
462  return "SELECT";
463  case SAFE_REQUEST:
464  return "SAFE_REQUEST";
465  case DOWN_REQUEST:
466  return "DOWN_REQUEST";
467  case SAFE:
468  return "SAFE";
469  }
470 
471  return "<unkown>";
472  }
473 
474  vsocket::SafeLock::SafeLock(SocketState &state, vsocket &sock)
475  : _state(state)
476  {
477  // request safe-state
478  ibrcommon::MutexLock l(_state);
479 
480  while ( (_state.get() != SocketState::DOWN) && (_state.get() != SocketState::IDLE) && (_state.get() != SocketState::SELECT) ) {
481  ((ibrcommon::Conditional&)_state).wait();
482  }
483 
484  if (_state.get() == SocketState::SELECT) {
485  _state.set(SocketState::SAFE_REQUEST);
486  // send interrupt
487  sock.interrupt();
488  _state.wait(SocketState::SAFE);
489  } else if (_state.get() == SocketState::DOWN) {
490  _state.set(SocketState::SAFE_DOWN);
491  } else {
492  _state.set(SocketState::SAFE);
493  }
494  }
495 
496  vsocket::SafeLock::~SafeLock()
497  {
498  // release safe-state
499  ibrcommon::MutexLock l(_state);
500  if (_state.get() == SocketState::SAFE)
501  {
502  _state.set(SocketState::IDLE);
503  }
504  else if (_state.get() == SocketState::SAFE_DOWN)
505  {
506  _state.set(SocketState::DOWN);
507  }
508  else
509  {
510  throw SocketState::state_exception("socket not in safe state");
511  }
512  }
513 
514  vsocket::SelectGuard::SelectGuard(SocketState &state, int &counter, ibrcommon::vsocket &sock)
515  : _state(state), _counter(counter), _sock(sock)
516  {
517  // set the current state to SELECT
518  try {
519  ibrcommon::MutexLock l(_state);
520  _state.setwait(SocketState::SELECT, SocketState::DOWN);
521  _counter++;
522  IBRCOMMON_LOGGER_DEBUG_TAG("SelectGuard", 90) << "SelectGuard counter set to " << _counter << IBRCOMMON_LOGGER_ENDL;
523  } catch (const SocketState::state_exception&) {
524  throw vsocket_interrupt("select interrupted while waiting for IDLE socket");
525  }
526  }
527 
528  vsocket::SelectGuard::~SelectGuard()
529  {
530  // set the current state to SELECT
531  try {
532  ibrcommon::MutexLock l(_state);
533  _counter--;
534  IBRCOMMON_LOGGER_DEBUG_TAG("SelectGuard", 90) << "SelectGuard counter set to " << _counter << IBRCOMMON_LOGGER_ENDL;
535 
536  if (_counter == 0) {
537  if (_state.get() == SocketState::SAFE_REQUEST) {
538  _state.set(SocketState::SAFE);
539  } else if (_state.get() == SocketState::DOWN_REQUEST) {
540  _state.set(SocketState::PENDING_DOWN);
541  } else {
542  _state.set(SocketState::IDLE);
543  }
544  }
545  else
546  {
547  // call interrupt once more if necessary
548  if (_state.get() == SocketState::SAFE_REQUEST) {
549  _sock.interrupt();
550  } else if (_state.get() == SocketState::DOWN_REQUEST) {
551  _sock.interrupt();
552  }
553  }
555  throw vsocket_interrupt("select interrupted while checking SAFE_REQUEST state");
556  }
557  }
558 
559  vsocket::vsocket()
560  : _state(SocketState::DOWN), _select_count(0)
561  {
562  _pipe.up();
563  }
564 
566  {
567  try {
568  _pipe.down();
569  } catch (const socket_exception &ex) {
570  }
571  }
572 
573  void vsocket::add(basesocket *socket)
574  {
575  SafeLock l(_state, *this);
576  _sockets.insert(socket);
577  }
578 
579  void vsocket::add(basesocket *socket, const vinterface &iface)
580  {
581  SafeLock l(_state, *this);
582  _sockets.insert(socket);
583  _socket_map[iface].insert(socket);
584  }
585 
587  {
588  SafeLock l(_state, *this);
589  _sockets.erase(socket);
590 
591  // search for the same socket in the map
592  for (std::map<vinterface, socketset>::iterator iter = _socket_map.begin(); iter != _socket_map.end(); ++iter)
593  {
594  socketset &set = (*iter).second;
595  set.erase(socket);
596  }
597  }
598 
599  size_t vsocket::size() const
600  {
601  return _sockets.size();
602  }
603 
605  {
606  SafeLock l(_state, *this);
607  _sockets.clear();
608  _socket_map.clear();
609  }
610 
612  {
613  down();
614 
615  for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter)
616  {
617  basesocket *sock = (*iter);
618  delete sock;
619  }
620  _sockets.clear();
621  _socket_map.clear();
622  }
623 
625  {
626  return _sockets;
627  }
628 
629  socketset vsocket::get(const vinterface &iface) const
630  {
631  std::map<vinterface, socketset>::const_iterator iter = _socket_map.find(iface);
632 
633  if (iter == _socket_map.end()) {
634  socketset empty;
635  return empty;
636  }
637 
638  return (*iter).second;
639  }
640 
641  void vsocket::up() throw (socket_exception)
642  {
643  {
644  ibrcommon::MutexLock l(_state);
645  // wait until we can get into PENDING_UP state
646  _state.setwait(SocketState::PENDING_UP, SocketState::IDLE);
647  }
648 
649  for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter) {
650  try {
651  if (!(*iter)->ready()) (*iter)->up();
652  } catch (const socket_exception&) {
653  // rewind all previously up'ped sockets
654  for (socketset::iterator riter = _sockets.begin(); riter != iter; ++riter) {
655  (*riter)->down();
656  }
657 
658  ibrcommon::MutexLock l(_state);
659  _state.set(SocketState::DOWN);
660  throw;
661  }
662  }
663 
664  // set state to IDLE
665  ibrcommon::MutexLock l(_state);
666  _state.set(SocketState::IDLE);
667  }
668 
669  void vsocket::down() throw ()
670  {
671  try {
672  ibrcommon::MutexLock l(_state);
673  if (_state.get() == SocketState::DOWN) {
674  return;
675  }
676  else if (_state.get() == SocketState::PENDING_DOWN || _state.get() == SocketState::DOWN_REQUEST) {
677  // prevent double down
678  _state.wait(SocketState::DOWN);
679  return;
680  }
681  else if (_state.get() == SocketState::SELECT) {
682  _state.setwait(SocketState::DOWN_REQUEST, SocketState::DOWN);
683  interrupt();
684  } else {
685  // enter PENDING_DOWN state
686  _state.setwait(SocketState::PENDING_DOWN, SocketState::DOWN);
687  }
688  } catch (SocketState::state_exception&) {
689  return;
690  }
691 
692  {
693  // shut-down all the sockets
694  ibrcommon::MutexLock l(_socket_lock);
695  for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter) {
696  try {
697  if ((*iter)->ready()) (*iter)->down();
698  } catch (const socket_exception&) { }
699  }
700  }
701 
702  ibrcommon::MutexLock sl(_state);
703  _state.setwait(SocketState::DOWN);
704  }
705 
706  void vsocket::interrupt()
707  {
708  _pipe.write("i", 1);
709  }
710 
711  void vsocket::select(socketset *readset, socketset *writeset, socketset *errorset, struct timeval *tv) throw (socket_exception)
712  {
713  fd_set fds_read;
714  fd_set fds_write;
715  fd_set fds_error;
716 
717  int high_fd = 0;
718 
719  while (true)
720  {
721  SelectGuard guard(_state, _select_count, *this);
722 
723  FD_ZERO(&fds_read);
724  FD_ZERO(&fds_write);
725  FD_ZERO(&fds_error);
726 
727  // add the self-pipe-trick interrupt fd
728  FD_SET(_pipe.fd(), &fds_read);
729  high_fd = _pipe.fd();
730 
731  {
732  ibrcommon::MutexLock l(_socket_lock);
733  for (socketset::iterator iter = _sockets.begin();
734  iter != _sockets.end(); ++iter)
735  {
736  basesocket &sock = (**iter);
737  if (!sock.ready()) continue;
738 
739  if (readset != NULL) {
740  FD_SET(sock.fd(), &fds_read);
741  if (high_fd < sock.fd()) high_fd = sock.fd();
742  }
743 
744  if (writeset != NULL) {
745  FD_SET(sock.fd(), &fds_write);
746  if (high_fd < sock.fd()) high_fd = sock.fd();
747  }
748 
749  if (errorset != NULL) {
750  FD_SET(sock.fd(), &fds_error);
751  if (high_fd < sock.fd()) high_fd = sock.fd();
752  }
753  }
754  }
755 
756  // call the linux-like select with given timeout
757  int res = __compat_select(high_fd + 1, &fds_read, &fds_write, &fds_error, tv);
758 
759 #ifdef __WIN32__
760  int errcode = WSAGetLastError();
761 #else
762  int errcode = errno;
763 #endif
764 
765  if (res < 0) {
766  if (errcode == EINTR) {
767  // signal has been caught - handle it as interruption
768  continue;
769  }
770  else if (errcode == 0) {
771  throw vsocket_interrupt("select call has been interrupted");
772  }
773  else if (errcode == EBADF) {
774  throw socket_error(ERROR_CLOSED, "socket was closed");
775  }
776  throw socket_raw_error(errcode, "unknown select error");
777  }
778 
779 
780  if (res == 0)
781  throw vsocket_timeout("select timeout");
782 
783  if (FD_ISSET(_pipe.fd(), &fds_read))
784  {
785  IBRCOMMON_LOGGER_DEBUG_TAG("vsocket::select", 90) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL;
786 
787  // this was an interrupt with the self-pipe-trick
788  ibrcommon::MutexLock l(_socket_lock);
789  char buf[2];
790  _pipe.read(buf, 2);
791 
792  // start over with the select call
793  continue;
794  }
795 
796  ibrcommon::MutexLock l(_socket_lock);
797  for (socketset::iterator iter = _sockets.begin();
798  iter != _sockets.end(); ++iter)
799  {
800  basesocket *sock = (*iter);
801 
802  if (readset != NULL) {
803  if (FD_ISSET(sock->fd(), &fds_read))
804  {
805  readset->insert(sock);
806  }
807  }
808 
809  if (writeset != NULL) {
810  if (FD_ISSET(sock->fd(), &fds_write))
811  {
812  writeset->insert(sock);
813  }
814  }
815 
816  if (errorset != NULL) {
817  if (FD_ISSET(sock->fd(), &fds_error))
818  {
819  errorset->insert(sock);
820  }
821  }
822  }
823 
824  break;
825  }
826  }
827 }