IBR-DTNSuite  0.10
NativeSession.cpp
Go to the documentation of this file.
1 /*
2  * NativeSession.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "api/NativeSession.h"
23 #include "api/NativeSerializer.h"
24 #include "core/BundleCore.h"
25 #include "core/EventDispatcher.h"
27 
29 #include <ibrcommon/data/BLOB.h>
30 #include <ibrcommon/Logger.h>
33 
34 namespace dtn
35 {
36  namespace api
37  {
39  {
40  }
41 
42  const std::string NativeSession::TAG = "NativeSession";
43 
45  : _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
46  {
47  // set the local endpoint to the default
48  _endpoint = _registration.getDefaultEID();
49 
50  // listen to QueueBundleEvents
52 
54  }
55 
57  {
58  // invalidate the callback pointer
59  {
61  _session_cb = NULL;
62  _serializer_cb = NULL;
63  }
64 
66  }
67 
68  void NativeSession::destroy() throw ()
69  {
70  // un-listen from QueueBundleEvents
72 
73  _registration.abort();
74  }
75 
76  const dtn::data::EID& NativeSession::getNodeEID() const throw ()
77  {
79  }
80 
81  void NativeSession::fireNotificationBundle(const dtn::data::BundleID &id) throw ()
82  {
84  if (_session_cb == NULL) return;
85  _session_cb->notifyBundle(id);
86  }
87 
88  void NativeSession::fireNotificationStatusReport(const dtn::data::EID &source, const dtn::data::StatusReportBlock &report) throw ()
89  {
91  if (_session_cb == NULL) return;
92  _session_cb->notifyStatusReport(source, report);
93  }
94 
95  void NativeSession::fireNotificationCustodySignal(const dtn::data::EID &source, const dtn::data::CustodySignalBlock &custody) throw ()
96  {
98  if (_session_cb == NULL) return;
99  _session_cb->notifyCustodySignal(source, custody);
100  }
101 
102  void NativeSession::setEndpoint(const std::string &suffix) throw (NativeSessionException)
103  {
104  const dtn::data::EID new_endpoint = dtn::core::BundleCore::local.add( dtn::core::BundleCore::local.getDelimiter() + suffix );
105 
106  // error checking
107  if (new_endpoint == dtn::data::EID())
108  {
109  throw NativeSessionException("given endpoint is not acceptable");
110  }
111  else
112  {
113  /* unsubscribe from the old endpoint and subscribe to the new one */
114  _registration.unsubscribe(_endpoint);
115  _registration.subscribe(new_endpoint);
116  _endpoint = new_endpoint;
117  }
118 
119  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
120  }
121 
123  {
124  _registration.unsubscribe(_endpoint);
125  _endpoint = _registration.getDefaultEID();
126  _registration.subscribe(_endpoint);
127 
128  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
129  }
130 
131  void NativeSession::addEndpoint(const std::string &suffix) throw (NativeSessionException)
132  {
133  const dtn::data::EID new_endpoint = dtn::core::BundleCore::local.add( dtn::core::BundleCore::local.getDelimiter() + suffix );
134 
135  // error checking
136  if (new_endpoint == dtn::data::EID())
137  {
138  throw NativeSessionException("given endpoint is not acceptable");
139  }
140  else
141  {
142  _registration.subscribe(new_endpoint);
143  }
144 
145  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint " << suffix << " added" << IBRCOMMON_LOGGER_ENDL;
146  }
147 
148  void NativeSession::removeEndpoint(const std::string &suffix) throw (NativeSessionException)
149  {
150  const dtn::data::EID old_endpoint = dtn::core::BundleCore::local.add( dtn::core::BundleCore::local.getDelimiter() + suffix );
151 
152  // error checking
153  if (old_endpoint == dtn::data::EID())
154  {
155  throw NativeSessionException("given endpoint is not acceptable");
156  }
157  else
158  {
159  _registration.unsubscribe(old_endpoint);
160  }
161 
162  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint " << suffix << " removed" << IBRCOMMON_LOGGER_ENDL;
163  }
164 
166  {
167  // error checking
168  if (eid == dtn::data::EID())
169  {
170  throw NativeSessionException("given endpoint is not acceptable");
171  }
172  else
173  {
174  _registration.subscribe(eid);
175  }
176 
177  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registration " << eid.getString() << " added" << IBRCOMMON_LOGGER_ENDL;
178  }
179 
181  {
182  // error checking
183  if (eid == dtn::data::EID())
184  {
185  throw NativeSessionException("given endpoint is not acceptable");
186  }
187  else
188  {
189  _registration.unsubscribe(eid);
190  }
191 
192  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registration " << eid.getString() << " removed" << IBRCOMMON_LOGGER_ENDL;
193  }
194 
196  {
197  resetEndpoint();
198 
199  const std::set<dtn::data::EID> subs = _registration.getSubscriptions();
200  for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
201  const dtn::data::EID &e = (*it);
202  if (e != _endpoint) {
203  _registration.unsubscribe(e);
204  }
205  }
206 
207  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registrations cleared" << IBRCOMMON_LOGGER_ENDL;
208  }
209 
210  std::vector<std::string> NativeSession::getSubscriptions() throw ()
211  {
212  const std::set<dtn::data::EID> subs = _registration.getSubscriptions();
213  std::vector<std::string> ret;
214  for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
215  ret.push_back((*it).getString());
216  }
217  return ret;
218  }
219 
221  {
222  try {
223  const dtn::data::BundleID id = _bundle_queue.getnpop();
224 
225  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Next bundle in queue is " << id.toString() << IBRCOMMON_LOGGER_ENDL;
226 
227  load(ri, id);
228  } catch (const ibrcommon::QueueUnblockedException &ex) {
229  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "No next bundle available" << IBRCOMMON_LOGGER_ENDL;
230  throw BundleNotFoundException();
231  }
232  }
233 
235  {
236  // load the bundle
237  try {
238  _bundle[ri] = dtn::core::BundleCore::getInstance().getStorage().get(id);
239 
240  // process the bundle block (security, compression, ...)
242 
243  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << id.toString() << " loaded" << IBRCOMMON_LOGGER_ENDL;
244  } catch (const ibrcommon::Exception &ex) {
245  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Failed to load bundle " << id.toString() << ", Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
246  throw BundleNotFoundException();
247  }
248  }
249 
251  {
253  if (_serializer_cb == NULL) return;
254 
255  NativeSerializer serializer(*_serializer_cb, NativeSerializer::BUNDLE_FULL);
256  try {
257  serializer << _bundle[ri];
258  } catch (const ibrcommon::Exception &ex) {
260  }
261  }
262 
264  {
266  if (_serializer_cb == NULL) return;
267 
268  NativeSerializer serializer(*_serializer_cb, NativeSerializer::BUNDLE_INFO);
269  try {
270  serializer << _bundle[ri];
271  } catch (const ibrcommon::Exception &ex) {
273  }
274  }
275 
277  {
278  try {
280  _bundle[ri] = dtn::data::Bundle();
281  } catch (const ibrcommon::Exception&) {
282  throw BundleNotFoundException();
283  }
284  }
285 
287  {
288  _bundle[ri] = dtn::data::Bundle();
289  }
290 
292  {
293  try {
294  // announce this bundle as delivered
296  _registration.delivered(meta);
297 
298  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << id.toString() << " marked as delivered" << IBRCOMMON_LOGGER_ENDL;
299  } catch (const ibrcommon::Exception&) {
300  throw BundleNotFoundException();
301  }
302  }
303 
305  {
306  // create a new sequence number
307  _bundle[ri].relabel();
308 
309  // forward the bundle to the storage processing
310  dtn::api::Registration::processIncomingBundle(_endpoint, _bundle[ri]);
311 
312  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << _bundle[ri].toString() << " sent" << IBRCOMMON_LOGGER_ENDL;
313 
314  return _bundle[ri];
315  }
316 
318  {
319  // Copy the given bundle into the local register
320  _bundle[ri] = b;
321  }
322 
324  {
325  // clear all blocks in the register
326  _bundle[ri].clear();
327 
328  // Copy the given primary block into the local register
329  ((dtn::data::PrimaryBlock&)_bundle[ri]) = p;
330  }
331 
332  void NativeSession::write(RegisterIndex ri, const char *buf, const size_t len, const size_t offset) throw ()
333  {
334  try {
335  dtn::data::PayloadBlock &payload = _bundle[ri].find<dtn::data::PayloadBlock>();
336 
337  ibrcommon::BLOB::Reference ref = payload.getBLOB();
338  ibrcommon::BLOB::iostream stream = ref.iostream();
339 
340  std::streamsize stream_size = stream.size();
341 
342  if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
343  (*stream).seekp(0, std::ios_base::end);
344  } else {
345  (*stream).seekp(offset);
346  }
347 
348  (*stream).write(buf, len);
349  (*stream) << std::flush;
351  dtn::data::PayloadBlock &payload = _bundle[ri].push_back<dtn::data::PayloadBlock>();
352 
353  ibrcommon::BLOB::Reference ref = payload.getBLOB();
354  ibrcommon::BLOB::iostream stream = ref.iostream();
355 
356  std::streamsize stream_size = stream.size();
357 
358  if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
359  (*stream).seekp(0, std::ios_base::end);
360  } else {
361  (*stream).seekp(offset);
362  }
363  (*stream).write(buf, len);
364  (*stream) << std::flush;
365  }
366 
367  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << len << " bytes added to the payload" << IBRCOMMON_LOGGER_ENDL;
368  }
369 
370  void NativeSession::read(RegisterIndex ri, char *buf, size_t &len, const size_t offset) throw ()
371  {
372  try {
373  dtn::data::PayloadBlock &payload = _bundle[ri].find<dtn::data::PayloadBlock>();
374 
375  ibrcommon::BLOB::Reference ref = payload.getBLOB();
376  ibrcommon::BLOB::iostream stream = ref.iostream();
377 
378  (*stream).seekg(offset);
379  (*stream).read(buf, len);
380 
381  len = (*stream).gcount();
383  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << "no payload block available" << IBRCOMMON_LOGGER_ENDL;
384  len = 0;
385  }
386  }
387 
388  NativeSession::BundleReceiver::BundleReceiver(NativeSession &session)
389  : _session(session)
390  {
391  }
392 
393  NativeSession::BundleReceiver::~BundleReceiver()
394  {
395  }
396 
397  void NativeSession::BundleReceiver::raiseEvent(const dtn::core::Event *evt) throw ()
398  {
399  try {
400  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
401 
402  // ignore fragments - we can not deliver them directly to the client
403  if (queued.bundle.fragment) return;
404 
405  if (_session._registration.hasSubscribed(queued.bundle.destination))
406  {
407  _session._registration.notify(Registration::NOTIFY_BUNDLE_AVAILABLE);
408  }
409  } catch (const std::bad_cast&) { };
410  }
411 
413  {
414  Registration &reg = _registration;
415  try {
416  try {
417  const dtn::data::MetaBundle id = reg.receiveMetaBundle();
418 
420  // transform custody signals & status reports into notifies
421  fireNotificationAdministrativeRecord(id);
422 
423  // announce the delivery of this bundle
424  reg.delivered(id);
425  } else {
426  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for new bundle " << id.toString() << IBRCOMMON_LOGGER_ENDL;
427 
428  // put the bundle into the API queue
429  _bundle_queue.push(id);
430 
431  // notify the client about the new bundle
432  fireNotificationBundle(id);
433  }
434  } catch (const dtn::storage::NoBundleFoundException&) {
435  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << "no more bundles found - wait until we are notified" << IBRCOMMON_LOGGER_ENDL;
436  reg.wait_for_bundle();
437  }
438  } catch (const ibrcommon::QueueUnblockedException &ex) {
439  throw NativeSessionException(std::string("loop aborted - ") + ex.what());
440  } catch (const std::exception &ex) {
441  throw NativeSessionException(std::string("loop aborted - ") + ex.what());
442  }
443  }
444 
445  void NativeSession::fireNotificationAdministrativeRecord(const dtn::data::MetaBundle &bundle)
446  {
447  // load the whole bundle
449 
450  // get the payload block of the bundle
452 
453  try {
454  // try to decode as status report
456  report.read(payload);
457 
458  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for status report" << IBRCOMMON_LOGGER_ENDL;
459 
460  // fire the status report notification
461  fireNotificationStatusReport(b.source, report);
463  // this is not a status report
464  }
465 
466  try {
467  // try to decode as custody signal
469  custody.read(payload);
470 
471  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for custody signal" << IBRCOMMON_LOGGER_ENDL;
472 
473  // fire the custody signal notification
474  fireNotificationCustodySignal(b.source, custody);
476  // this is not a custody report
477  }
478  }
479  } /* namespace net */
480 } /* namespace dtn */