IBR-DTNSuite  0.12
UDPConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * UDPConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
25 #include "core/BundleEvent.h"
26 #include "core/BundleCore.h"
27 #include "Configuration.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/Serializer.h>
31 
32 #include <ibrcommon/net/socket.h>
33 #include <ibrcommon/net/vaddress.h>
35 #include <ibrcommon/data/BLOB.h>
36 #include <ibrcommon/Logger.h>
38 
39 #include <sys/types.h>
40 #include <unistd.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <fcntl.h>
45 #include <limits.h>
46 
47 #include <iostream>
48 #include <list>
49 #include <vector>
50 
51 
52 using namespace dtn::data;
53 
54 namespace dtn
55 {
56  namespace net
57  {
58  const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
59 
60  UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, dtn::data::Length mtu)
61  : _net(net), _port(port), m_maxmsgsize(mtu), _running(false), _stats_in(0), _stats_out(0)
62  {
63  }
64 
66  {
67  componentDown();
68  }
69 
71  {
72  _stats_in = 0;
73  _stats_out = 0;
74  }
75 
77  {
78  std::stringstream ss_format;
79 
80  static const std::string IN_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|in";
81  static const std::string OUT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|out";
82 
83  ss_format << _stats_in;
84  data[IN_TAG] = ss_format.str();
85  ss_format.str("");
86 
87  ss_format << _stats_out;
88  data[OUT_TAG] = ss_format.str();
89  }
90 
92  {
94  }
95 
97  {
98  // announce port only if we are bound to any interface
99  if (_net.isAny()) {
100  std::stringstream service;
101  // ... set the port only
102  service << "port=" << _port << ";";
103  announcement.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
104  return;
105  }
106 
107  // do not announce if this is not our interface
109 
110  // determine if we should enable crosslayer discovery by sending out our own address
112 
113  // this marker should set to true if we added an service description
114  bool announced = false;
115 
116  try {
117  // check if cross layer discovery is disabled
118  if (!crosslayer) throw ibrcommon::Exception("crosslayer discovery disabled!");
119 
120  // get all global addresses of the interface
121  std::list<ibrcommon::vaddress> list = _net.getAddresses();
122 
123  // if no address is returned... (goto catch block)
124  if (list.empty()) throw ibrcommon::Exception("no address found");
125 
126  for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
127  {
128  const ibrcommon::vaddress &addr = (*addr_it);
129 
130  // only announce global scope addresses
131  if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL) continue;
132 
133  try {
134  // do not announce non-IP addresses
135  sa_family_t f = addr.family();
136  if ((f != AF_INET) && (f != AF_INET6)) continue;
137 
138  std::stringstream service;
139  // fill in the ip address
140  service << "ip=" << addr.address() << ";port=" << _port << ";";
141  announcement.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
142 
143  // set the announce mark
144  announced = true;
145  } catch (const ibrcommon::vaddress::address_exception &ex) {
146  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
147  }
148  }
149  } catch (const ibrcommon::Exception&) {
150  // address collection process aborted
151  }
152 
153  // if we still not announced anything...
154  if (!announced) {
155  // announce at least our local port
156  std::stringstream service;
157  service << "port=" << _port << ";";
158  announcement.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
159  }
160  }
161 
163  {
164  const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP);
165  if (uri_list.empty())
166  {
167  dtn::net::BundleTransfer local_job = job;
169  return;
170  }
171 
173 
174  const dtn::core::Node::URI &uri = uri_list.front();
175 
176  std::string address = "0.0.0.0";
177  unsigned int port = 0;
178 
179  // read values
180  uri.decode(address, port);
181 
182  // get the address of the node
183  ibrcommon::vaddress addr(address, port);
184 
185  try {
186  // read the bundle out of the storage
187  const dtn::data::Bundle bundle = storage.get(job.getBundle());
188 
189  // build the dictionary for EID lookup
190  const dtn::data::Dictionary dict(bundle);
191 
192  // create a default serializer
193  dtn::data::DefaultSerializer dummy(std::cout, dict);
194 
195  // get the encoded length of the primary block
196  size_t header = dummy.getLength((const PrimaryBlock&)bundle);
197  header += 20; // two times SDNV through fragmentation
198 
199  dtn::data::Length size = dummy.getLength(bundle);
200 
201  if (size > m_maxmsgsize)
202  {
203  // abort transmission if fragmentation is disabled
204  if (!dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
206  {
208  }
209 
210  const size_t psize = bundle.find<dtn::data::PayloadBlock>().getLength();
211  const size_t fragment_size = m_maxmsgsize - header;
212  const size_t fragment_count = (psize / fragment_size) + (((psize % fragment_size) > 0) ? 1 : 0);
213 
214  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 30) << "MTU of " << m_maxmsgsize << " is too small to carry " << psize << " bytes of payload." << IBRCOMMON_LOGGER_ENDL;
215  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 30) << "create " << fragment_count << " fragments with " << fragment_size << " bytes each." << IBRCOMMON_LOGGER_ENDL;
216 
217  for (size_t i = 0; i < fragment_count; ++i)
218  {
219  dtn::data::BundleFragment fragment(bundle, i * fragment_size, fragment_size);
220 
221  std::stringstream ss;
222  dtn::data::DefaultSerializer serializer(ss);
223 
224  serializer << fragment;
225  std::string data = ss.str();
226 
227  // send out the bundle data
228  send(addr, data);
229  }
230  }
231  else
232  {
233  std::stringstream ss;
234  dtn::data::DefaultSerializer serializer(ss);
235 
236  serializer << bundle;
237  std::string data = ss.str();
238 
239  // send out the bundle data
240  send(addr, data);
241  }
242 
243  // success - raise bundle event
244  dtn::net::BundleTransfer local_job = job;
245  local_job.complete();
246  } catch (const dtn::storage::NoBundleFoundException&) {
247  // send transfer aborted event
248  dtn::net::BundleTransfer local_job = job;
250  } catch (const ibrcommon::socket_exception&) {
251  // CL is busy, requeue bundle
252  } catch (const NoAddressFoundException &ex) {
253  // no connection available
254  dtn::net::BundleTransfer local_job = job;
256  }
257  }
258 
259  void UDPConvergenceLayer::send(const ibrcommon::vaddress &addr, const std::string &data) throw (ibrcommon::socket_exception, NoAddressFoundException)
260  {
261  // set write lock
262  ibrcommon::MutexLock l(m_writelock);
263 
264  // get the first global scope socket
265  ibrcommon::socketset socks = _vsocket.getAll();
266  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
267  ibrcommon::udpsocket &sock = dynamic_cast<ibrcommon::udpsocket&>(**iter);
268 
269  // send converted line back to client.
270  sock.sendto(data.c_str(), data.length(), 0, addr);
271 
272  // add statistic data
273  _stats_out += data.length();
274 
275  // success
276  return;
277  }
278 
279  // failure
280  throw NoAddressFoundException("no valid address found");
281  }
282 
283  void UDPConvergenceLayer::receive(dtn::data::Bundle &bundle, dtn::data::EID &sender) throw (ibrcommon::socket_exception, dtn::InvalidDataException)
284  {
285  ibrcommon::MutexLock l(m_readlock);
286 
287  std::vector<char> data(m_maxmsgsize);
288 
289  // data waiting
290  ibrcommon::socketset readfds;
291 
292  // wait for incoming messages
293  _vsocket.select(&readfds, NULL, NULL, NULL);
294 
295  if (readfds.size() > 0) {
296  ibrcommon::datagramsocket *sock = static_cast<ibrcommon::datagramsocket*>(*readfds.begin());
297 
298  ibrcommon::vaddress fromaddr;
299  size_t len = sock->recvfrom(&data[0], m_maxmsgsize, 0, fromaddr);
300 
301  // add statistic data
302  _stats_in += len;
303 
304  std::stringstream ss; ss << "udp://" << fromaddr.toString();
305  sender = dtn::data::EID(ss.str());
306 
307  if (len > 0)
308  {
309  // read all data into a stream
310  stringstream ss;
311  ss.write(&data[0], len);
312 
313  // get the bundle
315  }
316  }
317  }
318 
320  {
321  if (evt.getInterface() != _net) return;
322 
323  switch (evt.getAction())
324  {
326  {
327  ibrcommon::vaddress bindaddr = evt.getAddress();
328  // convert the port into a string
329  std::stringstream ss; ss << _port;
330  bindaddr.setService(ss.str());
331  ibrcommon::udpsocket *sock = new ibrcommon::udpsocket(bindaddr);
332  try {
333  sock->up();
334  _vsocket.add(sock, evt.getInterface());
335  } catch (const ibrcommon::socket_exception&) {
336  delete sock;
337  }
338  break;
339  }
340 
342  {
343  ibrcommon::socketset socks = _vsocket.getAll();
344  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
345  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
346  if (sock->get_address().address() == evt.getAddress().address()) {
347  _vsocket.remove(sock);
348  sock->down();
349  delete sock;
350  break;
351  }
352  }
353  break;
354  }
355 
357  {
358  ibrcommon::socketset socks = _vsocket.get(evt.getInterface());
359  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
360  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
361  _vsocket.remove(sock);
362  sock->down();
363  delete sock;
364  }
365  break;
366  }
367 
368  default:
369  break;
370  }
371  }
372 
374  {
375  // routine checked for throw() on 15.02.2013
376  try {
377  // create sockets for all addresses on the interface
378  std::list<ibrcommon::vaddress> addrs = _net.getAddresses();
379 
380  // convert the port into a string
381  std::stringstream ss; ss << _port;
382 
383  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
384  ibrcommon::vaddress &addr = (*iter);
385 
386  try {
387  // handle the addresses according to their family
388  switch (addr.family()) {
389  case AF_INET:
390  case AF_INET6:
391  addr.setService(ss.str());
392  _vsocket.add(new ibrcommon::udpsocket(addr), _net);
393  break;
394  default:
395  break;
396  }
397  } catch (const ibrcommon::vaddress::address_exception &ex) {
398  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
399  }
400  }
401 
402  _vsocket.up();
403 
404  // subscribe to NetLink events on our interfaces
406 
407  // register as discovery handler for this interface
409  } catch (const ibrcommon::socket_exception &ex) {
410  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", error) << "bind failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
411  }
412  }
413 
415  {
416  // unsubscribe to NetLink events
418 
419  // unregister as discovery handler for this interface
421 
422  _vsocket.destroy();
423  stop();
424  join();
425  }
426 
428  {
429  _running = true;
430 
431  while (_running)
432  {
433  try {
434  dtn::data::Bundle bundle;
435  EID sender;
436 
437  receive(bundle, sender);
438 
439  // raise default bundle received event
440  dtn::net::BundleReceivedEvent::raise(sender, bundle, false);
441 
442  } catch (const dtn::InvalidDataException &ex) {
443  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
444  } catch (const std::exception&) {
445  return;
446  }
447  yield();
448  }
449  }
450 
452  {
453  _running = false;
454  _vsocket.down();
455  }
456 
457  const std::string UDPConvergenceLayer::getName() const
458  {
459  return "UDPConvergenceLayer";
460  }
461  }
462 }