ThreadPool.cpp 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. #include "ThreadPool.h"
  2. ThreadPool::ThreadPool()
  3. {
  4. for ( int i = 0; i < THREAD_TYPES; i++ )
  5. {
  6. num_threads_available[ i ] = 0;
  7. max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL );
  8. }
  9. killswitch = CreateEvent( NULL, TRUE, FALSE, NULL );
  10. // one thread of each type to start
  11. for ( int i = 0; i < 2; i++ )
  12. CreateNewThread_Internal( i );
  13. watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 );
  14. }
  15. void ThreadPool::Kill()
  16. {
  17. SetEvent( killswitch );
  18. WaitForSingleObject( watchdog_thread_handle, INFINITE );
  19. CloseHandle( watchdog_thread_handle );
  20. for ( ThreadID *l_thread : threads )
  21. {
  22. l_thread->Kill();
  23. delete l_thread;
  24. }
  25. CloseHandle( killswitch );
  26. for ( int i = 0; i < THREAD_TYPES; i++ )
  27. CloseHandle( max_load_event[ i ] );
  28. }
  29. DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param )
  30. {
  31. ThreadPool *_this = (ThreadPool *)param;
  32. return _this->WatchDogThreadProcedure();
  33. }
  34. /*
  35. watchdog will get woken up when number of available threads hits zero
  36. it creates a new thread, sleeps for a bit to let things "settle" and then reset the event
  37. it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way
  38. (so a new thread doesn't "miss" a handle that is added in the interim)
  39. */
  40. DWORD CALLBACK ThreadPool::WatchDogThreadProcedure()
  41. {
  42. // we ignore the max load event for reserved threads
  43. HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] };
  44. while ( 1 )
  45. {
  46. DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE );
  47. if ( ret == WAIT_OBJECT_0 )
  48. {
  49. break;
  50. }
  51. else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 )
  52. {
  53. int thread_type = ret - ( WAIT_OBJECT_0 + 1 );
  54. // this signal is for "full thread load reached"
  55. // lets make sure we're actually at max capacity
  56. Sleep( 10 ); // sleep a bit
  57. if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded
  58. continue;
  59. guard.Lock();
  60. CreateNewThread_Internal( thread_type );
  61. guard.Unlock();
  62. Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row
  63. ResetEvent( max_load_event[ thread_type ] );
  64. }
  65. }
  66. return 0;
  67. }
  68. ThreadID *ThreadPool::ReserveThread( int flags )
  69. {
  70. // first, check to see if there's any released threads we can grab
  71. Nullsoft::Utility::AutoLock threadlock( guard );
  72. for ( ThreadID *t : threads )
  73. {
  74. if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) )
  75. {
  76. t->Reserve();
  77. return t;
  78. }
  79. }
  80. // TODO: if there are enough free threads available, mark one as reserved
  81. // this will involve signalling the thread to switch to 'reserved' mode
  82. // swapping out the 'function list' semaphore with a local one
  83. // and removing all 'busy handles' from the queue
  84. // can probably use the 'wake' handle to synchronize this
  85. /*
  86. int thread_type = GetThreadType(flags);
  87. if (num_threads_available[thread_type > 2])
  88. {
  89. for (size_t i=0;i!=threads.size();i++)
  90. {
  91. if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags))
  92. {
  93. }
  94. }
  95. }
  96. */
  97. ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) );
  98. return new_thread;
  99. }
  100. void ThreadPool::ReleaseThread( ThreadID *thread_id )
  101. {
  102. if ( thread_id )
  103. {
  104. // lock so there's no race condition between ReserveThread() and ReleaseThread()
  105. Nullsoft::Utility::AutoLock threadlock( guard );
  106. thread_id->Release();
  107. }
  108. }
  109. int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags )
  110. {
  111. // TODO: need to ensure that handles are not duplicated
  112. thread_functions.Add( handle, func, user_data, id );
  113. if ( thread_id )
  114. {
  115. if ( thread_id->CanRunCOM( flags ) )
  116. thread_id->AddHandle( handle );
  117. else
  118. return 1;
  119. return 0;
  120. }
  121. else
  122. {
  123. /* increment thread counts temporarily - because the massive wake-up
  124. causes thread counts to go to 0 */
  125. for ( int i = 0; i < THREAD_TYPES; i++ )
  126. InterlockedIncrement( &num_threads_available[ i ] );
  127. guard.Lock();
  128. AddHandle_Internal( 0, handle, flags );
  129. bool thread_types[ THREAD_TYPES ];
  130. GetThreadTypes( flags, thread_types );
  131. for ( int i = 0; i < THREAD_TYPES; i++ )
  132. {
  133. if ( thread_types[ i ] )
  134. any_thread_handles[ i ].push_back( handle );
  135. }
  136. guard.Unlock();
  137. for ( int i = 0; i < THREAD_TYPES; i++ )
  138. InterlockedDecrement( &num_threads_available[ i ] );
  139. }
  140. return 0;
  141. }
  142. /* helper functions for adding/removing handles,
  143. we keep going through the list as long as we can add/remove immediately.
  144. once we have to block, we recurse the function starting at the next handle
  145. when the function returns, we wait.
  146. this lets us do some work rather than sit and wait for each thread's lock */
  147. void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle)
  148. {
  149. for (;start!=threads.size();start++)
  150. {
  151. ThreadID *t = threads[start];
  152. if (!t->TryRemoveHandle(handle)) // try to remove
  153. {
  154. // have to wait
  155. RemoveHandle_Internal(start+1, handle); // recurse start with the next thread
  156. t->WaitRemoveHandle(handle); // finish the job
  157. return;
  158. }
  159. }
  160. }
  161. void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags)
  162. {
  163. for (;start<threads.size();start++)
  164. {
  165. ThreadID *t = threads[start];
  166. if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved())
  167. continue;
  168. if (!t->CanRunCOM(flags))
  169. continue;
  170. if (!t->TryAddHandle(handle)) // try to add
  171. {
  172. // have to wait,
  173. AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread
  174. t->WaitAddHandle(handle); // finish the job
  175. return;
  176. }
  177. }
  178. }
  179. void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle)
  180. {
  181. if (thread_id)
  182. {
  183. thread_id->RemoveHandle(handle);
  184. }
  185. else
  186. {
  187. /* increment thread counts temporarily - because the massive wake-up
  188. causes thread counts to go to 0 */
  189. for (int i=0;i<THREAD_TYPES;i++)
  190. InterlockedIncrement(&num_threads_available[i]);
  191. guard.Lock();
  192. RemoveHandle_Internal(0, handle);
  193. for (int j=0;j<THREAD_TYPES;j++)
  194. {
  195. //for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
  196. // itr != any_thread_handles[j].end();
  197. // itr++)
  198. ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
  199. while(itr != any_thread_handles[j].end())
  200. {
  201. if (*itr == handle)
  202. {
  203. itr = any_thread_handles[j].erase(itr);
  204. }
  205. else
  206. {
  207. itr++;
  208. }
  209. }
  210. }
  211. guard.Unlock();
  212. for (int i=0;i<THREAD_TYPES;i++)
  213. InterlockedDecrement(&num_threads_available[i]);
  214. }
  215. }
  216. int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
  217. {
  218. if (threadid)
  219. threadid->QueueFunction(func, user_data, id);
  220. else
  221. thread_functions.QueueFunction(func, user_data, id);
  222. return 0;
  223. }
  224. ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type)
  225. {
  226. int reserved=0;
  227. int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default
  228. switch(thread_type)
  229. {
  230. case TYPE_STA_RESERVED:
  231. reserved=1;
  232. case TYPE_STA:
  233. com_type = api_threadpool::FLAG_REQUIRE_COM_STA;
  234. break;
  235. case TYPE_MT_RESERVED:
  236. reserved=1;
  237. case TYPE_MT:
  238. com_type = api_threadpool::FLAG_REQUIRE_COM_MT;
  239. break;
  240. }
  241. Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles
  242. ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore,
  243. any_thread_handles[thread_type],
  244. &num_threads_available[thread_type], max_load_event[thread_type],
  245. reserved, com_type);
  246. threads.push_back(new_thread);
  247. return new_thread;
  248. }
  249. size_t ThreadPool::GetNumberOfThreads()
  250. {
  251. Nullsoft::Utility::AutoLock threadlock(guard);
  252. return threads.size();
  253. }
  254. size_t ThreadPool::GetNumberOfActiveThreads()
  255. {
  256. size_t numThreads = GetNumberOfThreads();
  257. for (int i=0;i<THREAD_TYPES;i++)
  258. numThreads -= num_threads_available[i];
  259. return numThreads;
  260. }
  261. int ThreadPool::GetThreadType(int flags, int reserved)
  262. {
  263. flags &= api_threadpool::MASK_COM_FLAGS;
  264. int thread_type=TYPE_MT;
  265. switch(flags)
  266. {
  267. case api_threadpool::FLAG_REQUIRE_COM_STA:
  268. thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA;
  269. break;
  270. case 0: // default
  271. case api_threadpool::FLAG_REQUIRE_COM_MT:
  272. thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT;
  273. break;
  274. }
  275. return thread_type;
  276. }
  277. void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES])
  278. {
  279. for (int i=0;i<THREAD_TYPES;i++)
  280. {
  281. types[i]=true;
  282. }
  283. if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
  284. {
  285. types[TYPE_MT] = false;
  286. types[TYPE_MT] = false;
  287. }
  288. if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
  289. {
  290. types[TYPE_STA] = false;
  291. types[TYPE_STA_RESERVED] = false;
  292. }
  293. if (flags & api_threadpool::FLAG_LONG_EXECUTION)
  294. {
  295. types[TYPE_STA_RESERVED] = false;
  296. types[TYPE_MT_RESERVED] = false;
  297. }
  298. }
  299. #define CBCLASS ThreadPool
  300. START_DISPATCH;
  301. CB(RESERVETHREAD, ReserveThread)
  302. VCB(RELEASETHREAD, ReleaseThread)
  303. CB(ADDHANDLE, AddHandle)
  304. VCB(REMOVEHANDLE, RemoveHandle)
  305. CB(RUNFUNCTION, RunFunction)
  306. CB(GETNUMBEROFTHREADS, GetNumberOfThreads)
  307. CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads)
  308. END_DISPATCH;
  309. #undef CBCLASS