123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #include "rar.hpp"
- #ifdef RAR_SMP
- #include "threadmisc.cpp"
- #ifdef _WIN_ALL
- int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
- #endif
- ThreadPool::ThreadPool(uint MaxThreads)
- {
- MaxAllowedThreads = MaxThreads;
- if (MaxAllowedThreads>MaxPoolThreads)
- MaxAllowedThreads=MaxPoolThreads;
- if (MaxAllowedThreads==0)
- MaxAllowedThreads=1;
- ThreadsCreatedCount=0;
- // If we have more threads than queue size, we'll hang on pool destroying,
- // not releasing all waiting threads.
- if (MaxAllowedThreads>ASIZE(TaskQueue))
- MaxAllowedThreads=ASIZE(TaskQueue);
- Closing=false;
- bool Success = CriticalSectionCreate(&CritSection);
- #ifdef _WIN_ALL
- QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
- NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
- Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
- #elif defined(_UNIX)
- AnyActive = false;
- QueuedTasksCnt = 0;
- Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
- pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
- pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
- pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
- #endif
- if (!Success)
- {
- ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
- ErrHandler.Exit(RARX_FATAL);
- }
- QueueTop = 0;
- QueueBottom = 0;
- ActiveThreads = 0;
- }
- ThreadPool::~ThreadPool()
- {
- WaitDone();
- Closing=true;
- #ifdef _WIN_ALL
- ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
- #elif defined(_UNIX)
- // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
- // so lock is required. We would occassionally hang without it.
- pthread_mutex_lock(&QueuedTasksCntMutex);
- QueuedTasksCnt+=ASIZE(TaskQueue);
- pthread_mutex_unlock(&QueuedTasksCntMutex);
- pthread_cond_broadcast(&QueuedTasksCntCond);
- #endif
- for(uint I=0;I<ThreadsCreatedCount;I++)
- {
- #ifdef _WIN_ALL
- // Waiting until the thread terminates.
- CWaitForSingleObject(ThreadHandles[I]);
- #endif
- // Close the thread handle. In Unix it results in pthread_join call,
- // which also waits for thread termination.
- ThreadClose(ThreadHandles[I]);
- }
- CriticalSectionDelete(&CritSection);
- #ifdef _WIN_ALL
- CloseHandle(QueuedTasksCnt);
- CloseHandle(NoneActive);
- #elif defined(_UNIX)
- pthread_cond_destroy(&AnyActiveCond);
- pthread_mutex_destroy(&AnyActiveMutex);
- pthread_cond_destroy(&QueuedTasksCntCond);
- pthread_mutex_destroy(&QueuedTasksCntMutex);
- #endif
- }
- void ThreadPool::CreateThreads()
- {
- for(uint I=0;I<MaxAllowedThreads;I++)
- {
- ThreadHandles[I] = ThreadCreate(PoolThread, this);
- ThreadsCreatedCount++;
- #ifdef _WIN_ALL
- if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
- SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
- #endif
- }
- }
- NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
- {
- ((ThreadPool*)Param)->PoolThreadLoop();
- return 0;
- }
- void ThreadPool::PoolThreadLoop()
- {
- QueueEntry Task;
- while (GetQueuedTask(&Task))
- {
- Task.Proc(Task.Param);
-
- CriticalSectionStart(&CritSection);
- if (--ActiveThreads == 0)
- {
- #ifdef _WIN_ALL
- SetEvent(NoneActive);
- #elif defined(_UNIX)
- pthread_mutex_lock(&AnyActiveMutex);
- AnyActive=false;
- pthread_cond_signal(&AnyActiveCond);
- pthread_mutex_unlock(&AnyActiveMutex);
- #endif
- }
- CriticalSectionEnd(&CritSection);
- }
- }
- bool ThreadPool::GetQueuedTask(QueueEntry *Task)
- {
- #ifdef _WIN_ALL
- CWaitForSingleObject(QueuedTasksCnt);
- #elif defined(_UNIX)
- pthread_mutex_lock(&QueuedTasksCntMutex);
- while (QueuedTasksCnt==0)
- cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
- QueuedTasksCnt--;
- pthread_mutex_unlock(&QueuedTasksCntMutex);
- #endif
- if (Closing)
- return false;
- CriticalSectionStart(&CritSection);
- *Task = TaskQueue[QueueBottom];
- QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
- CriticalSectionEnd(&CritSection);
- return true;
- }
- // Add task to queue. We assume that it is always called from main thread,
- // it allows to avoid any locks here. We process collected tasks only
- // when WaitDone is called.
- void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
- {
- if (ThreadsCreatedCount == 0)
- CreateThreads();
-
- // If queue is full, wait until it is empty.
- if (ActiveThreads>=ASIZE(TaskQueue))
- WaitDone();
- TaskQueue[QueueTop].Proc = Proc;
- TaskQueue[QueueTop].Param = Data;
- QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
- ActiveThreads++;
- }
- // Start queued tasks and wait until all threads are inactive.
- // We assume that it is always called from main thread, when pool threads
- // are sleeping yet.
- void ThreadPool::WaitDone()
- {
- if (ActiveThreads==0)
- return;
- #ifdef _WIN_ALL
- ResetEvent(NoneActive);
- ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
- CWaitForSingleObject(NoneActive);
- #elif defined(_UNIX)
- AnyActive=true;
- // Threads reset AnyActive before accessing QueuedTasksCnt and even
- // preceding WaitDone() call does not guarantee that some slow thread
- // is not accessing QueuedTasksCnt now. So lock is necessary.
- pthread_mutex_lock(&QueuedTasksCntMutex);
- QueuedTasksCnt+=ActiveThreads;
- pthread_mutex_unlock(&QueuedTasksCntMutex);
- pthread_cond_broadcast(&QueuedTasksCntCond);
- pthread_mutex_lock(&AnyActiveMutex);
- while (AnyActive)
- cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
- pthread_mutex_unlock(&AnyActiveMutex);
- #endif
- }
- #endif // RAR_SMP
|