IBR-DTNSuite  0.12
ApiServer.cpp
Go to the documentation of this file.
1 /*
2  * ApiServer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/ApiServer.h"
25 #include "core/EventDispatcher.h"
26 #include "core/BundleCore.h"
27 #include "core/NodeEvent.h"
29 
30 #include <ibrcommon/net/vaddress.h>
31 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/net/vsocket.h>
34 
35 #include <typeinfo>
36 #include <algorithm>
37 #include <sstream>
38 #include <unistd.h>
39 #include <list>
40 
41 namespace dtn
42 {
43  namespace api
44  {
45  const std::string ApiServer::TAG = "ApiServer";
46 
48  : _shutdown(false), _garbage_collector(*this)
49  {
50  _sockets.add(new ibrcommon::fileserversocket(socketfile));
51  }
52 
54  : _shutdown(false), _garbage_collector(*this)
55  {
56  if (net.isLoopback()) {
57  if (ibrcommon::basesocket::hasSupport(AF_INET6)) {
59  _sockets.add(new ibrcommon::tcpserversocket(addr6, 5));
60  }
61 
63  _sockets.add(new ibrcommon::tcpserversocket(addr4, 5));
64  }
65  else if (net.isAny()) {
67  _sockets.add(new ibrcommon::tcpserversocket(addr, 5));
68  }
69  else {
70  // add a socket for each address on the interface
71  std::list<ibrcommon::vaddress> addrs = net.getAddresses();
72 
73  // convert the port into a string
74  std::stringstream ss; ss << port;
75 
76  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
77  ibrcommon::vaddress &addr = (*iter);
78 
79  try {
80  // handle the addresses according to their family
81  switch (addr.family()) {
82  case AF_INET:
83  case AF_INET6:
84  addr.setService(ss.str());
85  _sockets.add(new ibrcommon::tcpserversocket(addr, 5), net);
86  break;
87 
88  default:
89  break;
90  }
91  } catch (const ibrcommon::vaddress::address_exception &ex) {
92  IBRCOMMON_LOGGER_TAG(ApiServer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
93  }
94  }
95  }
96  }
97 
99  {
100  _garbage_collector.stop();
101  _garbage_collector.join();
102  join();
103 
104  _sockets.destroy();
105  }
106 
108  {
109  // shut-down all server sockets
110  _sockets.down();
111  }
112 
113  void ApiServer::componentUp() throw ()
114  {
115  // routine checked for throw() on 15.02.2013
116  try {
117  // bring up all server sockets
118  _sockets.up();
119  } catch (const ibrcommon::socket_exception &ex) {
120  IBRCOMMON_LOGGER_TAG(ApiServer::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
121  }
122 
124  startGarbageCollector();
125  }
126 
127  void ApiServer::componentRun() throw ()
128  {
129  try {
130  while (!_shutdown)
131  {
132  // create a socket set to do select on all sockets
134 
135  // do select on all socket and find all readable ones
136  _sockets.select(&fds, NULL, NULL, NULL);
137 
138  // iterate through all readable sockets
139  for (ibrcommon::socketset::iterator iter = fds.begin(); iter != fds.end(); ++iter)
140  {
141  // we assume all the sockets in _sockets are server sockets
142  // so cast this one to the right class
143  ibrcommon::serversocket &sock = dynamic_cast<ibrcommon::serversocket&>(**iter);
144 
145  // variable to store the peer address on the next accept call
146  ibrcommon::vaddress peeraddr;
147 
148  // accept the next client
149  ibrcommon::clientsocket *peersock = sock.accept(peeraddr);
150 
151  // set the no delay option for the new socket if configured
152  if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
153  {
154  // check if the socket is a tcpsocket
155  if ( dynamic_cast<ibrcommon::tcpsocket*>(peersock) != NULL )
156  {
157  peersock->set(ibrcommon::clientsocket::NO_DELAY, true);
158  }
159  }
160 
161  // create a new socket stream using the new client socket
162  // now the socket stream is responsible for the client socket
163  // and will destroy the instance on its own destruction
164  ibrcommon::socketstream *conn = new ibrcommon::socketstream(peersock);
165 
166  // if we are already in shutdown state
167  if (_shutdown)
168  {
169  // close the new socket
170  conn->close();
171 
172  // and free the object
173  delete conn;
174  return;
175  }
176 
177  // generate some output
178  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "new connected client at the extended API server" << IBRCOMMON_LOGGER_ENDL;
179 
180  // send welcome banner
181  (*conn) << "IBR-DTN " << dtn::daemon::Configuration::getInstance().version() << " API 1.0" << std::endl;
182 
183  // the new client object will be hold here
184  ClientHandler *obj = NULL;
185 
186  // in locked state we create a new registration for the new connection
187  {
188  ibrcommon::MutexLock l1(_registration_lock);
189 
190  // create a new registration
191  Registration *reg = new Registration();
192  _registrations.push_back(reg);
193  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "new registration " << reg->getHandle() << IBRCOMMON_LOGGER_ENDL;
194 
195  // create a new clienthandler for the new registration
196  obj = new ClientHandler(*this, *_registrations.back(), conn);
197  }
198 
199  // one again in locked state we push the new connection in the connection list
200  {
201  ibrcommon::MutexLock l2(_connection_lock);
202  _connections.push_back(obj);
203  }
204 
205  // start the client handler
206  obj->start();
207  }
208 
209  // breakpoint
211  }
212  } catch (const std::exception&) {
213  // ignore all errors
214  return;
215  }
216  }
217 
218  void ApiServer::componentDown() throw ()
219  {
221 
222  // put the server into shutdown mode
223  _shutdown = true;
224 
225  // close the listen API socket
226  _sockets.down();
227 
228  // pause the garbage collection
229  _garbage_collector.pause();
230 
231  // stop/close all connections in locked state
232  {
233  ibrcommon::MutexLock l(_connection_lock);
234 
235  // shutdown all clients
236  for (client_list::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
237  {
238  (*iter)->stop();
239  }
240  }
241 
242  // wait until all clients are down
243  while (_connections.size() > 0) ibrcommon::Thread::sleep(1000);
244  }
245 
246  Registration& ApiServer::getRegistration(const std::string &handle)
247  {
248  ibrcommon::MutexLock l(_registration_lock);
249  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
250  {
251  Registration &reg = (**iter);
252 
253  if (reg == handle)
254  {
255  if (reg.isPersistent()){
256  reg.attach();
257  return reg;
258  }
259  break;
260  }
261  }
262 
263  throw Registration::NotFoundException("Registration not found");
264  }
265 
267  {
268  {
269  ibrcommon::MutexLock l(_registration_lock);
270 
271  // remove non-persistent and detached registrations
272  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end();)
273  {
274  try
275  {
276  Registration *reg = (*iter);
277 
278  reg->attach();
279  if(!reg->isPersistent()){
280  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "release registration " << reg->getHandle() << IBRCOMMON_LOGGER_ENDL;
281  _registrations.erase(iter++);
282  delete reg;
283  }
284  else
285  {
286  reg->detach();
287  ++iter;
288  }
289  }
291  {
292  ++iter;
293  }
294  }
295  }
296 
297  return nextRegistrationExpiry();
298  }
299 
300  const std::string ApiServer::getName() const
301  {
302  return "ApiServer";
303  }
304 
306  {
307  // generate some output
308  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "api connection up" << IBRCOMMON_LOGGER_ENDL;
309  }
310 
312  {
313  // generate some output
314  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "api connection down" << IBRCOMMON_LOGGER_ENDL;
315 
316  ibrcommon::MutexLock l(_connection_lock);
317 
318  // remove this object out of the list
319  for (client_list::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
320  {
321  if (obj == (*iter))
322  {
323  _connections.erase(iter);
324  break;
325  }
326  }
327  }
328 
330  {
331  if(reg.isPersistent())
332  {
333  reg.detach();
334  startGarbageCollector();
335  }
336  else
337  {
338  ibrcommon::MutexLock l(_registration_lock);
339  // remove the registration
340  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
341  {
342  Registration *r = (*iter);
343 
344  if (reg == (*r))
345  {
346  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "release registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL;
347  _registrations.erase(iter);
348  delete r;
349  break;
350  }
351  }
352  }
353  }
354 
355  void ApiServer::raiseEvent(const dtn::core::Event *evt) throw ()
356  {
357  try {
358  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
359 
360  // ignore fragments - we can not deliver them directly to the client
361  if (queued.bundle.isFragment()) return;
362 
363  ibrcommon::MutexLock l(_connection_lock);
364  for (client_list::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
365  {
366  ClientHandler &conn = **iter;
367  if (conn.getRegistration().hasSubscribed(queued.bundle.destination))
368  {
370  }
371  }
372  } catch (const std::bad_cast&) { };
373  }
374 
375  void ApiServer::startGarbageCollector()
376  {
377  try
378  {
379  /* set the timeout for the GarbageCollector */
380  size_t timeout = nextRegistrationExpiry();
381  _garbage_collector.set(timeout);
382 
383  /* start it, if it is not running yet */
384  if(!_garbage_collector.isRunning())
385  _garbage_collector.start();
386  }
387  catch(const ibrcommon::Timer::StopTimerException &ex)
388  {
389  }
390  }
391 
392  size_t ApiServer::nextRegistrationExpiry()
393  {
394  ibrcommon::MutexLock l(_registration_lock);
395  bool persistentFound = false;
396  size_t new_timeout = 0;
397  size_t current_time = ibrcommon::Timer::get_current_time();
398 
399  // find the registration that expires next
400  for (registration_list::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
401  {
402  try
403  {
404  Registration &reg = (**iter);
405 
406  reg.attach();
407  if(!reg.isPersistent()){
408  /* found an expired registration, trigger the timer */
409  reg.detach();
410  new_timeout = 0;
411  persistentFound = true;
412  break;
413  }
414  else
415  {
416  size_t expire_time = reg.getExpireTime();
417  /* we dont have to check if the expire time is smaller then the current_time
418  since isPersistent() would have returned false */
419  size_t expire_timeout = expire_time - current_time;
420  reg.detach();
421 
422  /* if persistentFound is false, no persistent registration was found yet */
423  if(!persistentFound)
424  {
425  persistentFound = true;
426  new_timeout = expire_timeout;
427  }
428  else if(expire_timeout < new_timeout)
429  {
430  new_timeout = expire_timeout;
431  }
432  }
433  }
434  catch(const Registration::AlreadyAttachedException &ex)
435  {
436  }
437  }
438 
439  if(!persistentFound) throw ibrcommon::Timer::StopTimerException();
440 
441  return new_timeout;
442  }
443  }
444 }