IBR-DTNSuite  0.10
FileConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * FileConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
26 #include "core/EventDispatcher.h"
27 #include "core/BundleEvent.h"
28 #include "core/BundleCore.h"
29 #include "core/NodeEvent.h"
30 #include "core/TimeEvent.h"
31 #include "routing/BaseRouter.h"
32 #include "routing/NodeHandshake.h"
33 #include <ibrdtn/data/BundleSet.h>
35 #include <ibrdtn/utils/Clock.h>
36 #include <ibrcommon/data/File.h>
37 #include <ibrcommon/Logger.h>
39 
40 namespace dtn
41 {
42  namespace net
43  {
44  FileConvergenceLayer::Task::Task(FileConvergenceLayer::Task::Action a, const dtn::core::Node &n)
45  : action(a), node(n)
46  {
47  }
48 
49  FileConvergenceLayer::Task::~Task()
50  {
51  }
52 
53  FileConvergenceLayer::StoreBundleTask::StoreBundleTask(const dtn::core::Node &n, const dtn::net::BundleTransfer &j)
54  : FileConvergenceLayer::Task(TASK_STORE, n), job(j)
55  {
56  }
57 
58  FileConvergenceLayer::StoreBundleTask::~StoreBundleTask()
59  {
60  }
61 
63  {
64  }
65 
67  {
68  }
69 
71  {
72  // routine checked for throw() on 15.02.2013
75  }
76 
78  {
79  // routine checked for throw() on 15.02.2013
82  }
83 
85  {
86  _tasks.abort();
87  }
88 
90  {
91  try {
92  while (true)
93  {
94  Task *t = _tasks.getnpop(true);
95 
96  try {
97  switch (t->action)
98  {
99  case Task::TASK_LOAD:
100  {
101  // load bundles (receive)
102  load(t->node);
103  break;
104  }
105 
106  case Task::TASK_STORE:
107  {
108  try {
109  StoreBundleTask &sbt = dynamic_cast<StoreBundleTask&>(*t);
111 
112  // get the file path of the node
113  ibrcommon::File path = getPath(sbt.node);
114 
115  // scan for bundles
116  std::list<dtn::data::MetaBundle> bundles = scan(path);
117 
118  try {
119  // check if bundle is a routing bundle
120  if (sbt.job.getBundle().source == dtn::core::BundleCore::local.add("/routing"))
121  {
122  // read the bundle out of the storage
123  const dtn::data::Bundle bundle = storage.get(sbt.job.getBundle());
124 
125  if (bundle.destination == sbt.node.getEID().add("/routing"))
126  {
127  // add this bundle to the blacklist
128  {
129  ibrcommon::MutexLock l(_blacklist_mutex);
130  if (_blacklist.find(bundle) != _blacklist.end())
131  {
132  // send transfer aborted event
134  continue;
135  }
136  _blacklist.add(bundle);
137  }
138 
139  // create ECM reply
140  replyHandshake(bundle, bundles);
141 
142  // raise bundle event
143  sbt.job.complete();
144  continue;
145  }
146  }
147 
148  // check if bundle is already in the path
149  for (std::list<dtn::data::MetaBundle>::const_iterator iter = bundles.begin(); iter != bundles.end(); ++iter)
150  {
151  if ((*iter) == sbt.job.getBundle())
152  {
153  // send transfer aborted event
155  continue;
156  }
157  }
158 
159  ibrcommon::TemporaryFile filename(path, "bundle");
160 
161  try {
162  // read the bundle out of the storage
163  const dtn::data::Bundle bundle = storage.get(sbt.job.getBundle());
164 
165  std::fstream fs(filename.getPath().c_str(), std::fstream::out);
166 
167  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", info) << "write bundle " << sbt.job.getBundle().toString() << " to file " << filename.getPath() << IBRCOMMON_LOGGER_ENDL;
168 
170 
171  // serialize the bundle
172  s << bundle;
173 
174  // raise bundle event
175  sbt.job.complete();
176  } catch (const ibrcommon::Exception&) {
177  filename.remove();
178  throw;
179  }
180  } catch (const dtn::storage::NoBundleFoundException&) {
181  // send transfer aborted event
183  } catch (const ibrcommon::Exception&) {
184  // something went wrong - requeue transfer for later
185  }
186 
187  } catch (const std::bad_cast&) { }
188  break;
189  }
190  }
191  } catch (const std::exception &ex) {
192  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", error) << "error while processing file convergencelayer task: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
193  }
194  delete t;
195  }
196  } catch (const ibrcommon::QueueUnblockedException &ex) { };
197  }
198 
200  {
201  try {
202  const dtn::core::NodeEvent &node = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
203 
204  if (node.getAction() == dtn::core::NODE_AVAILABLE)
205  {
206  const dtn::core::Node &n = node.getNode();
208  {
209  _tasks.push(new Task(Task::TASK_LOAD, n));
210  }
211  }
212  } catch (const std::bad_cast&) { };
213 
214  try {
215  const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
216 
218  {
219  ibrcommon::MutexLock l(_blacklist_mutex);
220  _blacklist.expire(time.getTimestamp());
221  }
222  } catch (const std::bad_cast&) { };
223  }
224 
225  const std::string FileConvergenceLayer::getName() const
226  {
227  return "FileConvergenceLayer";
228  }
229 
231  {
233  }
234 
236  {
237  }
238 
239  void FileConvergenceLayer::load(const dtn::core::Node &n)
240  {
241  std::list<dtn::data::MetaBundle> ret;
242  std::list<ibrcommon::File> files;
243 
244  // list all files in the folder
245  getPath(n).getFiles(files);
246 
247  // get a reference to the router
249 
250  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
251  {
252  const ibrcommon::File &f = (*iter);
253 
254  // skip system files
255  if (f.isSystem()) continue;
256 
257  try {
258  // open the file
259  std::fstream fs(f.getPath().c_str(), std::fstream::in);
260 
261  // get a deserializer
263 
264  dtn::data::MetaBundle bundle;
265 
266  // load meta data
267  d >> bundle;
268 
269  // check the bundle
270  if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
271  {
272  // invalid bundle!
273  throw dtn::data::Validator::RejectedException("destination or source EID is null");
274  }
275 
276  // ask if the bundle is already known
277  if ( router.isKnown(bundle) ) continue;
278  } catch (const std::exception&) {
279  // bundle could not be read
280  continue;
281  }
282 
283  try {
284  // open the file
285  std::fstream fs(f.getPath().c_str(), std::fstream::in);
286 
287  // get a deserializer
289 
290  dtn::data::Bundle bundle;
291 
292  // load meta data
293  d >> bundle;
294 
295  // raise default bundle received event
296  dtn::net::BundleReceivedEvent::raise(n.getEID(), bundle, false);
297  }
299  {
300  // display the rejection
301  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
302  }
303  catch (const dtn::InvalidDataException &ex) {
304  // display the rejection
305  IBRCOMMON_LOGGER_TAG("FileConvergenceLayer", warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
306  }
307  }
308  }
309 
310  ibrcommon::File FileConvergenceLayer::getPath(const dtn::core::Node &n)
311  {
312  std::list<dtn::core::Node::URI> uris = n.get(dtn::core::Node::CONN_FILE);
313 
314  // abort the transfer, if no URI exists
315  if (uris.empty()) throw ibrcommon::Exception("path not defined");
316 
317  // get the URI of the file path
318  const std::string &uri = uris.front().value;
319 
320  if (uri.substr(0, 7) != "file://") throw ibrcommon::Exception("path invalid");
321 
322  return ibrcommon::File(uri.substr(7, uri.length() - 7));
323  }
324 
325  std::list<dtn::data::MetaBundle> FileConvergenceLayer::scan(const ibrcommon::File &path)
326  {
327  std::list<dtn::data::MetaBundle> ret;
328  std::list<ibrcommon::File> files;
329 
330  // list all files in the folder
331  path.getFiles(files);
332 
333  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
334  {
335  const ibrcommon::File &f = (*iter);
336 
337  // skip system files
338  if (f.isSystem()) continue;
339 
340  try {
341  // open the file
342  std::fstream fs(f.getPath().c_str(), std::fstream::in);
343 
344  // get a deserializer
346 
348 
349  // load meta data
350  d >> meta;
351 
352  if (meta.expiretime < dtn::utils::Clock::getTime())
353  {
355  throw ibrcommon::Exception("bundle is expired");
356  }
357 
358  // put the meta bundle in the list
359  ret.push_back(meta);
360  } catch (const std::exception&) {
361  IBRCOMMON_LOGGER_DEBUG_TAG("FileConvergenceLayer", 34) << "bundle in file " << f.getPath() << " invalid or expired" << IBRCOMMON_LOGGER_ENDL;
362 
363  // delete the file
364  ibrcommon::File(f).remove();
365  }
366  }
367 
368  return ret;
369  }
370 
372  {
373  _tasks.push(new StoreBundleTask(n, job));
374  }
375 
376  void FileConvergenceLayer::replyHandshake(const dtn::data::Bundle &bundle, std::list<dtn::data::MetaBundle> &bl)
377  {
378  // read the ecm
382 
383  // locked within this region
384  {
386  (*s) >> request;
387  }
388 
389  // if this is a request answer with an summary vector
391  {
392  // create a new request for the summary vector of the neighbor
394 
396  {
397  // add own summary vector to the message
399 
400  // add bundles in the path
401  for (std::list<dtn::data::MetaBundle>::const_iterator iter = bl.begin(); iter != bl.end(); ++iter)
402  {
403  vec.add(*iter);
404  }
405 
406  // add bundles from the blacklist
407  {
408  ibrcommon::MutexLock l(_blacklist_mutex);
409  for (std::set<dtn::data::MetaBundle>::const_iterator iter = _blacklist.begin(); iter != _blacklist.end(); ++iter)
410  {
411  vec.add(*iter);
412  }
413  }
414 
415  // create an item
417 
418  // add it to the handshake
419  response.addItem(item);
420  }
421 
422  // create a new bundle
423  dtn::data::Bundle answer;
424 
425  // set the source of the bundle
426  answer.source = bundle.destination;
427 
428  // set the destination of the bundle
430  answer.destination = bundle.source;
431 
432  // limit the lifetime to 60 seconds
433  answer.lifetime = 60;
434 
435  // set high priority
438 
441 
442  // serialize the request into the payload
443  {
445  (*ios) << response;
446  }
447 
448  // add a schl block
450  schl.setLimit(1);
451 
452  // raise default bundle received event
454  }
455  }
456  } /* namespace net */
457 } /* namespace dtn */