threadpool.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. #include "rar.hpp"
  2. #ifdef RAR_SMP
  3. #include "threadmisc.cpp"
  4. #ifdef _WIN_ALL
  5. int ThreadPool::ThreadPriority=THREAD_PRIORITY_NORMAL;
  6. #endif
  7. ThreadPool::ThreadPool(uint MaxThreads)
  8. {
  9. MaxAllowedThreads = MaxThreads;
  10. if (MaxAllowedThreads>MaxPoolThreads)
  11. MaxAllowedThreads=MaxPoolThreads;
  12. if (MaxAllowedThreads==0)
  13. MaxAllowedThreads=1;
  14. ThreadsCreatedCount=0;
  15. // If we have more threads than queue size, we'll hang on pool destroying,
  16. // not releasing all waiting threads.
  17. if (MaxAllowedThreads>ASIZE(TaskQueue))
  18. MaxAllowedThreads=ASIZE(TaskQueue);
  19. Closing=false;
  20. bool Success = CriticalSectionCreate(&CritSection);
  21. #ifdef _WIN_ALL
  22. QueuedTasksCnt=CreateSemaphore(NULL,0,ASIZE(TaskQueue),NULL);
  23. NoneActive=CreateEvent(NULL,TRUE,TRUE,NULL);
  24. Success=Success && QueuedTasksCnt!=NULL && NoneActive!=NULL;
  25. #elif defined(_UNIX)
  26. AnyActive = false;
  27. QueuedTasksCnt = 0;
  28. Success=Success && pthread_cond_init(&AnyActiveCond,NULL)==0 &&
  29. pthread_mutex_init(&AnyActiveMutex,NULL)==0 &&
  30. pthread_cond_init(&QueuedTasksCntCond,NULL)==0 &&
  31. pthread_mutex_init(&QueuedTasksCntMutex,NULL)==0;
  32. #endif
  33. if (!Success)
  34. {
  35. ErrHandler.GeneralErrMsg(L"\nThread pool initialization failed.");
  36. ErrHandler.Exit(RARX_FATAL);
  37. }
  38. QueueTop = 0;
  39. QueueBottom = 0;
  40. ActiveThreads = 0;
  41. }
  42. ThreadPool::~ThreadPool()
  43. {
  44. WaitDone();
  45. Closing=true;
  46. #ifdef _WIN_ALL
  47. ReleaseSemaphore(QueuedTasksCnt,ASIZE(TaskQueue),NULL);
  48. #elif defined(_UNIX)
  49. // Threads still can access QueuedTasksCnt for a short time after WaitDone(),
  50. // so lock is required. We would occassionally hang without it.
  51. pthread_mutex_lock(&QueuedTasksCntMutex);
  52. QueuedTasksCnt+=ASIZE(TaskQueue);
  53. pthread_mutex_unlock(&QueuedTasksCntMutex);
  54. pthread_cond_broadcast(&QueuedTasksCntCond);
  55. #endif
  56. for(uint I=0;I<ThreadsCreatedCount;I++)
  57. {
  58. #ifdef _WIN_ALL
  59. // Waiting until the thread terminates.
  60. CWaitForSingleObject(ThreadHandles[I]);
  61. #endif
  62. // Close the thread handle. In Unix it results in pthread_join call,
  63. // which also waits for thread termination.
  64. ThreadClose(ThreadHandles[I]);
  65. }
  66. CriticalSectionDelete(&CritSection);
  67. #ifdef _WIN_ALL
  68. CloseHandle(QueuedTasksCnt);
  69. CloseHandle(NoneActive);
  70. #elif defined(_UNIX)
  71. pthread_cond_destroy(&AnyActiveCond);
  72. pthread_mutex_destroy(&AnyActiveMutex);
  73. pthread_cond_destroy(&QueuedTasksCntCond);
  74. pthread_mutex_destroy(&QueuedTasksCntMutex);
  75. #endif
  76. }
  77. void ThreadPool::CreateThreads()
  78. {
  79. for(uint I=0;I<MaxAllowedThreads;I++)
  80. {
  81. ThreadHandles[I] = ThreadCreate(PoolThread, this);
  82. ThreadsCreatedCount++;
  83. #ifdef _WIN_ALL
  84. if (ThreadPool::ThreadPriority!=THREAD_PRIORITY_NORMAL)
  85. SetThreadPriority(ThreadHandles[I],ThreadPool::ThreadPriority);
  86. #endif
  87. }
  88. }
  89. NATIVE_THREAD_TYPE ThreadPool::PoolThread(void *Param)
  90. {
  91. ((ThreadPool*)Param)->PoolThreadLoop();
  92. return 0;
  93. }
  94. void ThreadPool::PoolThreadLoop()
  95. {
  96. QueueEntry Task;
  97. while (GetQueuedTask(&Task))
  98. {
  99. Task.Proc(Task.Param);
  100. CriticalSectionStart(&CritSection);
  101. if (--ActiveThreads == 0)
  102. {
  103. #ifdef _WIN_ALL
  104. SetEvent(NoneActive);
  105. #elif defined(_UNIX)
  106. pthread_mutex_lock(&AnyActiveMutex);
  107. AnyActive=false;
  108. pthread_cond_signal(&AnyActiveCond);
  109. pthread_mutex_unlock(&AnyActiveMutex);
  110. #endif
  111. }
  112. CriticalSectionEnd(&CritSection);
  113. }
  114. }
  115. bool ThreadPool::GetQueuedTask(QueueEntry *Task)
  116. {
  117. #ifdef _WIN_ALL
  118. CWaitForSingleObject(QueuedTasksCnt);
  119. #elif defined(_UNIX)
  120. pthread_mutex_lock(&QueuedTasksCntMutex);
  121. while (QueuedTasksCnt==0)
  122. cpthread_cond_wait(&QueuedTasksCntCond,&QueuedTasksCntMutex);
  123. QueuedTasksCnt--;
  124. pthread_mutex_unlock(&QueuedTasksCntMutex);
  125. #endif
  126. if (Closing)
  127. return false;
  128. CriticalSectionStart(&CritSection);
  129. *Task = TaskQueue[QueueBottom];
  130. QueueBottom = (QueueBottom + 1) % ASIZE(TaskQueue);
  131. CriticalSectionEnd(&CritSection);
  132. return true;
  133. }
  134. // Add task to queue. We assume that it is always called from main thread,
  135. // it allows to avoid any locks here. We process collected tasks only
  136. // when WaitDone is called.
  137. void ThreadPool::AddTask(PTHREAD_PROC Proc,void *Data)
  138. {
  139. if (ThreadsCreatedCount == 0)
  140. CreateThreads();
  141. // If queue is full, wait until it is empty.
  142. if (ActiveThreads>=ASIZE(TaskQueue))
  143. WaitDone();
  144. TaskQueue[QueueTop].Proc = Proc;
  145. TaskQueue[QueueTop].Param = Data;
  146. QueueTop = (QueueTop + 1) % ASIZE(TaskQueue);
  147. ActiveThreads++;
  148. }
  149. // Start queued tasks and wait until all threads are inactive.
  150. // We assume that it is always called from main thread, when pool threads
  151. // are sleeping yet.
  152. void ThreadPool::WaitDone()
  153. {
  154. if (ActiveThreads==0)
  155. return;
  156. #ifdef _WIN_ALL
  157. ResetEvent(NoneActive);
  158. ReleaseSemaphore(QueuedTasksCnt,ActiveThreads,NULL);
  159. CWaitForSingleObject(NoneActive);
  160. #elif defined(_UNIX)
  161. AnyActive=true;
  162. // Threads reset AnyActive before accessing QueuedTasksCnt and even
  163. // preceding WaitDone() call does not guarantee that some slow thread
  164. // is not accessing QueuedTasksCnt now. So lock is necessary.
  165. pthread_mutex_lock(&QueuedTasksCntMutex);
  166. QueuedTasksCnt+=ActiveThreads;
  167. pthread_mutex_unlock(&QueuedTasksCntMutex);
  168. pthread_cond_broadcast(&QueuedTasksCntCond);
  169. pthread_mutex_lock(&AnyActiveMutex);
  170. while (AnyActive)
  171. cpthread_cond_wait(&AnyActiveCond,&AnyActiveMutex);
  172. pthread_mutex_unlock(&AnyActiveMutex);
  173. #endif
  174. }
  175. #endif // RAR_SMP