IBR-DTNSuite  0.12
ExtendedApiHandler.cpp
Go to the documentation of this file.
1 /*
2  * ExtendedApiHandler.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * Written-by: Stephen Roettger <roettger@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 #include "config.h"
24 #include "Configuration.h"
25 #include "api/ExtendedApiHandler.h"
26 #include "core/BundleEvent.h"
27 #include <ibrcommon/Logger.h>
30 #include <ibrdtn/data/AgeBlock.h>
31 #include <ibrdtn/utils/Utils.h>
34 #include "core/BundleCore.h"
35 #include <ibrdtn/utils/Random.h>
36 
37 #ifdef WITH_BUNDLE_SECURITY
39 #endif
40 
41 #include <algorithm>
42 
43 namespace dtn
44 {
45  namespace api
46  {
48  : ProtocolHandler(client, stream), _sender(new Sender(*this)),
49  _endpoint(_client.getRegistration().getDefaultEID()), _encoding(dtn::api::PlainSerializer::BASE64)
50  {
51  _client.getRegistration().subscribe(_endpoint);
52  }
53 
55  {
57  _sender->join();
58  delete _sender;
59  }
60 
62  return _stream.good();
63  }
64 
66  {
67  // close the stream
68  _stream.close();
69  }
70 
72  {
73  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 60) << "ExtendedApiConnection down" << IBRCOMMON_LOGGER_ENDL;
74 
76 
77  // close the stream
78  _stream.close();
79 
80  try {
81  // shutdown the sender thread
82  _sender->stop();
83  } catch (const std::exception&) { };
84  }
85 
87  {
88  _sender->start();
89 
90  std::string buffer;
91  _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO EXTENDED" << std::endl;
92 
93  while (_stream.good())
94  {
95  getline(_stream, buffer);
96 
97  std::string::reverse_iterator iter = buffer.rbegin();
98  if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
99 
100  std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
101  if (cmd.empty()) continue;
102 
103  try {
104  if (cmd[0] == "set")
105  {
106  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
107 
108  if (cmd[1] == "endpoint")
109  {
110  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
111 
112  ibrcommon::MutexLock l(_write_lock);
113  if (cmd[2].length() <= 0) {
114  // send error notification
115  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
116  } else {
118 
119  // un-subscribe previous registration
120  reg.unsubscribe(_endpoint);
121 
122  // set new application endpoint
123  _endpoint.setApplication(cmd[2]);
124 
125  // subscribe to new endpoint
126  reg.subscribe(_endpoint);
127 
128  // send accepted notification
129  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
130  }
131  }
132  else if (cmd[1] == "encoding")
133  {
134  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
135 
136  // parse encoding
138 
140  // set the new encoding as default
141  _encoding = enc;
142 
143  ibrcommon::MutexLock l(_write_lock);
144  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
145  } else {
146  ibrcommon::MutexLock l(_write_lock);
147  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENCODING" << std::endl;
148  }
149  }
150  else
151  {
152  ibrcommon::MutexLock l(_write_lock);
153  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
154  }
155  }
156  else if (cmd[0] == "endpoint")
157  {
158  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
159 
160  if (cmd[1] == "add")
161  {
162  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
163 
164  ibrcommon::MutexLock l(_write_lock);
165 
166  // error checking
167  if (cmd[2].length() <= 0)
168  {
169  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
170  }
171  else
172  {
174  new_endpoint.setApplication(cmd[2]);
175 
176  _client.getRegistration().subscribe(new_endpoint);
177  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
178  }
179  }
180  else if (cmd[1] == "del")
181  {
182  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
183 
184  ibrcommon::MutexLock l(_write_lock);
185 
186  // error checking
187  if (cmd[2].length() <= 0)
188  {
189  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
190  }
191  else
192  {
194  del_endpoint.setApplication( cmd[2] );
195 
196  _client.getRegistration().unsubscribe(del_endpoint);
197 
198  // restore default endpoint if the standard endpoint has been removed
199  if(_endpoint == del_endpoint)
200  {
201  _endpoint = _client.getRegistration().getDefaultEID();
202  _client.getRegistration().subscribe(_endpoint);
203  }
204 
205  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
206  }
207  }
208  else if (cmd[1] == "get")
209  {
210  ibrcommon::MutexLock l(_write_lock);
211  _stream << ClientHandler::API_STATUS_OK << " ENDPOINT GET " << _endpoint.getString() << std::endl;
212  }
213  else
214  {
215  ibrcommon::MutexLock l(_write_lock);
216  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
217  }
218  }
219  else if (cmd[0] == "registration")
220  {
221  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
222 
223  if (cmd[1] == "add")
224  {
225  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
226 
227  ibrcommon::MutexLock l(_write_lock);
228  dtn::data::EID endpoint(cmd[2]);
229 
230  // error checking
231  if (endpoint == dtn::data::EID())
232  {
233  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
234  }
235  else
236  {
237  _client.getRegistration().subscribe(endpoint);
238  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
239  }
240  }
241  else if (cmd[1] == "del")
242  {
243  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
244 
245  ibrcommon::MutexLock l(_write_lock);
246  dtn::data::EID endpoint(cmd[2]);
247 
248  // error checking
249  if (endpoint == dtn::data::EID())
250  {
251  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
252  }
253  else
254  {
255  _client.getRegistration().unsubscribe(endpoint);
256  if(_endpoint == endpoint)
257  {
258  _endpoint = _client.getRegistration().getDefaultEID();
259  _client.getRegistration().subscribe(_endpoint);
260  }
261 
262  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
263  }
264  }
265  else if (cmd[1] == "list")
266  {
267  ibrcommon::MutexLock l(_write_lock);
268  const std::set<dtn::data::EID> list = _client.getRegistration().getSubscriptions();
269 
270  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LIST" << std::endl;
271  for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
272  {
273  _stream << (*iter).getString() << std::endl;
274  }
275  _stream << std::endl;
276  }
277  else if (cmd[1] == "save")
278  {
279  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
280 
281  ibrcommon::Timer::time_t lifetime = 0;
282  std::stringstream ss(cmd[2]);
283 
284  ss >> lifetime;
285  if(ss.fail()) throw ibrcommon::Exception("malformed command");
286 
287  /* make the registration persistent for a given lifetime */
289 
290  ibrcommon::MutexLock l(_write_lock);
291  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION SAVE " << _client.getRegistration().getHandle() << std::endl;
292  }
293  else if (cmd[1] == "load")
294  {
295  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
296 
297  const std::string handle = cmd[2];
298 
299  try
300  {
302 
303  /* stop the sender */
305  _sender->join();
306 
307  /* switch the registration */
309 
310  /* and switch the sender */
311  Sender *old_sender = _sender;
312  try{
313  _sender = new Sender(*this);
314  }
315  catch (const std::bad_alloc &ex)
316  {
317  _sender = old_sender;
318  throw ex;
319  }
320  delete old_sender;
321  _sender->start();
322 
323  ibrcommon::MutexLock l(_write_lock);
324  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LOAD" << std::endl;
325  }
327  {
328  ibrcommon::MutexLock l(_write_lock);
329  _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION BUSY" << std::endl;
330  }
331  catch (const Registration::NotFoundException& ex)
332  {
333  ibrcommon::MutexLock l(_write_lock);
334  _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION NOT FOUND" << std::endl;
335  }
336  }
337  else
338  {
339  ibrcommon::MutexLock l(_write_lock);
340  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
341  }
342  }
343  else if (cmd[0] == "neighbor")
344  {
345  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
346 
347  if (cmd[1] == "list")
348  {
349  ibrcommon::MutexLock l(_write_lock);
350  const std::set<dtn::core::Node> nlist = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
351 
352  _stream << ClientHandler::API_STATUS_OK << " NEIGHBOR LIST" << std::endl;
353  for (std::set<dtn::core::Node>::const_iterator iter = nlist.begin(); iter != nlist.end(); ++iter)
354  {
355  _stream << (*iter).getEID().getString() << std::endl;
356  }
357  _stream << std::endl;
358  }
359  else
360  {
361  ibrcommon::MutexLock l(_write_lock);
362  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
363  }
364  }
365  else if (cmd[0] == "bundle")
366  {
367  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
368 
369  if (cmd[1] == "get")
370  {
371  // transfer bundle data
372  ibrcommon::MutexLock l(_write_lock);
373 
374  if (cmd.size() == 2)
375  {
376  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
377  PlainSerializer(_stream, _encoding) << _bundle_reg;
378  }
379  else if (cmd[2] == "binary")
380  {
381  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET BINARY "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
382  dtn::data::DefaultSerializer(_stream) << _bundle_reg; _stream << std::flush;
383  }
384  else if (cmd[2] == "plain")
385  {
386  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET PLAIN "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
387  PlainSerializer(_stream, _encoding) << _bundle_reg;
388  }
389  else if (cmd[2] == "xml")
390  {
391  _stream << ClientHandler::API_STATUS_NOT_IMPLEMENTED << " FORMAT NOT IMPLEMENTED" << std::endl;
392  }
393  else
394  {
395  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN FORMAT" << std::endl;
396  }
397  }
398  else if (cmd[1] == "put")
399  {
400  // lock the stream during reception of bundle data
401  ibrcommon::MutexLock l(_write_lock);
402 
403  if (cmd.size() < 3)
404  {
405  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
406  }
407  else if (cmd[2] == "plain")
408  {
409  _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE PLAIN" << std::endl;
410 
411  try {
412  PlainDeserializer(_stream) >> _bundle_reg;
413  _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
414  } catch (const std::exception &ex) {
415  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 20) << "put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
416  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
417 
418  }
419  }
420  else if (cmd[2] == "binary")
421  {
422  _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE BINARY" << std::endl;
423 
424  try {
425  dtn::data::DefaultDeserializer(_stream) >> _bundle_reg;
426  _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
427  } catch (const std::exception &ex) {
428  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 20) << "put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
429  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
430  }
431  }
432  else
433  {
434  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
435  }
436  }
437  else if (cmd[1] == "load")
438  {
439  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
440 
442 
443  if (cmd[2] == "queue")
444  {
445  id = _bundle_queue.getnpop();
446  }
447  else
448  {
449  // construct bundle id
450  id = readBundleID(cmd, 2);
451  }
452 
453  // load the bundle
454  try {
455  _bundle_reg = dtn::core::BundleCore::getInstance().getStorage().get(id);
456 
457  // process the bundle block (security, compression, ...)
459 
460  ibrcommon::MutexLock l(_write_lock);
461  _stream << ClientHandler::API_STATUS_OK << " BUNDLE LOADED "; sayBundleID(_stream, id); _stream << std::endl;
462  } catch (const ibrcommon::Exception&) {
463  ibrcommon::MutexLock l(_write_lock);
464  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
465  }
466  }
467  else if (cmd[1] == "clear")
468  {
469  _bundle_reg = dtn::data::Bundle();
470 
471  ibrcommon::MutexLock l(_write_lock);
472  _stream << ClientHandler::API_STATUS_OK << " BUNDLE CLEARED" << std::endl;
473  }
474  else if (cmd[1] == "free")
475  {
476  try {
478  _bundle_reg = dtn::data::Bundle();
479  ibrcommon::MutexLock l(_write_lock);
480  _stream << ClientHandler::API_STATUS_OK << " BUNDLE FREE SUCCESSFUL" << std::endl;
481  } catch (const ibrcommon::Exception&) {
482  ibrcommon::MutexLock l(_write_lock);
483  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
484  }
485  }
486  else if (cmd[1] == "delivered")
487  {
488  if (cmd.size() < 5) throw ibrcommon::Exception("not enough parameters");
489 
490  try {
491  // construct bundle id
492  dtn::data::BundleID id = readBundleID(cmd, 2);
493 
494  // announce this bundle as delivered
497 
498  ibrcommon::MutexLock l(_write_lock);
499  _stream << ClientHandler::API_STATUS_OK << " BUNDLE DELIVERED ACCEPTED" << std::endl;
500  } catch (const ibrcommon::Exception&) {
501  ibrcommon::MutexLock l(_write_lock);
502  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
503  }
504  }
505  else if (cmd[1] == "store")
506  {
507  // store the bundle in the storage
508  try {
510  ibrcommon::MutexLock l(_write_lock);
511  _stream << ClientHandler::API_STATUS_OK << " BUNDLE STORE SUCCESSFUL" << std::endl;
512  } catch (const ibrcommon::Exception&) {
513  ibrcommon::MutexLock l(_write_lock);
514  _stream << ClientHandler::API_STATUS_INTERNAL_ERROR << " BUNDLE STORE FAILED" << std::endl;
515  }
516  }
517  else if (cmd[1] == "send")
518  {
519  // forward the bundle to the storage processing
520  dtn::api::Registration::processIncomingBundle(_endpoint, _bundle_reg);
521 
522  ibrcommon::MutexLock l(_write_lock);
523  _stream << ClientHandler::API_STATUS_OK << " BUNDLE SENT" << std::endl;
524  }
525  else if (cmd[1] == "info")
526  {
527  // transfer bundle data
528  ibrcommon::MutexLock l(_write_lock);
529 
530  _stream << ClientHandler::API_STATUS_OK << " BUNDLE INFO "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
532  }
533  else if (cmd[1] == "block")
534  {
535  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
536 
537  if (cmd[2] == "add")
538  {
540 
541  /* parse an optional offset, where to insert the block */
542  if (cmd.size() > 3)
543  {
544  int offset;
545  istringstream ss(cmd[3]);
546 
547  ss >> offset;
548  if (ss.fail()) throw ibrcommon::Exception("malformed command");
549 
550  if (static_cast<dtn::data::Size>(offset) >= _bundle_reg.size())
551  {
553  }
554  else if(offset == 0)
555  {
557  }
558  else
559  {
560  inserter = dtn::data::BundleBuilder(_bundle_reg, dtn::data::BundleBuilder::MIDDLE, offset);
561  }
562  }
563 
564  ibrcommon::MutexLock l(_write_lock);
565  _stream << ClientHandler::API_STATUS_CONTINUE << " BUNDLE BLOCK ADD" << std::endl;
566 
567  try
568  {
571  {
572  block.set(dtn::data::Block::LAST_BLOCK, true);
573  }
574  else
575  {
576  block.set(dtn::data::Block::LAST_BLOCK, false);
577  }
578  _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK ADD SUCCESSFUL" << std::endl;
579  }
580  catch (const BundleBuilder::DiscardBlockException &ex) {
581  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK DISCARDED" << std::endl;
582  }
584  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK ADD FAILED" << std::endl;
585  }
586  }
587  else if (cmd[2] == "del")
588  {
589  if (cmd.size() < 4) throw ibrcommon::Exception("not enough parameters");
590 
591  int offset;
592  istringstream ss(cmd[3]);
593 
594  ss >> offset;
595  if (ss.fail()) throw ibrcommon::Exception("malformed command");
596 
597  dtn::data::Bundle::iterator it = _bundle_reg.begin();
598  std::advance(it, offset);
599  _bundle_reg.erase(it);
600 
601  ibrcommon::MutexLock l(_write_lock);
602  _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK DEL SUCCESSFUL" << std::endl;
603  }
604  else
605  {
606  ibrcommon::MutexLock l(_write_lock);
607  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
608  }
609  }
610  else
611  {
612  ibrcommon::MutexLock l(_write_lock);
613  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
614  }
615  }
616  else if (cmd[0] == "payload")
617  {
618  // check if there are more commands/parameters
619  if (cmd.size() < 1) throw ibrcommon::Exception("not enough parameters");
620 
621  // check if the command is valid
622  // [block-offset] get [[data-offset] [length]]
623 
624  dtn::data::Bundle::iterator block_it = _bundle_reg.begin();
625 
626  size_t cmd_index = 1;
627 
628  // check if a block offset is present
629  std::stringstream ss(cmd[1]);
630  size_t block_offset;
631  ss >> block_offset;
632 
633  if (!ss.fail()) {
634  // block offset present
635  // move forward to the selected block
636  std::advance(block_it, block_offset);
637 
638  // increment command index
639  ++cmd_index;
640  } else {
641  // search for the payload block
642  block_it = _bundle_reg.find(dtn::data::PayloadBlock::BLOCK_TYPE);
643  }
644 
645  // check if a valid block was selected
646  if (block_it == _bundle_reg.end()) {
647  throw ibrcommon::Exception("invalid offset or no payload block found");
648  }
649 
650  // get the selected block
651  dtn::data::Block &block = dynamic_cast<dtn::data::Block&>(**block_it);
652 
653  size_t cmd_remaining = cmd.size() - (cmd_index + 1);
654  if (cmd[cmd_index] == "get")
655  {
656  // lock the API stream
657  ibrcommon::MutexLock l(_write_lock);
658 
659  try {
660  dtn::api::PlainSerializer ps(_stream, _encoding);
661 
662  if (cmd_remaining > 0)
663  {
664  size_t payload_offset = 0;
665  size_t length = 0;
666 
667  /* read the payload offset */
668  ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
669 
670  if (cmd_remaining > 1)
671  {
672  ss.clear(); ss.str(cmd[cmd_index+2]); ss >> length;
673  }
674 
675  // abort here if the stream is no payload block
676  try {
677  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
678 
679  // open the payload BLOB
681  ibrcommon::BLOB::iostream stream = ref.iostream();
682 
683  if (static_cast<std::streamsize>(payload_offset) >= stream.size())
684  throw ibrcommon::Exception("offset out of range");
685 
686  size_t remaining = stream.size() - payload_offset;
687 
688  if ((length > 0) && (remaining > length)) {
689  remaining = length;
690  }
691 
692  // ignore all bytes leading the offset
693  (*stream).ignore(payload_offset);
694 
695  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
696 
697  // write the payload
698  ps.writeData((*stream), remaining);
699 
700  // final line break (mark the end)
701  _stream << std::endl;
702  } catch (const std::bad_cast&) {
703  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED INVALID BLOCK TYPE" << std::endl;
704  }
705  }
706  else
707  {
708  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
709 
710  // write the payload
711  ps.writeData(block);
712 
713  // final line break (mark the end)
714  _stream << std::endl;
715  }
716  } catch (const std::exception &ex) {
717  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED " << ex.what() << std::endl;
718  }
719  }
720  else if (cmd[cmd_index] == "put")
721  {
722  ibrcommon::MutexLock l(_write_lock);
723 
724  // abort there if the stream is no payload block
725  try {
726  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
727 
728  // write continue request to API
729  _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD PUT" << std::endl;
730 
731  size_t payload_offset = 0;
732  if (cmd_remaining > 0)
733  {
734  /* read the payload offset */
735  ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
736  }
737 
738  // open the payload BLOB
740  ibrcommon::BLOB::iostream stream = ref.iostream();
741 
742  // if the offset is valid
743  if (static_cast<std::streamsize>(payload_offset) < stream.size()) {
744  // move the streams put pointer to the given offset
745  (*stream).seekp(payload_offset, ios_base::beg);
746  } else if (payload_offset > 0) {
747  // move put-pointer to the end
748  (*stream).seekp(0, ios_base::end);
749  }
750 
751  /* write the new data into the blob */
753 
754  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD PUT SUCCESSFUL" << std::endl;
755  } catch (std::bad_cast&) {
756  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED INVALID BLOCK TYPE" << std::endl;
757  } catch (const std::exception&) {
758  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED" << std::endl;
759  }
760  }
761  else if (cmd[cmd_index] == "append")
762  {
763  ibrcommon::MutexLock l(_write_lock);
764 
765  // abort there if the stream is no payload block
766  try {
767  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
768 
769  // write continue request to API
770  _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD APPEND" << std::endl;
771 
772  // open the payload BLOB
774  ibrcommon::BLOB::iostream stream = ref.iostream();
775 
776  // move put-pointer to the end
777  (*stream).seekp(0, ios_base::end);
778 
779  /* write the new data into the blob */
781 
782  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD APPEND SUCCESSFUL" << std::endl;
783  } catch (std::bad_cast&) {
784  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED INVALID BLOCK TYPE" << std::endl;
785  } catch (const std::exception&) {
786  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED" << std::endl;
787  }
788  }
789  else if (cmd[cmd_index] == "clear")
790  {
791  ibrcommon::MutexLock l(_write_lock);
792  // abort there if the stream is no payload block
793  try {
794  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
795 
796  // open the payload BLOB
798  ibrcommon::BLOB::iostream stream = ref.iostream();
799 
800  // clear the payload
801  stream.clear();
802 
803  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD CLEAR SUCCESSFUL" << std::endl;
804  } catch (std::bad_cast&) {
805  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD CLEAR FAILED INVALID BLOCK TYPE" << std::endl;
806  }
807  }
808  else if (cmd[cmd_index] == "length")
809  {
810  ibrcommon::MutexLock l(_write_lock);
811  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD LENGTH" << std::endl;
812  _stream << "Length: " << block.getLength() << std::endl;
813  }
814  }
815  else if (cmd[0] == "nodename")
816  {
817  ibrcommon::MutexLock l(_write_lock);
818  _stream << ClientHandler::API_STATUS_OK << " NODENAME " << dtn::core::BundleCore::local.getString() << std::endl;
819  }
820  else
821  {
822  ibrcommon::MutexLock l(_write_lock);
823  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
824  }
825  } catch (const std::exception&) {
826  ibrcommon::MutexLock l(_write_lock);
827  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
828  }
829  }
830  }
831 
832  ExtendedApiHandler::Sender::Sender(ExtendedApiHandler &conn)
833  : _handler(conn)
834  {
835  }
836 
837  ExtendedApiHandler::Sender::~Sender()
838  {
840  }
841 
842  void ExtendedApiHandler::Sender::__cancellation() throw ()
843  {
844  // abort all blocking calls on the registration object
845  _handler._client.getRegistration().abort();
846  }
847 
848  void ExtendedApiHandler::Sender::finally() throw ()
849  {
850  }
851 
852  void ExtendedApiHandler::Sender::run() throw ()
853  {
854  Registration &reg = _handler._client.getRegistration();
855  try{
856  while(_handler.good()){
857  try{
858  dtn::data::MetaBundle id = reg.receiveMetaBundle();
859 
861  // transform custody signals & status reports into notifies
862  _handler.notifyAdministrativeRecord(id);
863 
864  // announce the delivery of this bundle
865  _handler._client.getRegistration().delivered(id);
866  } else {
867  // notify the client about the new bundle
868  _handler.notifyBundle(id);
869  }
870  } catch (const dtn::storage::NoBundleFoundException&) {
871  reg.wait_for_bundle();
872  }
873 
874  yield();
875  }
876  } catch (const ibrcommon::QueueUnblockedException &ex) {
877  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
878  return;
879  } catch (const ibrcommon::IOException &ex) {
880  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
881  } catch (const dtn::InvalidDataException &ex) {
882  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
883  } catch (const std::exception &ex) {
884  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
885  }
886 
887  try {
888  //FIXME
889 // _handler.stop();
890  } catch (const ibrcommon::ThreadException &ex) {
891  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 50) << ex.what() << IBRCOMMON_LOGGER_ENDL;
892  }
893  }
894 
895  void ExtendedApiHandler::notifyBundle(dtn::data::MetaBundle &bundle)
896  {
897  // put the bundle into the API queue
898  _bundle_queue.push(bundle);
899 
900  // lock the API channel
901  ibrcommon::MutexLock l(_write_lock);
902 
903  // write notification header to API channel
904  _stream << API_STATUS_NOTIFY_BUNDLE << " NOTIFY BUNDLE ";
905 
906  // format the bundle ID and write it to the stream
907  sayBundleID(_stream, bundle);
908 
909  // finalize statement with a line-break
910  _stream << std::endl;
911  }
912 
913  void ExtendedApiHandler::notifyAdministrativeRecord(dtn::data::MetaBundle &bundle)
914  {
915  // load the whole bundle
917 
918  // get the payload block of the bundle
920 
921  try {
922  // try to decode as status report
924  report.read(payload);
925 
926  // lock the API channel
927  ibrcommon::MutexLock l(_write_lock);
928 
929  // write notification header to API channel
930  _stream << API_STATUS_NOTIFY_REPORT << " NOTIFY REPORT ";
931 
932  // write sender EID
933  _stream << b.source.getString() << " ";
934 
935  // format the bundle ID and write it to the stream
936  _stream << report.bundleid.timestamp.toString() << "." << report.bundleid.sequencenumber.toString();
937 
938  if (report.bundleid.isFragment()) {
939  _stream << "." << report.bundleid.fragmentoffset.toString() << ":" << report.bundleid.getPayloadLength() << " ";
940  } else {
941  _stream << " ";
942  }
943 
944  // origin source
945  _stream << report.bundleid.source.getString() << " ";
946 
947  // reason code
948  _stream << (int)report.reasoncode << " ";
949 
951  _stream << "RECEIPT[" << report.timeof_receipt.getTimestamp().toString() << "."
952  << report.timeof_receipt.getNanoseconds().toString() << "] ";
953 
955  _stream << "CUSTODY-ACCEPTANCE[" << report.timeof_custodyaccept.getTimestamp().toString() << "."
956  << report.timeof_custodyaccept.getNanoseconds().toString() << "] ";
957 
959  _stream << "FORWARDING[" << report.timeof_forwarding.getTimestamp().toString() << "."
960  << report.timeof_forwarding.getNanoseconds().toString() << "] ";
961 
963  _stream << "DELIVERY[" << report.timeof_delivery.getTimestamp().toString() << "."
964  << report.timeof_delivery.getNanoseconds().toString() << "] ";
965 
967  _stream << "DELETION[" << report.timeof_deletion.getTimestamp().toString() << "."
968  << report.timeof_deletion.getNanoseconds().toString() << "] ";
969 
970  // finalize statement with a line-break
971  _stream << std::endl;
972 
973  return;
975  // this is not a status report
976  }
977 
978  try {
979  // try to decode as custody signal
981  custody.read(payload);
982 
983  // lock the API channel
984  ibrcommon::MutexLock l(_write_lock);
985 
986  // write notification header to API channel
987  _stream << API_STATUS_NOTIFY_CUSTODY << " NOTIFY CUSTODY ";
988 
989  // write sender EID
990  _stream << b.source.getString() << " ";
991 
992  // format the bundle ID and write it to the stream
993  _stream << custody.bundleid.timestamp.toString() << "." << custody.bundleid.sequencenumber.toString();
994 
995  if (custody.bundleid.isFragment()) {
996  _stream << "." << custody.bundleid.fragmentoffset.toString() << ":" << custody.bundleid.getPayloadLength() << " ";
997  } else {
998  _stream << " ";
999  }
1000 
1001  // origin source
1002  _stream << custody.bundleid.source.getString() << " ";
1003 
1004  if (custody.custody_accepted) {
1005  _stream << "ACCEPTED ";
1006  } else {
1007  _stream << "REJECTED(" << (int)custody.reason << ") ";
1008  }
1009 
1010  // add time of signal to the message
1011  _stream << custody.timeofsignal.getTimestamp().toString() << "." << custody.timeofsignal.getNanoseconds().toString();
1012 
1013  // finalize statement with a line-break
1014  _stream << std::endl;
1015 
1016  return;
1018  // this is not a custody report
1019  }
1020  }
1021 
1022  void ExtendedApiHandler::sayBundleID(ostream &stream, const dtn::data::BundleID &id)
1023  {
1024  stream << id.timestamp.toString() << " " << id.sequencenumber.toString() << " ";
1025 
1026  if (id.isFragment())
1027  {
1028  stream << id.fragmentoffset.toString() << " ";
1029  stream << id.getPayloadLength() << " ";
1030  }
1031 
1032  stream << id.source.getString();
1033  }
1034 
1035  dtn::data::BundleID ExtendedApiHandler::readBundleID(const std::vector<std::string> &data, const size_t start)
1036  {
1037  // load bundle id
1038  std::stringstream ss;
1040 
1041  if ((data.size() - start) < 3)
1042  {
1043  throw ibrcommon::Exception("not enough parameters");
1044  }
1045 
1046  // read timestamp
1047  ss.clear(); ss.str(data[start]);
1048  id.timestamp.read(ss);
1049 
1050  if(ss.fail())
1051  {
1052  throw ibrcommon::Exception("malformed parameters");
1053  }
1054 
1055  // read sequence number
1056  ss.clear(); ss.str(data[start+1]);
1057  id.sequencenumber.read(ss);
1058 
1059  if(ss.fail())
1060  {
1061  throw ibrcommon::Exception("malformed parameters");
1062  }
1063 
1064  // read fragment offset
1065  if ((data.size() - start) > 3)
1066  {
1067  id.setFragment(true);
1068 
1069  // read sequence number
1070  ss.clear(); ss.str(data[start+2]);
1071  id.fragmentoffset.read(ss);
1072 
1073  // read sequence number
1074  ss.clear(); ss.str(data[start+3]);
1075  dtn::data::Length len = 0;
1076  ss >> len;
1077  id.setPayloadLength(len);
1078 
1079  if(ss.fail())
1080  {
1081  throw ibrcommon::Exception("malformed parameters");
1082  }
1083  }
1084 
1085  // read EID
1086  id.source = dtn::data::EID(data[data.size() - 1]);
1087 
1088  // return bundle id
1089  return id;
1090  }
1091  }
1092 }