IBR-DTNSuite  0.12
FileConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * FileConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
26 #include "core/EventDispatcher.h"
27 #include "core/BundleEvent.h"
28 #include "core/BundleCore.h"
29 #include "core/NodeEvent.h"
30 #include "core/TimeEvent.h"
31 #include "routing/BaseRouter.h"
32 #include "routing/NodeHandshake.h"
33 #include <ibrdtn/data/BundleSet.h>
35 #include <ibrdtn/utils/Clock.h>
36 #include <ibrcommon/data/File.h>
37 #include <ibrcommon/Logger.h>
39 
40 namespace dtn
41 {
42  namespace net
43  {
44  FileConvergenceLayer::Task::Task(FileConvergenceLayer::Task::Action a, const dtn::core::Node &n)
45  : action(a), node(n)
46  {
47  }
48 
49  FileConvergenceLayer::Task::~Task()
50  {
51  }
52 
53  FileConvergenceLayer::StoreBundleTask::StoreBundleTask(const dtn::core::Node &n, const dtn::net::BundleTransfer &j)
54  : FileConvergenceLayer::Task(TASK_STORE, n), job(j)
55  {
56  }
57 
58  FileConvergenceLayer::StoreBundleTask::~StoreBundleTask()
59  {
60  }
61 
63  {
64  }
65 
67  {
68  }
69 
71  {
72  // routine checked for throw() on 15.02.2013
75  }
76 
78  {
79  // routine checked for throw() on 15.02.2013
82  }
83 
85  {
86  _tasks.abort();
87  }
88 
90  {
91  try {
92  while (true)
93  {
94  Task *t = _tasks.getnpop(true);
95 
96  try {
97  switch (t->action)
98  {
99  case Task::TASK_LOAD:
100  {
101  // load bundles (receive)
102  load(t->node);
103  break;
104  }
105 
106  case Task::TASK_STORE:
107  {
108  try {
109  StoreBundleTask &sbt = dynamic_cast<StoreBundleTask&>(*t);
111 
112  // get the file path of the node
113  ibrcommon::File path = getPath(sbt.node);
114 
115  // scan for bundles
116  std::list<dtn::data::MetaBundle> bundles = scan(path);
117 
118  try {
119  // check if bundle is a routing bundle
120  const dtn::data::EID &source = sbt.job.getBundle().source;
121 
122  if (source.isApplication("routing"))
123  {
124  // read the bundle out of the storage
125  const dtn::data::Bundle bundle = storage.get(sbt.job.getBundle());
127 
128  if (bundle.destination.isApplication("routing"))
129  {
130  // add this bundle to the blacklist
131  {
132  ibrcommon::MutexLock l(_blacklist_mutex);
133  if (_blacklist.find(meta) != _blacklist.end())
134  {
135  // send transfer aborted event
137  continue;
138  }
139  _blacklist.add(meta);
140  }
141 
142  // create ECM reply
143  replyHandshake(bundle, bundles);
144 
145  // raise bundle event
146  sbt.job.complete();
147  continue;
148  }
149  }
150 
151  // check if bundle is already in the path
152  for (std::list<dtn::data::MetaBundle>::const_iterator iter = bundles.begin(); iter != bundles.end(); ++iter)
153  {
154  if ((*iter) == sbt.job.getBundle())
155  {
156  // send transfer aborted event
158  continue;
159  }
160  }
161 
162  ibrcommon::TemporaryFile filename(path, "bundle");
163 
164  try {
165  // read the bundle out of the storage
166  const dtn::data::Bundle bundle = storage.get(sbt.job.getBundle());
167 
168  std::fstream fs(filename.getPath().c_str(), std::fstream::out);
169 
170  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", info) << "write bundle " << sbt.job.getBundle().toString() << " to file " << filename.getPath() << IBRCOMMON_LOGGER_ENDL;
171 
173 
174  // serialize the bundle
175  s << bundle;
176 
177  // raise bundle event
178  sbt.job.complete();
179  } catch (const ibrcommon::Exception&) {
180  filename.remove();
181  throw;
182  }
183  } catch (const dtn::storage::NoBundleFoundException&) {
184  // send transfer aborted event
186  } catch (const ibrcommon::Exception&) {
187  // something went wrong - requeue transfer for later
188  }
189 
190  } catch (const std::bad_cast&) { }
191  break;
192  }
193  }
194  } catch (const std::exception &ex) {
195  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", error) << "error while processing file convergencelayer task: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
196  }
197  delete t;
198  }
199  } catch (const ibrcommon::QueueUnblockedException &ex) { };
200  }
201 
203  {
204  try {
205  const dtn::core::NodeEvent &node = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
206 
207  if (node.getAction() == dtn::core::NODE_AVAILABLE)
208  {
209  const dtn::core::Node &n = node.getNode();
211  {
212  _tasks.push(new Task(Task::TASK_LOAD, n));
213  }
214  }
215  } catch (const std::bad_cast&) { };
216 
217  try {
218  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
219 
221  {
222  ibrcommon::MutexLock l(_blacklist_mutex);
223  _blacklist.expire(time.getTimestamp());
224  }
225  } catch (const std::bad_cast&) { };
226  }
227 
228  const std::string FileConvergenceLayer::getName() const
229  {
230  return "FileConvergenceLayer";
231  }
232 
234  {
236  }
237 
239  {
240  }
241 
242  void FileConvergenceLayer::load(const dtn::core::Node &n)
243  {
244  std::list<dtn::data::MetaBundle> ret;
245  std::list<ibrcommon::File> files;
246 
247  // list all files in the folder
248  getPath(n).getFiles(files);
249 
250  // get a reference to the router
252 
253  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
254  {
255  const ibrcommon::File &f = (*iter);
256 
257  // skip system files
258  if (f.isSystem()) continue;
259 
260  try {
261  // open the file
262  std::fstream fs(f.getPath().c_str(), std::fstream::in);
263 
264  // get a deserializer
266 
267  dtn::data::MetaBundle bundle;
268 
269  // load meta data
270  d >> bundle;
271 
272  // check the bundle
273  if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
274  {
275  // invalid bundle!
276  throw dtn::data::Validator::RejectedException("destination or source EID is null");
277  }
278 
279  // ask if the bundle is already known
280  if ( router.isKnown(bundle) ) continue;
281  } catch (const std::exception&) {
282  // bundle could not be read
283  continue;
284  }
285 
286  try {
287  // open the file
288  std::fstream fs(f.getPath().c_str(), std::fstream::in);
289 
290  // get a deserializer
292 
293  dtn::data::Bundle bundle;
294 
295  // load meta data
296  d >> bundle;
297 
298  // raise default bundle received event
299  dtn::net::BundleReceivedEvent::raise(n.getEID(), bundle, false);
300  }
302  {
303  // display the rejection
304  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
305  }
306  catch (const dtn::InvalidDataException &ex) {
307  // display the rejection
308  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
309  }
310  }
311  }
312 
313  ibrcommon::File FileConvergenceLayer::getPath(const dtn::core::Node &n)
314  {
315  std::list<dtn::core::Node::URI> uris = n.get(dtn::core::Node::CONN_FILE);
316 
317  // abort the transfer, if no URI exists
318  if (uris.empty()) throw ibrcommon::Exception("path not defined");
319 
320  // get the URI of the file path
321  const std::string &uri = uris.front().value;
322 
323  if (uri.substr(0, 7) != "file://") throw ibrcommon::Exception("path invalid");
324 
325  return ibrcommon::File(uri.substr(7, uri.length() - 7));
326  }
327 
328  std::list<dtn::data::MetaBundle> FileConvergenceLayer::scan(const ibrcommon::File &path)
329  {
330  std::list<dtn::data::MetaBundle> ret;
331  std::list<ibrcommon::File> files;
332 
333  // list all files in the folder
334  path.getFiles(files);
335 
336  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
337  {
338  const ibrcommon::File &f = (*iter);
339 
340  // skip system files
341  if (f.isSystem()) continue;
342 
343  try {
344  // open the file
345  std::fstream fs(f.getPath().c_str(), std::fstream::in);
346 
347  // get a deserializer
349 
351 
352  // load meta data
353  d >> meta;
354 
355  if (meta.expiretime < dtn::utils::Clock::getTime())
356  {
358  throw ibrcommon::Exception("bundle is expired");
359  }
360 
361  // put the meta bundle in the list
362  ret.push_back(meta);
363  } catch (const std::exception&) {
364  IBRCOMMON_LOGGER_DEBUG_TAG("FileConvergenceLayer", 34) << "bundle in file " << f.getPath() << " invalid or expired" << IBRCOMMON_LOGGER_ENDL;
365 
366  // delete the file
367  ibrcommon::File(f).remove();
368  }
369  }
370 
371  return ret;
372  }
373 
375  {
376  _tasks.push(new StoreBundleTask(n, job));
377  }
378 
379  void FileConvergenceLayer::replyHandshake(const dtn::data::Bundle &bundle, std::list<dtn::data::MetaBundle> &bl)
380  {
381  // read the ecm
385 
386  // locked within this region
387  {
389  (*s) >> request;
390  }
391 
392  // if this is a request answer with an summary vector
394  {
395  // create a new request for the summary vector of the neighbor
397 
399  {
400  // add own summary vector to the message
402 
403  // add bundles in the path
404  for (std::list<dtn::data::MetaBundle>::const_iterator iter = bl.begin(); iter != bl.end(); ++iter)
405  {
406  vec.add(*iter);
407  }
408 
409  // add bundles from the blacklist
410  {
411  ibrcommon::MutexLock l(_blacklist_mutex);
412  for (std::set<dtn::data::MetaBundle>::const_iterator iter = _blacklist.begin(); iter != _blacklist.end(); ++iter)
413  {
414  vec.add(*iter);
415  }
416  }
417 
418  // create an item
420 
421  // add it to the handshake
422  response.addItem(item);
423  }
424 
425  // create a new bundle
426  dtn::data::Bundle answer;
427 
428  // set the source of the bundle
429  answer.source = bundle.destination;
430 
431  // set the destination of the bundle
433  answer.destination = bundle.source;
434 
435  // limit the lifetime to 60 seconds
436  answer.lifetime = 60;
437 
438  // set high priority
441 
444 
445  // serialize the request into the payload
446  {
448  (*ios) << response;
449  }
450 
451  // add a schl block
453  schl.setLimit(1);
454 
455  // raise default bundle received event
457  }
458  }
459  } /* namespace net */
460 } /* namespace dtn */