IBR-DTNSuite  0.12
AbstractWorker.cpp
Go to the documentation of this file.
1 /*
2  * AbstractWorker.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "core/EventDispatcher.h"
24 #include "core/AbstractWorker.h"
25 #include "core/BundleCore.h"
28 #include "core/BundleEvent.h"
29 #include "core/BundlePurgeEvent.h"
31 #include <ibrcommon/Logger.h>
32 #include <typeinfo>
33 
34 namespace dtn
35 {
36  namespace core
37  {
38  AbstractWorker::AbstractWorkerAsync::AbstractWorkerAsync(AbstractWorker &worker)
39  : _worker(worker), _running(true)
40  {
42  }
43 
44  AbstractWorker::AbstractWorkerAsync::~AbstractWorkerAsync()
45  {
47  shutdown();
48  }
49 
50  void AbstractWorker::AbstractWorkerAsync::raiseEvent(const dtn::core::Event *evt) throw ()
51  {
52  try {
53  const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
54 
55  // ignore fragments - we can not deliver them directly to the client
56  if (queued.bundle.isFragment()) return;
57 
58  // check for bundle destination
59  if (queued.bundle.destination == _worker._eid)
60  {
61  _receive_bundles.push(queued.bundle);
62  return;
63  }
64 
65  // if the bundle is a singleton, stop here
67 
68  // check for subscribed groups
69  if (_worker._groups.find(queued.bundle.destination) != _worker._groups.end())
70  {
71  _receive_bundles.push(queued.bundle);
72  return;
73  }
74  } catch (const std::bad_cast&) { }
75  }
76 
77  void AbstractWorker::AbstractWorkerAsync::initialize()
78  {
79  // reset thread if necessary
80  if (JoinableThread::isFinalized())
81  {
82  JoinableThread::reset();
83  _running = true;
84  _receive_bundles.reset();
85  }
86 
87  JoinableThread::start();
88  }
89 
90  void AbstractWorker::AbstractWorkerAsync::shutdown()
91  {
92  _running = false;
93  _receive_bundles.abort();
94 
95  join();
96  }
97 
98  void AbstractWorker::AbstractWorkerAsync::run() throw ()
99  {
101 
102  try {
103  while (_running)
104  {
105  dtn::data::BundleID id = _receive_bundles.getnpop(true);
106 
107  try {
108  dtn::data::Bundle b = storage.get( id );
109  prepareBundle(b);
110  _worker.callbackBundleReceived( b );
111 
112  // create meta bundle for futher processing
114 
115  // raise bundle event
117 
119  {
120  // remove the bundle from the storage
122  }
123  } catch (const ibrcommon::Exception &ex) {
124  IBRCOMMON_LOGGER_DEBUG_TAG("AbstractWorker", 15) << ex.what() << IBRCOMMON_LOGGER_ENDL;
125  };
126 
127  yield();
128  }
129  } catch (const ibrcommon::QueueUnblockedException&) {
130  // queue was aborted by another call
131  }
132  }
133 
134  void AbstractWorker::AbstractWorkerAsync::__cancellation() throw ()
135  {
136  // cancel the main thread in here
137  _receive_bundles.abort();
138  }
139 
140  void AbstractWorker::AbstractWorkerAsync::prepareBundle(dtn::data::Bundle &bundle) const
141  {
142  // process the bundle block (security, compression, ...)
144  }
145 
146  AbstractWorker::AbstractWorker() : _eid(BundleCore::local), _thread(*this)
147  {
148  }
149 
151  {
152  _groups.insert(endpoint);
153  }
154 
156  {
157  _groups.erase(endpoint);
158  }
159 
160  void AbstractWorker::initialize(const std::string &uri)
161  {
162  _eid.setApplication(uri);
163 
164  try {
165  _thread.initialize();
166  } catch (const ibrcommon::ThreadException &ex) {
167  IBRCOMMON_LOGGER_TAG("AbstractWorker", error) << "initialize failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
168  }
169  }
170 
172  {
173  shutdown();
174  }
175 
177  {
178  // wait for the async thread
179  _thread.shutdown();
180  }
181 
183  {
184  return _eid;
185  }
186 
187  void AbstractWorker::transmit(const Bundle &bundle)
188  {
190  }
191  }
192 }