IBR-DTNSuite
0.8
|
00001 /* 00002 * Queue.h 00003 * 00004 * Created on: 10.06.2010 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef IBRCOMMON_QUEUE_H_ 00009 #define IBRCOMMON_QUEUE_H_ 00010 00011 #include "ibrcommon/thread/MutexLock.h" 00012 #include "ibrcommon/thread/Conditional.h" 00013 #include "ibrcommon/Exceptions.h" 00014 #include "ibrcommon/thread/Semaphore.h" 00015 #include "ibrcommon/thread/Thread.h" 00016 #include <queue> 00017 #include <iostream> 00018 00019 namespace ibrcommon 00020 { 00021 class QueueUnblockedException : public ibrcommon::Exception 00022 { 00023 public: 00024 enum type_t 00025 { 00026 QUEUE_ABORT = 0, 00027 QUEUE_ERROR = 1, 00028 QUEUE_TIMEOUT = 2 00029 }; 00030 00031 QueueUnblockedException(const type_t r, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what), reason(r) 00032 { 00033 }; 00034 00035 QueueUnblockedException(const ibrcommon::Conditional::ConditionalAbortException &ex, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what) 00036 { 00037 switch (ex.reason) 00038 { 00039 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT: 00040 reason = QUEUE_ABORT; 00041 _what = "queue function aborted in " + what; 00042 break; 00043 00044 case ibrcommon::Conditional::ConditionalAbortException::COND_ERROR: 00045 reason = QUEUE_ERROR; 00046 _what = "queue function error in " + what; 00047 break; 00048 00049 case ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT: 00050 reason = QUEUE_TIMEOUT; 00051 _what = "queue function timeout in " + what; 00052 break; 00053 } 00054 }; 00055 00056 type_t reason; 00057 }; 00058 00059 template <class T> 00060 class Queue 00061 { 00062 ibrcommon::Conditional _cond; 00063 std::queue<T> _queue; 00064 ibrcommon::Semaphore _sem; 00065 bool _limit; 00066 00067 public: 00068 Queue(size_t max = 0) : _sem(max), _limit(max > 0) 00069 {}; 00070 00071 virtual ~Queue() 00072 { 00073 abort(); 00074 }; 00075 00076 /* Test whether container is empty (public member function) */ 00077 bool empty ( ) 00078 { 00079 ibrcommon::MutexLock l(_cond); 00080 return _queue.empty(); 00081 } 00082 00083 /* Return size (public member function) */ 00084 size_t size ( ) const 00085 { 00086 return _queue.size(); 00087 } 00088 00089 /* Access next element (public member function) */ 00090 T& front ( ) 00091 { 00092 ibrcommon::MutexLock l(_cond); 00093 return _queue.front(); 00094 } 00095 00096 const T& front ( ) const 00097 { 00098 ibrcommon::MutexLock l(_cond); 00099 return _queue.front(); 00100 } 00101 00102 /* Access last element (public member function) */ 00103 T& back ( ) 00104 { 00105 ibrcommon::MutexLock l(_cond); 00106 return _queue.back(); 00107 } 00108 00109 const T& back ( ) const 00110 { 00111 ibrcommon::MutexLock l(_cond); 00112 return _queue.back(); 00113 } 00114 00115 /* Insert element (public member function) */ 00116 void push ( const T& x ) 00117 { 00118 if (_limit) _sem.wait(); 00119 00120 ibrcommon::MutexLock l(_cond); 00121 _queue.push(x); 00122 _cond.signal(true); 00123 } 00124 00125 /* Delete next element (public member function) */ 00126 void pop () 00127 { 00128 ibrcommon::MutexLock l(_cond); 00129 __pop(); 00130 } 00131 00132 T getnpop(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException) 00133 { 00134 try { 00135 ibrcommon::MutexLock l(_cond); 00136 if (_queue.empty()) 00137 { 00138 if (blocking) 00139 { 00140 if (timeout == 0) 00141 { 00142 __wait(QUEUE_NOT_EMPTY); 00143 } 00144 else 00145 { 00146 __wait(QUEUE_NOT_EMPTY, timeout); 00147 } 00148 } 00149 else 00150 { 00151 throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!"); 00152 } 00153 } 00154 00155 T ret = _queue.front(); 00156 __pop(); 00157 return ret; 00158 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00159 throw QueueUnblockedException(ex, "getnpop()"); 00160 } 00161 } 00162 00163 void abort() throw () 00164 { 00165 ibrcommon::MutexLock l(_cond); 00166 _cond.abort(); 00167 } 00168 00169 void reset() 00170 { 00171 _cond.reset(); 00172 } 00173 00174 enum WAIT_MODES 00175 { 00176 QUEUE_NOT_EMPTY = 0, 00177 QUEUE_EMPTY = 1 00178 }; 00179 00180 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException) 00181 { 00182 ibrcommon::MutexLock l(_cond); 00183 if (timeout == 0) 00184 { 00185 __wait(mode); 00186 } 00187 else 00188 { 00189 __wait(mode, timeout); 00190 } 00191 } 00192 00193 class Locked 00194 { 00195 public: 00196 Locked(Queue<T> &queue) 00197 : _queue(queue), _lock(queue._cond), _changed(false) 00198 { 00199 }; 00200 00201 virtual ~Locked() 00202 { 00203 if (_changed) _queue._cond.signal(true); 00204 }; 00205 00206 void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException) 00207 { 00208 if (timeout == 0) 00209 { 00210 _queue.__wait(mode); 00211 } 00212 else 00213 { 00214 _queue.__wait(mode, timeout); 00215 } 00216 } 00217 00218 void pop() 00219 { 00220 _queue.__pop(); 00221 } 00222 00223 const T& front() const 00224 { 00225 return _queue._queue.front(); 00226 } 00227 00228 T& front() 00229 { 00230 return _queue._queue.front(); 00231 } 00232 00233 bool empty() 00234 { 00235 return _queue._queue.empty(); 00236 } 00237 00238 size_t size() 00239 { 00240 return _queue._queue.size(); 00241 } 00242 00243 void push(const T &p) 00244 { 00245 _queue._queue.push(p); 00246 _changed = true; 00247 } 00248 00249 private: 00250 Queue<T> &_queue; 00251 ibrcommon::MutexLock _lock; 00252 bool _changed; 00253 }; 00254 00255 typename Queue<T>::Locked exclusive() 00256 { 00257 return typename Queue<T>::Locked(*this); 00258 } 00259 00260 protected: 00261 void __push( const T& x ) 00262 { 00263 _queue.push(x); 00264 _cond.signal(true); 00265 } 00266 00267 void __pop() 00268 { 00269 if (!_queue.empty()) 00270 { 00271 _queue.pop(); 00272 if (_limit) _sem.post(); 00273 _cond.signal(true); 00274 } 00275 } 00276 00277 void __wait(const WAIT_MODES mode) throw (QueueUnblockedException) 00278 { 00279 try { 00280 switch (mode) 00281 { 00282 case QUEUE_NOT_EMPTY: 00283 { 00284 while (_queue.empty()) 00285 { 00286 _cond.wait(); 00287 } 00288 break; 00289 } 00290 00291 case QUEUE_EMPTY: 00292 { 00293 while (!_queue.empty()) 00294 { 00295 _cond.wait(); 00296 } 00297 break; 00298 } 00299 } 00300 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00301 switch (ex.reason) 00302 { 00303 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT: 00304 _cond.reset(); 00305 break; 00306 00307 default: 00308 break; 00309 } 00310 00311 throw QueueUnblockedException(ex, "__wait()"); 00312 } 00313 } 00314 00315 void __wait(const WAIT_MODES mode, const size_t timeout) throw (QueueUnblockedException) 00316 { 00317 try { 00318 struct timespec ts; 00319 Conditional::gettimeout(timeout, &ts); 00320 00321 switch (mode) 00322 { 00323 case QUEUE_NOT_EMPTY: 00324 { 00325 while (_queue.empty()) 00326 { 00327 _cond.wait(&ts); 00328 } 00329 break; 00330 } 00331 00332 case QUEUE_EMPTY: 00333 { 00334 while (!_queue.empty()) 00335 { 00336 _cond.wait(&ts); 00337 } 00338 break; 00339 } 00340 } 00341 } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) { 00342 switch (ex.reason) 00343 { 00344 case ibrcommon::Conditional::ConditionalAbortException::COND_ABORT: 00345 _cond.reset(); 00346 break; 00347 00348 default: 00349 break; 00350 } 00351 00352 throw QueueUnblockedException(ex, "__wait()"); 00353 } 00354 } 00355 }; 00356 } 00357 00358 #endif /* IBRCOMMON_QUEUE_H_ */