IBR-DTNSuite  0.10
vsocket.cpp
Go to the documentation of this file.
1 /*
2  * vsocket.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "ibrcommon/config.h"
23 #include "ibrcommon/net/vsocket.h"
25 #include "ibrcommon/Logger.h"
26 
27 #ifndef HAVE_FEATURES_H
29 #endif
30 
31 #include <algorithm>
32 #include <netdb.h>
33 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <netinet/tcp.h>
36 #include <sys/un.h>
37 #include <errno.h>
38 #include <sstream>
39 #include <string.h>
40 #include <fcntl.h>
41 #include <signal.h>
42 #include <arpa/inet.h>
43 #include <unistd.h>
44 
45 namespace ibrcommon
46 {
47 #ifdef HAVE_FEATURES_H
48 #define __compat_select ::select
49 #else
50  int __compat_select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
51  {
52  if (timeout == NULL)
53  {
54  return ::select(nfds, readfds, writefds, exceptfds, NULL);
55  }
56 
57  TimeMeasurement tm;
58 
59  struct timeval to_copy;
60  ::memcpy(&to_copy, timeout, sizeof to_copy);
61 
62  tm.start();
63  int ret = ::select(nfds, readfds, writefds, exceptfds, &to_copy);
64  tm.stop();
65 
66  size_t us = tm.getMicroseconds();
67 
68  while ((us > 1000000) && (timeout->tv_sec > 0))
69  {
70  us -= 1000000;
71  timeout->tv_sec--;
72  }
73 
74  if (us >= static_cast<size_t>(timeout->tv_usec))
75  {
76  timeout->tv_usec = 0;
77  }
78  else
79  {
80  timeout->tv_usec -= us;
81  }
82 
83  return ret;
84  }
85 #endif
86 
87  vsocket::pipesocket::pipesocket()
88  : _output_fd(-1)
89  { }
90 
91  vsocket::pipesocket::~pipesocket()
92  {
93  }
94 
95  int vsocket::pipesocket::getOutput() const throw (socket_exception)
96  {
97  if (_state == SOCKET_DOWN) throw socket_exception("output fd not available");
98  return _output_fd;
99  }
100 
101  void vsocket::pipesocket::up() throw (socket_exception)
102  {
103  if (_state != SOCKET_DOWN)
104  throw socket_exception("socket is already up");
105 
106  int pipe_fds[2];
107 
108  // create a pipe for interruption
109  if (::pipe(pipe_fds) < 0)
110  {
111  IBRCOMMON_LOGGER_TAG("pipesocket", error) << "Error " << errno << " creating pipe" << IBRCOMMON_LOGGER_ENDL;
112  throw socket_exception("failed to create pipe");
113  }
114 
115  _fd = pipe_fds[0];
116  _output_fd = pipe_fds[1];
117 
118  this->set_blocking_mode(false);
119  this->set_blocking_mode(false, _output_fd);
120 
121  _state = SOCKET_UP;
122  }
123 
124  void vsocket::pipesocket::down() throw (socket_exception)
125  {
126  if (_state != SOCKET_UP)
127  throw socket_exception("socket is not up");
128 
129  this->close();
130  ::close(_output_fd);
131  _state = SOCKET_DOWN;
132  }
133 
134  void vsocket::pipesocket::read(char *buf, size_t len) throw (socket_exception)
135  {
136  ssize_t ret = ::read(this->fd(), buf, len);
137  if (ret == -1)
138  throw socket_exception("read error");
139  if (ret == 0)
140  throw socket_exception("end of file");
141  }
142 
143  void vsocket::pipesocket::write(const char *buf, size_t len) throw (socket_exception)
144  {
145  ssize_t ret = ::write(_output_fd, buf, len);
146  if (ret == -1)
147  throw socket_exception("write error");
148  }
149 
150  vsocket::SocketState::SocketState(STATE initial)
151  : _state(initial)
152  {
153  }
154 
155  vsocket::SocketState::~SocketState()
156  {
157  }
158 
159  vsocket::SocketState::STATE vsocket::SocketState::get() const
160  {
161  return _state;
162  }
163 
164  void vsocket::SocketState::set(STATE s) throw (state_exception)
165  {
166  try {
167  // try to get into state "s" and wait until this is possible
168  switch (s) {
169  case NONE:
170  throw state_exception("illegal state requested");
171  case SAFE_DOWN:
172  // throw exception if not (DOWN or PENDING DOWN)
173  if ((_state != DOWN) && (_state != PENDING_UP))
174  throw state_exception("state is not DOWN");
175  __change(s);
176  break;
177 
178  case DOWN:
179  // throw exception if not (PENDING_DOWN or SAFE_DOWN)
180  if ((_state != PENDING_DOWN) && (_state != SAFE_DOWN) && (_state != PENDING_UP))
181  throw state_exception("state is not PENDING_DOWN, PENDING_UP or SAFE_DOWN");
182  __change(s);
183  break;
184 
185  case PENDING_UP:
186  // throw exception if not (DOWN)
187  if (_state != DOWN)
188  throw state_exception("state is not DOWN");
189  __change(s);
190  break;
191 
192  case PENDING_DOWN:
193  // throw exception if not (IDLE, DOWN_REQUEST)
194  if ((_state != IDLE) && (_state != DOWN_REQUEST))
195  throw state_exception("state is not IDLE or DOWN_REQUEST");
196  __change(s);
197  break;
198 
199  case IDLE:
200  // throw exception if not (PENDING_UP, SAFE, SELECT)
201  if ((_state != PENDING_UP) && (_state != SAFE) && (_state != SELECT))
202  throw state_exception("state is not PENDING_UP, SAFE or SELECT");
203  __change(s);
204  break;
205 
206  case SELECT:
207  // throw exception if not (IDLE, SELECT)
208  if ((_state != IDLE) && (_state != SELECT))
209  throw state_exception("state is not IDLE or SELECT");
210  __change(s);
211  break;
212 
213  case DOWN_REQUEST:
214  case SAFE_REQUEST:
215  // throw exception if not (SELECT, DOWN_REQUEST)
216  if (_state != SELECT)
217  throw state_exception("state is not SELECT or DOWN_REQUEST");
218  __change(s);
219  break;
220 
221  case SAFE:
222  // throw exception if not (SAFE_REQUEST, IDLE)
223  if ((_state != SAFE_REQUEST) && (_state != IDLE))
224  throw state_exception("state is not SAFE_REQUEST or IDLE");
225  __change(s);
226  break;
227  }
228  } catch (const Conditional::ConditionalAbortException &e) {
229  throw state_exception(e.what());
230  }
231  }
232 
233  void vsocket::SocketState::setwait(STATE s, STATE abortstate) throw (state_exception)
234  {
235  try {
236  // try to get into state "s" and wait until this is possible
237  switch (s) {
238  case NONE:
239  throw state_exception("illegal state requested");
240  case SAFE_DOWN:
241  // throw exception if not (DOWN or PENDING_UP)
242  while ((_state != DOWN) && (_state != PENDING_UP)) {
243  if (_state == abortstate)
244  throw state_exception("abort state " + __getname(abortstate) + " reached");
246  }
247  __change(s);
248  break;
249 
250  case DOWN:
251  // throw exception if not (PENDING_DOWN or SAFE_DOWN)
252  while ((_state != PENDING_DOWN) && (_state != SAFE_DOWN)) {
253  if (_state == abortstate)
254  throw state_exception("abort state " + __getname(abortstate) + " reached");
256  }
257  __change(s);
258  break;
259 
260  case PENDING_UP:
261  // throw exception if not (DOWN)
262  while (_state != DOWN) {
263  if (_state == abortstate)
264  throw state_exception("abort state " + __getname(abortstate) + " reached");
266  }
267  __change(s);
268  break;
269 
270  case PENDING_DOWN:
271  // throw exception if not (IDLE, DOWN_REQUEST)
272  while ((_state != IDLE) && (_state != DOWN_REQUEST)) {
273  if (_state == abortstate)
274  throw state_exception("abort state " + __getname(abortstate) + " reached");
276  }
277  __change(s);
278  break;
279 
280  case IDLE:
281  // throw exception if not (PENDING_UP, SAFE, SELECT)
282  while ((_state != PENDING_UP) && (_state != SAFE) && (_state != SELECT)) {
283  if (_state == abortstate)
284  throw state_exception("abort state " + __getname(abortstate) + " reached");
286  }
287  __change(s);
288  break;
289 
290  case SELECT:
291  // throw exception if not (IDLE, SELECT)
292  while ((_state != IDLE) && (_state != SELECT)) {
293  if (_state == abortstate)
294  throw state_exception("abort state " + __getname(abortstate) + " reached");
296  }
297  __change(s);
298  break;
299 
300  case DOWN_REQUEST:
301  case SAFE_REQUEST:
302  // throw exception if not (SELECT)
303  while (_state != SELECT) {
304  if (_state == abortstate)
305  throw state_exception("abort state " + __getname(abortstate) + " reached");
307  }
308  __change(s);
309  break;
310 
311  case SAFE:
312  // throw exception if not (SAFE_REQUEST, IDLE)
313  while ((_state != SAFE_REQUEST) && (_state != IDLE)) {
314  if (_state == abortstate)
315  throw state_exception("abort state " + __getname(abortstate) + " reached");
317  }
318  __change(s);
319  break;
320  }
321  } catch (const Conditional::ConditionalAbortException &e) {
322  throw state_exception(e.what());
323  }
324  }
325 
326  void vsocket::SocketState::wait(STATE s, STATE abortstate) throw (state_exception)
327  {
328  try {
329  while (_state != s) {
330  if (_state == abortstate)
331  throw state_exception("abort state " + __getname(abortstate) + " reached");
333  }
334  } catch (const Conditional::ConditionalAbortException &e) {
335  throw state_exception(e.what());
336  }
337  }
338 
339  void vsocket::SocketState::__change(STATE s)
340  {
341  IBRCOMMON_LOGGER_DEBUG_TAG("SocketState", 90) << "SocketState transition: " << __getname(_state) << " -> " << __getname(s) << IBRCOMMON_LOGGER_ENDL;
342  _state = s;
344  }
345 
346  std::string vsocket::SocketState::__getname(STATE s) const
347  {
348  switch (s) {
349  case NONE:
350  return "NONE";
351  case SAFE_DOWN:
352  return "SAFE_DOWN";
353  case DOWN:
354  return "DOWN";
355  case PENDING_UP:
356  return "PENDING_UP";
357  case PENDING_DOWN:
358  return "PENDING_DOWN";
359  case IDLE:
360  return "IDLE";
361  case SELECT:
362  return "SELECT";
363  case SAFE_REQUEST:
364  return "SAFE_REQUEST";
365  case DOWN_REQUEST:
366  return "DOWN_REQUEST";
367  case SAFE:
368  return "SAFE";
369  }
370 
371  return "<unkown>";
372  }
373 
374  vsocket::SafeLock::SafeLock(SocketState &state, vsocket &sock)
375  : _state(state)
376  {
377  // request safe-state
378  ibrcommon::MutexLock l(_state);
379 
380  while ( (_state.get() != SocketState::DOWN) && (_state.get() != SocketState::IDLE) && (_state.get() != SocketState::SELECT) ) {
381  ((ibrcommon::Conditional&)_state).wait();
382  }
383 
384  if (_state.get() == SocketState::SELECT) {
385  _state.set(SocketState::SAFE_REQUEST);
386  // send interrupt
387  sock.interrupt();
388  _state.wait(SocketState::SAFE);
389  } else if (_state.get() == SocketState::DOWN) {
390  _state.set(SocketState::SAFE_DOWN);
391  } else {
392  _state.set(SocketState::SAFE);
393  }
394  }
395 
396  vsocket::SafeLock::~SafeLock()
397  {
398  // release safe-state
399  ibrcommon::MutexLock l(_state);
400  if (_state.get() == SocketState::SAFE)
401  {
402  _state.set(SocketState::IDLE);
403  }
404  else if (_state.get() == SocketState::SAFE_DOWN)
405  {
406  _state.set(SocketState::DOWN);
407  }
408  else
409  {
410  throw SocketState::state_exception("socket not in safe state");
411  }
412  }
413 
414  vsocket::SelectGuard::SelectGuard(SocketState &state, int &counter)
415  : _state(state), _counter(counter)
416  {
417  // set the current state to SELECT
418  try {
419  ibrcommon::MutexLock l(_state);
420  _state.setwait(SocketState::SELECT, SocketState::DOWN);
421  _counter++;
422  IBRCOMMON_LOGGER_DEBUG_TAG("SelectGuard", 90) << "SelectGuard counter set to " << _counter << IBRCOMMON_LOGGER_ENDL;
423  } catch (const SocketState::state_exception&) {
424  throw vsocket_interrupt("select interrupted while waiting for IDLE socket");
425  }
426  }
427 
428  vsocket::SelectGuard::~SelectGuard()
429  {
430  // set the current state to SELECT
431  try {
432  ibrcommon::MutexLock l(_state);
433  _counter--;
434  IBRCOMMON_LOGGER_DEBUG_TAG("SelectGuard", 90) << "SelectGuard counter set to " << _counter << IBRCOMMON_LOGGER_ENDL;
435 
436  if (_counter == 0) {
437  if (_state.get() == SocketState::SAFE_REQUEST) {
438  _state.set(SocketState::SAFE);
439  } else if (_state.get() == SocketState::DOWN_REQUEST) {
440  _state.set(SocketState::PENDING_DOWN);
441  } else {
442  _state.set(SocketState::IDLE);
443  }
444  }
446  throw vsocket_interrupt("select interrupted while checking SAFE_REQUEST state");
447  }
448  }
449 
450  vsocket::vsocket()
451  : _state(SocketState::DOWN), _select_count(0)
452  {
453  _pipe.up();
454  }
455 
457  {
458  _pipe.down();
459  }
460 
461  void vsocket::add(basesocket *socket)
462  {
463  SafeLock l(_state, *this);
464  _sockets.insert(socket);
465  }
466 
467  void vsocket::add(basesocket *socket, const vinterface &iface)
468  {
469  SafeLock l(_state, *this);
470  _sockets.insert(socket);
471  _socket_map[iface].insert(socket);
472  }
473 
475  {
476  SafeLock l(_state, *this);
477  _sockets.erase(socket);
478 
479  // search for the same socket in the map
480  for (std::map<vinterface, socketset>::iterator iter = _socket_map.begin(); iter != _socket_map.end(); ++iter)
481  {
482  socketset &set = (*iter).second;
483  set.erase(socket);
484  }
485  }
486 
487  size_t vsocket::size() const
488  {
489  return _sockets.size();
490  }
491 
493  {
494  SafeLock l(_state, *this);
495  _sockets.clear();
496  _socket_map.clear();
497  }
498 
500  {
501  down();
502 
503  for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter)
504  {
505  basesocket *sock = (*iter);
506  delete sock;
507  }
508  _sockets.clear();
509  _socket_map.clear();
510  }
511 
513  {
514  return _sockets;
515  }
516 
517  socketset vsocket::get(const vinterface &iface) const
518  {
519  std::map<vinterface, socketset>::const_iterator iter = _socket_map.find(iface);
520 
521  if (iter == _socket_map.end()) {
522  socketset empty;
523  return empty;
524  }
525 
526  return (*iter).second;
527  }
528 
529  void vsocket::up() throw (socket_exception)
530  {
531  {
532  ibrcommon::MutexLock l(_state);
533  // wait until we can get into PENDING_UP state
534  _state.setwait(SocketState::PENDING_UP, SocketState::IDLE);
535  }
536 
537  for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter) {
538  try {
539  if (!(*iter)->ready()) (*iter)->up();
540  } catch (const socket_exception&) {
541  // rewind all previously up'ped sockets
542  for (socketset::iterator riter = _sockets.begin(); riter != iter; ++riter) {
543  (*riter)->down();
544  }
545 
546  ibrcommon::MutexLock l(_state);
547  _state.set(SocketState::DOWN);
548  throw;
549  }
550  }
551 
552  // set state to IDLE
553  ibrcommon::MutexLock l(_state);
554  _state.set(SocketState::IDLE);
555  }
556 
557  void vsocket::down() throw ()
558  {
559  try {
560  ibrcommon::MutexLock l(_state);
561  if (_state.get() == SocketState::SELECT) {
562  _state.setwait(SocketState::DOWN_REQUEST, SocketState::DOWN);
563  interrupt();
564  // wait for PENDING_DOWN state
565  _state.wait(SocketState::PENDING_DOWN);
566  } else {
567  // enter PENDING_DOWN state
568  _state.setwait(SocketState::PENDING_DOWN, SocketState::DOWN);
569  }
570  } catch (SocketState::state_exception&) {
571  return;
572  }
573 
574  // shut-down all the sockets
575  ibrcommon::MutexLock l(_socket_lock);
576  for (socketset::iterator iter = _sockets.begin(); iter != _sockets.end(); ++iter) {
577  try {
578  if ((*iter)->ready()) (*iter)->down();
579  } catch (const socket_exception&) { }
580  }
581 
582  ibrcommon::MutexLock sl(_state);
583  _state.set(SocketState::DOWN);
584  }
585 
586  void vsocket::interrupt()
587  {
588  _pipe.write("i", 1);
589  }
590 
591  void vsocket::select(socketset *readset, socketset *writeset, socketset *errorset, struct timeval *tv) throw (socket_exception)
592  {
593  fd_set fds_read;
594  fd_set fds_write;
595  fd_set fds_error;
596 
597  int high_fd = 0;
598 
599  while (true)
600  {
601  SelectGuard guard(_state, _select_count);
602 
603  FD_ZERO(&fds_read);
604  FD_ZERO(&fds_write);
605  FD_ZERO(&fds_error);
606 
607  // add the self-pipe-trick interrupt fd
608  FD_SET(_pipe.fd(), &fds_read);
609  high_fd = _pipe.fd();
610 
611  {
612  ibrcommon::MutexLock l(_socket_lock);
613  for (socketset::iterator iter = _sockets.begin();
614  iter != _sockets.end(); ++iter)
615  {
616  basesocket &sock = (**iter);
617  if (!sock.ready()) continue;
618 
619  if (readset != NULL) {
620  FD_SET(sock.fd(), &fds_read);
621  if (high_fd < sock.fd()) high_fd = sock.fd();
622  }
623 
624  if (writeset != NULL) {
625  FD_SET(sock.fd(), &fds_write);
626  if (high_fd < sock.fd()) high_fd = sock.fd();
627  }
628 
629  if (errorset != NULL) {
630  FD_SET(sock.fd(), &fds_error);
631  if (high_fd < sock.fd()) high_fd = sock.fd();
632  }
633  }
634  }
635 
636  // call the linux-like select with given timeout
637  int res = __compat_select(high_fd + 1, &fds_read, &fds_write, &fds_error, tv);
638 
639  if (res < 0) {
640  if (errno == EINTR) {
641  // signal has been caught - handle it as interruption
642  continue;
643  }
644  else if (errno == 0) {
645  throw vsocket_interrupt("select call has been interrupted");
646  }
647  throw socket_raw_error(errno, "unknown select error");
648  }
649 
650 
651  if (res == 0)
652  throw vsocket_timeout("select timeout");
653 
654  if (FD_ISSET(_pipe.fd(), &fds_read))
655  {
656  IBRCOMMON_LOGGER_DEBUG_TAG("vsocket::select", 90) << "unblocked by self-pipe-trick" << IBRCOMMON_LOGGER_ENDL;
657 
658  // this was an interrupt with the self-pipe-trick
659  ibrcommon::MutexLock l(_socket_lock);
660  char buf[2];
661  _pipe.read(buf, 2);
662 
663  // start over with the select call
664  continue;
665  }
666 
667  ibrcommon::MutexLock l(_socket_lock);
668  for (socketset::iterator iter = _sockets.begin();
669  iter != _sockets.end(); ++iter)
670  {
671  basesocket *sock = (*iter);
672 
673  if (readset != NULL) {
674  if (FD_ISSET(sock->fd(), &fds_read))
675  {
676  readset->insert(sock);
677  }
678  }
679 
680  if (writeset != NULL) {
681  if (FD_ISSET(sock->fd(), &fds_write))
682  {
683  writeset->insert(sock);
684  }
685  }
686 
687  if (errorset != NULL) {
688  if (FD_ISSET(sock->fd(), &fds_error))
689  {
690  errorset->insert(sock);
691  }
692  }
693  }
694 
695  break;
696  }
697  }
698 }