#include "ThreadPool.h" ThreadPool::ThreadPool() { for ( int i = 0; i < THREAD_TYPES; i++ ) { num_threads_available[ i ] = 0; max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL ); } killswitch = CreateEvent( NULL, TRUE, FALSE, NULL ); // one thread of each type to start for ( int i = 0; i < 2; i++ ) CreateNewThread_Internal( i ); watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 ); } void ThreadPool::Kill() { SetEvent( killswitch ); WaitForSingleObject( watchdog_thread_handle, INFINITE ); CloseHandle( watchdog_thread_handle ); for ( ThreadID *l_thread : threads ) { l_thread->Kill(); delete l_thread; } CloseHandle( killswitch ); for ( int i = 0; i < THREAD_TYPES; i++ ) CloseHandle( max_load_event[ i ] ); } DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param ) { ThreadPool *_this = (ThreadPool *)param; return _this->WatchDogThreadProcedure(); } /* watchdog will get woken up when number of available threads hits zero it creates a new thread, sleeps for a bit to let things "settle" and then reset the event it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way (so a new thread doesn't "miss" a handle that is added in the interim) */ DWORD CALLBACK ThreadPool::WatchDogThreadProcedure() { // we ignore the max load event for reserved threads HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] }; while ( 1 ) { DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE ); if ( ret == WAIT_OBJECT_0 ) { break; } else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 ) { int thread_type = ret - ( WAIT_OBJECT_0 + 1 ); // this signal is for "full thread load reached" // lets make sure we're actually at max capacity Sleep( 10 ); // sleep a bit if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded continue; guard.Lock(); CreateNewThread_Internal( thread_type ); guard.Unlock(); Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row ResetEvent( max_load_event[ thread_type ] ); } } return 0; } ThreadID *ThreadPool::ReserveThread( int flags ) { // first, check to see if there's any released threads we can grab Nullsoft::Utility::AutoLock threadlock( guard ); for ( ThreadID *t : threads ) { if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) ) { t->Reserve(); return t; } } // TODO: if there are enough free threads available, mark one as reserved // this will involve signalling the thread to switch to 'reserved' mode // swapping out the 'function list' semaphore with a local one // and removing all 'busy handles' from the queue // can probably use the 'wake' handle to synchronize this /* int thread_type = GetThreadType(flags); if (num_threads_available[thread_type > 2]) { for (size_t i=0;i!=threads.size();i++) { if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags)) { } } } */ ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) ); return new_thread; } void ThreadPool::ReleaseThread( ThreadID *thread_id ) { if ( thread_id ) { // lock so there's no race condition between ReserveThread() and ReleaseThread() Nullsoft::Utility::AutoLock threadlock( guard ); thread_id->Release(); } } int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags ) { // TODO: need to ensure that handles are not duplicated thread_functions.Add( handle, func, user_data, id ); if ( thread_id ) { if ( thread_id->CanRunCOM( flags ) ) thread_id->AddHandle( handle ); else return 1; return 0; } else { /* increment thread counts temporarily - because the massive wake-up causes thread counts to go to 0 */ for ( int i = 0; i < THREAD_TYPES; i++ ) InterlockedIncrement( &num_threads_available[ i ] ); guard.Lock(); AddHandle_Internal( 0, handle, flags ); bool thread_types[ THREAD_TYPES ]; GetThreadTypes( flags, thread_types ); for ( int i = 0; i < THREAD_TYPES; i++ ) { if ( thread_types[ i ] ) any_thread_handles[ i ].push_back( handle ); } guard.Unlock(); for ( int i = 0; i < THREAD_TYPES; i++ ) InterlockedDecrement( &num_threads_available[ i ] ); } return 0; } /* helper functions for adding/removing handles, we keep going through the list as long as we can add/remove immediately. once we have to block, we recurse the function starting at the next handle when the function returns, we wait. this lets us do some work rather than sit and wait for each thread's lock */ void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle) { for (;start!=threads.size();start++) { ThreadID *t = threads[start]; if (!t->TryRemoveHandle(handle)) // try to remove { // have to wait RemoveHandle_Internal(start+1, handle); // recurse start with the next thread t->WaitRemoveHandle(handle); // finish the job return; } } } void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags) { for (;startIsReserved()) continue; if (!t->CanRunCOM(flags)) continue; if (!t->TryAddHandle(handle)) // try to add { // have to wait, AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread t->WaitAddHandle(handle); // finish the job return; } } } void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle) { if (thread_id) { thread_id->RemoveHandle(handle); } else { /* increment thread counts temporarily - because the massive wake-up causes thread counts to go to 0 */ for (int i=0;iQueueFunction(func, user_data, id); else thread_functions.QueueFunction(func, user_data, id); return 0; } ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type) { int reserved=0; int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default switch(thread_type) { case TYPE_STA_RESERVED: reserved=1; case TYPE_STA: com_type = api_threadpool::FLAG_REQUIRE_COM_STA; break; case TYPE_MT_RESERVED: reserved=1; case TYPE_MT: com_type = api_threadpool::FLAG_REQUIRE_COM_MT; break; } Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore, any_thread_handles[thread_type], &num_threads_available[thread_type], max_load_event[thread_type], reserved, com_type); threads.push_back(new_thread); return new_thread; } size_t ThreadPool::GetNumberOfThreads() { Nullsoft::Utility::AutoLock threadlock(guard); return threads.size(); } size_t ThreadPool::GetNumberOfActiveThreads() { size_t numThreads = GetNumberOfThreads(); for (int i=0;i