IBR-DTNSuite  0.10
NodeHandshakeExtension.cpp
Go to the documentation of this file.
1 /*
2  * NodeHandshakeExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
24 
25 #include "core/NodeEvent.h"
26 #include "net/ConnectionEvent.h"
27 #include "core/BundleCore.h"
28 #include "core/BundleEvent.h"
29 
30 #include <ibrdtn/data/AgeBlock.h>
32 #include <ibrdtn/utils/Clock.h>
33 
36 #include <ibrcommon/Logger.h>
37 
38 namespace dtn
39 {
40  namespace routing
41  {
42  const std::string NodeHandshakeExtension::TAG = "NodeHandshakeExtension";
43 
45  : _endpoint(*this)
46  {
47  }
48 
50  {
51  }
52 
54  {
56  }
57 
59  {
61  {
62  // add own summary vector to the message
63  const dtn::data::BundleSet vec = (**this).getKnownBundles();
64 
65  // create an item
67 
68  // add it to the handshake
69  answer.addItem(item);
70  }
71 
73  {
74  // add own purge vector to the message
75  const dtn::data::BundleSet vec = (**this).getPurgedBundles();
76 
77  // create an item
79 
80  // add it to the handshake
81  answer.addItem(item);
82  }
83  }
84 
86  {
87  try {
89 
90  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "summary vector received from " << source.getString() << IBRCOMMON_LOGGER_ENDL;
91 
92  // get the summary vector (bloomfilter) of this ECM
93  const ibrcommon::BloomFilter &filter = bfsv.getVector().getBloomFilter();
94 
100  NeighborDatabase &db = (**this).getNeighborDB();
101  ibrcommon::MutexLock l(db);
102  db.create(source.getNode()).update(filter, answer.getLifetime());
103  } catch (std::exception&) { };
104 
105  try {
106  const BloomFilterPurgeVector bfpv = answer.get<BloomFilterPurgeVector>();
107 
108  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "purge vector received from " << source.getString() << IBRCOMMON_LOGGER_ENDL;
109 
110  // get the purge vector (bloomfilter) of this ECM
111  const ibrcommon::BloomFilter &purge = bfpv.getVector().getBloomFilter();
112 
113  // get a reference to the storage
114  dtn::storage::BundleStorage &storage = (**this).getStorage();
115 
116  // create a bundle filter which selects bundles contained in the received
117  // purge vector but not addressed locally
118  class BundleFilter : public dtn::storage::BundleSelector
119  {
120  public:
121  BundleFilter(const ibrcommon::BloomFilter &filter)
122  : _filter(filter)
123  {};
124 
125  virtual ~BundleFilter() {};
126 
127  virtual dtn::data::Size limit() const throw () { return 100; };
128 
129  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
130  {
131  // do not select locally addressed bundles
133  return false;
134 
135  // do not purge non-singleton bundles
137  return false;
138 
139  // select the bundle if it is in the filter
140  return _filter.contains(meta.toString());
141  };
142 
143  const ibrcommon::BloomFilter &_filter;
144  } bundle_filter(purge);
145 
147 
148  // while we are getting more results from the storage
149  do {
150  // delete all previous results
151  list.clear();
152 
153  // query for more bundles
154  storage.get(bundle_filter, list);
155 
156  for (dtn::storage::BundleResultList::const_iterator iter = list.begin(); iter != list.end(); ++iter)
157  {
158  const dtn::data::MetaBundle &meta = (*iter);
159 
160  // delete bundle from storage
161  storage.remove(meta);
162 
163  // log the purged bundle
164  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
165 
166  // gen a report
167  dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, StatusReportBlock::NO_ADDITIONAL_INFORMATION);
168 
169  // add this bundle to the own purge vector
170  (**this).setPurged(meta);
171  }
172  } while (!list.empty());
173  } catch (std::exception&) { };
174  }
175 
177  {
178  _endpoint.query(eid);
179  }
180 
182  {
183  // If a new neighbor comes available, send him a request for the summary vector
184  // If a neighbor went away we can free the stored summary vector
185  try {
186  const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
187  const dtn::core::Node &n = nodeevent.getNode();
188 
189  if (nodeevent.getAction() == NODE_UNAVAILABLE)
190  {
191  // remove the item from the blacklist
192  _endpoint.removeFromBlacklist(n.getEID());
193  }
194 
195  return;
196  } catch (const std::bad_cast&) { };
197  }
198 
199  NodeHandshakeExtension::HandshakeEndpoint::HandshakeEndpoint(NodeHandshakeExtension &callback)
200  : _callback(callback)
201  {
202  AbstractWorker::initialize("/routing", 50, true);
203  }
204 
205  NodeHandshakeExtension::HandshakeEndpoint::~HandshakeEndpoint()
206  {
207  }
208 
209  void NodeHandshakeExtension::HandshakeEndpoint::callbackBundleReceived(const Bundle &b)
210  {
211  _callback.processHandshake(b);
212  }
213 
214  void NodeHandshakeExtension::HandshakeEndpoint::send(const dtn::data::Bundle &b)
215  {
216  transmit(b);
217  }
218 
219  void NodeHandshakeExtension::HandshakeEndpoint::removeFromBlacklist(const dtn::data::EID &eid)
220  {
221  ibrcommon::MutexLock l(_blacklist_lock);
222  _blacklist.erase(eid);
223  }
224 
225  void NodeHandshakeExtension::HandshakeEndpoint::query(const dtn::data::EID &origin)
226  {
227  {
228  ibrcommon::MutexLock l(_blacklist_lock);
229  // only query once each 60 seconds
230  if (_blacklist[origin] > dtn::utils::Clock::getUnixTimestamp()) return;
231  _blacklist[origin] = dtn::utils::Clock::getUnixTimestamp() + 60;
232  }
233 
234  // create a new request for the summary vector of the neighbor
235  NodeHandshake request(NodeHandshake::HANDSHAKE_REQUEST);
236 
237  // walk through all extensions to generate a request
238  (*_callback).requestHandshake(origin, request);
239 
240  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake query from " << origin.getString() << ": " << request.toString() << IBRCOMMON_LOGGER_ENDL;
241 
242  // create a new bundle
243  dtn::data::Bundle req;
244 
245  // set the source of the bundle
246  req.source = getWorkerURI();
247 
248  // set the destination of the bundle
250 
251  if (origin.isCompressable())
252  req.destination = origin.add(origin.getDelimiter() + "50");
253  else
254  req.destination = origin.add(origin.getDelimiter() + "routing");
255 
256  // limit the lifetime to 60 seconds
257  req.lifetime = 60;
258 
259  // set high priority
262 
265 
266  // serialize the request into the payload
267  {
269  (*ios) << request;
270  }
271 
272  // add a schl block
274  schl.setLimit(1);
275 
276  // add an age block (to prevent expiring due to wrong clocks)
278 
279  // send the bundle
280  transmit(req);
281  }
282 
284  {
285  // read the ecm
288  NodeHandshake handshake;
289 
290  // locked within this region
291  {
293  (*s) >> handshake;
294  }
295 
296  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake received from " << bundle.source.getString() << ": " << handshake.toString() << IBRCOMMON_LOGGER_ENDL;
297 
298  // if this is a request answer with an summary vector
299  if (handshake.getType() == NodeHandshake::HANDSHAKE_REQUEST)
300  {
301  // create a new request for the summary vector of the neighbor
303 
304  // lock the extension list during the processing
305  (**this).responseHandshake(bundle.source, handshake, response);
306 
307  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake reply to " << bundle.source.getString() << ": " << response.toString() << IBRCOMMON_LOGGER_ENDL;
308 
309  // create a new bundle
310  dtn::data::Bundle answer;
311 
312  // set the source of the bundle
313  answer.source = _endpoint.getWorkerURI();
314 
315  // set the destination of the bundle
317  answer.destination = bundle.source;
318 
319  // limit the lifetime to 60 seconds
320  answer.lifetime = 60;
321 
322  // set high priority
325 
328 
329  // serialize the request into the payload
330  {
332  (*ios) << response;
333  }
334 
335  // add a schl block
337  schl.setLimit(1);
338 
339  // add an age block (to prevent expiring due to wrong clocks)
341 
342  // transfer the bundle to the neighbor
343  _endpoint.send(answer);
344 
345  // call handshake completed event
347  }
348  else if (handshake.getType() == NodeHandshake::HANDSHAKE_RESPONSE)
349  {
350  // walk through all extensions to process the contents of the response
351  (**this).processHandshake(bundle.source, handshake);
352 
353  // call handshake completed event
355  }
356  }
357  } /* namespace routing */
358 } /* namespace dtn */