IBR-DTNSuite  0.10
ApiServer.cpp
Go to the documentation of this file.
1 /*
2  * ApiServer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/ApiServer.h"
25 #include "core/EventDispatcher.h"
26 #include "core/BundleCore.h"
27 #include "core/NodeEvent.h"
29 
30 #include <ibrcommon/net/vaddress.h>
31 #include <ibrcommon/Logger.h>
32 #include <ibrcommon/net/vsocket.h>
34 
35 #include <typeinfo>
36 #include <algorithm>
37 #include <sstream>
38 #include <unistd.h>
39 #include <list>
40 
41 namespace dtn
42 {
43  namespace api
44  {
45  const std::string ApiServer::TAG = "ApiServer";
46 
48  : _shutdown(false), _garbage_collector(*this)
49  {
50  _sockets.add(new ibrcommon::fileserversocket(socketfile));
51  }
52 
54  : _shutdown(false), _garbage_collector(*this)
55  {
56  if (net.isLoopback()) {
57  if (ibrcommon::basesocket::hasSupport(AF_INET6)) {
59  _sockets.add(new ibrcommon::tcpserversocket(addr6, 5));
60  }
61 
63  _sockets.add(new ibrcommon::tcpserversocket(addr4, 5));
64  }
65  else if (net.isAny()) {
67  _sockets.add(new ibrcommon::tcpserversocket(addr, 5));
68  }
69  else {
70  // add a socket for each address on the interface
71  std::list<ibrcommon::vaddress> addrs = net.getAddresses();
72 
73  // convert the port into a string
74  std::stringstream ss; ss << port;
75 
76  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
77  ibrcommon::vaddress &addr = (*iter);
78 
79  try {
80  // handle the addresses according to their family
81  switch (addr.family()) {
82  case AF_INET:
83  case AF_INET6:
84  addr.setService(ss.str());
85  _sockets.add(new ibrcommon::tcpserversocket(addr, 5), net);
86  break;
87 
88  default:
89  break;
90  }
91  } catch (const ibrcommon::vaddress::address_exception &ex) {
93  }
94  }
95  }
96  }
97 
99  {
100  _garbage_collector.stop();
101  _garbage_collector.join();
102  join();
103 
104  _sockets.destroy();
105  }
106 
108  {
109  // shut-down all server sockets
110  _sockets.down();
111  }
112 
113  void ApiServer::componentUp() throw ()
114  {
115  // routine checked for throw() on 15.02.2013
116  try {
117  // bring up all server sockets
118  _sockets.up();
119  } catch (const ibrcommon::socket_exception &ex) {
121  }
122 
124  startGarbageCollector();
125  }
126 
127  void ApiServer::componentRun() throw ()
128  {
129  try {
130  while (!_shutdown)
131  {
132  // create a socket set to do select on all sockets
134 
135  // do select on all socket and find all readable ones
136  _sockets.select(&fds, NULL, NULL, NULL);
137 
138  // iterate through all readable sockets
139  for (ibrcommon::socketset::iterator iter = fds.begin(); iter != fds.end(); ++iter)
140  {
141  // we assume all the sockets in _sockets are server sockets
142  // so cast this one to the right class
143  ibrcommon::serversocket &sock = dynamic_cast<ibrcommon::serversocket&>(**iter);
144 
145  // variable to store the peer address on the next accept call
146  ibrcommon::vaddress peeraddr;
147 
148  // accept the next client
149  ibrcommon::clientsocket *peersock = sock.accept(peeraddr);
150 
151  // set the no delay option for the new socket if configured
152  if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
153  {
154  // check if the socket is a tcpsocket
155  if ( dynamic_cast<ibrcommon::tcpsocket*>(peersock) != NULL )
156  {
157  peersock->set(ibrcommon::clientsocket::NO_DELAY, true);
158  }
159  }
160 
161  // create a new socket stream using the new client socket
162  // now the socket stream is responsible for the client socket
163  // and will destroy the instance on its own destruction
164  ibrcommon::socketstream *conn = new ibrcommon::socketstream(peersock);
165 
166  // if we are already in shutdown state
167  if (_shutdown)
168  {
169  // close the new socket
170  conn->close();
171 
172  // and free the object
173  delete conn;
174  return;
175  }
176 
177  // generate some output
178  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "new connected client at the extended API server" << IBRCOMMON_LOGGER_ENDL;
179 
180  // send welcome banner
181  (*conn) << "IBR-DTN " << dtn::daemon::Configuration::getInstance().version() << " API 1.0" << std::endl;
182 
183  // the new client object will be hold here
184  ClientHandler *obj = NULL;
185 
186  // in locked state we create a new registration for the new connection
187  {
188  ibrcommon::MutexLock l1(_registration_lock);
189 
190  // create a new registration
191  Registration reg;
192  _registrations.push_back(reg);
193  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "new registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL;
194 
195  // create a new clienthandler for the new registration
196  obj = new ClientHandler(*this, _registrations.back(), conn);
197  }
198 
199  // one again in locked state we push the new connection in the connection list
200  {
201  ibrcommon::MutexLock l2(_connection_lock);
202  _connections.push_back(obj);
203  }
204 
205  // start the client handler
206  obj->start();
207  }
208 
209  // breakpoint
211  }
212  } catch (const std::exception&) {
213  // ignore all errors
214  return;
215  }
216  }
217 
218  void ApiServer::componentDown() throw ()
219  {
221 
222  // put the server into shutdown mode
223  _shutdown = true;
224 
225  // close the listen API socket
226  _sockets.down();
227 
228  // pause the garbage collection
229  _garbage_collector.pause();
230 
231  // stop/close all connections in locked state
232  {
233  ibrcommon::MutexLock l(_connection_lock);
234 
235  // shutdown all clients
236  for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
237  {
238  (*iter)->stop();
239  }
240  }
241 
242  // wait until all clients are down
243  while (_connections.size() > 0) ::sleep(1);
244  }
245 
246  Registration& ApiServer::getRegistration(const std::string &handle)
247  {
248  ibrcommon::MutexLock l(_registration_lock);
249  for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
250  {
251  if (*iter == handle)
252  {
253  if (iter->isPersistent()){
254  iter->attach();
255  return *iter;
256  }
257  break;
258  }
259  }
260 
261  throw Registration::NotFoundException("Registration not found");
262  }
263 
265  {
266  {
267  ibrcommon::MutexLock l(_registration_lock);
268 
269  // remove non-persistent and detached registrations
270  for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end();)
271  {
272  try
273  {
274  iter->attach();
275  if(!iter->isPersistent()){
276  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "release registration " << iter->getHandle() << IBRCOMMON_LOGGER_ENDL;
277  iter = _registrations.erase(iter);
278  }
279  else
280  {
281  iter->detach();
282  ++iter;
283  }
284  }
286  {
287  ++iter;
288  }
289  }
290  }
291 
292  return nextRegistrationExpiry();
293  }
294 
295  const std::string ApiServer::getName() const
296  {
297  return "ApiServer";
298  }
299 
301  {
302  // generate some output
303  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "api connection up" << IBRCOMMON_LOGGER_ENDL;
304  }
305 
307  {
308  // generate some output
309  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "api connection down" << IBRCOMMON_LOGGER_ENDL;
310 
311  ibrcommon::MutexLock l(_connection_lock);
312 
313  // remove this object out of the list
314  for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
315  {
316  if (obj == (*iter))
317  {
318  _connections.erase(iter);
319  break;
320  }
321  }
322  }
323 
325  {
326  if(reg.isPersistent())
327  {
328  reg.detach();
329  startGarbageCollector();
330  }
331  else
332  {
333  ibrcommon::MutexLock l(_registration_lock);
334  // remove the registration
335  for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
336  {
337  if (reg == (*iter))
338  {
339  IBRCOMMON_LOGGER_DEBUG_TAG("ApiServer", 5) << "release registration " << reg.getHandle() << IBRCOMMON_LOGGER_ENDL;
340  _registrations.erase(iter);
341  break;
342  }
343  }
344  }
345  }
346 
347  void ApiServer::raiseEvent(const dtn::core::Event *evt) throw ()
348  {
349  try {
350  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
351 
352  // ignore fragments - we can not deliver them directly to the client
353  if (queued.bundle.fragment) return;
354 
355  ibrcommon::MutexLock l(_connection_lock);
356  for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
357  {
358  ClientHandler &conn = **iter;
359  if (conn.getRegistration().hasSubscribed(queued.bundle.destination))
360  {
362  }
363  }
364  } catch (const std::bad_cast&) { };
365  }
366 
367  void ApiServer::startGarbageCollector()
368  {
369  try
370  {
371  /* set the timeout for the GarbageCollector */
372  size_t timeout = nextRegistrationExpiry();
373  _garbage_collector.set(timeout);
374 
375  /* start it, if it is not running yet */
376  if(!_garbage_collector.isRunning())
377  _garbage_collector.start();
378  }
379  catch(const ibrcommon::Timer::StopTimerException &ex)
380  {
381  }
382  }
383 
384  size_t ApiServer::nextRegistrationExpiry()
385  {
386  ibrcommon::MutexLock l(_registration_lock);
387  bool persistentFound = false;
388  size_t new_timeout = 0;
389  size_t current_time = ibrcommon::Timer::get_current_time();
390 
391  // find the registration that expires next
392  for (std::list<dtn::api::Registration>::iterator iter = _registrations.begin(); iter != _registrations.end(); ++iter)
393  {
394  try
395  {
396  iter->attach();
397  if(!iter->isPersistent()){
398  /* found an expired registration, trigger the timer */
399  iter->detach();
400  new_timeout = 0;
401  persistentFound = true;
402  break;
403  }
404  else
405  {
406  size_t expire_time = iter->getExpireTime();
407  /* we dont have to check if the expire time is smaller then the current_time
408  since isPersistent() would have returned false */
409  size_t expire_timeout = expire_time - current_time;
410  iter->detach();
411 
412  /* if persistentFound is false, no persistent registration was found yet */
413  if(!persistentFound)
414  {
415  persistentFound = true;
416  new_timeout = expire_timeout;
417  }
418  else if(expire_timeout < new_timeout)
419  {
420  new_timeout = expire_timeout;
421  }
422  }
423  }
424  catch(const Registration::AlreadyAttachedException &ex)
425  {
426  }
427  }
428 
429  if(!persistentFound) throw ibrcommon::Timer::StopTimerException();
430 
431  return new_timeout;
432  }
433  }
434 }