IBR-DTNSuite  0.10
EventSwitch.cpp
Go to the documentation of this file.
1 /*
2  * EventSwitch.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "core/EventSwitch.h"
23 #include "core/EventReceiver.h"
24 #include "core/EventDispatcher.h"
25 
27 #include "core/GlobalEvent.h"
28 #include <ibrcommon/Logger.h>
29 #include <stdexcept>
30 #include <iostream>
31 #include <typeinfo>
32 
33 namespace dtn
34 {
35  namespace core
36  {
37  EventSwitch::EventSwitch()
38  : _running(true), _shutdown(false)
39  {
40  }
41 
42  EventSwitch::~EventSwitch()
43  {
44  componentDown();
45  }
46 
47  void EventSwitch::componentUp() throw ()
48  {
49  // routine checked for throw() on 15.02.2013
50 
51  // clear all queues
52  _queue.clear();
53  _prio_queue.clear();
54  _low_queue.clear();
55 
56  // reset component state
57  _running = true;
58  _shutdown = false;
59 
60  // reset aborted conditional
61  _queue_cond.reset();
62  }
63 
65  {
66  try {
67  ibrcommon::MutexLock l(_queue_cond);
68 
69  // stop receiving events
70  _shutdown = true;
71 
72  // wait until all queues are empty
73  while (!this->empty())
74  {
75  _queue_cond.wait();
76  }
77 
78  _queue_cond.abort();
80  }
81 
82  bool EventSwitch::empty() const
83  {
84  return (_low_queue.empty() && _queue.empty() && _prio_queue.empty());
85  }
86 
87  void EventSwitch::process()
88  {
89  EventSwitch::Task *t = NULL;
90 
91  // just look for an event to process
92  {
93  ibrcommon::MutexLock l(_queue_cond);
94  if (!_running) return;
95 
96  while (this->empty() && !_shutdown)
97  {
98  _queue_cond.wait();
99  }
100 
101  if (!_prio_queue.empty())
102  {
103  t = _prio_queue.front();
104  _prio_queue.pop_front();
105  }
106  else if (!_queue.empty())
107  {
108  t = _queue.front();
109  _queue.pop_front();
110  }
111  else if (!_low_queue.empty())
112  {
113  t = _low_queue.front();
114  _low_queue.pop_front();
115  }
116  else if (_shutdown)
117  {
118  // if all queues are empty and shutdown is requested
119  // set running mode to false
120  _running = false;
121 
122  // abort the conditional to release all blocking threads
123  _queue_cond.abort();
124 
125  return;
126  }
127 
128  _queue_cond.signal(true);
129  }
130 
131  if (t != NULL)
132  {
133  // execute the event
134  t->processor.process(t->event);
135 
136  // delete the Task
137  delete t;
138  }
139  }
140 
141  void EventSwitch::loop(size_t threads)
142  {
143  // create worker threads
144  std::list<Worker*> wlist;
145 
146  for (size_t i = 0; i < threads; ++i)
147  {
148  Worker *w = new Worker(*this);
149  w->start();
150  wlist.push_back(w);
151  }
152 
153  try {
154  while (_running)
155  {
156  process();
157  }
159 
160  for (std::list<Worker*>::iterator iter = wlist.begin(); iter != wlist.end(); ++iter)
161  {
162  Worker *w = (*iter);
163  w->stop();
164  w->join();
165  delete w;
166  }
167  }
168 
170  {
172 
173  ibrcommon::MutexLock l(s._queue_cond);
174 
175  // do not process any event if the system is going down
176  if (s._shutdown)
177  {
178  delete evt;
179  return;
180  }
181 
182  EventSwitch::Task *t = new EventSwitch::Task(proc, evt);
183 
184  if (evt->prio > 0)
185  {
186  s._prio_queue.push_back(t);
187  }
188  else if (evt->prio < 0)
189  {
190  s._low_queue.push_back(t);
191  }
192  else
193  {
194  s._queue.push_back(t);
195  }
196  s._queue_cond.signal();
197  }
198 
200  {
201  try {
202  ibrcommon::MutexLock l(_queue_cond);
203 
204  // stop receiving events
205  _shutdown = true;
206 
207  // signal all blocking thread to check _shutdown variable
208  _queue_cond.signal(true);
210  }
211 
213  {
214  static EventSwitch instance;
215  return instance;
216  }
217 
218  const std::string EventSwitch::getName() const
219  {
220  return "EventSwitch";
221  }
222 
223  EventSwitch::Task::Task(EventProcessor &proc, dtn::core::Event *evt)
224  : processor(proc), event(evt)
225  {
226  }
227 
228  EventSwitch::Task::~Task()
229  {
230  if (event != NULL)
231  {
232  delete event;
233  }
234  }
235 
236  EventSwitch::Worker::Worker(EventSwitch &sw)
237  : _switch(sw), _running(true)
238  {}
239 
240  EventSwitch::Worker::~Worker()
241  {}
242 
243  void EventSwitch::Worker::run() throw ()
244  {
245  try {
246  while (_running)
247  _switch.process();
249  }
250 
251  void EventSwitch::Worker::__cancellation() throw ()
252  {
253  _running = false;
254  }
255  }
256 }
257