来源:百度文库 编辑:中财网 时间:2024/04/29 20:58:40
#ifndef _ThreadPool_H_
#define _ThreadPool_H_
#pragma warning(disable: 4530)
#pragma warning(disable: 4786)
#include
#include
#include
#include
using
namespace
std;
class
ThreadJob
{
public
:
virtual
void
DoJob(
void
*pPara) = 0;
};
class
ThreadPool
{
public
:
struct
JobItem
{
void
(*_pFunc)(
void
*);
void
*_pPara;
JobItem(
void
(*pFunc)(
void
*) = NULL,
void
*pPara = NULL) : _pFunc(pFunc), _pPara(pPara)
{
};
};
struct
ThreadItem
{
HANDLE
_Handle;
ThreadPool *_pThreadPool;
DWORD
_dwLastBeginTime;
DWORD
_dwCount;
bool
_fIsRunning;
ThreadItem(ThreadPool *pThreadPool = NULL) : _pThreadPool(pThreadPool), _Handle(NULL), _dwLastBeginTime(0),
_dwCount(0), _fIsRunning(
false
)
{
};
~ThreadItem()
{
if
(_Handle)
{
CloseHandle(_Handle);
_Handle = NULL;
}
}
};
public
:
std::queue _JobQueue;
std::vector _ThreadVector;
CRITICAL_SECTION _csThreadVector;
CRITICAL_SECTION _csWorkQueue;
HANDLE
_EventEnd;
HANDLE
_EventComplete;
HANDLE
_SemaphoreCall;
HANDLE
_SemaphoreDel;
long
_lRunningThreadNum;
long
_lDoingJobThreadNum;
public
:
ThreadPool(
DWORD
dwThreadNum = 4) : _lRunningThreadNum(0), _lDoingJobThreadNum(0)
{
InitializeCriticalSection(&_csThreadVector);
InitializeCriticalSection(&_csWorkQueue);
_EventComplete = CreateEvent(0,
false
,
false
, NULL);
_EventEnd = CreateEvent(0,
true
,
false
, NULL);
_SemaphoreCall = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
_SemaphoreDel = CreateSemaphore(0, 0, 0x7FFFFFFF, NULL);
assert
(_SemaphoreCall != INVALID_HANDLE_VALUE);
assert
(_EventComplete != INVALID_HANDLE_VALUE);
assert
(_EventEnd != INVALID_HANDLE_VALUE);
assert
(_SemaphoreDel != INVALID_HANDLE_VALUE);
AdjustSize(dwThreadNum <= 0 ? 4 : dwThreadNum);
}
~ThreadPool()
{
DeleteCriticalSection(&_csWorkQueue);
CloseHandle(_EventEnd);
CloseHandle(_EventComplete);
CloseHandle(_SemaphoreCall);
CloseHandle(_SemaphoreDel);
vector::iterator iter;
for
(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
{
if
(*iter)
delete
*iter;
}
DeleteCriticalSection(&_csThreadVector);
}
int
AdjustSize(
int
iThreadNum)
{
if
(iThreadNum > 0)
{
ThreadItem *pNew;
EnterCriticalSection(&_csThreadVector);
for
(
int
_i=0; _i
{
_ThreadVector.push_back(pNew =
new
ThreadItem(
this
));
pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
SetThreadPriority(pNew->_Handle, THREAD_PRIORITY_BELOW_NORMAL);
}
LeaveCriticalSection(&_csThreadVector);
}
else
{
iThreadNum *= -1;
ReleaseSemaphore(_SemaphoreDel, iThreadNum > _lRunningThreadNum ? _lRunningThreadNum : iThreadNum, NULL);
}
return
(
int
)_lRunningThreadNum;
}
void
Call(
void
(*pFunc)(
void
*),
void
*pPara = NULL)
{
assert
(pFunc);
EnterCriticalSection(&_csWorkQueue);
_JobQueue.push(
new
JobItem(pFunc, pPara));
LeaveCriticalSection(&_csWorkQueue);
ReleaseSemaphore(_SemaphoreCall, 1, NULL);
}
inline
void
Call(ThreadJob * p,
void
*pPara = NULL)
{
Call(CallProc,
new
CallProcPara(p, pPara));
}
bool
EndAndWait(
DWORD
dwWaitTime = INFINITE)
{
SetEvent(_EventEnd);
return
WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
}
inline
void
End()
{
SetEvent(_EventEnd);
}
inline
DWORD
GetRunningThreadNum()
{
return
(
DWORD
)_lRunningThreadNum;
}
inline
DWORD
GetDoingJobThreadNum()
{
return
(
DWORD
)_lDoingJobThreadNum;
}
bool
IsDoingJob();
protected
:
static
DWORD
WINAPI DefaultJobProc(
LPVOID
lpParameter = NULL)
{
ThreadItem *pThread =
static_cast
(lpParameter);
assert
(pThread);
ThreadPool *pThreadPoolObj = pThread->_pThreadPool;
assert
(pThreadPoolObj);
InterlockedIncrement(&pThreadPoolObj->_lRunningThreadNum);
HANDLE
hWaitHandle[3];
hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
hWaitHandle[2] = pThreadPoolObj->_EventEnd;
JobItem * pJob;
bool
fHasJob;
while
(
true
)
{
DWORD
wr = WaitForMultipleObjects(3, hWaitHandle,
false
, INFINITE);
if
(wr == WAIT_OBJECT_0 + 1)
{
break
;
}
EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
fHasJob = !pThreadPoolObj->_JobQueue.empty();
if
(fHasJob)
{
pJob = pThreadPoolObj->_JobQueue.front();
pThreadPoolObj->_JobQueue.pop();
assert
(pJob);
}
LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);
if
(wr == WAIT_OBJECT_0 + 2 && !fHasJob)
{
break
;
}
if
(fHasJob && pJob)
{
InterlockedIncrement(&pThreadPoolObj->_lDoingJobThreadNum);
pThread->_dwLastBeginTime = GetTickCount();
pThread->_dwCount++;
pThread->_fIsRunning =
true
;
pJob->_pFunc(pJob->_pPara);
delete
pJob;
pJob = NULL;
pThread->_fIsRunning =
false
;
InterlockedDecrement(&pThreadPoolObj->_lDoingJobThreadNum);
}
}
EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);
delete
pThread;
pThread = NULL;
InterlockedDecrement(&pThreadPoolObj->_lRunningThreadNum);
if
(!pThreadPoolObj->_lRunningThreadNum)
{
SetEvent(pThreadPoolObj->_EventComplete);
}
return
0;
}
public
:
static
void
CallProc(
void
*pPara)
{
CallProcPara *cp =
static_cast
(pPara);
assert
(cp);
if
(cp)
{
cp->_pObj->DoJob(cp->_pPara);
delete
cp;
cp = NULL;
}
}
struct
CallProcPara
{
ThreadJob* _pObj;
void
*_pPara;
CallProcPara(ThreadJob* p = NULL,
void
*pPara = NULL) : _pObj(p), _pPara(pPara)
{
};
};
};
#endif