-
Notifications
You must be signed in to change notification settings - Fork 1
/
EasyThreadPool.h
192 lines (156 loc) · 4.42 KB
/
EasyThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#ifndef CLOUDROOM_LOGIC_EASYTHREADPOOL_H
#define CLOUDROOM_LOGIC_EASYTHREADPOOL_H
#include <queue>
#include "Common/AutoLock.h"
#include "TransStruct.h"
//简易线程池思想+优先任务队列 可根据当前任务状态动态增减任务线程数
//数目成员包括:初始线程数 最多线程数 最少空闲线程数 最多空闲线程数
//原则上接到任务从空闲线程队列取线程执行,取了后判断最多线程数与最少空闲线程数决定是否创建空闲线程
//线程执行从空闲切换到繁忙,执行完后继续从任务队列取任务执行,当任务队列空了再放置空闲队列
//放置到空闲队列后,判断空闲队列最多数选择干掉空闲线程
///////////////////////////////////////////////////////////////////////////////////////////////
//优先队列不允许遍历。。。但为了增加任务的取消移除。。。将就一下吧
template <typename _Ty>
class TravelablePQ : public std::priority_queue<_Ty>
{
public:
bool removeElem(const _Ty& elem)
{
bool bRet = false;
auto it = find_if(c.begin(), c.end(), [&](const _Ty& curElem)->bool
{return elem == curElem;});
if (it != c.end())
{
bRet = true;
c.erase(it);
push_heap(c.begin(), c.end(), comp);
}
return bRet;
}
};
///////////////////////////////////////////////////////////////////////////////////////////////
//启动参数 可能得扩展 单独放个结构
struct stMTPQMgrStartUpPara
{
unsigned int nMaxThrdNum;//最多线程数 busy + idle
unsigned int nMinIdleThrdNum;//最少保留空闲线程数
unsigned int nMaxIdleThrdNum;//最多保留的空闲线程数
bool bQuitImmd;//退出方式 干掉线程|等待执行正常退出
stMTPQMgrStartUpPara()
{
nMaxThrdNum = 5;
nMinIdleThrdNum = 1;
nMaxIdleThrdNum = 4;
bQuitImmd = false;
}
};
class CEasyThreadPoolMgr
{
class CWorkThread;
friend class CWorkThread;
public:
CEasyThreadPoolMgr(int nInitThrdNum = 2, stMTPQMgrStartUpPara* pPara = NULL);
~CEasyThreadPoolMgr();
public:
void ExecJob(stJob* job);
void TerminateJob(LPCSTR szJobKey);
void ClearJobs();
bool IsJobsDone()
{
CAutoLock _lock(&m_csBusyThreads);
return m_listBusyThreads.size() == 0;
}
private:
stJob* GetJob();
CWorkThread* CreatWorkThread();
bool MoveBusyToIdle(CWorkThread* pThread);//超过最大空闲数干掉一些 suspend
bool MoveIdleToBusy(CWorkThread* pThread, stJob* pJob);//resume
// 超过最大空闲,旋转几次再做删除,CPU消耗影响可以接受,若挂起到运行时钟周期消耗比较不好
// 没有考虑好删除空闲个数,目前超过最大空闲删一半,有待分析,旋转次数也有待分析。。。
void DeleteIdleThreads(/*int nDelNum = 1, */int nSpinTime = 500);
private:
class CWorkThread
{
public:
CWorkThread(const char* cstrThreadName);
~CWorkThread();
public:
//mgr将thread从idle到busy中的调用
//自己在busy中不用,通过mgr获取排队任务处理
void SetJob(stJob* pJob)
{
//不需要加锁,除了从空闲到繁忙mgr激活时才会从其他线程setJob
//那时线程阻塞,自己不会操作job,其余都是只有线程自己才操作job
//CAutoLock _lock(&m_csJob);
m_pJob = pJob;
m_bIsSuspend = false;
if (ResumeThread(m_hThread) == -1)
{
RetrieveErrCall(_T("EasyThrdPool:SetJob"));
}
}
const stJob* GetJob() const
{
return m_pJob;
}
HANDLE GetThreadHandle()
{
return m_hThread;
}
unsigned int GetThreadID()
{
return m_nThreadID;
}
void RegisteMgr(CEasyThreadPoolMgr* pMgr)
{
m_pThreadsMgr = pMgr;
}
//正常结束线程 并在结束前释放对象
//但没那么及时,至少得完成当前任务。。。
static void DeleteThread(CWorkThread* pThread)
{
if (pThread == NULL)
{
return;
}
pThread->m_bIsRun = false;
if (pThread->m_bIsSuspend)
{
ResumeThread(pThread->m_hThread);
}
}
void SetThrdSusp(bool bIsSusp = true)
{
m_bIsSuspend = bIsSusp;
}
bool IsRun()
{
return m_bIsRun;
}
private:
static unsigned int __stdcall Run(void*);
private:
CEasyThreadPoolMgr* m_pThreadsMgr;
HANDLE m_hThread;
unsigned int m_nThreadID;
CCriticalSection m_csJob;
stJob* m_pJob;
bool m_bIsSuspend;//通过mgr操作,不用同步
bool m_bIsRun;//通过控制run 结束线程 不用同步
};
private:
unsigned int m_nInitThrdNum;//初始线程数
unsigned int m_nMaxThrdNum;//最多线程数 busy + idle
unsigned int m_nMinIdleThrdNum;//最少保留空闲线程数
unsigned int m_nMaxIdleThrdNum;//最多保留的空闲线程数
std::list<CWorkThread*> m_listBusyThreads;
CCriticalSection m_csBusyThreads;
std::list<CWorkThread*> m_listIdleThreads;
CCriticalSection m_csIdleThreads;
TravelablePQ<stJob*> m_TaskQueue;//job的内存管理在各workThread中做
CCriticalSection m_csTaskQueue;
int m_nCreatedThreadNum;
bool m_bQuitImmd;//退出方式
std::list<HANDLE> m_listQuitBusyThrd;//等待正常退出线程句柄
};
#endif //CLOUDROOM_LOGIC_EASYTHREADPOOL_H