123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- #include "ThreadID.h"
- DWORD ThreadID::thread_func_stub(LPVOID param)
- {
- ThreadID *t = static_cast<ThreadID*>(param);
- if (t != NULL)
- {
- return t->ThreadFunction();
- }
- else return 0;
- }
- void ThreadID::Kill()
- {
- if (threadHandle && threadHandle != INVALID_HANDLE_VALUE)
- {
- //cut: WaitForSingleObject(threadHandle, INFINITE);
- while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0)
- {
- }
- }
-
- }
- ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore,
- ThreadPoolTypes::HandleList &inherited_handles,
- volatile LONG *thread_count, HANDLE _max_load_event,
- int _reserved, int _com_type) : ThreadFunctions(_reserved)
- {
- /* initialize values */
- released = false;
- InitializeCriticalSection(&handle_lock);
- /* grab values passed to us */
- reserved = _reserved;
- com_type = _com_type;
- max_load_event = _max_load_event;
- global_functions = t_f;
- num_threads_available = thread_count;
- /* wait_handles[0] is kill switch */
- wait_handles.push_back(killswitch);
- /* wait_handles[1] is wake switch */
- wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
- wait_handles.push_back(wakeHandle);
- if (reserved)
- {
- /* if thread is reserved,
- wait_handles[2] is a Funcion Call wake semaphore
- for this thread only. */
- wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions
- }
- else
- {
- /* if thread is not reserved,
- wait_handles[2] is a Function Call wake semaphore
- global to all threads */
- wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions
- }
- /* add inherited handles
- (handles added to thread pool before this thread was created) */
- for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ )
- {
- wait_handles.push_back( *itr );
- }
- /* start thread */
- threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0);
- }
- ThreadID::~ThreadID()
- {
- CloseHandle(threadHandle);
- CloseHandle(wakeHandle);
- DeleteCriticalSection(&handle_lock);
- }
- bool ThreadID::TryAddHandle(HANDLE new_handle)
- {
- // let's see if we get lucky and can access the handle list directly
- if (TryEnterCriticalSection(&handle_lock))
- {
- // made it
- wait_handles.push_back(new_handle);
- LeaveCriticalSection(&handle_lock);
- return true;
- }
- else
- {
- ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
- return false;
- }
- }
- void ThreadID::WaitAddHandle(HANDLE handle)
- {
- // wakeHandle already got released once by nature of this function being called
- EnterCriticalSection(&handle_lock);
- wait_handles.push_back(handle);
- LeaveCriticalSection(&handle_lock);
- ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
- }
- void ThreadID::AddHandle(HANDLE new_handle)
- {
- if (!TryAddHandle(new_handle))
- WaitAddHandle(new_handle);
- }
- bool ThreadID::TryRemoveHandle(HANDLE handle)
- {
- // let's see if we get lucky and can access the handle list directly
- if (TryEnterCriticalSection(&handle_lock))
- {
- RemoveHandle_Internal(handle);
- LeaveCriticalSection(&handle_lock);
- return true;
- }
- else
- {
- ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
- return false;
- }
- return false;
- }
- void ThreadID::WaitRemoveHandle(HANDLE handle)
- {
- // wakeHandle already got released once by nature of this function being called
- EnterCriticalSection(&handle_lock);
- RemoveHandle_Internal(handle);
- LeaveCriticalSection(&handle_lock);
- ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
- }
- void ThreadID::RemoveHandle(HANDLE handle)
- {
- if (!TryRemoveHandle(handle))
- WaitRemoveHandle(handle);
- }
- void ThreadID::RemoveHandle_Internal(HANDLE handle)
- {
- // first three handles are reserved, so start after that
- for (size_t i=3;i<wait_handles.size();i++)
- {
- if (wait_handles[i] == handle)
- {
- wait_handles.erase(wait_handles.begin() + i);
- i--;
- }
- }
- }
- bool ThreadID::IsReserved() const
- {
- return !!reserved;
- }
- DWORD CALLBACK ThreadID::ThreadFunction()
- {
- switch(com_type)
- {
- case api_threadpool::FLAG_REQUIRE_COM_MT:
- CoInitializeEx(0, COINIT_MULTITHREADED);
- break;
- case api_threadpool::FLAG_REQUIRE_COM_STA:
- CoInitialize(0);
- break;
- }
- while (1)
- {
- InterlockedIncrement(num_threads_available);
- EnterCriticalSection(&handle_lock);
- DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE);
- // cut: LeaveCriticalSection(&handle_lock);
- if (InterlockedDecrement(num_threads_available) == 0 && !reserved)
- SetEvent(max_load_event); // notify the watch dog if all the threads are used up
- if (ret == WAIT_OBJECT_0)
- {
- // killswitch
- LeaveCriticalSection(&handle_lock);
- break;
- }
- else if (ret == WAIT_OBJECT_0 + 1)
- {
- // we got woken up to release the handles lock
- // wait for the second signal
- LeaveCriticalSection(&handle_lock);
- InterlockedIncrement(num_threads_available);
- WaitForSingleObject(wakeHandle, INFINITE);
- InterlockedDecrement(num_threads_available);
- }
- else if (ret == WAIT_OBJECT_0 + 2)
- {
- LeaveCriticalSection(&handle_lock);
- api_threadpool::ThreadPoolFunc func;
- void *user_data;
- intptr_t id;
- if (reserved)
- {
- // per-thread queued functions
- if (PopFunction(&func, &user_data, &id))
- {
- func(0, user_data, id);
- }
- }
- else
- {
- // global queued functions
- if (global_functions->PopFunction(&func, &user_data, &id))
- {
- func(0, user_data, id);
- }
- }
- }
- else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size()))
- {
- DWORD index = ret - WAIT_OBJECT_0;
- HANDLE handle = wait_handles[index];
- LeaveCriticalSection(&handle_lock);
- /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!!
- before calling RemoveHandle, caller needs to either
- ensure that Event is unsignalled (And won't be signalled)
- or call RemoveHandle from within the function callback */
- api_threadpool::ThreadPoolFunc func;
- void *user_data;
- intptr_t id;
- if (global_functions->Get(handle, &func, &user_data, &id))
- {
- func(handle, user_data, id);
- }
- }
- else
- {
- LeaveCriticalSection(&handle_lock);
- }
- }
- if (com_type & api_threadpool::MASK_COM_FLAGS)
- CoUninitialize();
- return 0;
- }
- bool ThreadID::CanRunCOM(int flags) const
- {
- switch(com_type)
- {
- case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default)
- return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run
- case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread
- return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run
- }
- return false; // shouldn't get here
- }
- bool ThreadID::IsReleased() const
- {
- return released;
- }
- void ThreadID::Reserve()
- {
- released=false;
- }
- void ThreadID::Release()
- {
- released=true;
- }
|