ThreadID.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #include "ThreadID.h"
  2. DWORD ThreadID::thread_func_stub(LPVOID param)
  3. {
  4. ThreadID *t = static_cast<ThreadID*>(param);
  5. if (t != NULL)
  6. {
  7. return t->ThreadFunction();
  8. }
  9. else return 0;
  10. }
  11. void ThreadID::Kill()
  12. {
  13. if (threadHandle && threadHandle != INVALID_HANDLE_VALUE)
  14. {
  15. //cut: WaitForSingleObject(threadHandle, INFINITE);
  16. while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0)
  17. {
  18. }
  19. }
  20. }
  21. ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore,
  22. ThreadPoolTypes::HandleList &inherited_handles,
  23. volatile LONG *thread_count, HANDLE _max_load_event,
  24. int _reserved, int _com_type) : ThreadFunctions(_reserved)
  25. {
  26. /* initialize values */
  27. released = false;
  28. InitializeCriticalSection(&handle_lock);
  29. /* grab values passed to us */
  30. reserved = _reserved;
  31. com_type = _com_type;
  32. max_load_event = _max_load_event;
  33. global_functions = t_f;
  34. num_threads_available = thread_count;
  35. /* wait_handles[0] is kill switch */
  36. wait_handles.push_back(killswitch);
  37. /* wait_handles[1] is wake switch */
  38. wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
  39. wait_handles.push_back(wakeHandle);
  40. if (reserved)
  41. {
  42. /* if thread is reserved,
  43. wait_handles[2] is a Funcion Call wake semaphore
  44. for this thread only. */
  45. wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions
  46. }
  47. else
  48. {
  49. /* if thread is not reserved,
  50. wait_handles[2] is a Function Call wake semaphore
  51. global to all threads */
  52. wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions
  53. }
  54. /* add inherited handles
  55. (handles added to thread pool before this thread was created) */
  56. for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ )
  57. {
  58. wait_handles.push_back( *itr );
  59. }
  60. /* start thread */
  61. threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0);
  62. }
  63. ThreadID::~ThreadID()
  64. {
  65. CloseHandle(threadHandle);
  66. CloseHandle(wakeHandle);
  67. DeleteCriticalSection(&handle_lock);
  68. }
  69. bool ThreadID::TryAddHandle(HANDLE new_handle)
  70. {
  71. // let's see if we get lucky and can access the handle list directly
  72. if (TryEnterCriticalSection(&handle_lock))
  73. {
  74. // made it
  75. wait_handles.push_back(new_handle);
  76. LeaveCriticalSection(&handle_lock);
  77. return true;
  78. }
  79. else
  80. {
  81. ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
  82. return false;
  83. }
  84. }
  85. void ThreadID::WaitAddHandle(HANDLE handle)
  86. {
  87. // wakeHandle already got released once by nature of this function being called
  88. EnterCriticalSection(&handle_lock);
  89. wait_handles.push_back(handle);
  90. LeaveCriticalSection(&handle_lock);
  91. ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
  92. }
  93. void ThreadID::AddHandle(HANDLE new_handle)
  94. {
  95. if (!TryAddHandle(new_handle))
  96. WaitAddHandle(new_handle);
  97. }
  98. bool ThreadID::TryRemoveHandle(HANDLE handle)
  99. {
  100. // let's see if we get lucky and can access the handle list directly
  101. if (TryEnterCriticalSection(&handle_lock))
  102. {
  103. RemoveHandle_Internal(handle);
  104. LeaveCriticalSection(&handle_lock);
  105. return true;
  106. }
  107. else
  108. {
  109. ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
  110. return false;
  111. }
  112. return false;
  113. }
  114. void ThreadID::WaitRemoveHandle(HANDLE handle)
  115. {
  116. // wakeHandle already got released once by nature of this function being called
  117. EnterCriticalSection(&handle_lock);
  118. RemoveHandle_Internal(handle);
  119. LeaveCriticalSection(&handle_lock);
  120. ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
  121. }
  122. void ThreadID::RemoveHandle(HANDLE handle)
  123. {
  124. if (!TryRemoveHandle(handle))
  125. WaitRemoveHandle(handle);
  126. }
  127. void ThreadID::RemoveHandle_Internal(HANDLE handle)
  128. {
  129. // first three handles are reserved, so start after that
  130. for (size_t i=3;i<wait_handles.size();i++)
  131. {
  132. if (wait_handles[i] == handle)
  133. {
  134. wait_handles.erase(wait_handles.begin() + i);
  135. i--;
  136. }
  137. }
  138. }
  139. bool ThreadID::IsReserved() const
  140. {
  141. return !!reserved;
  142. }
  143. DWORD CALLBACK ThreadID::ThreadFunction()
  144. {
  145. switch(com_type)
  146. {
  147. case api_threadpool::FLAG_REQUIRE_COM_MT:
  148. CoInitializeEx(0, COINIT_MULTITHREADED);
  149. break;
  150. case api_threadpool::FLAG_REQUIRE_COM_STA:
  151. CoInitialize(0);
  152. break;
  153. }
  154. while (1)
  155. {
  156. InterlockedIncrement(num_threads_available);
  157. EnterCriticalSection(&handle_lock);
  158. DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE);
  159. // cut: LeaveCriticalSection(&handle_lock);
  160. if (InterlockedDecrement(num_threads_available) == 0 && !reserved)
  161. SetEvent(max_load_event); // notify the watch dog if all the threads are used up
  162. if (ret == WAIT_OBJECT_0)
  163. {
  164. // killswitch
  165. LeaveCriticalSection(&handle_lock);
  166. break;
  167. }
  168. else if (ret == WAIT_OBJECT_0 + 1)
  169. {
  170. // we got woken up to release the handles lock
  171. // wait for the second signal
  172. LeaveCriticalSection(&handle_lock);
  173. InterlockedIncrement(num_threads_available);
  174. WaitForSingleObject(wakeHandle, INFINITE);
  175. InterlockedDecrement(num_threads_available);
  176. }
  177. else if (ret == WAIT_OBJECT_0 + 2)
  178. {
  179. LeaveCriticalSection(&handle_lock);
  180. api_threadpool::ThreadPoolFunc func;
  181. void *user_data;
  182. intptr_t id;
  183. if (reserved)
  184. {
  185. // per-thread queued functions
  186. if (PopFunction(&func, &user_data, &id))
  187. {
  188. func(0, user_data, id);
  189. }
  190. }
  191. else
  192. {
  193. // global queued functions
  194. if (global_functions->PopFunction(&func, &user_data, &id))
  195. {
  196. func(0, user_data, id);
  197. }
  198. }
  199. }
  200. else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size()))
  201. {
  202. DWORD index = ret - WAIT_OBJECT_0;
  203. HANDLE handle = wait_handles[index];
  204. LeaveCriticalSection(&handle_lock);
  205. /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!!
  206. before calling RemoveHandle, caller needs to either
  207. ensure that Event is unsignalled (And won't be signalled)
  208. or call RemoveHandle from within the function callback */
  209. api_threadpool::ThreadPoolFunc func;
  210. void *user_data;
  211. intptr_t id;
  212. if (global_functions->Get(handle, &func, &user_data, &id))
  213. {
  214. func(handle, user_data, id);
  215. }
  216. }
  217. else
  218. {
  219. LeaveCriticalSection(&handle_lock);
  220. }
  221. }
  222. if (com_type & api_threadpool::MASK_COM_FLAGS)
  223. CoUninitialize();
  224. return 0;
  225. }
  226. bool ThreadID::CanRunCOM(int flags) const
  227. {
  228. switch(com_type)
  229. {
  230. case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default)
  231. return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run
  232. case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread
  233. return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run
  234. }
  235. return false; // shouldn't get here
  236. }
  237. bool ThreadID::IsReleased() const
  238. {
  239. return released;
  240. }
  241. void ThreadID::Reserve()
  242. {
  243. released=false;
  244. }
  245. void ThreadID::Release()
  246. {
  247. released=true;
  248. }