IBR-DTNSuite  0.12
dtnoutbox.cpp
Go to the documentation of this file.
1 /*
2  * dtnoutbox.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * David Goltzsche <goltzsch@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 #include "config.h"
24 #include <ibrdtn/api/Client.h>
25 #include <ibrcommon/net/socket.h>
26 #include <ibrcommon/thread/Mutex.h>
30 #include <ibrcommon/data/BLOB.h>
31 #include <ibrcommon/data/File.h>
32 #include <ibrcommon/appstreambuf.h>
33 
34 #include "io/TarUtils.h"
35 #include "io/ObservedFile.h"
36 
37 #ifdef HAVE_LIBTFFS
38 #include "io/FatImageReader.h"
39 #endif
40 
41 #include <stdlib.h>
42 #include <iostream>
43 #include <map>
44 #include <vector>
45 #include <sys/types.h>
46 #include <unistd.h>
47 #include <regex.h>
48 #include <getopt.h>
49 
50 typedef std::list<io::ObservedFile> filelist;
51 typedef std::set<io::ObservedFile> fileset;
52 typedef std::set<io::FileHash> hashlist;
53 
54 // set this variable to false to stop the app
55 bool _running = true;
56 
57 //global wait conditional
59 bool _wait_abort = false;
60 
61 class config {
62 public:
63  config()
64  : interval(5000), rounds(3), path("/"), regex_str("^\\."),
65  bundle_group(false), invert(false), quiet(false), fat(false), enabled(true)
66  {}
67 
68  //global conf values
69  string name;
70  string outbox;
71  string destination;
72 
73  //optional paramters
74  std::string workdir;
75  std::size_t interval;
76  std::size_t rounds;
77  std::string path;
78  std::string regex_str;
79  regex_t regex;
80 
81  int bundle_group;
82  int invert;
83  int quiet;
84  int fat;
85  int enabled;
86 };
87 typedef struct config config_t;
88 
89 struct option long_options[] =
90 {
91  {"destination", required_argument, 0, 'd'},
92  {"help", no_argument, 0, 'h'},
93  {"group", no_argument, 0, 'g'},
94  {"workdir", required_argument, 0, 'w'},
95  {"interval", required_argument, 0, 'i'},
96  {"rounds", required_argument, 0, 'r'},
97  {"path", required_argument, 0, 'p'},
98  {"regex", required_argument, 0, 'R'},
99  {"quiet", no_argument, 0, 'q'},
100  {0, 0, 0, 0}
101 };
103 {
104  std::cout << "-- dtnoutbox (IBR-DTN) --" << std::endl;
105  std::cout << "Syntax: dtnoutbox [options] <name> <outbox> <destination>" << std::endl;
106  std::cout << " <name> The application name" << std::endl;
107 #ifdef HAVE_LIBTFFS
108  std::cout << " <outbox> Location of outgoing files, directory or vfat-image" << std::endl;
109 #else
110  std::cout << " <outbox> Directory of outgoing files" << std::endl;
111 #endif
112  std::cout << " <destination> The destination EID for all outgoing files" << std::endl << std::endl;
113  std::cout << "* optional parameters *" << std::endl;
114  std::cout << " -h|--help Display this text" << std::endl;
115  std::cout << " -g|--group Receiver is a destination group" << std::endl;
116  std::cout << " -w|--workdir <dir>" << std::endl;
117  std::cout << " Temporary work directory" << std::endl;
118  std::cout << " -i|--interval <milliseconds>" << std::endl;
119  std::cout << " Interval in milliseconds, in which <outbox> is scanned" << std::endl;
120  std::cout << " for new/changed files. default: 5000" << std::endl;
121  std::cout << " -r|--rounds <n> Number of rounds of intervals, after which a unchanged" << std::endl;
122  std::cout << " file is considered as written. default: 3" << std::endl;
123 #ifdef HAVE_LIBTFFS
124  std::cout << " -p|--path <path> Path of outbox within vfat-image. default: /" << std::endl;
125 #endif
126  std::cout << " -R|--regex <regex>" << std::endl;
127  std::cout << " All files in <outbox> matching this regular expression" << std::endl;
128  std::cout << " will be ignored. default: ^\\." << std::endl;
129  std::cout << " -I|--invert Invert the regular expression defined with -R"<< std::endl;
130  std::cout << " -q|--quiet Only print error messages" << std::endl;
131 
132  _running = false; //stop this app, after printing help
133 
134 }
135 
136 void read_configuration(int argc, char** argv, config_t &conf)
137 {
138  // print help if not enough parameters are set
139  if (argc < 3)
140  {
141  print_help();
142  exit(EXIT_FAILURE);
143  }
144 
145  while(1)
146  {
147  /* getopt_long stores the option index here. */
148  int option_index = 0;
149  int c = getopt_long (argc, argv, "hw:i:r:p:R:qIg",
150  long_options, &option_index);
151  /* Detect the end of the options. */
152  if (c == -1)
153  break;
154 
155  switch (c)
156  {
157  case 0:
158  /* If this option set a flag, do nothing else now. */
159  if (long_options[option_index].flag != 0)
160  break;
161  printf ("option %s", long_options[option_index].name);
162  if (optarg)
163  printf (" with arg %s", optarg);
164  printf ("\n");
165  break;
166 
167  case 'h':
168  print_help();
169  exit(EXIT_SUCCESS);
170  break;
171  case 'g':
172  conf.bundle_group = true;
173  break;
174  case 'w':
175  conf.workdir = std::string(optarg);
176  break;
177  case 'i':
178  conf.interval = atoi(optarg);
179  break;
180  case 'r':
181  conf.rounds = atoi(optarg);
182  break;
183  case 'p':
184  conf.path = std::string(optarg);
185  break;
186  case 'R':
187  conf.regex_str = std::string(optarg);
188  break;
189  case 'q':
190  conf.quiet = true;
191  break;
192  case 'I':
193  conf.invert = true;
194  break;
195  case '?':
196  break;
197  default:
198  abort();
199  break;
200  }
201  }
202 
203  conf.name = std::string(argv[optind]);
204  conf.destination = std::string(argv[optind+2]);
205  conf.outbox = std::string(argv[optind+1]);
206 
207  //compile regex, if set
208  if (conf.regex_str.length() > 0 && regcomp(&conf.regex,conf.regex_str.c_str(),0))
209  {
210  std::cout << "ERROR: invalid regex: " << optarg << std::endl;
211  exit(-1);
212  }
213 
214 
215  //check outbox path for trailing slash
216  if (conf.outbox.at(conf.outbox.length()-1) == '/')
217  conf.outbox = conf.outbox.substr(0,conf.outbox.length() -1);
218 }
219 
220 void sighandler_func(int signal)
221 {
222  switch (signal)
223  {
224  case SIGTERM:
225  case SIGINT:
226  {
227  //stop waiting and stop running, on SIGINT or SIGTERM
228  ibrcommon::MutexLock l(_wait_cond);
229  _running = false;
230  _wait_cond.signal(true);
231  break;
232  }
233 #ifndef __WIN32__
234  case SIGUSR1:
235  {
236  //stop waiting on SIGUSR1 -> "quickscan"
237  ibrcommon::MutexLock l(_wait_cond);
238  _wait_abort = true;
239  _wait_cond.signal(true);
240  break;
241  }
242 #endif
243  default:
244  break;
245  }
246 }
247 
248 /*
249  * main application method
250  */
251 int main( int argc, char** argv )
252 {
253  // catch process signals
255  sighandler.handle(SIGINT);
256  sighandler.handle(SIGTERM);
257 #ifndef __WIN32__
258  sighandler.handle(SIGUSR1);
259 #endif
260 
261  // configration object
262  config_t conf;
263 
264  // read the configuration
265  read_configuration(argc, argv, conf);
266 
267  // initialize sighandler after possible exit call
268  sighandler.initialize();
269 
270  // init working directory
271  if (conf.workdir.length() > 0)
272  {
273  ibrcommon::File blob_path(conf.workdir);
274 
275  if (blob_path.exists())
276  {
278  }
279  }
280 
281  // backoff for reconnect
282  unsigned int backoff = 2;
283 
284  ibrcommon::File outbox_file(conf.outbox);
285 
286  // create new file lists
287  fileset new_files, prev_files, deleted_files, files_to_send;
288  filelist observed_files;
289  hashlist sent_hashes;
290 
291  // observed root file
293 
294 #ifdef HAVE_LIBTFFS
295  io::FatImageReader *imagereader = NULL;
296 #endif
297 
298  if (outbox_file.exists() && !outbox_file.isDirectory())
299  {
300 #ifdef HAVE_LIBTFFS
301  conf.fat = true;
302  imagereader = new io::FatImageReader(conf.outbox);
303  const io::FATFile fat_root(*imagereader, conf.path);
304  root = io::ObservedFile(fat_root);
305 #else
306  std::cout << "ERROR: image-file provided, but this tool has been compiled without libtffs support!" << std::endl;
307  return -1;
308 #endif
309  }
310  else
311  {
312  if (!outbox_file.exists()) {
314  }
315  root = io::ObservedFile(outbox_file);
316  }
317 
318  if (!conf.quiet) std::cout << "-- dtnoutbox --" << std::endl;
319 
320  // loop, if no stop if requested
321  while (_running)
322  {
323  try
324  {
325  // Create a stream to the server using TCP.
326  ibrcommon::vaddress addr("localhost", 4550);
328 
329  // Initiate a client for synchronous receiving
330  dtn::api::Client client(conf.name, conn, dtn::api::Client::MODE_SENDONLY);
331 
332  // Connect to the server. Actually, this function initiate the
333  // stream protocol by starting the thread and sending the contact header.
334  client.connect();
335 
336  // reset backoff if connected
337  backoff = 2;
338 
339  // check the connection
340  while (_running)
341  {
342  // get all files
343  fileset current_files;
344  root.findFiles(current_files);
345 
346  // determine deleted files
347  fileset deleted_files;
348  std::set_difference(prev_files.begin(), prev_files.end(), current_files.begin(), current_files.end(), std::inserter(deleted_files, deleted_files.begin()));
349 
350  // remove deleted files from observation
351  for (fileset::const_iterator iter = deleted_files.begin(); iter != deleted_files.end(); ++iter)
352  {
353  const io::ObservedFile &deletedFile = (*iter);
354 
355  // remove references in the sent_hashes
356  for (hashlist::iterator hash_it = sent_hashes.begin(); hash_it != sent_hashes.end(); /* blank */) {
357  if ((*hash_it).getPath() == deletedFile.getFile().getPath()) {
358  sent_hashes.erase(hash_it++);
359  } else {
360  ++hash_it;
361  }
362  }
363 
364  // remove from observed files
365  observed_files.remove(deletedFile);
366 
367  // output
368  if (!conf.quiet) std::cout << "file removed: " << deletedFile.getFile().getBasename() << std::endl;
369  }
370 
371  // determine new files
372  fileset new_files;
373  std::set_difference(current_files.begin(), current_files.end(), prev_files.begin(), prev_files.end(), std::inserter(new_files, new_files.begin()));
374 
375  // add new files to observation
376  for (fileset::const_iterator iter = new_files.begin(); iter != new_files.end(); ++iter)
377  {
378  const io::ObservedFile &of = (*iter);
379 
380  int reg_ret = regexec(&conf.regex, of.getFile().getBasename().c_str(), 0, NULL, 0);
381  if (!reg_ret && !conf.invert)
382  continue;
383  if (reg_ret && conf.invert)
384  continue;
385 
386  // print error message, if regex error occurs
387  if (reg_ret && reg_ret != REG_NOMATCH)
388  {
389  char msgbuf[100];
390  regerror(reg_ret,&conf.regex,msgbuf,sizeof(msgbuf));
391  std::cerr << "ERROR: regex match failed : " << std::string(msgbuf) << std::endl;
392  }
393 
394  // add new file to the observed set
395  observed_files.push_back(of);
396 
397  // log output
398  if (!conf.quiet) std::cout << "file found: " << of.getFile().getBasename() << std::endl;
399  }
400 
401  // store current files for the next round
402  prev_files.clear();
403  prev_files.insert(current_files.begin(), current_files.end());
404 
405  // find files to send, create std::list
406  files_to_send.clear();
407 
408  for (filelist::iterator iter = observed_files.begin(); iter != observed_files.end(); ++iter)
409  {
410  io::ObservedFile &of = (*iter);
411 
412  // tick and update all files
413  of.update();
414 
415  if (of.getStableCounter() > conf.rounds)
416  {
417  if (sent_hashes.find(of.getHash()) == sent_hashes.end())
418  {
419  sent_hashes.insert(of.getHash());
420  files_to_send.insert(*iter);
421  }
422  }
423  }
424 
425  if (!files_to_send.empty())
426  {
427  if (!conf.quiet)
428  {
429  std::cout << "send files: ";
430  for (fileset::const_iterator it = files_to_send.begin(); it != files_to_send.end(); ++it) {
431  std::cout << (*it).getFile().getBasename() << " ";
432  }
433  std::cout << std::endl;
434  }
435 
436  try {
437  // create a blob
439 
440  // write files into BLOB while it is locked
441  {
442  ibrcommon::BLOB::iostream stream = blob.iostream();
443  io::TarUtils::write(*stream, root, files_to_send);
444  }
445 
446  // create a new bundle
447  dtn::data::EID destination = EID(conf.destination);
448 
449  // create a new bundle
451 
452  // set destination
453  b.destination = destination;
454 
455  // add payload block using the blob
456  b.push_back(blob);
457 
458  // set destination address to non-singleton, if configured
459  if (conf.bundle_group)
461 
462  // send the bundle
463  client << b;
464  client.flush();
465  } catch (const ibrcommon::IOException &e) {
466  std::cerr << "send failed: " << e.what() << std::endl;
467  }
468  }
469 
470  // wait defined seconds
471  ibrcommon::MutexLock l(_wait_cond);
472  while (!_wait_abort && _running) {
473  _wait_cond.wait(conf.interval);
474  }
475  _wait_abort = false;
476  }
477 
478  // clean up regex
479  regfree(&conf.regex);
480 
481  // close the client connection
482  client.close();
483 
484  // close the connection
485  conn.close();
486 
487  }
488  catch (const ibrcommon::socket_exception&)
489  {
490  if (_running)
491  {
492  std::cout << "Connection to bundle daemon failed. Retry in " << backoff << " seconds." << std::endl;
493  ibrcommon::Thread::sleep(backoff * 1000);
494 
495  // if backoff < 10 minutes
496  if (backoff < 600)
497  {
498  // set a new backoff
499  backoff = backoff * 2;
500  }
501  }
502  }
503  catch (const ibrcommon::IOException&)
504  {
505  if (_running)
506  {
507  std::cout << "Connection to bundle daemon failed. Retry in " << backoff << " seconds." << std::endl;
508  ibrcommon::Thread::sleep(backoff * 1000);
509 
510  // if backoff < 10 minutes
511  if (backoff < 600)
512  {
513  // set a new backoff
514  backoff = backoff * 2;
515  }
516  }
517  }
518  catch (const std::exception&) { };
519  }
520 
521  // clear observed files
522  observed_files.clear();
523 
524 #ifdef HAVE_LIBTFFS
525  // clean-up
526  if (imagereader != NULL) delete imagereader;
527 #endif
528 
529  return (EXIT_SUCCESS);
530 }