IBR-DTNSuite
0.8
|
00001 /* 00002 * EventSwitch.cpp 00003 * 00004 * Created on: 05.03.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "core/EventSwitch.h" 00009 00010 #include <ibrcommon/thread/MutexLock.h> 00011 #include "core/GlobalEvent.h" 00012 #include <ibrcommon/Logger.h> 00013 #include <stdexcept> 00014 #include <iostream> 00015 #include <typeinfo> 00016 00017 namespace dtn 00018 { 00019 namespace core 00020 { 00021 EventSwitch::EventSwitch() 00022 : _running(true), _active_worker(0), _pause(false) 00023 { 00024 } 00025 00026 EventSwitch::~EventSwitch() 00027 { 00028 componentDown(); 00029 } 00030 00031 void EventSwitch::componentUp() 00032 { 00033 } 00034 00035 void EventSwitch::componentDown() 00036 { 00037 try { 00038 ibrcommon::MutexLock l(_queue_cond); 00039 _running = false; 00040 00041 // wait until the queue is empty 00042 while (!_queue.empty()) 00043 { 00044 _queue_cond.wait(); 00045 } 00046 00047 _queue_cond.abort(); 00048 } catch (const ibrcommon::Conditional::ConditionalAbortException&) {}; 00049 } 00050 00051 void EventSwitch::process() 00052 { 00053 EventSwitch::Task *t = NULL; 00054 00055 // just look for an event to process 00056 { 00057 ibrcommon::MutexLock l(_queue_cond); 00058 while (_queue.empty() && _prio_queue.empty() && _low_queue.empty()) 00059 { 00060 _queue_cond.wait(); 00061 } 00062 00063 if (!_prio_queue.empty()) 00064 { 00065 // IBRCOMMON_LOGGER_DEBUG(1) << "process element of priority queue" << IBRCOMMON_LOGGER_ENDL; 00066 t = _prio_queue.front(); 00067 _prio_queue.pop_front(); 00068 } 00069 else if (!_queue.empty()) 00070 { 00071 t = _queue.front(); 00072 _queue.pop_front(); 00073 } 00074 else 00075 { 00076 t = _low_queue.front(); 00077 _low_queue.pop_front(); 00078 } 00079 00080 _queue_cond.signal(true); 00081 00082 ibrcommon::MutexLock la(_active_cond); 00083 _active_worker++; 00084 _active_cond.signal(true); 00085 } 00086 00087 try { 00088 // execute the event 00089 t->receiver->raiseEvent(t->event); 00090 } catch (const std::exception&) {}; 00091 00092 // delete the Task 00093 delete t; 00094 00095 ibrcommon::MutexLock l(_active_cond); 00096 _active_worker--; 00097 _active_cond.signal(true); 00098 00099 // wait while pause is enabled 00100 while (_pause) _active_cond.wait(); 00101 } 00102 00103 void EventSwitch::loop(size_t threads) 00104 { 00105 // create worker threads 00106 std::list<Worker*> wlist; 00107 00108 for (size_t i = 0; i < threads; i++) 00109 { 00110 Worker *w = new Worker(*this); 00111 w->start(); 00112 wlist.push_back(w); 00113 } 00114 00115 try { 00116 while (_running || (!_queue.empty())) 00117 { 00118 process(); 00119 } 00120 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { }; 00121 00122 for (std::list<Worker*>::iterator iter = wlist.begin(); iter != wlist.end(); iter++) 00123 { 00124 Worker *w = (*iter); 00125 w->join(); 00126 delete w; 00127 } 00128 } 00129 00130 void EventSwitch::pause() 00131 { 00132 // wait until all workers are done 00133 ibrcommon::MutexLock la(_active_cond); 00134 _pause = true; 00135 while (_active_worker != 0) { _active_cond.wait(); }; 00136 } 00137 00138 void EventSwitch::unpause() 00139 { 00140 ibrcommon::MutexLock la(_active_cond); 00141 _pause = false; 00142 _active_cond.signal(); 00143 } 00144 00145 const list<EventReceiver*>& EventSwitch::getReceivers(string eventName) const 00146 { 00147 map<string,list<EventReceiver*> >::const_iterator iter = _list.find(eventName); 00148 00149 if (iter == _list.end()) 00150 { 00151 throw NoReceiverFoundException(); 00152 } 00153 00154 return iter->second; 00155 } 00156 00157 void EventSwitch::registerEventReceiver(string eventName, EventReceiver *receiver) 00158 { 00159 // get the list for this event 00160 EventSwitch &s = EventSwitch::getInstance(); 00161 ibrcommon::MutexLock l(s._receiverlock); 00162 s._list[eventName].push_back(receiver); 00163 } 00164 00165 void EventSwitch::unregisterEventReceiver(string eventName, EventReceiver *receiver) 00166 { 00167 // unregister the receiver 00168 EventSwitch::getInstance().unregister(eventName, receiver); 00169 } 00170 00171 void EventSwitch::unregister(std::string eventName, EventReceiver *receiver) 00172 { 00173 { 00174 // remove the receiver from the list 00175 ibrcommon::MutexLock lr(_receiverlock); 00176 std::list<EventReceiver*> &rlist = _list[eventName]; 00177 for (std::list<EventReceiver*>::iterator iter = rlist.begin(); iter != rlist.end(); iter++) 00178 { 00179 if ((*iter) == receiver) 00180 { 00181 rlist.erase(iter); 00182 break; 00183 } 00184 } 00185 } 00186 00187 // set the event switch into pause mode and wait 00188 // until all threads are on hold 00189 pause(); 00190 00191 { 00192 // freeze the queue 00193 ibrcommon::MutexLock lq(_queue_cond); 00194 00195 // remove all elements with this receiver from the queue 00196 for (std::list<Task*>::iterator iter = _queue.begin(); iter != _queue.end();) 00197 { 00198 EventSwitch::Task &t = (**iter); 00199 std::list<Task*>::iterator current = iter++; 00200 if (t.receiver == receiver) 00201 { 00202 _queue.erase(current); 00203 } 00204 } 00205 } 00206 00207 // resume all threads 00208 unpause(); 00209 } 00210 00211 void EventSwitch::raiseEvent(Event *evt) 00212 { 00213 EventSwitch &s = EventSwitch::getInstance(); 00214 00215 // do not process any event if the system is going down 00216 { 00217 ibrcommon::MutexLock l(s._queue_cond); 00218 if (!s._running) 00219 { 00220 if (evt->decrement_ref_count()) 00221 { 00222 delete evt; 00223 } 00224 00225 return; 00226 } 00227 } 00228 00229 // forward to debugger 00230 s._debugger.raiseEvent(evt); 00231 00232 try { 00233 dtn::core::GlobalEvent &global = dynamic_cast<dtn::core::GlobalEvent&>(*evt); 00234 00235 if (global.getAction() == dtn::core::GlobalEvent::GLOBAL_SHUTDOWN) 00236 { 00237 // stop receiving events 00238 try { 00239 ibrcommon::MutexLock l(s._queue_cond); 00240 s._running = false; 00241 s._queue_cond.abort(); 00242 } catch (const ibrcommon::Conditional::ConditionalAbortException&) {}; 00243 } 00244 } catch (const std::bad_cast&) { } 00245 00246 try { 00247 ibrcommon::MutexLock reglock(s._receiverlock); 00248 00249 // get the list for this event 00250 const std::list<EventReceiver*> receivers = s.getReceivers(evt->getName()); 00251 evt->set_ref_count(receivers.size()); 00252 00253 for (list<EventReceiver*>::const_iterator iter = receivers.begin(); iter != receivers.end(); iter++) 00254 { 00255 Task *t = new Task(*iter, evt); 00256 ibrcommon::MutexLock l(s._queue_cond); 00257 00258 if (evt->prio > 0) 00259 { 00260 s._prio_queue.push_back(t); 00261 } 00262 else if (evt->prio < 0) 00263 { 00264 s._low_queue.push_back(t); 00265 } 00266 else 00267 { 00268 s._queue.push_back(t); 00269 } 00270 s._queue_cond.signal(); 00271 } 00272 } catch (const NoReceiverFoundException&) { 00273 // No receiver available! 00274 } 00275 } 00276 00277 EventSwitch& EventSwitch::getInstance() 00278 { 00279 static EventSwitch instance; 00280 return instance; 00281 } 00282 00283 const std::string EventSwitch::getName() const 00284 { 00285 return "EventSwitch"; 00286 } 00287 00288 void EventSwitch::clear() 00289 { 00290 ibrcommon::MutexLock l(_receiverlock); 00291 _list.clear(); 00292 } 00293 00294 EventSwitch::Task::Task() 00295 : receiver(NULL), event(NULL) 00296 { 00297 } 00298 00299 EventSwitch::Task::Task(EventReceiver *er, dtn::core::Event *evt) 00300 : receiver(er), event(evt) 00301 { 00302 } 00303 00304 EventSwitch::Task::~Task() 00305 { 00306 if (event != NULL) 00307 { 00308 if (event->decrement_ref_count()) 00309 { 00310 delete event; 00311 } 00312 } 00313 } 00314 00315 EventSwitch::Worker::Worker(EventSwitch &sw) 00316 : _switch(sw), _running(true) 00317 {} 00318 00319 EventSwitch::Worker::~Worker() 00320 {} 00321 00322 void EventSwitch::Worker::run() 00323 { 00324 try { 00325 while (_running) 00326 _switch.process(); 00327 } catch (const ibrcommon::Conditional::ConditionalAbortException&) { }; 00328 } 00329 00330 void EventSwitch::Worker::__cancellation() 00331 { 00332 _running = false; 00333 } 00334 } 00335 } 00336