IBR-DTNSuite  0.10
DatagramConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * DatagramConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "net/DatagramConnection.h"
24 
25 #include "core/BundleCore.h"
26 #include "core/EventDispatcher.h"
27 #include "core/TimeEvent.h"
28 #include "core/NodeEvent.h"
29 
30 #include <ibrcommon/Logger.h>
33 
34 #include <string.h>
35 #include <vector>
36 
37 namespace dtn
38 {
39  namespace net
40  {
41  const std::string DatagramConvergenceLayer::TAG = "DatagramConvergenceLayer";
42 
44  : _service(ds), _running(false), _discovery_sn(0)
45  {
46  // initialize stats
47  addStats("out", 0);
48  addStats("in", 0);
49  }
50 
52  {
53  // wait until all connections are down
54  {
55  ibrcommon::MutexLock l(_cond_connections);
56  while (_connections.size() != 0) _cond_connections.wait();
57  }
58 
59  join();
60  delete _service;
61  }
62 
64  {
65  return _service->getProtocol();
66  }
67 
68  void DatagramConvergenceLayer::callback_send(DatagramConnection&, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len) throw (DatagramException)
69  {
70  // only on sender at once
71  ibrcommon::MutexLock l(_send_lock);
72 
73  // forward the send request to DatagramService
74  _service->send(HEADER_SEGMENT, flags, seqno, destination, buf, len);
75 
76  // traffic monitoring
77  addStats("out", len);
78  }
79 
80  void DatagramConvergenceLayer::callback_ack(DatagramConnection&, const unsigned int &seqno, const std::string &destination) throw (DatagramException)
81  {
82  // only on sender at once
83  ibrcommon::MutexLock l(_send_lock);
84 
85  // forward the send request to DatagramService
86  _service->send(HEADER_ACK, 0, seqno, destination, NULL, 0);
87  }
88 
90  {
91  const std::list<dtn::core::Node::URI> uri_list = node.get(_service->getProtocol());
92  if (uri_list.empty()) return;
93 
94  // get the first element of the result
95  const dtn::core::Node::URI &uri = uri_list.front();
96 
97  // some debugging
99 
100  // lock the connection list while working with it
102 
103  // get a new or the existing connection for this address
104  DatagramConnection &conn = getConnection( uri.value );
105 
106  // queue the job to the connection
107  conn.queue(job);
108  }
109 
110  DatagramConnection& DatagramConvergenceLayer::getConnection(const std::string &identifier)
111  {
112  DatagramConnection *connection = NULL;
113 
114  {
115  // lock the list of connections while iterating or adding new connections
116  ibrcommon::MutexLock l(_cond_connections);
117 
118  // Test if connection for this address already exist
119  for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
120  {
121  if ((*i)->getIdentifier() == identifier)
122  return *(*i);
123  }
124 
125  // Connection does not exist, create one and put it into the list
126  connection = new DatagramConnection(identifier, _service->getParameter(), (*this));
127 
128  // add a new connection to the list of connections
129  _connections.push_back(connection);
130  _cond_connections.signal(true);
131  }
132 
133  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "Selected identifier: " << connection->getIdentifier() << IBRCOMMON_LOGGER_ENDL;
134  connection->start();
135  return *connection;
136  }
137 
139  {
141  }
142 
144  {
146  ibrcommon::MutexLock lc(_cond_connections);
147 
148  const connection_list::iterator i = std::find(_connections.begin(), _connections.end(), conn);
149 
150  if (i != _connections.end())
151  {
153 
154  _connections.erase(i);
155  _cond_connections.signal(true);
156  }
157  else
158  {
159  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << "Error in " << conn->getIdentifier() << " not found!" << IBRCOMMON_LOGGER_ENDL;
160  }
161  }
162 
164  {
165  // routine checked for throw() on 15.02.2013
167  try {
168  _service->bind();
169  } catch (const std::exception &e) {
170  IBRCOMMON_LOGGER_TAG(DatagramConvergenceLayer::TAG, error) << "bind to " << _service->getInterface().toString() << " failed (" << e.what() << ")" << IBRCOMMON_LOGGER_ENDL;
171  }
172 
173  // set the running variable
174  _running = true;
175  }
176 
178  {
180 
181  // shutdown all connections
183  ibrcommon::MutexLock l(_cond_connections);
184  for(connection_list::const_iterator i = _connections.begin(); i != _connections.end(); ++i)
185  {
186  (*i)->shutdown();
187  }
188  }
189 
191  {
193 
194  // set sequencenumber
195  announcement.setSequencenumber(_discovery_sn);
196  _discovery_sn++;
197 
198  // clear all services
199  announcement.clearServices();
200 
201  // serialize announcement
202  stringstream ss;
203  ss << announcement;
204 
205  std::streamsize len = ss.str().size();
206 
207  try {
208  // only on sender at once
209  ibrcommon::MutexLock l(_send_lock);
210 
211  // forward the send request to DatagramService
212  _service->send(HEADER_BROADCAST, 0, 0, ss.str().c_str(), static_cast<dtn::data::Length>(len));
213  } catch (const DatagramException&) {
214  // ignore any send failure
215  };
216  }
217 
219  {
220  size_t maxlen = _service->getParameter().max_msg_length;
221  std::string address;
222  unsigned int seqno = 0;
223  char flags = 0;
224  char type = 0;
225  std::vector<char> data(maxlen);
226  size_t len = 0;
227 
229 
230  while (_running)
231  {
232  try {
233  // Receive full frame from socket
234  len = _service->recvfrom(&data[0], maxlen, type, flags, seqno, address);
235 
236  // traffic monitoring
237  addStats("in", len);
238  } catch (const DatagramException&) {
239  _running = false;
240  break;
241  }
242 
243  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "componentRun() Address: " << address << IBRCOMMON_LOGGER_ENDL;
244 
245  // Check for extended header and retrieve if available
246  if (type == HEADER_BROADCAST)
247  {
248  try {
249  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConvergenceLayer::TAG, 10) << "componentRun() Announcement received" << IBRCOMMON_LOGGER_ENDL;
250  DiscoveryAnnouncement announce;
251  stringstream ss;
252  ss.write(&data[0], len);
253  ss >> announce;
254 
255  // ignore own beacons
256  if (announce.getEID() == dtn::core::BundleCore::local) continue;
257 
258  // convert the announcement into NodeEvents
259  Node n(announce.getEID());
260 
261  // timeout value
262  size_t to_value = 30;
263 
264  // add
265  n.add(Node::URI(Node::NODE_DISCOVERED, _service->getProtocol(), address, to_value, 20));
266 
267  const std::list<DiscoveryService> services = announce.getServices();
268  for (std::list<DiscoveryService>::const_iterator iter = services.begin(); iter != services.end(); ++iter)
269  {
270  const DiscoveryService &s = (*iter);
271  n.add(Node::Attribute(Node::NODE_DISCOVERED, s.getName(), s.getParameters(), to_value, 20));
272  }
273 
274  {
275  // lock the connection list while working with it
277 
278  // Connection instance for this address
279  DatagramConnection& connection = getConnection(address);
280  connection.setPeerEID(announce.getEID());
281  }
282 
283  // announce NodeInfo to ConnectionManager
285  } catch (const ibrcommon::Exception&) {
286  // catch wrong formats
287  }
288 
289  continue;
290  }
291  else if ( type == HEADER_SEGMENT )
292  {
294 
295  // Connection instance for this address
296  DatagramConnection& connection = getConnection(address);
297 
298  try {
299  // Decide in which queue to write based on the src address
300  connection.queue(flags, seqno, &data[0], len);
301  } catch (const ibrcommon::Exception &ex) {
303  connection.shutdown();
304  };
305  }
306  else if ( type == HEADER_ACK )
307  {
309 
310  // Connection instance for this address
311  DatagramConnection& connection = getConnection(address);
312 
313  // Decide in which queue to write based on the src address
314  connection.ack(seqno);
315  }
316 
317  yield();
318  }
319  }
320 
321  void DatagramConvergenceLayer::raiseEvent(const Event *evt) throw ()
322  {
323  try {
324  const TimeEvent &time=dynamic_cast<const TimeEvent&>(*evt);
325  if (time.getAction() == TIME_SECOND_TICK)
326  if (time.getTimestamp().get<size_t>() % 5 == 0)
327  sendAnnoucement();
328  } catch (const std::bad_cast&)
329  {}
330  }
331 
333  {
334  _running = false;
335  _service->shutdown();
336  }
337 
338  const std::string DatagramConvergenceLayer::getName() const
339  {
341  }
342  } /* namespace data */
343 } /* namespace dtn */