IBR-DTNSuite  0.10
EpidemicRoutingExtension.cpp
Go to the documentation of this file.
1 /*
2  * EpidemicRoutingExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 
28 #include "core/NodeEvent.h"
29 #include "core/TimeEvent.h"
30 #include "core/Node.h"
31 #include "net/ConnectionEvent.h"
32 #include "net/ConnectionManager.h"
33 #include "Configuration.h"
34 #include "core/BundleCore.h"
35 #include "core/BundleEvent.h"
36 
37 #include <ibrdtn/data/MetaBundle.h>
39 #include <ibrcommon/Logger.h>
40 
41 #include <functional>
42 #include <list>
43 #include <algorithm>
44 
45 #include <iomanip>
46 #include <ios>
47 #include <iostream>
48 #include <set>
49 #include <memory>
50 
51 #include <stdlib.h>
52 #include <typeinfo>
53 
54 namespace dtn
55 {
56  namespace routing
57  {
58  const std::string EpidemicRoutingExtension::TAG = "EpidemicRoutingExtension";
59 
61  {
62  // write something to the syslog
63  IBRCOMMON_LOGGER_TAG(EpidemicRoutingExtension::TAG, info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
64  }
65 
67  {
68  join();
69  }
70 
72  {
74  }
75 
77  {
78  // If an incoming bundle is received, forward it to all connected neighbors
79  try {
80  const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
81 
82  // new bundles trigger a recheck for all neighbors
83  const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
84 
85  for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); ++iter)
86  {
87  const dtn::core::Node &n = (*iter);
88 
89  if (n.getEID() != queued.origin) {
90  // transfer the next bundle to this destination
91  _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
92  }
93  }
94  return;
95  } catch (const std::bad_cast&) { };
96 
97  // If a new neighbor comes available search for bundles
98  try {
99  const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
100  const dtn::core::Node &n = nodeevent.getNode();
101 
102  if (nodeevent.getAction() == NODE_AVAILABLE)
103  {
104  _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
105  }
106  else if (nodeevent.getAction() == NODE_DATA_ADDED)
107  {
108  _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
109  }
110 
111  return;
112  } catch (const std::bad_cast&) { };
113 
114  try {
115  const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt);
116 
118  {
119  // send all (multi-hop) bundles in the storage to the neighbor
120  _taskqueue.push( new SearchNextBundleTask(ce.peer) );
121  }
122  return;
123  } catch (const std::bad_cast&) { };
124 
125  try {
126  const NodeHandshakeEvent &handshake = dynamic_cast<const NodeHandshakeEvent&>(*evt);
127 
129  {
130  // transfer the next bundle to this destination
131  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
132  }
133  else if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED)
134  {
135  // transfer the next bundle to this destination
136  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
137  }
138  return;
139  } catch (const std::bad_cast&) { };
140 
141  // The bundle transfer has been aborted
142  try {
143  const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
144 
145  // transfer the next bundle to this destination
146  _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
147 
148  return;
149  } catch (const std::bad_cast&) { };
150 
151  // A bundle transfer was successful
152  try {
153  const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
154 
155  // transfer the next bundle to this destination
156  _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) );
157  return;
158  } catch (const std::bad_cast&) { };
159  }
160 
162  {
163  // reset the task queue
164  _taskqueue.reset();
165 
166  // routine checked for throw() on 15.02.2013
167  try {
168  // run the thread
169  start();
170  } catch (const ibrcommon::ThreadException &ex) {
171  IBRCOMMON_LOGGER_TAG(EpidemicRoutingExtension::TAG, error) << "componentUp failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
172  }
173  }
174 
176  {
177  try {
178  // stop the thread
179  stop();
180  join();
181  } catch (const ibrcommon::ThreadException &ex) {
182  IBRCOMMON_LOGGER_TAG(EpidemicRoutingExtension::TAG, error) << "componentDown failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
183  }
184  }
185 
187  {
188  _taskqueue.abort();
189  }
190 
192  {
193  class BundleFilter : public dtn::storage::BundleSelector
194  {
195  public:
196  BundleFilter(const NeighborDatabase::NeighborEntry &entry)
197  : _entry(entry)
198  {};
199 
200  virtual ~BundleFilter() {};
201 
202  virtual dtn::data::Size limit() const throw () { return _entry.getFreeTransferSlots(); };
203 
204  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
205  {
206  // check Scope Control Block - do not forward bundles with hop limit == 0
207  if (meta.hopcount == 0)
208  {
209  return false;
210  }
211 
212  // do not forward any routing control message
213  // this is done by the neighbor routing module
214  if (isRouting(meta.source))
215  {
216  return false;
217  }
218 
219  // do not forward local bundles
222  )
223  {
224  return false;
225  }
226 
227  // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
229  {
230  return false;
231  }
232 
233  // do not forward bundles addressed to this neighbor,
234  // because this is handled by neighbor routing extension
235  if (_entry.eid == meta.destination.getNode())
236  {
237  return false;
238  }
239 
240  // do not forward bundles already known by the destination
241  // throws BloomfilterNotAvailableException if no filter is available or it is expired
242  try {
243  if (_entry.has(meta, true))
244  {
245  return false;
246  }
249  }
250 
251  return true;
252  };
253 
254  private:
255  const NeighborDatabase::NeighborEntry &_entry;
256  };
257 
258  // list for bundles
260 
261  while (true)
262  {
263  try {
264  Task *t = _taskqueue.getnpop(true);
265  std::auto_ptr<Task> killer(t);
266 
267  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 50) << "processing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
268 
269  try {
275  try {
276  SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
277 
278  // lock the neighbor database while searching for bundles
279  try {
280  NeighborDatabase &db = (**this).getNeighborDB();
281  ibrcommon::MutexLock l(db);
282  NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
283 
284  // check if enough transfer slots available (threshold reached)
285  if (!entry.isTransferThresholdReached())
287 
288  // get the bundle filter of the neighbor
289  BundleFilter filter(entry);
290 
291  // some debug output
292  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
293 
294  // query some unknown bundle from the storage
295  list.clear();
296  (**this).getSeeker().get(filter, list);
297  } catch (const dtn::storage::BundleSelectorException&) {
298  // query a new summary vector from this neighbor
299  (**this).doHandshake(task.eid);
300  }
301 
302  // send the bundles as long as we have resources
303  for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
304  {
305  try {
306  // transfer the bundle to the neighbor
307  transferTo(task.eid, *iter);
308  } catch (const NeighborDatabase::AlreadyInTransitException&) { };
309  }
312  } catch (const dtn::storage::NoBundleFoundException&) {
313  } catch (const std::bad_cast&) { };
314  } catch (const ibrcommon::Exception &ex) {
315  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 20) << "task failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
316  }
317  } catch (const std::exception &ex) {
318  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 15) << "terminated due to " << ex.what() << IBRCOMMON_LOGGER_ENDL;
319  return;
320  }
321 
322  yield();
323  }
324  }
325 
326  /****************************************/
327 
328  EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
329  : eid(e)
330  { }
331 
332  EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
333  { }
334 
335  std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
336  {
337  return "SearchNextBundleTask: " + eid.getString();
338  }
339  }
340 }