IBR-DTNSuite  0.10
ExtendedApiHandler.cpp
Go to the documentation of this file.
1 /*
2  * ExtendedApiHandler.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * Written-by: Stephen Roettger <roettger@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 #include "config.h"
24 #include "Configuration.h"
25 #include "api/ExtendedApiHandler.h"
26 #include "core/BundleEvent.h"
27 #include <ibrcommon/Logger.h>
30 #include <ibrdtn/data/AgeBlock.h>
31 #include <ibrdtn/utils/Utils.h>
34 #include "core/BundleCore.h"
35 #include <ibrdtn/utils/Random.h>
36 
37 #ifdef WITH_COMPRESSION
39 #endif
40 
41 #ifdef WITH_BUNDLE_SECURITY
43 #endif
44 
45 #include <algorithm>
46 
47 namespace dtn
48 {
49  namespace api
50  {
52  : ProtocolHandler(client, stream), _sender(new Sender(*this)),
53  _endpoint(_client.getRegistration().getDefaultEID()), _encoding(dtn::api::PlainSerializer::BASE64)
54  {
55  _client.getRegistration().subscribe(_endpoint);
56  }
57 
59  {
61  _sender->join();
62  delete _sender;
63  }
64 
66  return _stream.good();
67  }
68 
70  {
71  // close the stream
72  _stream.close();
73  }
74 
76  {
77  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 60) << "ExtendedApiConnection down" << IBRCOMMON_LOGGER_ENDL;
78 
80 
81  // close the stream
82  _stream.close();
83 
84  try {
85  // shutdown the sender thread
86  _sender->stop();
87  } catch (const std::exception&) { };
88  }
89 
91  {
92  _sender->start();
93 
94  std::string buffer;
95  _stream << ClientHandler::API_STATUS_OK << " SWITCHED TO EXTENDED" << std::endl;
96 
97  while (_stream.good())
98  {
99  getline(_stream, buffer);
100 
101  std::string::reverse_iterator iter = buffer.rbegin();
102  if ( (*iter) == '\r' ) buffer = buffer.substr(0, buffer.length() - 1);
103 
104  std::vector<std::string> cmd = dtn::utils::Utils::tokenize(" ", buffer);
105  if (cmd.empty()) continue;
106 
107  try {
108  if (cmd[0] == "set")
109  {
110  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
111 
112  if (cmd[1] == "endpoint")
113  {
114  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
115 
116  ibrcommon::MutexLock l(_write_lock);
117  dtn::data::EID new_endpoint = dtn::core::BundleCore::local.add(dtn::core::BundleCore::local.getDelimiter() + cmd[2]);
118 
119  // error checking
120  if (new_endpoint == dtn::data::EID())
121  {
122  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
123  }
124  else
125  {
126  /* unsubscribe from the old endpoint and subscribe to the new one */
128  reg.unsubscribe(_endpoint);
129  reg.subscribe(new_endpoint);
130  _endpoint = new_endpoint;
131  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
132  }
133  }
134  else if (cmd[1] == "encoding")
135  {
136  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
137 
138  // parse encoding
140 
142  // set the new encoding as default
143  _encoding = enc;
144 
145  ibrcommon::MutexLock l(_write_lock);
146  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
147  } else {
148  ibrcommon::MutexLock l(_write_lock);
149  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENCODING" << std::endl;
150  }
151  }
152  else
153  {
154  ibrcommon::MutexLock l(_write_lock);
155  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
156  }
157  }
158  else if (cmd[0] == "endpoint")
159  {
160  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
161 
162  if (cmd[1] == "add")
163  {
164  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
165 
166  ibrcommon::MutexLock l(_write_lock);
167  dtn::data::EID new_endpoint = dtn::core::BundleCore::local.add(BundleCore::local.getDelimiter() + cmd[2]);
168 
169  // error checking
170  if (new_endpoint == dtn::data::EID())
171  {
172  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
173  }
174  else
175  {
176  _client.getRegistration().subscribe(new_endpoint);
177  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
178  }
179  }
180  else if (cmd[1] == "del")
181  {
182  dtn::data::EID del_endpoint = _endpoint;
183  if (cmd.size() >= 3)
184  {
185  del_endpoint = dtn::core::BundleCore::local.add( BundleCore::local.getDelimiter() + cmd[2] );
186  }
187 
188  ibrcommon::MutexLock l(_write_lock);
189 
190  // error checking
191  if (del_endpoint == dtn::data::EID())
192  {
193  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID ENDPOINT" << std::endl;
194  }
195  else
196  {
197  _client.getRegistration().unsubscribe(del_endpoint);
198 
199  if(_endpoint == del_endpoint)
200  {
201  _endpoint = _client.getRegistration().getDefaultEID();
202  _client.getRegistration().subscribe(_endpoint);
203  }
204 
205  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
206  }
207  }
208  else if (cmd[1] == "get")
209  {
210  ibrcommon::MutexLock l(_write_lock);
211  _stream << ClientHandler::API_STATUS_OK << " ENDPOINT GET " << _endpoint.getString() << std::endl;
212  }
213  else
214  {
215  ibrcommon::MutexLock l(_write_lock);
216  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
217  }
218  }
219  else if (cmd[0] == "registration")
220  {
221  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
222 
223  if (cmd[1] == "add")
224  {
225  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
226 
227  ibrcommon::MutexLock l(_write_lock);
228  dtn::data::EID endpoint(cmd[2]);
229 
230  // error checking
231  if (endpoint == dtn::data::EID())
232  {
233  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
234  }
235  else
236  {
237  _client.getRegistration().subscribe(endpoint);
238  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
239  }
240  }
241  else if (cmd[1] == "del")
242  {
243  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
244 
245  ibrcommon::MutexLock l(_write_lock);
246  dtn::data::EID endpoint(cmd[2]);
247 
248  // error checking
249  if (endpoint == dtn::data::EID())
250  {
251  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " INVALID EID" << std::endl;
252  }
253  else
254  {
255  _client.getRegistration().unsubscribe(endpoint);
256  if(_endpoint == endpoint)
257  {
258  _endpoint = _client.getRegistration().getDefaultEID();
259  _client.getRegistration().subscribe(_endpoint);
260  }
261 
262  _stream << ClientHandler::API_STATUS_OK << " OK" << std::endl;
263  }
264  }
265  else if (cmd[1] == "list")
266  {
267  ibrcommon::MutexLock l(_write_lock);
268  const std::set<dtn::data::EID> list = _client.getRegistration().getSubscriptions();
269 
270  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LIST" << std::endl;
271  for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); ++iter)
272  {
273  _stream << (*iter).getString() << std::endl;
274  }
275  _stream << std::endl;
276  }
277  else if (cmd[1] == "save")
278  {
279  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
280 
281  ibrcommon::Timer::time_t lifetime = 0;
282  std::stringstream ss(cmd[2]);
283 
284  ss >> lifetime;
285  if(ss.fail()) throw ibrcommon::Exception("malformed command");
286 
287  /* make the registration persistent for a given lifetime */
289 
290  ibrcommon::MutexLock l(_write_lock);
291  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION SAVE " << _client.getRegistration().getHandle() << std::endl;
292  }
293  else if (cmd[1] == "load")
294  {
295  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
296 
297  const std::string handle = cmd[2];
298 
299  try
300  {
302 
303  /* stop the sender */
305  _sender->join();
306 
307  /* switch the registration */
309 
310  /* and switch the sender */
311  Sender *old_sender = _sender;
312  try{
313  _sender = new Sender(*this);
314  }
315  catch (const std::bad_alloc &ex)
316  {
317  _sender = old_sender;
318  throw ex;
319  }
320  delete old_sender;
321  _sender->start();
322 
323  ibrcommon::MutexLock l(_write_lock);
324  _stream << ClientHandler::API_STATUS_OK << " REGISTRATION LOAD" << std::endl;
325  }
327  {
328  ibrcommon::MutexLock l(_write_lock);
329  _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION BUSY" << std::endl;
330  }
331  catch (const Registration::NotFoundException& ex)
332  {
333  ibrcommon::MutexLock l(_write_lock);
334  _stream << ClientHandler::API_STATUS_SERVICE_UNAVAILABLE << " REGISTRATION NOT FOUND" << std::endl;
335  }
336  }
337  else
338  {
339  ibrcommon::MutexLock l(_write_lock);
340  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
341  }
342  }
343  else if (cmd[0] == "neighbor")
344  {
345  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
346 
347  if (cmd[1] == "list")
348  {
349  ibrcommon::MutexLock l(_write_lock);
350  const std::set<dtn::core::Node> nlist = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
351 
352  _stream << ClientHandler::API_STATUS_OK << " NEIGHBOR LIST" << std::endl;
353  for (std::set<dtn::core::Node>::const_iterator iter = nlist.begin(); iter != nlist.end(); ++iter)
354  {
355  _stream << (*iter).getEID().getString() << std::endl;
356  }
357  _stream << std::endl;
358  }
359  else
360  {
361  ibrcommon::MutexLock l(_write_lock);
362  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
363  }
364  }
365  else if (cmd[0] == "bundle")
366  {
367  if (cmd.size() < 2) throw ibrcommon::Exception("not enough parameters");
368 
369  if (cmd[1] == "get")
370  {
371  // transfer bundle data
372  ibrcommon::MutexLock l(_write_lock);
373 
374  if (cmd.size() == 2)
375  {
376  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
377  PlainSerializer(_stream, _encoding) << _bundle_reg;
378  }
379  else if (cmd[2] == "binary")
380  {
381  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET BINARY "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
382  dtn::data::DefaultSerializer(_stream) << _bundle_reg; _stream << std::flush;
383  }
384  else if (cmd[2] == "plain")
385  {
386  _stream << ClientHandler::API_STATUS_OK << " BUNDLE GET PLAIN "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
387  PlainSerializer(_stream, _encoding) << _bundle_reg;
388  }
389  else if (cmd[2] == "xml")
390  {
391  _stream << ClientHandler::API_STATUS_NOT_IMPLEMENTED << " FORMAT NOT IMPLEMENTED" << std::endl;
392  }
393  else
394  {
395  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN FORMAT" << std::endl;
396  }
397  }
398  else if (cmd[1] == "put")
399  {
400  // lock the stream during reception of bundle data
401  ibrcommon::MutexLock l(_write_lock);
402 
403  if (cmd.size() < 3)
404  {
405  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
406  }
407  else if (cmd[2] == "plain")
408  {
409  _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE PLAIN" << std::endl;
410 
411  try {
412  PlainDeserializer(_stream) >> _bundle_reg;
413  _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
414  } catch (const std::exception &ex) {
415  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 20) << "put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
416  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
417 
418  }
419  }
420  else if (cmd[2] == "binary")
421  {
422  _stream << ClientHandler::API_STATUS_CONTINUE << " PUT BUNDLE BINARY" << std::endl;
423 
424  try {
425  dtn::data::DefaultDeserializer(_stream) >> _bundle_reg;
426  _stream << ClientHandler::API_STATUS_OK << " BUNDLE IN REGISTER" << std::endl;
427  } catch (const std::exception &ex) {
428  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 20) << "put failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
429  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PUT FAILED" << std::endl;
430  }
431  }
432  else
433  {
434  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " PLEASE DEFINE THE FORMAT" << std::endl;
435  }
436  }
437  else if (cmd[1] == "load")
438  {
439  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
440 
442 
443  if (cmd[2] == "queue")
444  {
445  id = _bundle_queue.getnpop();
446  }
447  else
448  {
449  // construct bundle id
450  id = readBundleID(cmd, 2);
451  }
452 
453  // load the bundle
454  try {
455  _bundle_reg = dtn::core::BundleCore::getInstance().getStorage().get(id);
456 
457  // process the bundle block (security, compression, ...)
459 
460  ibrcommon::MutexLock l(_write_lock);
461  _stream << ClientHandler::API_STATUS_OK << " BUNDLE LOADED "; sayBundleID(_stream, id); _stream << std::endl;
462  } catch (const ibrcommon::Exception&) {
463  ibrcommon::MutexLock l(_write_lock);
464  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
465  }
466  }
467  else if (cmd[1] == "clear")
468  {
469  _bundle_reg = dtn::data::Bundle();
470 
471  ibrcommon::MutexLock l(_write_lock);
472  _stream << ClientHandler::API_STATUS_OK << " BUNDLE CLEARED" << std::endl;
473  }
474  else if (cmd[1] == "free")
475  {
476  try {
478  _bundle_reg = dtn::data::Bundle();
479  ibrcommon::MutexLock l(_write_lock);
480  _stream << ClientHandler::API_STATUS_OK << " BUNDLE FREE SUCCESSFUL" << std::endl;
481  } catch (const ibrcommon::Exception&) {
482  ibrcommon::MutexLock l(_write_lock);
483  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
484  }
485  }
486  else if (cmd[1] == "delivered")
487  {
488  if (cmd.size() < 5) throw ibrcommon::Exception("not enough parameters");
489 
490  try {
491  // construct bundle id
492  dtn::data::BundleID id = readBundleID(cmd, 2);
493 
494  // announce this bundle as delivered
497 
498  ibrcommon::MutexLock l(_write_lock);
499  _stream << ClientHandler::API_STATUS_OK << " BUNDLE DELIVERED ACCEPTED" << std::endl;
500  } catch (const ibrcommon::Exception&) {
501  ibrcommon::MutexLock l(_write_lock);
502  _stream << ClientHandler::API_STATUS_NOT_FOUND << " BUNDLE NOT FOUND" << std::endl;
503  }
504  }
505  else if (cmd[1] == "store")
506  {
507  // store the bundle in the storage
508  try {
510  ibrcommon::MutexLock l(_write_lock);
511  _stream << ClientHandler::API_STATUS_OK << " BUNDLE STORE SUCCESSFUL" << std::endl;
512  } catch (const ibrcommon::Exception&) {
513  ibrcommon::MutexLock l(_write_lock);
514  _stream << ClientHandler::API_STATUS_INTERNAL_ERROR << " BUNDLE STORE FAILED" << std::endl;
515  }
516  }
517  else if (cmd[1] == "send")
518  {
519  // create a new sequence number
520  _bundle_reg.relabel();
521 
522  // forward the bundle to the storage processing
523  dtn::api::Registration::processIncomingBundle(_endpoint, _bundle_reg);
524 
525  ibrcommon::MutexLock l(_write_lock);
526  _stream << ClientHandler::API_STATUS_OK << " BUNDLE SENT" << std::endl;
527  }
528  else if (cmd[1] == "info")
529  {
530  // transfer bundle data
531  ibrcommon::MutexLock l(_write_lock);
532 
533  _stream << ClientHandler::API_STATUS_OK << " BUNDLE INFO "; sayBundleID(_stream, _bundle_reg); _stream << std::endl;
535  }
536  else if (cmd[1] == "block")
537  {
538  if (cmd.size() < 3) throw ibrcommon::Exception("not enough parameters");
539 
540  if (cmd[2] == "add")
541  {
543 
544  /* parse an optional offset, where to insert the block */
545  if (cmd.size() > 3)
546  {
547  int offset;
548  istringstream ss(cmd[3]);
549 
550  ss >> offset;
551  if (ss.fail()) throw ibrcommon::Exception("malformed command");
552 
553  if (static_cast<dtn::data::Size>(offset) >= _bundle_reg.size())
554  {
556  }
557  else if(offset == 0)
558  {
560  }
561  else
562  {
563  inserter = dtn::data::BundleBuilder(_bundle_reg, dtn::data::BundleBuilder::MIDDLE, offset);
564  }
565  }
566 
567  ibrcommon::MutexLock l(_write_lock);
568  _stream << ClientHandler::API_STATUS_CONTINUE << " BUNDLE BLOCK ADD" << std::endl;
569 
570  try
571  {
574  {
575  block.set(dtn::data::Block::LAST_BLOCK, true);
576  }
577  else
578  {
579  block.set(dtn::data::Block::LAST_BLOCK, false);
580  }
581  _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK ADD SUCCESSFUL" << std::endl;
582  }
583  catch (const BundleBuilder::DiscardBlockException &ex) {
584  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK DISCARDED" << std::endl;
585  }
587  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " BUNDLE BLOCK ADD FAILED" << std::endl;
588  }
589  }
590  else if (cmd[2] == "del")
591  {
592  if (cmd.size() < 4) throw ibrcommon::Exception("not enough parameters");
593 
594  int offset;
595  istringstream ss(cmd[3]);
596 
597  ss >> offset;
598  if (ss.fail()) throw ibrcommon::Exception("malformed command");
599 
600  dtn::data::Bundle::iterator it = _bundle_reg.begin();
601  std::advance(it, offset);
602  _bundle_reg.erase(it);
603 
604  ibrcommon::MutexLock l(_write_lock);
605  _stream << ClientHandler::API_STATUS_OK << " BUNDLE BLOCK DEL SUCCESSFUL" << std::endl;
606  }
607  else
608  {
609  ibrcommon::MutexLock l(_write_lock);
610  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
611  }
612  }
613  else
614  {
615  ibrcommon::MutexLock l(_write_lock);
616  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
617  }
618  }
619  else if (cmd[0] == "payload")
620  {
621  // check if there are more commands/parameters
622  if (cmd.size() < 1) throw ibrcommon::Exception("not enough parameters");
623 
624  // check if the command is valid
625  // [block-offset] get [[data-offset] [length]]
626 
627  dtn::data::Bundle::iterator block_it = _bundle_reg.begin();
628 
629  size_t cmd_index = 1;
630 
631  // check if a block offset is present
632  std::stringstream ss(cmd[1]);
633  size_t block_offset;
634  ss >> block_offset;
635 
636  if (!ss.fail()) {
637  // block offset present
638  // move forward to the selected block
639  std::advance(block_it, block_offset);
640 
641  // increment command index
642  ++cmd_index;
643  } else {
644  // search for the payload block
645  block_it = _bundle_reg.find(dtn::data::PayloadBlock::BLOCK_TYPE);
646  }
647 
648  // check if a valid block was selected
649  if (block_it == _bundle_reg.end()) {
650  throw ibrcommon::Exception("invalid offset or no payload block found");
651  }
652 
653  // get the selected block
654  dtn::data::Block &block = dynamic_cast<dtn::data::Block&>(**block_it);
655 
656  size_t cmd_remaining = cmd.size() - (cmd_index + 1);
657  if (cmd[cmd_index] == "get")
658  {
659  // lock the API stream
660  ibrcommon::MutexLock l(_write_lock);
661 
662  try {
663  dtn::api::PlainSerializer ps(_stream, _encoding);
664 
665  if (cmd_remaining > 0)
666  {
667  size_t payload_offset = 0;
668  size_t length = 0;
669 
670  /* read the payload offset */
671  ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
672 
673  if (cmd_remaining > 1)
674  {
675  ss.clear(); ss.str(cmd[cmd_index+2]); ss >> length;
676  }
677 
678  // abort here if the stream is no payload block
679  try {
680  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
681 
682  // open the payload BLOB
684  ibrcommon::BLOB::iostream stream = ref.iostream();
685 
686  if (static_cast<std::streamsize>(payload_offset) >= stream.size())
687  throw ibrcommon::Exception("offset out of range");
688 
689  size_t remaining = stream.size() - payload_offset;
690 
691  if ((length > 0) && (remaining > length)) {
692  remaining = length;
693  }
694 
695  // ignore all bytes leading the offset
696  (*stream).ignore(payload_offset);
697 
698  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
699 
700  // write the payload
701  ps.writeData((*stream), remaining);
702 
703  // final line break (mark the end)
704  _stream << std::endl;
705  } catch (const std::bad_cast&) {
706  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED INVALID BLOCK TYPE" << std::endl;
707  }
708  }
709  else
710  {
711  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD GET" << std::endl;
712 
713  // write the payload
714  ps.writeData(block);
715 
716  // final line break (mark the end)
717  _stream << std::endl;
718  }
719  } catch (const std::exception &ex) {
720  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD GET FAILED " << ex.what() << std::endl;
721  }
722  }
723  else if (cmd[cmd_index] == "put")
724  {
725  ibrcommon::MutexLock l(_write_lock);
726 
727  // abort there if the stream is no payload block
728  try {
729  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
730 
731  // write continue request to API
732  _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD PUT" << std::endl;
733 
734  size_t payload_offset = 0;
735  if (cmd_remaining > 0)
736  {
737  /* read the payload offset */
738  ss.clear(); ss.str(cmd[cmd_index+1]); ss >> payload_offset;
739  }
740 
741  // open the payload BLOB
743  ibrcommon::BLOB::iostream stream = ref.iostream();
744 
745  // if the offset is valid
746  if (static_cast<std::streamsize>(payload_offset) < stream.size()) {
747  // move the streams put pointer to the given offset
748  (*stream).seekp(payload_offset, ios_base::beg);
749  } else if (payload_offset > 0) {
750  // move put-pointer to the end
751  (*stream).seekp(0, ios_base::end);
752  }
753 
754  /* write the new data into the blob */
756 
757  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD PUT SUCCESSFUL" << std::endl;
758  } catch (std::bad_cast&) {
759  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED INVALID BLOCK TYPE" << std::endl;
760  } catch (const std::exception&) {
761  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD PUT FAILED" << std::endl;
762  }
763  }
764  else if (cmd[cmd_index] == "append")
765  {
766  ibrcommon::MutexLock l(_write_lock);
767 
768  // abort there if the stream is no payload block
769  try {
770  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
771 
772  // write continue request to API
773  _stream << ClientHandler::API_STATUS_CONTINUE << " PAYLOAD APPEND" << std::endl;
774 
775  // open the payload BLOB
777  ibrcommon::BLOB::iostream stream = ref.iostream();
778 
779  // move put-pointer to the end
780  (*stream).seekp(0, ios_base::end);
781 
782  /* write the new data into the blob */
784 
785  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD APPEND SUCCESSFUL" << std::endl;
786  } catch (std::bad_cast&) {
787  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED INVALID BLOCK TYPE" << std::endl;
788  } catch (const std::exception&) {
789  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD APPEND FAILED" << std::endl;
790  }
791  }
792  else if (cmd[cmd_index] == "clear")
793  {
794  ibrcommon::MutexLock l(_write_lock);
795  // abort there if the stream is no payload block
796  try {
797  dtn::data::PayloadBlock &pb = dynamic_cast<dtn::data::PayloadBlock&>(block);
798 
799  // open the payload BLOB
801  ibrcommon::BLOB::iostream stream = ref.iostream();
802 
803  // clear the payload
804  stream.clear();
805 
806  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD CLEAR SUCCESSFUL" << std::endl;
807  } catch (std::bad_cast&) {
808  _stream << ClientHandler::API_STATUS_NOT_ACCEPTABLE << " PAYLOAD CLEAR FAILED INVALID BLOCK TYPE" << std::endl;
809  }
810  }
811  else if (cmd[cmd_index] == "length")
812  {
813  ibrcommon::MutexLock l(_write_lock);
814  _stream << ClientHandler::API_STATUS_OK << " PAYLOAD LENGTH" << std::endl;
815  _stream << "Length: " << block.getLength() << std::endl;
816  }
817  }
818  else if (cmd[0] == "nodename")
819  {
820  ibrcommon::MutexLock l(_write_lock);
821  _stream << ClientHandler::API_STATUS_OK << " NODENAME " << dtn::core::BundleCore::local.getString() << std::endl;
822  }
823  else
824  {
825  ibrcommon::MutexLock l(_write_lock);
826  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " UNKNOWN COMMAND" << std::endl;
827  }
828  } catch (const std::exception&) {
829  ibrcommon::MutexLock l(_write_lock);
830  _stream << ClientHandler::API_STATUS_BAD_REQUEST << " ERROR" << std::endl;
831  }
832  }
833  }
834 
835  ExtendedApiHandler::Sender::Sender(ExtendedApiHandler &conn)
836  : _handler(conn)
837  {
838  }
839 
840  ExtendedApiHandler::Sender::~Sender()
841  {
843  }
844 
845  void ExtendedApiHandler::Sender::__cancellation() throw ()
846  {
847  // abort all blocking calls on the registration object
848  _handler._client.getRegistration().abort();
849  }
850 
851  void ExtendedApiHandler::Sender::finally() throw ()
852  {
853  }
854 
855  void ExtendedApiHandler::Sender::run() throw ()
856  {
857  Registration &reg = _handler._client.getRegistration();
858  try{
859  while(_handler.good()){
860  try{
861  dtn::data::MetaBundle id = reg.receiveMetaBundle();
862 
864  // transform custody signals & status reports into notifies
865  _handler.notifyAdministrativeRecord(id);
866 
867  // announce the delivery of this bundle
868  _handler._client.getRegistration().delivered(id);
869  } else {
870  // notify the client about the new bundle
871  _handler.notifyBundle(id);
872  }
873  } catch (const dtn::storage::NoBundleFoundException&) {
874  reg.wait_for_bundle();
875  }
876 
877  yield();
878  }
879  } catch (const ibrcommon::QueueUnblockedException &ex) {
880  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 40) << ex.what() << IBRCOMMON_LOGGER_ENDL;
881  return;
882  } catch (const ibrcommon::IOException &ex) {
883  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
884  } catch (const dtn::InvalidDataException &ex) {
885  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
886  } catch (const std::exception &ex) {
887  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 10) << ex.what() << IBRCOMMON_LOGGER_ENDL;
888  }
889 
890  try {
891  //FIXME
892 // _handler.stop();
893  } catch (const ibrcommon::ThreadException &ex) {
894  IBRCOMMON_LOGGER_DEBUG_TAG("ExtendedApiHandler", 50) << ex.what() << IBRCOMMON_LOGGER_ENDL;
895  }
896  }
897 
898  void ExtendedApiHandler::notifyBundle(dtn::data::MetaBundle &bundle)
899  {
900  // put the bundle into the API queue
901  _bundle_queue.push(bundle);
902 
903  // lock the API channel
904  ibrcommon::MutexLock l(_write_lock);
905 
906  // write notification header to API channel
907  _stream << API_STATUS_NOTIFY_BUNDLE << " NOTIFY BUNDLE ";
908 
909  // format the bundle ID and write it to the stream
910  sayBundleID(_stream, bundle);
911 
912  // finalize statement with a line-break
913  _stream << std::endl;
914  }
915 
916  void ExtendedApiHandler::notifyAdministrativeRecord(dtn::data::MetaBundle &bundle)
917  {
918  // load the whole bundle
920 
921  // get the payload block of the bundle
923 
924  try {
925  // try to decode as status report
927  report.read(payload);
928 
929  // lock the API channel
930  ibrcommon::MutexLock l(_write_lock);
931 
932  // write notification header to API channel
933  _stream << API_STATUS_NOTIFY_REPORT << " NOTIFY REPORT ";
934 
935  // write sender EID
936  _stream << b.source.getString() << " ";
937 
938  // format the bundle ID and write it to the stream
939  _stream << report.bundleid.timestamp.toString() << "." << report.bundleid.sequencenumber.toString();
940 
941  if (report.refsFragment()) {
942  _stream << "." << report.bundleid.offset.toString() << ":" << report.fragment_length.toString() << " ";
943  } else {
944  _stream << " ";
945  }
946 
947  // origin source
948  _stream << report.bundleid.source.getString() << " ";
949 
950  // reason code
951  _stream << (int)report.reasoncode << " ";
952 
954  _stream << "RECEIPT[" << report.timeof_receipt.getTimestamp().toString() << "."
955  << report.timeof_receipt.getNanoseconds().toString() << "] ";
956 
958  _stream << "CUSTODY-ACCEPTANCE[" << report.timeof_custodyaccept.getTimestamp().toString() << "."
959  << report.timeof_custodyaccept.getNanoseconds().toString() << "] ";
960 
962  _stream << "FORWARDING[" << report.timeof_forwarding.getTimestamp().toString() << "."
963  << report.timeof_forwarding.getNanoseconds().toString() << "] ";
964 
966  _stream << "DELIVERY[" << report.timeof_delivery.getTimestamp().toString() << "."
967  << report.timeof_delivery.getNanoseconds().toString() << "] ";
968 
970  _stream << "DELETION[" << report.timeof_deletion.getTimestamp().toString() << "."
971  << report.timeof_deletion.getNanoseconds().toString() << "] ";
972 
973  // finalize statement with a line-break
974  _stream << std::endl;
975 
976  return;
978  // this is not a status report
979  }
980 
981  try {
982  // try to decode as custody signal
984  custody.read(payload);
985 
986  // lock the API channel
987  ibrcommon::MutexLock l(_write_lock);
988 
989  // write notification header to API channel
990  _stream << API_STATUS_NOTIFY_CUSTODY << " NOTIFY CUSTODY ";
991 
992  // write sender EID
993  _stream << b.source.getString() << " ";
994 
995  // format the bundle ID and write it to the stream
996  _stream << custody.bundleid.timestamp.toString() << "." << custody.bundleid.sequencenumber.toString();
997 
998  if (custody.refsFragment()) {
999  _stream << "." << custody.bundleid.offset.toString() << ":" << custody.fragment_length.toString() << " ";
1000  } else {
1001  _stream << " ";
1002  }
1003 
1004  // origin source
1005  _stream << custody.bundleid.source.getString() << " ";
1006 
1007  if (custody.custody_accepted) {
1008  _stream << "ACCEPTED ";
1009  } else {
1010  _stream << "REJECTED(" << (int)custody.reason << ") ";
1011  }
1012 
1013  // add time of signal to the message
1014  _stream << custody.timeofsignal.getTimestamp().toString() << "." << custody.timeofsignal.getNanoseconds().toString();
1015 
1016  // finalize statement with a line-break
1017  _stream << std::endl;
1018 
1019  return;
1021  // this is not a custody report
1022  }
1023  }
1024 
1025  void ExtendedApiHandler::sayBundleID(ostream &stream, const dtn::data::BundleID &id)
1026  {
1027  stream << id.timestamp.toString() << " " << id.sequencenumber.toString() << " ";
1028 
1029  if (id.fragment)
1030  {
1031  stream << id.offset.toString() << " ";
1032  }
1033 
1034  stream << id.source.getString();
1035  }
1036 
1037  dtn::data::BundleID ExtendedApiHandler::readBundleID(const std::vector<std::string> &data, const size_t start)
1038  {
1039  // load bundle id
1040  std::stringstream ss;
1041  dtn::data::Timestamp timestamp = 0;
1042  dtn::data::Number sequencenumber = 0;
1043  bool fragment = false;
1044  dtn::data::Number offset = 0;
1045 
1046  if ((data.size() - start) < 3)
1047  {
1048  throw ibrcommon::Exception("not enough parameters");
1049  }
1050 
1051  // read timestamp
1052  ss.clear(); ss.str(data[start]);
1053  timestamp.read(ss);
1054 
1055  if(ss.fail())
1056  {
1057  throw ibrcommon::Exception("malformed parameters");
1058  }
1059 
1060  // read sequence number
1061  ss.clear(); ss.str(data[start+1]);
1062  sequencenumber.read(ss);
1063 
1064  if(ss.fail())
1065  {
1066  throw ibrcommon::Exception("malformed parameters");
1067  }
1068 
1069  // read fragment offset
1070  if ((data.size() - start) > 3)
1071  {
1072  fragment = true;
1073 
1074  // read sequence number
1075  ss.clear(); ss.str(data[start+2]);
1076  offset.read(ss);
1077 
1078  if(ss.fail())
1079  {
1080  throw ibrcommon::Exception("malformed parameters");
1081  }
1082  }
1083 
1084  // read EID
1085  ss.clear(); dtn::data::EID eid(data[data.size() - 1]);
1086 
1087  // construct bundle id
1088  return dtn::data::BundleID(eid, timestamp, sequencenumber, fragment, offset);
1089  }
1090  }
1091 }