IBR-DTNSuite  0.8
daemon/src/core/EventSwitch.cpp
Go to the documentation of this file.
00001 /*
00002  * EventSwitch.cpp
00003  *
00004  *  Created on: 05.03.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "core/EventSwitch.h"
00009 
00010 #include <ibrcommon/thread/MutexLock.h>
00011 #include "core/GlobalEvent.h"
00012 #include <ibrcommon/Logger.h>
00013 #include <stdexcept>
00014 #include <iostream>
00015 #include <typeinfo>
00016 
00017 namespace dtn
00018 {
00019         namespace core
00020         {
00021                 EventSwitch::EventSwitch()
00022                  : _running(true), _active_worker(0), _pause(false)
00023                 {
00024                 }
00025 
00026                 EventSwitch::~EventSwitch()
00027                 {
00028                         componentDown();
00029                 }
00030 
00031                 void EventSwitch::componentUp()
00032                 {
00033                 }
00034 
00035                 void EventSwitch::componentDown()
00036                 {
00037                         try {
00038                                 ibrcommon::MutexLock l(_queue_cond);
00039                                 _running = false;
00040 
00041                                 // wait until the queue is empty
00042                                 while (!_queue.empty())
00043                                 {
00044                                         _queue_cond.wait();
00045                                 }
00046 
00047                                 _queue_cond.abort();
00048                         } catch (const ibrcommon::Conditional::ConditionalAbortException&) {};
00049                 }
00050 
00051                 void EventSwitch::process()
00052                 {
00053                         EventSwitch::Task *t = NULL;
00054 
00055                         // just look for an event to process
00056                         {
00057                                 ibrcommon::MutexLock l(_queue_cond);
00058                                 while (_queue.empty() && _prio_queue.empty() && _low_queue.empty())
00059                                 {
00060                                         _queue_cond.wait();
00061                                 }
00062 
00063                                 if (!_prio_queue.empty())
00064                                 {
00065 //                                      IBRCOMMON_LOGGER_DEBUG(1) << "process element of priority queue" << IBRCOMMON_LOGGER_ENDL;
00066                                         t = _prio_queue.front();
00067                                         _prio_queue.pop_front();
00068                                 }
00069                                 else if (!_queue.empty())
00070                                 {
00071                                         t = _queue.front();
00072                                         _queue.pop_front();
00073                                 }
00074                                 else
00075                                 {
00076                                         t = _low_queue.front();
00077                                         _low_queue.pop_front();
00078                                 }
00079 
00080                                 _queue_cond.signal(true);
00081 
00082                                 ibrcommon::MutexLock la(_active_cond);
00083                                 _active_worker++;
00084                                 _active_cond.signal(true);
00085                         }
00086 
00087                         try {
00088                                 // execute the event
00089                                 t->receiver->raiseEvent(t->event);
00090                         } catch (const std::exception&) {};
00091 
00092                         // delete the Task
00093                         delete t;
00094 
00095                         ibrcommon::MutexLock l(_active_cond);
00096                         _active_worker--;
00097                         _active_cond.signal(true);
00098 
00099                         // wait while pause is enabled
00100                         while (_pause) _active_cond.wait();
00101                 }
00102 
00103                 void EventSwitch::loop(size_t threads)
00104                 {
00105                         // create worker threads
00106                         std::list<Worker*> wlist;
00107 
00108                         for (size_t i = 0; i < threads; i++)
00109                         {
00110                                 Worker *w = new Worker(*this);
00111                                 w->start();
00112                                 wlist.push_back(w);
00113                         }
00114 
00115                         try {
00116                                 while (_running || (!_queue.empty()))
00117                                 {
00118                                         process();
00119                                 }
00120                         } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
00121 
00122                         for (std::list<Worker*>::iterator iter = wlist.begin(); iter != wlist.end(); iter++)
00123                         {
00124                                 Worker *w = (*iter);
00125                                 w->join();
00126                                 delete w;
00127                         }
00128                 }
00129                 
00130                 void EventSwitch::pause()
00131                 {
00132                         // wait until all workers are done
00133                         ibrcommon::MutexLock la(_active_cond);
00134                         _pause = true;
00135                         while (_active_worker != 0) { _active_cond.wait(); };
00136                 }
00137 
00138                 void EventSwitch::unpause()
00139                 {
00140                         ibrcommon::MutexLock la(_active_cond);
00141                         _pause = false;
00142                         _active_cond.signal();
00143                 }
00144 
00145                 const list<EventReceiver*>& EventSwitch::getReceivers(string eventName) const
00146                 {
00147                         map<string,list<EventReceiver*> >::const_iterator iter = _list.find(eventName);
00148 
00149                         if (iter == _list.end())
00150                         {
00151                                 throw NoReceiverFoundException();
00152                         }
00153 
00154                         return iter->second;
00155                 }
00156 
00157                 void EventSwitch::registerEventReceiver(string eventName, EventReceiver *receiver)
00158                 {
00159                         // get the list for this event
00160                         EventSwitch &s = EventSwitch::getInstance();
00161                         ibrcommon::MutexLock l(s._receiverlock);
00162                         s._list[eventName].push_back(receiver);
00163                 }
00164 
00165                 void EventSwitch::unregisterEventReceiver(string eventName, EventReceiver *receiver)
00166                 {
00167                         // unregister the receiver
00168                         EventSwitch::getInstance().unregister(eventName, receiver);
00169                 }
00170 
00171                 void EventSwitch::unregister(std::string eventName, EventReceiver *receiver)
00172                 {
00173                         {
00174                                 // remove the receiver from the list
00175                                 ibrcommon::MutexLock lr(_receiverlock);
00176                                 std::list<EventReceiver*> &rlist = _list[eventName];
00177                                 for (std::list<EventReceiver*>::iterator iter = rlist.begin(); iter != rlist.end(); iter++)
00178                                 {
00179                                         if ((*iter) == receiver)
00180                                         {
00181                                                 rlist.erase(iter);
00182                                                 break;
00183                                         }
00184                                 }
00185                         }
00186 
00187                         // set the event switch into pause mode and wait
00188                         // until all threads are on hold
00189                         pause();
00190 
00191                         {
00192                                 // freeze the queue
00193                                 ibrcommon::MutexLock lq(_queue_cond);
00194 
00195                                 // remove all elements with this receiver from the queue
00196                                 for (std::list<Task*>::iterator iter = _queue.begin(); iter != _queue.end();)
00197                                 {
00198                                         EventSwitch::Task &t = (**iter);
00199                                         std::list<Task*>::iterator current = iter++;
00200                                         if (t.receiver == receiver)
00201                                         {
00202                                                 _queue.erase(current);
00203                                         }
00204                                 }
00205                         }
00206 
00207                         // resume all threads
00208                         unpause();
00209                 }
00210 
00211                 void EventSwitch::raiseEvent(Event *evt)
00212                 {
00213                         EventSwitch &s = EventSwitch::getInstance();
00214 
00215                         // do not process any event if the system is going down
00216                         {
00217                                 ibrcommon::MutexLock l(s._queue_cond);
00218                                 if (!s._running)
00219                                 {
00220                                         if (evt->decrement_ref_count())
00221                                         {
00222                                                 delete evt;
00223                                         }
00224 
00225                                         return;
00226                                 }
00227                         }
00228 
00229                         // forward to debugger
00230                         s._debugger.raiseEvent(evt);
00231 
00232                         try {
00233                                 dtn::core::GlobalEvent &global = dynamic_cast<dtn::core::GlobalEvent&>(*evt);
00234 
00235                                 if (global.getAction() == dtn::core::GlobalEvent::GLOBAL_SHUTDOWN)
00236                                 {
00237                                         // stop receiving events
00238                                         try {
00239                                                 ibrcommon::MutexLock l(s._queue_cond);
00240                                                 s._running = false;
00241                                                 s._queue_cond.abort();
00242                                         } catch (const ibrcommon::Conditional::ConditionalAbortException&) {};
00243                                 }
00244                         } catch (const std::bad_cast&) { }
00245 
00246                         try {
00247                                 ibrcommon::MutexLock reglock(s._receiverlock);
00248 
00249                                 // get the list for this event
00250                                 const std::list<EventReceiver*> receivers = s.getReceivers(evt->getName());
00251                                 evt->set_ref_count(receivers.size());
00252 
00253                                 for (list<EventReceiver*>::const_iterator iter = receivers.begin(); iter != receivers.end(); iter++)
00254                                 {
00255                                         Task *t = new Task(*iter, evt);
00256                                         ibrcommon::MutexLock l(s._queue_cond);
00257 
00258                                         if (evt->prio > 0)
00259                                         {
00260                                                 s._prio_queue.push_back(t);
00261                                         }
00262                                         else if (evt->prio < 0)
00263                                         {
00264                                                 s._low_queue.push_back(t);
00265                                         }
00266                                         else
00267                                         {
00268                                                 s._queue.push_back(t);
00269                                         }
00270                                         s._queue_cond.signal();
00271                                 }
00272                         } catch (const NoReceiverFoundException&) {
00273                                 // No receiver available!
00274                         }
00275                 }
00276 
00277                 EventSwitch& EventSwitch::getInstance()
00278                 {
00279                         static EventSwitch instance;
00280                         return instance;
00281                 }
00282 
00283                 const std::string EventSwitch::getName() const
00284                 {
00285                         return "EventSwitch";
00286                 }
00287 
00288                 void EventSwitch::clear()
00289                 {
00290                         ibrcommon::MutexLock l(_receiverlock);
00291                         _list.clear();
00292                 }
00293 
00294                 EventSwitch::Task::Task()
00295                  : receiver(NULL), event(NULL)
00296                 {
00297                 }
00298 
00299                 EventSwitch::Task::Task(EventReceiver *er, dtn::core::Event *evt)
00300                  : receiver(er), event(evt)
00301                 {
00302                 }
00303 
00304                 EventSwitch::Task::~Task()
00305                 {
00306                         if (event != NULL)
00307                         {
00308                                 if (event->decrement_ref_count())
00309                                 {
00310                                         delete event;
00311                                 }
00312                         }
00313                 }
00314 
00315                 EventSwitch::Worker::Worker(EventSwitch &sw)
00316                  : _switch(sw), _running(true)
00317                 {}
00318 
00319                 EventSwitch::Worker::~Worker()
00320                 {}
00321 
00322                 void EventSwitch::Worker::run()
00323                 {
00324                         try {
00325                                 while (_running)
00326                                         _switch.process();
00327                         } catch (const ibrcommon::Conditional::ConditionalAbortException&) { };
00328                 }
00329 
00330                 void EventSwitch::Worker::__cancellation()
00331                 {
00332                         _running = false;
00333                 }
00334         }
00335 }
00336