IBR-DTNSuite  0.12
Queue.h
Go to the documentation of this file.
1 /*
2  * Queue.h
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #ifndef IBRCOMMON_QUEUE_H_
23 #define IBRCOMMON_QUEUE_H_
24 
27 #include "ibrcommon/Exceptions.h"
30 #include <queue>
31 #include <iostream>
32 
33 namespace ibrcommon
34 {
36  {
37  public:
38  enum type_t
39  {
43  };
44 
45  QueueUnblockedException(const type_t r, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what), reason(r)
46  {
47  };
48 
49  QueueUnblockedException(const ibrcommon::Conditional::ConditionalAbortException &ex, string what = "Queue is unblocked.") throw() : ibrcommon::Exception(what)
50  {
51  switch (ex.reason)
52  {
55  _what = "queue function aborted in " + what;
56  break;
57 
60  _what = "queue function error in " + what;
61  break;
62 
65  _what = "queue function timeout in " + what;
66  break;
67  }
68  };
69 
70  type_t reason;
71  };
72 
73  template <class T>
74  class Queue
75  {
77  std::queue<T> _queue;
79  bool _limit;
80 
81  public:
82  Queue(unsigned int max = 0) : _sem(max), _limit(max > 0)
83  {};
84 
85  virtual ~Queue()
86  {
87  abort();
88  };
89 
90  /* Test whether container is empty (public member function) */
91  bool empty ( )
92  {
93  ibrcommon::MutexLock l(_cond);
94  return _queue.empty();
95  }
96 
97  /* Return size (public member function) */
98  size_t size ( ) const
99  {
100  return _queue.size();
101  }
102 
103  /* Access next element (public member function) */
104  T& front ( )
105  {
106  ibrcommon::MutexLock l(_cond);
107  return _queue.front();
108  }
109 
110  const T& front ( ) const
111  {
112  ibrcommon::MutexLock l(_cond);
113  return _queue.front();
114  }
115 
116  /* Access last element (public member function) */
117  T& back ( )
118  {
119  ibrcommon::MutexLock l(_cond);
120  return _queue.back();
121  }
122 
123  const T& back ( ) const
124  {
125  ibrcommon::MutexLock l(_cond);
126  return _queue.back();
127  }
128 
129  /* Insert element (public member function) */
130  void push ( const T& x )
131  {
132  if (_limit) _sem.wait();
133 
134  ibrcommon::MutexLock l(_cond);
135  _queue.push(x);
136  _cond.signal(true);
137  }
138 
139  /* Delete next element (public member function) */
140  void pop ()
141  {
142  ibrcommon::MutexLock l(_cond);
143  __pop();
144  }
145 
146  T get(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException)
147  {
148  try {
149  ibrcommon::MutexLock l(_cond);
150  if (_queue.empty())
151  {
152  if (blocking)
153  {
154  if (timeout == 0)
155  {
157  }
158  else
159  {
160  __wait(QUEUE_NOT_EMPTY, timeout);
161  }
162  }
163  else
164  {
165  throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!");
166  }
167  }
168 
169  return _queue.front();
171  throw QueueUnblockedException(ex, "getnpop()");
172  }
173  }
174 
175  T getnpop(bool blocking = false, size_t timeout = 0) throw (QueueUnblockedException)
176  {
177  try {
178  ibrcommon::MutexLock l(_cond);
179  if (_queue.empty())
180  {
181  if (blocking)
182  {
183  if (timeout == 0)
184  {
186  }
187  else
188  {
189  __wait(QUEUE_NOT_EMPTY, timeout);
190  }
191  }
192  else
193  {
194  throw QueueUnblockedException(QueueUnblockedException::QUEUE_ABORT, "getnpop(): queue is empty!");
195  }
196  }
197 
198  T ret = _queue.front();
199  __pop();
200  return ret;
202  throw QueueUnblockedException(ex, "getnpop()");
203  }
204  }
205 
206  void abort() throw ()
207  {
208  ibrcommon::MutexLock l(_cond);
209  _cond.abort();
210  }
211 
212  void reset() throw ()
213  {
214  _cond.reset();
215  }
216 
218  {
221  };
222 
223  void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
224  {
225  ibrcommon::MutexLock l(_cond);
226  if (timeout == 0)
227  {
228  __wait(mode);
229  }
230  else
231  {
232  __wait(mode, timeout);
233  }
234  }
235 
236  class Locked
237  {
238  public:
239  Locked(Queue<T> &queue)
240  : _queue(queue), _lock(queue._cond), _changed(false)
241  {
242  };
243 
244  virtual ~Locked()
245  {
246  if (_changed) _queue._cond.signal(true);
247  };
248 
249  void wait(WAIT_MODES mode, const size_t timeout = 0) throw (QueueUnblockedException)
250  {
251  if (timeout == 0)
252  {
253  _queue.__wait(mode);
254  }
255  else
256  {
257  _queue.__wait(mode, timeout);
258  }
259  }
260 
261  void pop()
262  {
263  _queue.__pop();
264  }
265 
266  const T& front() const
267  {
268  return _queue._queue.front();
269  }
270 
271  T& front()
272  {
273  return _queue._queue.front();
274  }
275 
276  bool empty()
277  {
278  return _queue._queue.empty();
279  }
280 
281  size_t size()
282  {
283  return _queue._queue.size();
284  }
285 
286  void push(const T &p)
287  {
288  _queue._queue.push(p);
289  _changed = true;
290  }
291 
292  private:
293  Queue<T> &_queue;
294  ibrcommon::MutexLock _lock;
295  bool _changed;
296  };
297 
299  {
300  return typename Queue<T>::Locked(*this);
301  }
302 
303  protected:
304  void __push( const T& x )
305  {
306  _queue.push(x);
307  _cond.signal(true);
308  }
309 
310  void __pop()
311  {
312  if (!_queue.empty())
313  {
314  _queue.pop();
315  if (_limit) _sem.post();
316  _cond.signal(true);
317  }
318  }
319 
320  void __wait(const WAIT_MODES mode) throw (QueueUnblockedException)
321  {
322  try {
323  switch (mode)
324  {
325  case QUEUE_NOT_EMPTY:
326  {
327  while (_queue.empty())
328  {
329  _cond.wait();
330  }
331  break;
332  }
333 
334  case QUEUE_EMPTY:
335  {
336  while (!_queue.empty())
337  {
338  _cond.wait();
339  }
340  break;
341  }
342  }
344  switch (ex.reason)
345  {
347  _cond.reset();
348  break;
349 
350  default:
351  break;
352  }
353 
354  throw QueueUnblockedException(ex, "__wait()");
355  }
356  }
357 
358  void __wait(const WAIT_MODES mode, const size_t timeout) throw (QueueUnblockedException)
359  {
360  try {
361  struct timespec ts;
362  Conditional::gettimeout(timeout, &ts);
363 
364  switch (mode)
365  {
366  case QUEUE_NOT_EMPTY:
367  {
368  while (_queue.empty())
369  {
370  _cond.wait(&ts);
371  }
372  break;
373  }
374 
375  case QUEUE_EMPTY:
376  {
377  while (!_queue.empty())
378  {
379  _cond.wait(&ts);
380  }
381  break;
382  }
383  }
385  switch (ex.reason)
386  {
388  _cond.reset();
389  break;
390 
391  default:
392  break;
393  }
394 
395  throw QueueUnblockedException(ex, "__wait()");
396  }
397  }
398  };
399 }
400 
401 #endif /* IBRCOMMON_QUEUE_H_ */