IBR-DTNSuite  0.10
UDPConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * UDPConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
25 #include "core/BundleEvent.h"
26 #include "core/BundleCore.h"
27 #include "Configuration.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/Serializer.h>
31 
32 #include <ibrcommon/net/socket.h>
33 #include <ibrcommon/net/vaddress.h>
35 #include <ibrcommon/data/BLOB.h>
36 #include <ibrcommon/Logger.h>
38 
39 #include <sys/types.h>
40 #include <unistd.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <fcntl.h>
45 #include <limits.h>
46 
47 #include <iostream>
48 #include <list>
49 #include <vector>
50 
51 
52 using namespace dtn::data;
53 
54 namespace dtn
55 {
56  namespace net
57  {
58  const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
59 
60  UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net, int port, dtn::data::Length mtu)
61  : _net(net), _port(port), m_maxmsgsize(mtu), _running(false)
62  {
63  // initialize stats
64  addStats("out", 0.0);
65  addStats("in", 0.0);
66  }
67 
69  {
70  componentDown();
71  }
72 
74  {
76  }
77 
79  {
80  // announce port only if we are bound to any interface
81  if (_net.empty()) {
82  std::stringstream service;
83  // ... set the port only
84  service << "port=" << _port << ";";
85  announcement.addService( DiscoveryService("udpcl", service.str()));
86  return;
87  }
88 
89  // do not announce if this is not our interface
91 
92  // determine if we should enable crosslayer discovery by sending out our own address
94 
95  // this marker should set to true if we added an service description
96  bool announced = false;
97 
98  try {
99  // check if cross layer discovery is disabled
100  if (!crosslayer) throw ibrcommon::Exception("crosslayer discovery disabled!");
101 
102  // get all global addresses of the interface
103  std::list<ibrcommon::vaddress> list = _net.getAddresses();
104 
105  // if no address is returned... (goto catch block)
106  if (list.empty()) throw ibrcommon::Exception("no address found");
107 
108  for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
109  {
110  const ibrcommon::vaddress &addr = (*addr_it);
111 
112  // only announce global scope addresses
113  if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL) continue;
114 
115  try {
116  // do not announce non-IP addresses
117  sa_family_t f = addr.family();
118  if ((f != AF_INET) && (f != AF_INET6)) continue;
119 
120  std::stringstream service;
121  // fill in the ip address
122  service << "ip=" << addr.address() << ";port=" << _port << ";";
123  announcement.addService( DiscoveryService("udpcl", service.str()));
124 
125  // set the announce mark
126  announced = true;
127  } catch (const ibrcommon::vaddress::address_exception &ex) {
128  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
129  }
130  }
131  } catch (const ibrcommon::Exception&) {
132  // address collection process aborted
133  }
134 
135  // if we still not announced anything...
136  if (!announced) {
137  // announce at least our local port
138  std::stringstream service;
139  service << "port=" << _port << ";";
140  announcement.addService( DiscoveryService("udpcl", service.str()));
141  }
142  }
143 
145  {
146  const std::list<dtn::core::Node::URI> uri_list = node.get(dtn::core::Node::CONN_UDPIP);
147  if (uri_list.empty())
148  {
149  dtn::net::BundleTransfer local_job = job;
151  return;
152  }
153 
155 
156  const dtn::core::Node::URI &uri = uri_list.front();
157 
158  std::string address = "0.0.0.0";
159  unsigned int port = 0;
160 
161  // read values
162  uri.decode(address, port);
163 
164  // get the address of the node
165  ibrcommon::vaddress addr(address, port);
166 
167  try {
168  // read the bundle out of the storage
169  const dtn::data::Bundle bundle = storage.get(job.getBundle());
170 
171  // build the dictionary for EID lookup
172  const dtn::data::Dictionary dict(bundle);
173 
174  // create a default serializer
175  dtn::data::DefaultSerializer dummy(std::cout, dict);
176 
177  // get the encoded length of the primary block
178  size_t header = dummy.getLength((const PrimaryBlock&)bundle);
179  header += 20; // two times SDNV through fragmentation
180 
181  dtn::data::Length size = dummy.getLength(bundle);
182 
183  if (size > m_maxmsgsize)
184  {
185  // abort transmission if fragmentation is disabled
186  if (!dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
188  {
190  }
191 
192  const size_t psize = bundle.find<dtn::data::PayloadBlock>().getLength();
193  const size_t fragment_size = m_maxmsgsize - header;
194  const size_t fragment_count = (psize / fragment_size) + (((psize % fragment_size) > 0) ? 1 : 0);
195 
196  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 30) << "MTU of " << m_maxmsgsize << " is too small to carry " << psize << " bytes of payload." << IBRCOMMON_LOGGER_ENDL;
197  IBRCOMMON_LOGGER_DEBUG_TAG("UDPConvergenceLayer", 30) << "create " << fragment_count << " fragments with " << fragment_size << " bytes each." << IBRCOMMON_LOGGER_ENDL;
198 
199  for (size_t i = 0; i < fragment_count; ++i)
200  {
201  dtn::data::BundleFragment fragment(bundle, i * fragment_size, fragment_size);
202 
203  std::stringstream ss;
204  dtn::data::DefaultSerializer serializer(ss);
205 
206  serializer << fragment;
207  std::string data = ss.str();
208 
209  // send out the bundle data
210  send(addr, data);
211  }
212  }
213  else
214  {
215  std::stringstream ss;
216  dtn::data::DefaultSerializer serializer(ss);
217 
218  serializer << bundle;
219  std::string data = ss.str();
220 
221  // send out the bundle data
222  send(addr, data);
223  }
224 
225  // success - raise bundle event
226  dtn::net::BundleTransfer local_job = job;
227  local_job.complete();
228  } catch (const dtn::storage::NoBundleFoundException&) {
229  // send transfer aborted event
230  dtn::net::BundleTransfer local_job = job;
232  } catch (const ibrcommon::socket_exception&) {
233  // CL is busy, requeue bundle
234  } catch (const NoAddressFoundException &ex) {
235  // no connection available
236  dtn::net::BundleTransfer local_job = job;
238  }
239  }
240 
241  void UDPConvergenceLayer::send(const ibrcommon::vaddress &addr, const std::string &data) throw (ibrcommon::socket_exception, NoAddressFoundException)
242  {
243  // set write lock
244  ibrcommon::MutexLock l(m_writelock);
245 
246  // get the first global scope socket
247  ibrcommon::socketset socks = _vsocket.getAll();
248  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
249  ibrcommon::udpsocket &sock = dynamic_cast<ibrcommon::udpsocket&>(**iter);
250 
251  // send converted line back to client.
252  sock.sendto(data.c_str(), data.length(), 0, addr);
253 
254  // add statistic data
255  addStats("out", data.length());
256 
257  // success
258  return;
259  }
260 
261  // failure
262  throw NoAddressFoundException("no valid address found");
263  }
264 
265  void UDPConvergenceLayer::receive(dtn::data::Bundle &bundle, dtn::data::EID &sender) throw (ibrcommon::socket_exception, dtn::InvalidDataException)
266  {
267  ibrcommon::MutexLock l(m_readlock);
268 
269  std::vector<char> data(m_maxmsgsize);
270 
271  // data waiting
272  ibrcommon::socketset readfds;
273 
274  // wait for incoming messages
275  _vsocket.select(&readfds, NULL, NULL, NULL);
276 
277  if (readfds.size() > 0) {
278  ibrcommon::datagramsocket *sock = static_cast<ibrcommon::datagramsocket*>(*readfds.begin());
279 
280  ibrcommon::vaddress fromaddr;
281  size_t len = sock->recvfrom(&data[0], m_maxmsgsize, 0, fromaddr);
282 
283  // add statistic data
284  addStats("int", len);
285 
286  std::stringstream ss; ss << "udp://" << fromaddr.toString();
287  sender = dtn::data::EID(ss.str());
288 
289  if (len > 0)
290  {
291  // read all data into a stream
292  stringstream ss;
293  ss.write(&data[0], len);
294 
295  // get the bundle
297  }
298  }
299  }
300 
302  {
303  if (evt.getInterface() != _net) return;
304 
305  switch (evt.getAction())
306  {
308  {
309  ibrcommon::vaddress bindaddr = evt.getAddress();
310  // convert the port into a string
311  std::stringstream ss; ss << _port;
312  bindaddr.setService(ss.str());
313  ibrcommon::udpsocket *sock = new ibrcommon::udpsocket(bindaddr);
314  try {
315  sock->up();
316  _vsocket.add(sock, evt.getInterface());
317  } catch (const ibrcommon::socket_exception&) {
318  delete sock;
319  }
320  break;
321  }
322 
324  {
325  ibrcommon::socketset socks = _vsocket.getAll();
326  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
327  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
328  if (sock->get_address().address() == evt.getAddress().address()) {
329  _vsocket.remove(sock);
330  sock->down();
331  delete sock;
332  break;
333  }
334  }
335  break;
336  }
337 
339  {
340  ibrcommon::socketset socks = _vsocket.get(evt.getInterface());
341  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
342  ibrcommon::udpsocket *sock = dynamic_cast<ibrcommon::udpsocket*>(*iter);
343  _vsocket.remove(sock);
344  sock->down();
345  delete sock;
346  }
347  break;
348  }
349 
350  default:
351  break;
352  }
353  }
354 
356  {
357  // routine checked for throw() on 15.02.2013
358  try {
359  // create sockets for all addresses on the interface
360  std::list<ibrcommon::vaddress> addrs = _net.getAddresses();
361 
362  // convert the port into a string
363  std::stringstream ss; ss << _port;
364 
365  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
366  ibrcommon::vaddress &addr = (*iter);
367 
368  try {
369  // handle the addresses according to their family
370  switch (addr.family()) {
371  case AF_INET:
372  case AF_INET6:
373  addr.setService(ss.str());
374  _vsocket.add(new ibrcommon::udpsocket(addr), _net);
375  break;
376  default:
377  break;
378  }
379  } catch (const ibrcommon::vaddress::address_exception &ex) {
380  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
381  }
382  }
383 
384  _vsocket.up();
385 
386  // subscribe to NetLink events on our interfaces
388  } catch (const ibrcommon::socket_exception &ex) {
389  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", error) << "bind failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
390  }
391  }
392 
394  {
395  // unsubscribe to NetLink events
397 
398  _vsocket.destroy();
399  stop();
400  join();
401  }
402 
404  {
405  _running = true;
406 
407  while (_running)
408  {
409  try {
410  dtn::data::Bundle bundle;
411  EID sender;
412 
413  receive(bundle, sender);
414 
415  // raise default bundle received event
416  dtn::net::BundleReceivedEvent::raise(sender, bundle, false);
417 
418  } catch (const dtn::InvalidDataException &ex) {
419  IBRCOMMON_LOGGER_TAG("UDPConvergenceLayer", warning) << "Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
420  } catch (const std::exception &ex) {
421 
422  }
423  yield();
424  }
425  }
426 
428  {
429  _running = false;
430  _vsocket.down();
431  }
432 
433  const std::string UDPConvergenceLayer::getName() const
434  {
435  return "UDPConvergenceLayer";
436  }
437  }
438 }