IBR-DTNSuite  0.8
ibrcommon/ibrcommon/thread/Queue.h
Go to the documentation of this file.
00001 /*
00002  * Queue.h
00003  *
00004  *  Created on: 10.06.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef IBRCOMMON_QUEUE_H_
00009 #define IBRCOMMON_QUEUE_H_
00010 
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include "ibrcommon/thread/Conditional.h"
00013 #include "ibrcommon/Exceptions.h"
00014 #include "ibrcommon/thread/Semaphore.h"
00015 #include "ibrcommon/thread/Thread.h"
00016 #include <queue>
00017 #include <iostream>
00018 
00019 namespace ibrcommon
00020 {
00021         class QueueUnblockedException : public ibrcommon::Exception
00022         {
00023         public:
00024                 enum type_t
00025                 {
00026                         QUEUE_ABORT = 0,
00027                         QUEUE_ERROR = 1,
00028                         QUEUE_TIMEOUT = 2
00029                 };
00030 
00031                 QueueUnblockedException(const type_t r, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what), reason(r)
00032                 {
00033                 };
00034 
00035                 QueueUnblockedException(const ibrcommon::Conditional::ConditionalAbortException &ex, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what)
00036                 {
00037                         switch (ex.reason)
00038                         {
00039                         case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00040                                 reason = QUEUE_ABORT;
00041                                 _what = "queue function aborted in " + what;
00042                                 break;
00043 
00044                         case ibrcommon::Conditional::ConditionalAbortException::COND_ERROR:
00045                                 reason = QUEUE_ERROR;
00046                                 _what = "queue function error in " + what;
00047                                 break;
00048 
00049                         case ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT:
00050                                 reason = QUEUE_TIMEOUT;
00051                                 _what = "queue function timeout in " + what;
00052                                 break;
00053                         }
00054                 };
00055 
00056                 type_t reason;
00057         };
00058 
00059         template <class T>
00060         class Queue
00061         {
00062                 ibrcommon::Conditional _cond;
00063                 std::queue<T> _queue;
00064                 ibrcommon::Semaphore _sem;
00065                 bool _limit;
00066 
00067         public:
00068                 Queue(size_t max = 0) : _sem(max), _limit(max > 0)
00069                 {};
00070 
00071                 virtual ~Queue()
00072                 {
00073                         abort();
00074                 };
00075 
00076                 /* Test whether container is empty (public member function) */
00077                 bool empty ( )
00078                 {
00079                         ibrcommon::MutexLock l(_cond);
00080                         return _queue.empty();
00081                 }
00082 
00083                 /* Return size (public member function) */
00084                 size_t size ( ) const
00085                 {
00086                         return _queue.size();
00087                 }
00088 
00089                 /* Access next element (public member function) */
00090                 T& front ( )
00091                 {
00092                         ibrcommon::MutexLock l(_cond);
00093                         return _queue.front();
00094                 }
00095 
00096                 const T& front ( ) const
00097                 {
00098                         ibrcommon::MutexLock l(_cond);
00099                         return _queue.front();
00100                 }
00101 
00102                 /* Access last element (public member function) */
00103                 T& back ( )
00104                 {
00105                         ibrcommon::MutexLock l(_cond);
00106                         return _queue.back();
00107                 }
00108 
00109                 const T& back ( ) const
00110                 {
00111                         ibrcommon::MutexLock l(_cond);
00112                         return _queue.back();
00113                 }
00114 
00115                 /* Insert element (public member function) */
00116                 void push ( const T& x )
00117                 {
00118                         if (_limit) _sem.wait();
00119 
00120                         ibrcommon::MutexLock l(_cond);
00121                         _queue.push(x);
00122                         _cond.signal(true);
00123                 }
00124 
00125                 /* Delete next element (public member function) */
00126                 void pop ()
00127                 {
00128                         ibrcommon::MutexLock l(_cond);
00129                         __pop();
00130                 }
00131 
00132                 T getnpop(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException)
00133                 {
00134                         try {
00135                                 ibrcommon::MutexLock l(_cond);
00136                                 if (_queue.empty())
00137                                 {
00138                                         if (blocking)
00139                                         {
00140                                                 if (timeout == 0)
00141                                                 {
00142                                                         __wait(QUEUE_NOT_EMPTY);
00143                                                 }
00144                                                 else
00145                                                 {
00146                                                         __wait(QUEUE_NOT_EMPTY, timeout);
00147                                                 }
00148                                         }
00149                                         else
00150                                         {
00151                                                 throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!");
00152                                         }
00153                                 }
00154 
00155                                 T ret = _queue.front();
00156                                 __pop();
00157                                 return ret;
00158                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00159                                 throw QueueUnblockedException(ex, "getnpop()");
00160                         }
00161                 }
00162 
00163                 void abort() throw ()
00164                 {
00165                         ibrcommon::MutexLock l(_cond);
00166                         _cond.abort();
00167                 }
00168 
00169                 void reset()
00170                 {
00171                         _cond.reset();
00172                 }
00173 
00174                 enum WAIT_MODES
00175                 {
00176                         QUEUE_NOT_EMPTY = 0,
00177                         QUEUE_EMPTY = 1
00178                 };
00179 
00180                 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
00181                 {
00182                         ibrcommon::MutexLock l(_cond);
00183                         if (timeout == 0)
00184                         {
00185                                 __wait(mode);
00186                         }
00187                         else
00188                         {
00189                                 __wait(mode, timeout);
00190                         }
00191                 }
00192 
00193                 class Locked
00194                 {
00195                 public:
00196                         Locked(Queue<T> &queue)
00197                          : _queue(queue), _lock(queue._cond), _changed(false)
00198                         {
00199                         };
00200 
00201                         virtual ~Locked()
00202                         {
00203                                 if (_changed) _queue._cond.signal(true);
00204                         };
00205 
00206                         void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
00207                         {
00208                                 if (timeout == 0)
00209                                 {
00210                                         _queue.__wait(mode);
00211                                 }
00212                                 else
00213                                 {
00214                                         _queue.__wait(mode, timeout);
00215                                 }
00216                         }
00217 
00218                         void pop()
00219                         {
00220                                 _queue.__pop();
00221                         }
00222 
00223                         const T& front() const
00224                         {
00225                                 return _queue._queue.front();
00226                         }
00227 
00228                         T& front()
00229                         {
00230                                 return _queue._queue.front();
00231                         }
00232 
00233                         bool empty()
00234                         {
00235                                 return _queue._queue.empty();
00236                         }
00237 
00238                         size_t size()
00239                         {
00240                                 return _queue._queue.size();
00241                         }
00242 
00243                         void push(const T &p)
00244                         {
00245                                 _queue._queue.push(p);
00246                                 _changed = true;
00247                         }
00248 
00249                 private:
00250                         Queue<T> &_queue;
00251                         ibrcommon::MutexLock _lock;
00252                         bool _changed;
00253                 };
00254 
00255                 typename Queue<T>::Locked exclusive()
00256                 {
00257                         return typename Queue<T>::Locked(*this);
00258                 }
00259 
00260         protected:
00261                 void __push( const T& x )
00262                 {
00263                         _queue.push(x);
00264                         _cond.signal(true);
00265                 }
00266 
00267                 void __pop()
00268                 {
00269                         if (!_queue.empty())
00270                         {
00271                                 _queue.pop();
00272                                 if (_limit) _sem.post();
00273                                 _cond.signal(true);
00274                         }
00275                 }
00276 
00277                 void __wait(const WAIT_MODES mode) throw (QueueUnblockedException)
00278                 {
00279                         try {
00280                                 switch (mode)
00281                                 {
00282                                         case QUEUE_NOT_EMPTY:
00283                                         {
00284                                                 while (_queue.empty())
00285                                                 {
00286                                                         _cond.wait();
00287                                                 }
00288                                                 break;
00289                                         }
00290 
00291                                         case QUEUE_EMPTY:
00292                                         {
00293                                                 while (!_queue.empty())
00294                                                 {
00295                                                         _cond.wait();
00296                                                 }
00297                                                 break;
00298                                         }
00299                                 }
00300                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00301                                 switch (ex.reason)
00302                                 {
00303                                 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00304                                         _cond.reset();
00305                                         break;
00306 
00307                                 default:
00308                                         break;
00309                                 }
00310 
00311                                 throw QueueUnblockedException(ex, "__wait()");
00312                         }
00313                 }
00314 
00315                 void __wait(const WAIT_MODES mode, const size_t timeout) throw (QueueUnblockedException)
00316                 {
00317                         try {
00318                                 struct timespec ts;
00319                                 Conditional::gettimeout(timeout, &ts);
00320 
00321                                 switch (mode)
00322                                 {
00323                                         case QUEUE_NOT_EMPTY:
00324                                         {
00325                                                 while (_queue.empty())
00326                                                 {
00327                                                         _cond.wait(&ts);
00328                                                 }
00329                                                 break;
00330                                         }
00331 
00332                                         case QUEUE_EMPTY:
00333                                         {
00334                                                 while (!_queue.empty())
00335                                                 {
00336                                                         _cond.wait(&ts);
00337                                                 }
00338                                                 break;
00339                                         }
00340                                 }
00341                         } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
00342                                 switch (ex.reason)
00343                                 {
00344                                 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT:
00345                                         _cond.reset();
00346                                         break;
00347 
00348                                 default:
00349                                         break;
00350                                 }
00351 
00352                                 throw QueueUnblockedException(ex, "__wait()");
00353                         }
00354                 }
00355         };
00356 }
00357 
00358 #endif /* IBRCOMMON_QUEUE_H_ */