IBR-DTNSuite  0.10
AbstractWorker.cpp
Go to the documentation of this file.
1 /*
2  * AbstractWorker.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "core/EventDispatcher.h"
24 #include "core/AbstractWorker.h"
25 #include "core/BundleCore.h"
28 #include "core/BundleEvent.h"
29 #include "core/BundlePurgeEvent.h"
30 #ifdef WITH_BUNDLE_SECURITY
32 #endif
33 #ifdef WITH_COMPRESSION
35 #endif
37 #include <ibrcommon/Logger.h>
38 #include <typeinfo>
39 
40 namespace dtn
41 {
42  namespace core
43  {
44  AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
45  : _worker(worker), _running(true)
46  {
48  }
49 
50  AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
51  {
53  shutdown();
54  }
55 
56  void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) throw ()
57  {
58  try {
59  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
60 
61  // ignore fragments - we can not deliver them directly to the client
62  if (queued.bundle.fragment) return;
63 
64  // check for bundle destination
65  if (queued.bundle.destination == _worker._eid)
66  {
67  _receive_bundles.push(queued.bundle);
68  return;
69  }
70 
71  // if the bundle is a singleton, stop here
73 
74  // check for subscribed groups
75  if (_worker._groups.find(queued.bundle.destination) != _worker._groups.end())
76  {
77  _receive_bundles.push(queued.bundle);
78  return;
79  }
80  } catch (const std::bad_cast&) { }
81  }
82 
83  void AbstractWorker::AbstractWorkerAsync::shutdown()
84  {
85  _running = false;
86  _receive_bundles.abort();
87 
88  join();
89  }
90 
91  void AbstractWorker::AbstractWorkerAsync::run() throw ()
92  {
94 
95  try {
96  while (_running)
97  {
98  dtn::data::BundleID id = _receive_bundles.getnpop(true);
99 
100  try {
101  dtn::data::Bundle b = storage.get( id );
102  prepareBundle(b);
103  _worker.callbackBundleReceived( b );
104 
105  // raise bundle event
107 
109  {
110  // remove the bundle from the storage
112  }
113  } catch (const ibrcommon::Exception &ex) {
114  IBRCOMMON_LOGGER_DEBUG_TAG("AbstractWorker", 15) << ex.what() << IBRCOMMON_LOGGER_ENDL;
115  };
116 
117  yield();
118  }
119  } catch (const ibrcommon::QueueUnblockedException&) {
120  // queue was aborted by another call
121  }
122  }
123 
124  void AbstractWorker::AbstractWorkerAsync::__cancellation() throw ()
125  {
126  // cancel the main thread in here
127  _receive_bundles.abort();
128  }
129 
130  void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const
131  {
132  // process the bundle block (security, compression, ...)
134  }
135 
137  {
138  }
139 
141  {
142  _groups.insert(endpoint);
143  }
144 
146  {
147  _groups.erase(endpoint);
148  }
149 
150  void AbstractWorker::initialize(const std::string &uri, const dtn::data::Number &cbhe, bool async)
151  {
153  {
154  _eid = BundleCore::local.add(BundleCore::local.getDelimiter() + cbhe.toString());
155  }
156  else
157  {
158  _eid = BundleCore::local.add(uri);
159  }
160 
161  try {
162  if (async) _thread.start();
163  } catch (const ibrcommon::ThreadException &ex) {
164  IBRCOMMON_LOGGER_TAG("AbstractWorker", error) << "initialize failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
165  }
166  }
167 
169  {
170  shutdown();
171  }
172 
174  {
175  // wait for the async thread
176  _thread.shutdown();
177  }
178 
180  {
181  return _eid;
182  }
183 
184  void AbstractWorker::transmit(const Bundle &bundle)
185  {
187  }
188  }
189 }