IBR-DTNSuite  0.8
ibrcommon/ibrcommon/thread/Thread.cpp
Go to the documentation of this file.
00001 /*
00002  * Thread.cpp
00003  *
00004  *  Created on: 30.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrcommon/config.h"
00009 #include "ibrcommon/thread/Thread.h"
00010 #include "ibrcommon/thread/MutexLock.h"
00011 #include "ibrcommon/Logger.h"
00012 #include <stdexcept>
00013 #include <pthread.h>
00014 #include <sys/times.h>
00015 #include <stdio.h>
00016 #include <unistd.h>
00017 #include <signal.h>
00018 
00019 #ifdef __DEVELOPMENT_ASSERTIONS__
00020 #include <cassert>
00021 #endif
00022 
00023 namespace ibrcommon
00024 {
00025         void Thread::finalize_thread(void *obj)
00026         {
00027 #ifdef __DEVELOPMENT_ASSERTIONS__
00028                 // the given object should never be null
00029                 assert(obj != NULL);
00030 #endif
00031                 // cast the thread object
00032                 Thread *th = static_cast<Thread *>(obj);
00033 
00034                 // set the state to finalizing, blocking all threads until this is done
00035                 th->_state = THREAD_FINALIZING;
00036 
00037                 // call the finally method
00038                 th->finally();
00039 
00040                 // set the state to DONE
00041                 th->_state = THREAD_FINALIZED;
00042 
00043                 // delete the thread-object is requested
00044                 if (th->__delete_on_exit)
00045                 {
00046                         delete th;
00047                 }
00048         }
00049 
00050         void* Thread::exec_thread(void *obj)
00051         {
00052 #ifdef __DEVELOPMENT_ASSERTIONS__
00053                 // the given object should never be null
00054                 assert(obj != NULL);
00055 #endif
00056                 // cast the thread object
00057                 Thread *th = static_cast<Thread *>(obj);
00058 
00059                 // add cleanup-handler to finalize threads
00060                 pthread_cleanup_push(Thread::finalize_thread, (void *)th);
00061 
00062                 // set the state to RUNNING
00063                 try {
00064                         // enter the setup stage, if this thread is still in preparation state
00065                         {
00066                                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = th->_state.lock();
00067                                 if (ls != THREAD_PREPARE)
00068                                 {
00069                                         throw ibrcommon::Exception("Thread has been canceled.");
00070                                 }
00071 
00072                                 ls = THREAD_SETUP;
00073                         }
00074 
00075                         // call the setup method
00076                         th->setup();
00077                 } catch (const std::exception &ex) {
00078                         IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00079 
00080                         // exit this thread
00081                         Thread::exit();
00082                 }
00083 
00084                 try {
00085                         // set the state to running
00086                         th->_state = THREAD_RUNNING;
00087 
00088                         // run threads run routine
00089                         th->run();
00090                 } catch (const std::exception &ex) {
00091                         // an exception occured in run() or ready(), but this is allowed
00092                         IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00093                         Thread::exit();
00094                 }
00095 
00096                 // remove cleanup-handler and call it
00097                 pthread_cleanup_pop(1);
00098 
00099                 // exit the thread
00100                 Thread::exit();
00101 
00102                 return NULL;
00103         }
00104 
00105         Thread::Thread(size_t size, bool delete_on_exit)
00106          : _state(THREAD_INITIALIZED, THREAD_JOINED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit)
00107         {
00108                 pthread_attr_init(&attr);
00109         }
00110 
00111         void Thread::yield(void)
00112         {
00113 #if defined(HAVE_PTHREAD_YIELD_NP)
00114                 pthread_yield_np();
00115 #elif defined(HAVE_PTHREAD_YIELD)
00116                 pthread_yield();
00117 #else
00118                 sched_yield();
00119 #endif
00120         }
00121 
00122         void Thread::concurrency(int level)
00123         {
00124         #if defined(HAVE_PTHREAD_SETCONCURRENCY)
00125                 pthread_setconcurrency(level);
00126         #endif
00127         }
00128 
00129         void Thread::sleep(size_t timeout)
00130         {
00131                 timespec ts;
00132                 ts.tv_sec = timeout / 1000l;
00133                 ts.tv_nsec = (timeout % 1000l) * 1000000l;
00134         #if defined(HAVE_PTHREAD_DELAY)
00135                 pthread_delay(&ts);
00136         #elif defined(HAVE_PTHREAD_DELAY_NP)
00137                 pthread_delay_np(&ts);
00138         #else
00139                 usleep(timeout * 1000);
00140         #endif
00141         }
00142 
00143         Thread::~Thread()
00144         {
00145                 pthread_attr_destroy(&attr);
00146         }
00147 
00148         void Thread::detach(void)
00149         {
00150                 pthread_detach(tid);
00151         }
00152 
00153         void Thread::exit(void)
00154         {
00155                 pthread_exit(NULL);
00156         }
00157 
00158         int Thread::kill(int sig)
00159         {
00160                 if (tid == 0) return -1;
00161                 return pthread_kill(tid, sig);
00162         }
00163 
00164         void Thread::cancel() throw (ThreadException)
00165         {
00166                 // block multiple cancel calls
00167                 {
00168                         ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00169 
00170                         // is this thread running?
00171                         if (ls == THREAD_INITIALIZED)
00172                         {
00173                                 // deny any futher startup of this thread
00174                                 ls = THREAD_JOINED;
00175                                 return;
00176                         }
00177 
00178                         if ( (ls == THREAD_PREPARE) || (ls == THREAD_SETUP) )
00179                         {
00180                                 // wait until RUNNING, ERROR, JOINED or FINALIZED is reached
00181                                 ls.wait(THREAD_RUNNING | THREAD_JOINED | THREAD_ERROR | THREAD_FINALIZED | THREAD_CANCELLED);
00182                         }
00183 
00184                         // exit if the thread is not running
00185                         if (ls != THREAD_RUNNING) return;
00186 
00187                         // set state to cancelled
00188                         ls = THREAD_CANCELLED;
00189                 }
00190 
00191                 // run custom cancellation
00192                 __cancellation();
00193         }
00194 
00195         bool Thread::equal(pthread_t t1, pthread_t t2)
00196         {
00197                 return (pthread_equal(t1, t2) != 0);
00198         }
00199 
00200         JoinableThread::JoinableThread(size_t size)
00201          : Thread(size, false)
00202         {
00203         }
00204 
00205         JoinableThread::~JoinableThread()
00206         {
00207 #ifdef __DEVELOPMENT_ASSERTIONS__
00208                 // every thread should be joined, when the destructor is called.
00209                 if ( (_state != THREAD_JOINED) && (_state != THREAD_ERROR) )
00210                 {
00211                         std::cerr << "FAILURE: Thread not joined! Current state:" << _state.get() << std::endl;
00212                         assert( (_state == THREAD_JOINED) || (_state == THREAD_ERROR) );
00213                 }
00214 #endif
00215                 join();
00216         }
00217 
00218         void JoinableThread::start(int adj) throw (ThreadException)
00219         {
00220                 int ret;
00221 
00222                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00223                 if (ls != THREAD_INITIALIZED) return;
00224 
00225                 // set the thread state to PREPARE
00226                 ls = THREAD_PREPARE;
00227 
00228                 // set priority
00229                 priority = adj;
00230 
00231 #ifndef __PTH__
00232                 // modify the threads attributes - set as joinable thread
00233                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
00234 
00235                 // ignore scheduling policy and use the same as the parent
00236                 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00237 #endif
00238                 // we typically use "stack 1" for min stack...
00239 #ifdef  PTHREAD_STACK_MIN
00240                 // if the given stack size is too small...
00241                 if(stack && stack < PTHREAD_STACK_MIN)
00242                 {
00243                         // set it to the min stack size
00244                         stack = PTHREAD_STACK_MIN;
00245                 }
00246 #else
00247                 // if stack size if larger than zero and smaller than two...
00248                 if (stack && stack < 2)
00249                 {
00250                         // set it to zero (we will not set the stack size)
00251                         stack = 0;
00252                 }
00253 #endif
00254 
00255 #ifdef  __PTH__
00256                 // spawn a new thread
00257                 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00258 #else
00259                 // if the stack size is specified (> 0)
00260                 if (stack)
00261                 {
00262                         // set the stack size attribute
00263                         pthread_attr_setstacksize(&attr, stack);
00264                 }
00265 
00266                 // spawn a new thread
00267                 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00268 
00269                 
00270                 switch (ret)
00271                 {
00272                 case EAGAIN:
00273                         ls = THREAD_ERROR;
00274                         throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00275                 case EINVAL:
00276                         ls = THREAD_ERROR;
00277                         throw ThreadException(ret, "The value specified by attr is invalid.");
00278                 case EPERM:
00279                         ls = THREAD_ERROR;
00280                         throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00281                 }
00282 #endif
00283         }
00284 
00285         void JoinableThread::stop() throw (ThreadException)
00286         {
00287                 // Cancel throws the exception of the terminated thread
00288                 // so we have to catch any possible exception here
00289                 try {
00290                         Thread::cancel();
00291                 } catch (const ThreadException&) {
00292                         throw;
00293                 } catch (const std::exception&) {
00294                 }
00295         }
00296 
00297         void JoinableThread::join(void)
00298         {
00299                 // first to some state checking
00300                 {
00301                         ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00302 
00303                         // if the thread never has been started: exit and deny any further startup
00304                         if (ls == THREAD_INITIALIZED)
00305                         {
00306                                 ls = THREAD_JOINED;
00307                                 return;
00308                         }
00309 
00310                         // wait until the finalized state is reached
00311                         ls.wait(THREAD_FINALIZED | THREAD_JOINED | THREAD_ERROR);
00312 
00313                         // do not join if an error occured
00314                         if (ls == THREAD_ERROR) return;
00315 
00316                         // if the thread has been joined already: exit
00317                         if (ls == THREAD_JOINED) return;
00318 
00319                         // get the thread-id of the calling thread
00320                         pthread_t self = pthread_self();
00321 
00322                         // if we try to join our own thread, just call Thread::exit()
00323                         if (equal(tid, self))
00324                         {
00325                                 Thread::exit();
00326                                 return;
00327                         }
00328                 }
00329 
00330                 // if the thread has been started, do join
00331                 if (pthread_join(tid, NULL) == 0)
00332                 {
00333                         // set the state to joined
00334                         _state = THREAD_JOINED;
00335                 }
00336                 else
00337                 {
00338                         IBRCOMMON_LOGGER(error) << "Join on a thread failed[" << tid << "]" << IBRCOMMON_LOGGER_ENDL;
00339                         _state = THREAD_ERROR;
00340                 }
00341         }
00342 
00343         DetachedThread::DetachedThread(size_t size) : Thread(size, true)
00344         {
00345         }
00346 
00347         DetachedThread::~DetachedThread()
00348         {
00349         }
00350 
00351         void DetachedThread::start(int adj) throw (ThreadException)
00352         {
00353                 int ret = 0;
00354 
00355                 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock();
00356                 if (ls != THREAD_INITIALIZED) return;
00357 
00358                 // set the thread state to PREPARE
00359                 ls = THREAD_PREPARE;
00360 
00361                 // set the priority
00362                 priority = adj;
00363 
00364 #ifndef __PTH__
00365                 // modify the threads attributes - set as detached thread
00366                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
00367 
00368                 // ignore scheduling policy and use the same as the parent
00369                 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED);
00370 #endif
00371                 // we typically use "stack 1" for min stack...
00372 #ifdef  PTHREAD_STACK_MIN
00373                 // if the given stack size is too small...
00374                 if(stack && stack < PTHREAD_STACK_MIN)
00375                 {
00376                         // set it to the min stack size
00377                         stack = PTHREAD_STACK_MIN;
00378                 }
00379 #else
00380                 // if stack size if larger than zero and smaller than two...
00381                 if (stack && stack < 2)
00382                 {
00383                         // set it to zero (we will not set the stack size)
00384                         stack = 0;
00385                 }
00386 #endif
00387 
00388 #ifdef  __PTH__
00389                 // spawn a new thread
00390                 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this);
00391 #else
00392                 // if the stack size is specified (> 0)
00393                 if (stack)
00394                 {
00395                         // set the stack size attribute
00396                         pthread_attr_setstacksize(&attr, stack);
00397                 }
00398 
00399                 // spawn a new thread
00400                 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this);
00401 
00402                 // check for errors
00403                 switch (ret)
00404                 {
00405                 case EAGAIN:
00406                         ls = THREAD_ERROR;
00407                         throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded.");
00408                 case EINVAL:
00409                         ls = THREAD_ERROR;
00410                         throw ThreadException(ret, "The value specified by attr is invalid.");
00411                 case EPERM:
00412                         ls = THREAD_ERROR;
00413                         throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy.");
00414                 }
00415 #endif
00416         }
00417 
00418         void DetachedThread::stop() throw (ThreadException)
00419         {
00420                 // Cancel throws the exception of the terminated thread
00421                 // so we have to catch any possible exception here
00422                 try {
00423                         Thread::cancel();
00424                 } catch (const ThreadException&) {
00425                         throw;
00426                 } catch (const std::exception&) {
00427                 }
00428         }
00429 }