IBR-DTNSuite
0.8
|
00001 /* 00002 * Thread.cpp 00003 * 00004 * Created on: 30.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrcommon/config.h" 00009 #include "ibrcommon/thread/Thread.h" 00010 #include "ibrcommon/thread/MutexLock.h" 00011 #include "ibrcommon/Logger.h" 00012 #include <stdexcept> 00013 #include <pthread.h> 00014 #include <sys/times.h> 00015 #include <stdio.h> 00016 #include <unistd.h> 00017 #include <signal.h> 00018 00019 #ifdef __DEVELOPMENT_ASSERTIONS__ 00020 #include <cassert> 00021 #endif 00022 00023 namespace ibrcommon 00024 { 00025 void Thread::finalize_thread(void *obj) 00026 { 00027 #ifdef __DEVELOPMENT_ASSERTIONS__ 00028 // the given object should never be null 00029 assert(obj != NULL); 00030 #endif 00031 // cast the thread object 00032 Thread *th = static_cast<Thread *>(obj); 00033 00034 // set the state to finalizing, blocking all threads until this is done 00035 th->_state = THREAD_FINALIZING; 00036 00037 // call the finally method 00038 th->finally(); 00039 00040 // set the state to DONE 00041 th->_state = THREAD_FINALIZED; 00042 00043 // delete the thread-object is requested 00044 if (th->__delete_on_exit) 00045 { 00046 delete th; 00047 } 00048 } 00049 00050 void* Thread::exec_thread(void *obj) 00051 { 00052 #ifdef __DEVELOPMENT_ASSERTIONS__ 00053 // the given object should never be null 00054 assert(obj != NULL); 00055 #endif 00056 // cast the thread object 00057 Thread *th = static_cast<Thread *>(obj); 00058 00059 // add cleanup-handler to finalize threads 00060 pthread_cleanup_push(Thread::finalize_thread, (void *)th); 00061 00062 // set the state to RUNNING 00063 try { 00064 // enter the setup stage, if this thread is still in preparation state 00065 { 00066 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = th->_state.lock(); 00067 if (ls != THREAD_PREPARE) 00068 { 00069 throw ibrcommon::Exception("Thread has been canceled."); 00070 } 00071 00072 ls = THREAD_SETUP; 00073 } 00074 00075 // call the setup method 00076 th->setup(); 00077 } catch (const std::exception &ex) { 00078 IBRCOMMON_LOGGER_DEBUG(40) << "Thread setup aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00079 00080 // exit this thread 00081 Thread::exit(); 00082 } 00083 00084 try { 00085 // set the state to running 00086 th->_state = THREAD_RUNNING; 00087 00088 // run threads run routine 00089 th->run(); 00090 } catch (const std::exception &ex) { 00091 // an exception occured in run() or ready(), but this is allowed 00092 IBRCOMMON_LOGGER_DEBUG(40) << "Thread execution aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00093 Thread::exit(); 00094 } 00095 00096 // remove cleanup-handler and call it 00097 pthread_cleanup_pop(1); 00098 00099 // exit the thread 00100 Thread::exit(); 00101 00102 return NULL; 00103 } 00104 00105 Thread::Thread(size_t size, bool delete_on_exit) 00106 : _state(THREAD_INITIALIZED, THREAD_JOINED), tid(0), stack(size), priority(0), __delete_on_exit(delete_on_exit) 00107 { 00108 pthread_attr_init(&attr); 00109 } 00110 00111 void Thread::yield(void) 00112 { 00113 #if defined(HAVE_PTHREAD_YIELD_NP) 00114 pthread_yield_np(); 00115 #elif defined(HAVE_PTHREAD_YIELD) 00116 pthread_yield(); 00117 #else 00118 sched_yield(); 00119 #endif 00120 } 00121 00122 void Thread::concurrency(int level) 00123 { 00124 #if defined(HAVE_PTHREAD_SETCONCURRENCY) 00125 pthread_setconcurrency(level); 00126 #endif 00127 } 00128 00129 void Thread::sleep(size_t timeout) 00130 { 00131 timespec ts; 00132 ts.tv_sec = timeout / 1000l; 00133 ts.tv_nsec = (timeout % 1000l) * 1000000l; 00134 #if defined(HAVE_PTHREAD_DELAY) 00135 pthread_delay(&ts); 00136 #elif defined(HAVE_PTHREAD_DELAY_NP) 00137 pthread_delay_np(&ts); 00138 #else 00139 usleep(timeout * 1000); 00140 #endif 00141 } 00142 00143 Thread::~Thread() 00144 { 00145 pthread_attr_destroy(&attr); 00146 } 00147 00148 void Thread::detach(void) 00149 { 00150 pthread_detach(tid); 00151 } 00152 00153 void Thread::exit(void) 00154 { 00155 pthread_exit(NULL); 00156 } 00157 00158 int Thread::kill(int sig) 00159 { 00160 if (tid == 0) return -1; 00161 return pthread_kill(tid, sig); 00162 } 00163 00164 void Thread::cancel() throw (ThreadException) 00165 { 00166 // block multiple cancel calls 00167 { 00168 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00169 00170 // is this thread running? 00171 if (ls == THREAD_INITIALIZED) 00172 { 00173 // deny any futher startup of this thread 00174 ls = THREAD_JOINED; 00175 return; 00176 } 00177 00178 if ( (ls == THREAD_PREPARE) || (ls == THREAD_SETUP) ) 00179 { 00180 // wait until RUNNING, ERROR, JOINED or FINALIZED is reached 00181 ls.wait(THREAD_RUNNING | THREAD_JOINED | THREAD_ERROR | THREAD_FINALIZED | THREAD_CANCELLED); 00182 } 00183 00184 // exit if the thread is not running 00185 if (ls != THREAD_RUNNING) return; 00186 00187 // set state to cancelled 00188 ls = THREAD_CANCELLED; 00189 } 00190 00191 // run custom cancellation 00192 __cancellation(); 00193 } 00194 00195 bool Thread::equal(pthread_t t1, pthread_t t2) 00196 { 00197 return (pthread_equal(t1, t2) != 0); 00198 } 00199 00200 JoinableThread::JoinableThread(size_t size) 00201 : Thread(size, false) 00202 { 00203 } 00204 00205 JoinableThread::~JoinableThread() 00206 { 00207 #ifdef __DEVELOPMENT_ASSERTIONS__ 00208 // every thread should be joined, when the destructor is called. 00209 if ( (_state != THREAD_JOINED) && (_state != THREAD_ERROR) ) 00210 { 00211 std::cerr << "FAILURE: Thread not joined! Current state:" << _state.get() << std::endl; 00212 assert( (_state == THREAD_JOINED) || (_state == THREAD_ERROR) ); 00213 } 00214 #endif 00215 join(); 00216 } 00217 00218 void JoinableThread::start(int adj) throw (ThreadException) 00219 { 00220 int ret; 00221 00222 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00223 if (ls != THREAD_INITIALIZED) return; 00224 00225 // set the thread state to PREPARE 00226 ls = THREAD_PREPARE; 00227 00228 // set priority 00229 priority = adj; 00230 00231 #ifndef __PTH__ 00232 // modify the threads attributes - set as joinable thread 00233 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 00234 00235 // ignore scheduling policy and use the same as the parent 00236 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); 00237 #endif 00238 // we typically use "stack 1" for min stack... 00239 #ifdef PTHREAD_STACK_MIN 00240 // if the given stack size is too small... 00241 if(stack && stack < PTHREAD_STACK_MIN) 00242 { 00243 // set it to the min stack size 00244 stack = PTHREAD_STACK_MIN; 00245 } 00246 #else 00247 // if stack size if larger than zero and smaller than two... 00248 if (stack && stack < 2) 00249 { 00250 // set it to zero (we will not set the stack size) 00251 stack = 0; 00252 } 00253 #endif 00254 00255 #ifdef __PTH__ 00256 // spawn a new thread 00257 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this); 00258 #else 00259 // if the stack size is specified (> 0) 00260 if (stack) 00261 { 00262 // set the stack size attribute 00263 pthread_attr_setstacksize(&attr, stack); 00264 } 00265 00266 // spawn a new thread 00267 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this); 00268 00269 00270 switch (ret) 00271 { 00272 case EAGAIN: 00273 ls = THREAD_ERROR; 00274 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded."); 00275 case EINVAL: 00276 ls = THREAD_ERROR; 00277 throw ThreadException(ret, "The value specified by attr is invalid."); 00278 case EPERM: 00279 ls = THREAD_ERROR; 00280 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy."); 00281 } 00282 #endif 00283 } 00284 00285 void JoinableThread::stop() throw (ThreadException) 00286 { 00287 // Cancel throws the exception of the terminated thread 00288 // so we have to catch any possible exception here 00289 try { 00290 Thread::cancel(); 00291 } catch (const ThreadException&) { 00292 throw; 00293 } catch (const std::exception&) { 00294 } 00295 } 00296 00297 void JoinableThread::join(void) 00298 { 00299 // first to some state checking 00300 { 00301 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00302 00303 // if the thread never has been started: exit and deny any further startup 00304 if (ls == THREAD_INITIALIZED) 00305 { 00306 ls = THREAD_JOINED; 00307 return; 00308 } 00309 00310 // wait until the finalized state is reached 00311 ls.wait(THREAD_FINALIZED | THREAD_JOINED | THREAD_ERROR); 00312 00313 // do not join if an error occured 00314 if (ls == THREAD_ERROR) return; 00315 00316 // if the thread has been joined already: exit 00317 if (ls == THREAD_JOINED) return; 00318 00319 // get the thread-id of the calling thread 00320 pthread_t self = pthread_self(); 00321 00322 // if we try to join our own thread, just call Thread::exit() 00323 if (equal(tid, self)) 00324 { 00325 Thread::exit(); 00326 return; 00327 } 00328 } 00329 00330 // if the thread has been started, do join 00331 if (pthread_join(tid, NULL) == 0) 00332 { 00333 // set the state to joined 00334 _state = THREAD_JOINED; 00335 } 00336 else 00337 { 00338 IBRCOMMON_LOGGER(error) << "Join on a thread failed[" << tid << "]" << IBRCOMMON_LOGGER_ENDL; 00339 _state = THREAD_ERROR; 00340 } 00341 } 00342 00343 DetachedThread::DetachedThread(size_t size) : Thread(size, true) 00344 { 00345 } 00346 00347 DetachedThread::~DetachedThread() 00348 { 00349 } 00350 00351 void DetachedThread::start(int adj) throw (ThreadException) 00352 { 00353 int ret = 0; 00354 00355 ibrcommon::ThreadsafeState<THREAD_STATE>::Locked ls = _state.lock(); 00356 if (ls != THREAD_INITIALIZED) return; 00357 00358 // set the thread state to PREPARE 00359 ls = THREAD_PREPARE; 00360 00361 // set the priority 00362 priority = adj; 00363 00364 #ifndef __PTH__ 00365 // modify the threads attributes - set as detached thread 00366 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 00367 00368 // ignore scheduling policy and use the same as the parent 00369 pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); 00370 #endif 00371 // we typically use "stack 1" for min stack... 00372 #ifdef PTHREAD_STACK_MIN 00373 // if the given stack size is too small... 00374 if(stack && stack < PTHREAD_STACK_MIN) 00375 { 00376 // set it to the min stack size 00377 stack = PTHREAD_STACK_MIN; 00378 } 00379 #else 00380 // if stack size if larger than zero and smaller than two... 00381 if (stack && stack < 2) 00382 { 00383 // set it to zero (we will not set the stack size) 00384 stack = 0; 00385 } 00386 #endif 00387 00388 #ifdef __PTH__ 00389 // spawn a new thread 00390 tid = pth_spawn(PTH_ATTR_DEFAULT, &Thread::exec_thread, this); 00391 #else 00392 // if the stack size is specified (> 0) 00393 if (stack) 00394 { 00395 // set the stack size attribute 00396 pthread_attr_setstacksize(&attr, stack); 00397 } 00398 00399 // spawn a new thread 00400 ret = pthread_create(&tid, &attr, &Thread::exec_thread, this); 00401 00402 // check for errors 00403 switch (ret) 00404 { 00405 case EAGAIN: 00406 ls = THREAD_ERROR; 00407 throw ThreadException(ret, "The system lacked the necessary resources to create another thread, or the system-imposed limit on the total number of threads in a process PTHREAD_THREADS_MAX would be exceeded."); 00408 case EINVAL: 00409 ls = THREAD_ERROR; 00410 throw ThreadException(ret, "The value specified by attr is invalid."); 00411 case EPERM: 00412 ls = THREAD_ERROR; 00413 throw ThreadException(ret, "The caller does not have appropriate permission to set the required scheduling parameters or scheduling policy."); 00414 } 00415 #endif 00416 } 00417 00418 void DetachedThread::stop() throw (ThreadException) 00419 { 00420 // Cancel throws the exception of the terminated thread 00421 // so we have to catch any possible exception here 00422 try { 00423 Thread::cancel(); 00424 } catch (const ThreadException&) { 00425 throw; 00426 } catch (const std::exception&) { 00427 } 00428 } 00429 }