IBR-DTNSuite  0.12
NativeSession.cpp
Go to the documentation of this file.
1 /*
2  * NativeSession.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "api/NativeSession.h"
23 #include "api/NativeSerializer.h"
24 #include "core/BundleCore.h"
25 #include "core/EventDispatcher.h"
27 
29 #include <ibrcommon/data/BLOB.h>
30 #include <ibrcommon/Logger.h>
33 
34 namespace dtn
35 {
36  namespace api
37  {
39  {
40  }
41 
42  const std::string NativeSession::TAG = "NativeSession";
43 
45  : _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
46  {
47  // set the local endpoint to the default
48  _endpoint = _registration.getDefaultEID();
49 
50  // listen to QueueBundleEvents
52 
53  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Session created" << IBRCOMMON_LOGGER_ENDL;
54  }
55 
56  NativeSession::NativeSession(NativeSessionCallback *session_cb, NativeSerializerCallback *serializer_cb, const std::string &handle)
57  : _registration(handle), _receiver(*this), _session_cb(session_cb), _serializer_cb(serializer_cb)
58  {
59  // set the local endpoint to the default
60  _endpoint = _registration.getDefaultEID();
61 
62  // listen to QueueBundleEvents
64 
65  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Session created" << IBRCOMMON_LOGGER_ENDL;
66  }
67 
69  {
70  // invalidate the callback pointer
71  {
73  _session_cb = NULL;
74  _serializer_cb = NULL;
75  }
76 
77  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Session destroyed" << IBRCOMMON_LOGGER_ENDL;
78  }
79 
80  void NativeSession::destroy() throw ()
81  {
82  // un-listen from QueueBundleEvents
84 
85  _registration.abort();
86  }
87 
88  const dtn::data::EID& NativeSession::getNodeEID() const throw ()
89  {
91  }
92 
93  void NativeSession::fireNotificationBundle(const dtn::data::BundleID &id) throw ()
94  {
96  if (_session_cb == NULL) return;
97  _session_cb->notifyBundle(id);
98  }
99 
100  void NativeSession::fireNotificationStatusReport(const dtn::data::EID &source, const dtn::data::StatusReportBlock &report) throw ()
101  {
103  if (_session_cb == NULL) return;
104  _session_cb->notifyStatusReport(source, report);
105  }
106 
107  void NativeSession::fireNotificationCustodySignal(const dtn::data::EID &source, const dtn::data::CustodySignalBlock &custody) throw ()
108  {
110  if (_session_cb == NULL) return;
111  _session_cb->notifyCustodySignal(source, custody);
112  }
113 
114  void NativeSession::setEndpoint(const std::string &suffix) throw (NativeSessionException)
115  {
116  // error checking
117  if (suffix.length() <= 0)
118  {
119  throw NativeSessionException("given endpoint is not acceptable");
120  }
121  else
122  {
123  /* unsubscribe from the old endpoint and subscribe to the new one */
124  _registration.unsubscribe(_endpoint);
125 
126  // set new application part
127  _endpoint.setApplication(suffix);
128 
129  // subscribe to new endpoint
130  _registration.subscribe(_endpoint);
131  }
132 
133  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
134  }
135 
137  {
138  _registration.unsubscribe(_endpoint);
139  _endpoint = _registration.getDefaultEID();
140  _registration.subscribe(_endpoint);
141 
142  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint set to " << _endpoint.getString() << IBRCOMMON_LOGGER_ENDL;
143  }
144 
145  void NativeSession::addEndpoint(const std::string &suffix) throw (NativeSessionException)
146  {
147  // error checking
148  if (suffix.length() <= 0)
149  {
150  throw NativeSessionException("given endpoint is not acceptable");
151  }
152  else
153  {
155  new_endpoint.setApplication( suffix );
156  _registration.subscribe(new_endpoint);
157  }
158 
159  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint " << suffix << " added" << IBRCOMMON_LOGGER_ENDL;
160  }
161 
162  void NativeSession::removeEndpoint(const std::string &suffix) throw (NativeSessionException)
163  {
164  // error checking
165  if (suffix.length() <= 0)
166  {
167  throw NativeSessionException("given endpoint is not acceptable");
168  }
169  else
170  {
172  old_endpoint.setApplication( suffix );
173  _registration.unsubscribe(old_endpoint);
174  }
175 
176  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Endpoint " << suffix << " removed" << IBRCOMMON_LOGGER_ENDL;
177  }
178 
180  {
181  // error checking
182  if (eid == dtn::data::EID())
183  {
184  throw NativeSessionException("given endpoint is not acceptable");
185  }
186  else
187  {
188  _registration.subscribe(eid);
189  }
190 
191  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registration " << eid.getString() << " added" << IBRCOMMON_LOGGER_ENDL;
192  }
193 
195  {
196  // error checking
197  if (eid == dtn::data::EID())
198  {
199  throw NativeSessionException("given endpoint is not acceptable");
200  }
201  else
202  {
203  _registration.unsubscribe(eid);
204  }
205 
206  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registration " << eid.getString() << " removed" << IBRCOMMON_LOGGER_ENDL;
207  }
208 
210  {
211  resetEndpoint();
212 
213  const std::set<dtn::data::EID> subs = _registration.getSubscriptions();
214  for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
215  const dtn::data::EID &e = (*it);
216  if (e != _endpoint) {
217  _registration.unsubscribe(e);
218  }
219  }
220 
221  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Registrations cleared" << IBRCOMMON_LOGGER_ENDL;
222  }
223 
224  std::vector<std::string> NativeSession::getSubscriptions() throw ()
225  {
226  const std::set<dtn::data::EID> subs = _registration.getSubscriptions();
227  std::vector<std::string> ret;
228  for (std::set<dtn::data::EID>::const_iterator it = subs.begin(); it != subs.end(); ++it) {
229  ret.push_back((*it).getString());
230  }
231  return ret;
232  }
233 
235  {
236  try {
237  const dtn::data::BundleID id = _bundle_queue.getnpop();
238 
239  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Next bundle in queue is " << id.toString() << IBRCOMMON_LOGGER_ENDL;
240 
241  load(ri, id);
242  } catch (const ibrcommon::QueueUnblockedException &ex) {
243  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "No next bundle available" << IBRCOMMON_LOGGER_ENDL;
244  throw BundleNotFoundException();
245  }
246  }
247 
249  {
250  // load the bundle
251  try {
252  _bundle[ri] = dtn::core::BundleCore::getInstance().getStorage().get(id);
253 
254  // process the bundle block (security, compression, ...)
256 
257  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << id.toString() << " loaded" << IBRCOMMON_LOGGER_ENDL;
258  } catch (const ibrcommon::Exception &ex) {
259  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 15) << "Failed to load bundle " << id.toString() << ", Exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
260  throw BundleNotFoundException();
261  }
262  }
263 
265  {
267  if (_serializer_cb == NULL) return;
268 
269  NativeSerializer serializer(*_serializer_cb, NativeSerializer::BUNDLE_FULL);
270  try {
271  serializer << _bundle[ri];
272  } catch (const ibrcommon::Exception &ex) {
273  IBRCOMMON_LOGGER_TAG(NativeSession::TAG, error) << "Get failed " << ex.what() << IBRCOMMON_LOGGER_ENDL;
274  }
275  }
276 
278  {
280  if (_serializer_cb == NULL) return;
281 
282  NativeSerializer serializer(*_serializer_cb, NativeSerializer::BUNDLE_INFO);
283  try {
284  serializer << _bundle[ri];
285  } catch (const ibrcommon::Exception &ex) {
286  IBRCOMMON_LOGGER_TAG(NativeSession::TAG, error) << "Get failed " << ex.what() << IBRCOMMON_LOGGER_ENDL;
287  }
288  }
289 
291  {
292  try {
294  _bundle[ri] = dtn::data::Bundle();
295  } catch (const ibrcommon::Exception&) {
296  throw BundleNotFoundException();
297  }
298  }
299 
301  {
302  _bundle[ri] = dtn::data::Bundle();
303  }
304 
306  {
307  try {
308  // announce this bundle as delivered
310  _registration.delivered(meta);
311 
312  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << id.toString() << " marked as delivered" << IBRCOMMON_LOGGER_ENDL;
313  } catch (const ibrcommon::Exception&) {
314  throw BundleNotFoundException();
315  }
316  }
317 
319  {
320  // forward the bundle to the storage processing
321  dtn::api::Registration::processIncomingBundle(_endpoint, _bundle[ri]);
322 
323  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "Bundle " << _bundle[ri].toString() << " sent" << IBRCOMMON_LOGGER_ENDL;
324 
325  return _bundle[ri];
326  }
327 
329  {
330  // Copy the given bundle into the local register
331  _bundle[ri] = b;
332  }
333 
335  {
336  // clear all blocks in the register
337  _bundle[ri].clear();
338 
339  // Copy the given primary block into the local register
340  ((dtn::data::PrimaryBlock&)_bundle[ri]) = p;
341  }
342 
343  void NativeSession::write(RegisterIndex ri, const char *buf, const size_t len, const size_t offset) throw ()
344  {
345  try {
346  dtn::data::PayloadBlock &payload = _bundle[ri].find<dtn::data::PayloadBlock>();
347 
348  ibrcommon::BLOB::Reference ref = payload.getBLOB();
349  ibrcommon::BLOB::iostream stream = ref.iostream();
350 
351  std::streamsize stream_size = stream.size();
352 
353  if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
354  (*stream).seekp(0, std::ios_base::end);
355  } else {
356  (*stream).seekp(offset);
357  }
358 
359  (*stream).write(buf, len);
360  (*stream) << std::flush;
362  dtn::data::PayloadBlock &payload = _bundle[ri].push_back<dtn::data::PayloadBlock>();
363 
364  ibrcommon::BLOB::Reference ref = payload.getBLOB();
365  ibrcommon::BLOB::iostream stream = ref.iostream();
366 
367  std::streamsize stream_size = stream.size();
368 
369  if ((offset > 0) || (stream_size < static_cast<std::streamsize>(offset))) {
370  (*stream).seekp(0, std::ios_base::end);
371  } else {
372  (*stream).seekp(offset);
373  }
374  (*stream).write(buf, len);
375  (*stream) << std::flush;
376  }
377 
378  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << len << " bytes added to the payload" << IBRCOMMON_LOGGER_ENDL;
379  }
380 
381  void NativeSession::read(RegisterIndex ri, char *buf, size_t &len, const size_t offset) throw ()
382  {
383  try {
384  dtn::data::PayloadBlock &payload = _bundle[ri].find<dtn::data::PayloadBlock>();
385 
386  ibrcommon::BLOB::Reference ref = payload.getBLOB();
387  ibrcommon::BLOB::iostream stream = ref.iostream();
388 
389  (*stream).seekg(offset);
390  (*stream).read(buf, len);
391 
392  len = (*stream).gcount();
394  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << "no payload block available" << IBRCOMMON_LOGGER_ENDL;
395  len = 0;
396  }
397  }
398 
399  NativeSession::BundleReceiver::BundleReceiver(NativeSession &session)
400  : _session(session)
401  {
402  }
403 
404  NativeSession::BundleReceiver::~BundleReceiver()
405  {
406  }
407 
408  void NativeSession::BundleReceiver::raiseEvent(const dtn::core::Event *evt) throw ()
409  {
410  try {
411  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
412 
413  // ignore fragments - we can not deliver them directly to the client
414  if (queued.bundle.isFragment()) return;
415 
416  if (_session._registration.hasSubscribed(queued.bundle.destination))
417  {
418  _session._registration.notify(Registration::NOTIFY_BUNDLE_AVAILABLE);
419  }
420  } catch (const std::bad_cast&) { };
421  }
422 
424  {
425  Registration &reg = _registration;
426  try {
427  try {
428  const dtn::data::MetaBundle id = reg.receiveMetaBundle();
429 
431  // transform custody signals & status reports into notifies
432  fireNotificationAdministrativeRecord(id);
433 
434  // announce the delivery of this bundle
435  reg.delivered(id);
436  } else {
437  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for new bundle " << id.toString() << IBRCOMMON_LOGGER_ENDL;
438 
439  // put the bundle into the API queue
440  _bundle_queue.push(id);
441 
442  // notify the client about the new bundle
443  fireNotificationBundle(id);
444  }
445  } catch (const dtn::storage::NoBundleFoundException&) {
446  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 25) << "no more bundles found - wait until we are notified" << IBRCOMMON_LOGGER_ENDL;
447  reg.wait_for_bundle();
448  }
449  } catch (const ibrcommon::QueueUnblockedException &ex) {
450  throw NativeSessionException(std::string("loop aborted - ") + ex.what());
451  } catch (const std::exception &ex) {
452  throw NativeSessionException(std::string("loop aborted - ") + ex.what());
453  }
454  }
455 
456  void NativeSession::fireNotificationAdministrativeRecord(const dtn::data::MetaBundle &bundle)
457  {
458  // load the whole bundle
460 
461  // get the payload block of the bundle
463 
464  try {
465  // try to decode as status report
467  report.read(payload);
468 
469  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for status report" << IBRCOMMON_LOGGER_ENDL;
470 
471  // fire the status report notification
472  fireNotificationStatusReport(b.source, report);
474  // this is not a status report
475  }
476 
477  try {
478  // try to decode as custody signal
480  custody.read(payload);
481 
482  IBRCOMMON_LOGGER_DEBUG_TAG(NativeSession::TAG, 20) << "fire notification for custody signal" << IBRCOMMON_LOGGER_ENDL;
483 
484  // fire the custody signal notification
485  fireNotificationCustodySignal(b.source, custody);
487  // this is not a custody report
488  }
489  }
490 
491  const std::string& NativeSession::getHandle() const
492  {
493  return _registration.getHandle();
494  }
495  } /* namespace net */
496 } /* namespace dtn */