IBR-DTNSuite  0.12
NodeHandshakeExtension.cpp
Go to the documentation of this file.
1 /*
2  * NodeHandshakeExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
25 
26 #include "core/NodeEvent.h"
27 #include "net/ConnectionEvent.h"
28 #include "core/BundleCore.h"
29 #include "core/BundleEvent.h"
30 #include "core/EventDispatcher.h"
31 
32 #include <ibrdtn/data/AgeBlock.h>
34 #include <ibrdtn/utils/Clock.h>
35 
36 #ifdef WITH_COMPRESSION
38 #endif
39 
42 #include <ibrcommon/Logger.h>
43 
44 namespace dtn
45 {
46  namespace routing
47  {
48  const std::string NodeHandshakeExtension::TAG = "NodeHandshakeExtension";
49 
51  : _endpoint(*this)
52  {
53  }
54 
56  {
57  }
58 
60  {
62  }
63 
65  {
67  {
68  // add own summary vector to the message
69  const dtn::data::BundleSet vec = (**this).getKnownBundles();
70 
71  // create an item
73 
74  // add it to the handshake
75  answer.addItem(item);
76  }
77 
79  {
80  // add own purge vector to the message
81  const dtn::data::BundleSet vec = (**this).getPurgedBundles();
82 
83  // create an item
85 
86  // add it to the handshake
87  answer.addItem(item);
88  }
89  }
90 
92  {
93  try {
95 
96  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "summary vector received from " << source.getString() << IBRCOMMON_LOGGER_ENDL;
97 
98  // get the summary vector (bloomfilter) of this ECM
99  const ibrcommon::BloomFilter &filter = bfsv.getVector().getBloomFilter();
100 
106  NeighborDatabase &db = (**this).getNeighborDB();
107  ibrcommon::MutexLock l(db);
108  db.get(source.getNode()).update(filter, answer.getLifetime());
109  } catch (std::exception&) { };
110 
111  try {
112  const BloomFilterPurgeVector bfpv = answer.get<BloomFilterPurgeVector>();
113 
114  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "purge vector received from " << source.getString() << IBRCOMMON_LOGGER_ENDL;
115 
116  // get the purge vector (bloomfilter) of this ECM
117  const ibrcommon::BloomFilter &purge = bfpv.getVector().getBloomFilter();
118 
119  // get a reference to the storage
120  dtn::storage::BundleStorage &storage = (**this).getStorage();
121 
122  // create a bundle filter which selects bundles contained in the received
123  // purge vector but not addressed locally
124  class BundleFilter : public dtn::storage::BundleSelector
125  {
126  public:
127  BundleFilter(const ibrcommon::BloomFilter &filter)
128  : _filter(filter)
129  {};
130 
131  virtual ~BundleFilter() {};
132 
133  virtual dtn::data::Size limit() const throw () { return 100; };
134 
135  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
136  {
137  // do not select locally addressed bundles
139  return false;
140 
141  // do not purge non-singleton bundles
143  return false;
144 
145  // select the bundle if it is in the filter
146  return meta.isIn(_filter);
147  };
148 
149  const ibrcommon::BloomFilter &_filter;
150  } bundle_filter(purge);
151 
153 
154  // while we are getting more results from the storage
155  do {
156  // delete all previous results
157  list.clear();
158 
159  // query for more bundles
160  storage.get(bundle_filter, list);
161 
162  for (dtn::storage::BundleResultList::const_iterator iter = list.begin(); iter != list.end(); ++iter)
163  {
164  const dtn::data::MetaBundle &meta = (*iter);
165 
166  // delete bundle from storage
167  storage.remove(meta);
168 
169  // log the purged bundle
170  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
171 
172  // gen a report
173  dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, StatusReportBlock::NO_ADDITIONAL_INFORMATION);
174 
175  // add this bundle to the own purge vector
176  (**this).setPurged(meta);
177  }
178  } while (!list.empty());
179  } catch (std::exception&) { };
180  }
181 
183  {
184  _endpoint.query(eid);
185  }
186 
188  {
190  }
191 
193  {
195  }
196 
198  {
199  // If a new neighbor comes available, send him a request for the summary vector
200  // If a neighbor went away we can free the stored summary vector
201  try {
202  const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
203  const dtn::core::Node &n = nodeevent.getNode();
204 
205  if (nodeevent.getAction() == NODE_UNAVAILABLE)
206  {
207  // remove the item from the blacklist
208  _endpoint.removeFromBlacklist(n.getEID());
209  }
210 
211  return;
212  } catch (const std::bad_cast&) { };
213  }
214 
215  NodeHandshakeExtension::HandshakeEndpoint::HandshakeEndpoint(NodeHandshakeExtension &callback)
216  : _callback(callback)
217  {
218  AbstractWorker::initialize("routing");
219  }
220 
221  NodeHandshakeExtension::HandshakeEndpoint::~HandshakeEndpoint()
222  {
223  }
224 
225  void NodeHandshakeExtension::HandshakeEndpoint::callbackBundleReceived(const Bundle &b)
226  {
227  _callback.processHandshake(b);
228  }
229 
230  void NodeHandshakeExtension::HandshakeEndpoint::send(const dtn::data::Bundle &b)
231  {
232  transmit(b);
233  }
234 
235  void NodeHandshakeExtension::HandshakeEndpoint::removeFromBlacklist(const dtn::data::EID &eid)
236  {
237  ibrcommon::MutexLock l(_blacklist_lock);
238  _blacklist.erase(eid);
239  }
240 
241  void NodeHandshakeExtension::HandshakeEndpoint::query(const dtn::data::EID &origin)
242  {
243  {
244  ibrcommon::MutexLock l(_blacklist_lock);
245  // only query once each 60 seconds
246  if (_blacklist[origin] > dtn::utils::Clock::getMonotonicTimestamp()) return;
247  _blacklist[origin] = dtn::utils::Clock::getMonotonicTimestamp() + 60;
248  }
249 
250  // create a new request for the summary vector of the neighbor
251  NodeHandshake request(NodeHandshake::HANDSHAKE_REQUEST);
252 
253 #ifdef WITH_COMPRESSION
254  // request compressed answer
256 #endif
257 
258  // walk through all extensions to generate a request
259  (*_callback).requestHandshake(origin, request);
260 
261  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake query from " << origin.getString() << ": " << request.toString() << IBRCOMMON_LOGGER_ENDL;
262 
263  // create a new bundle with a zero timestamp (+age block)
264  dtn::data::Bundle req(true);
265 
266  // set the source of the bundle
267  req.source = getWorkerURI();
268 
269  // set the destination of the bundle
271  req.destination = origin;
272 
273  // set destination application
274  req.destination.setApplication("routing");
275 
276  // limit the lifetime to 60 seconds
277  req.lifetime = 60;
278 
279  // set high priority
282 
283  dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
285 
286  // serialize the request into the payload
287  {
289  (*ios) << request;
290  }
291 
292  // add a schl block
294  schl.setLimit(1);
295 
296  // send the bundle
297  transmit(req);
298  }
299 
301  {
302  // read the ecm
305  NodeHandshake handshake;
306 
307  // locked within this region
308  {
310  (*s) >> handshake;
311  }
312 
313  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake received from " << bundle.source.getString() << ": " << handshake.toString() << IBRCOMMON_LOGGER_ENDL;
314 
315  // if this is a request answer with an summary vector
316  if (handshake.getType() == NodeHandshake::HANDSHAKE_REQUEST)
317  {
318  // create a new request for the summary vector of the neighbor
320 
321  // lock the extension list during the processing
322  (**this).responseHandshake(bundle.source, handshake, response);
323 
324  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake reply to " << bundle.source.getString() << ": " << response.toString() << IBRCOMMON_LOGGER_ENDL;
325 
326  // create a new bundle with a zero timestamp (+age block)
327  dtn::data::Bundle answer(true);
328 
329  // set the source of the bundle
330  answer.source = _endpoint.getWorkerURI();
331 
332  // set the destination of the bundle
334  answer.destination = bundle.source;
335 
336  // limit the lifetime to 60 seconds
337  answer.lifetime = 60;
338 
339  // set high priority
342 
345 
346  // serialize the request into the payload
347  {
349  (*ios) << response;
350  }
351 
352  // add a schl block
354  schl.setLimit(1);
355 
356 #ifdef WITH_COMPRESSION
357  // compress bundle if requested
359  {
360  try {
362  } catch (const ibrcommon::Exception &ex) {
363  IBRCOMMON_LOGGER_TAG(TAG, warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
364  };
365  }
366 #endif
367 
368  // transfer the bundle to the neighbor
369  _endpoint.send(answer);
370 
371  // call handshake completed event
373  }
374  else if (handshake.getType() == NodeHandshake::HANDSHAKE_RESPONSE)
375  {
376  // walk through all extensions to process the contents of the response
377  (**this).processHandshake(bundle.source, handshake);
378 
379  // call handshake completed event
381  }
382  }
383  } /* namespace routing */
384 } /* namespace dtn */