1. 线程池原理

我们使用线程的时候就去创建一个线程,这样实现起来非常简便
但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用:
执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

线程池是一种多线程处理形式,处理过程中将任务添加到队列
在创建线程后自动启动这些任务。线程池线程都是后台线程。
每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。
如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。
超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

在各个编程语言的语种中都有线程池的概念,并且很多语言中直接提供了线程池,作为程序猿直接使用就可以了,下面给大家介绍一下线程池的实现原理:

  • 线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:

    1. 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
      • 通过线程池提供的API函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
      • 已处理的任务会被从任务队列中删除
      • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
    2. 工作的线程(任务队列任务的消费者) ,N个
      • 线程池中维护了一定数量的工作线程, 他们的作用是是不停的读任务队列, 从里边取出任务并处理
      • 工作的线程相当于是任务队列的消费者角色,
      • 如果任务队列为空, 工作的线程将会被阻塞 (使用条件变量/信号量阻塞)
      • 如果阻塞之后有了新的任务, 由生产者将阻塞解除, 工作线程开始工作
    3. 管理者线程(不处理任务队列中的任务),1个
      • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
        • 当任务过多的时候, 可以适当的创建一些新的工作线程
        • 当任务过少的时候, 可以适当的销毁一些工作的线程

在这里插入图片描述


2. C实现线程池

2.1 头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#pragma once
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<unistd.h>
#define NUMBER 2

typedef struct Task
{
void (*function)(void* args);
void* args;
}Task;

typedef struct ThreadPool
{
//任务队列
Task* task_queue;
int queue_capacity; //容量
int queue_size; //当前任务个数
int queue_front; //队头->消费用
int queue_tail; //队尾->生产用


pthread_t managerID; //管理者线程ID(1个)
pthread_t* threadID; //工作的线程ID(多个,用指针)
int min_num; //最小的线程数量
int max_num; //最大的线程数量
int busy_num; //在忙的线程数量
int live_num; //存活的线程数量
int exit_num; //将要销毁的线程数量
pthread_mutex_t mutex_pool; //线程池的锁
pthread_mutex_t mutex_busy_num; //busy_num的锁
pthread_cond_t producer_wait_consumer; //生产者等待消费者
pthread_cond_t consumer_wait_producer; //消费者等待生产者

int shutdown; //是否销毁线程池,销毁为1,否则为0
}ThreadPool;


//创建线程池并且初始化
ThreadPool* Thread_Pool_Create(int min, int max, int queuesize);

//销毁线程池
int Thread_Pool_Destroy(ThreadPool* pool);

//给线程池添加任务(相当于生产者)
void Thread_Pool_Add(ThreadPool* pool, void(*function)(void*), void* args);

//获取线程池中工作的线程个数
int Get_Busy_Num(ThreadPool* pool);

//获取线程池中存活的个数
int Get_Live_Num(ThreadPool* pool);

//以下为被封装的函数
//工作的线程(消费者线程)任务函数
void* Worker(void* args);

//管理者线程任务函数
void* Manager(void* args);

//单个线程的退出
void Thread_Exit(ThreadPool* pool);

2.2 源文件的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#include"threadpool.h"

ThreadPool* Thread_Pool_Create(int min, int max, int queuesize)
{
//开线程池
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail\n");
break;
}

// 开线程ID
pool->threadID = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadID == NULL)
{
printf("malloc threadID fail\n");
break;
}
//初始线程ID都初始化为0
memset(pool->threadID, 0, sizeof(pthread_t) * max);
pool->max_num = max;
pool->min_num = min;
pool->busy_num = 0;
pool->live_num = min;
pool->exit_num = 0;

//初始化锁和条件变量
if (pthread_mutex_init(&pool->mutex_pool, NULL) != 0 ||
pthread_mutex_init(&pool->mutex_busy_num, NULL) != 0 ||
pthread_cond_init(&pool->producer_wait_consumer, NULL) != 0 ||
pthread_cond_init(&pool->consumer_wait_producer, NULL) != 0)
{
printf("mutex or cond init fail\n");
break;
}

//初始化任务队列
pool->task_queue = (Task*)malloc(sizeof(Task) * queuesize);
if (pool->task_queue == NULL)
{
printf("malloc task_queue fail...\n");
break;
}

pool->queue_capacity = queuesize;
pool->queue_size = 0;
pool->queue_front = 0;
pool->queue_tail = 0;

pool->shutdown = 0;

//创建线程
pthread_create(&pool->managerID, NULL, Manager, pool);

for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadID[i], NULL, Worker, pool);
}
return pool; //到这里说明成功,返回创建出来的线程池
} while (0);

//跳到这里说明失败了,释放资源
if (pool && pool->threadID)
free(pool->threadID);
if (pool)
free(pool);

return NULL;
}



int Thread_Pool_Destroy(ThreadPool* pool)
{
//如果线程池本就不存在
if (pool == NULL)
return -1;

//关闭线程池
pool->shutdown = 1;

//阻塞并回收管理者线程
pthread_join(pool->managerID, NULL);

//唤醒阻塞的消费者进程
for (int i = 0; i < pool->live_num; ++i)
pthread_cond_signal(&pool->consumer_wait_producer);

//释放堆内存
if (pool->task_queue)
free(pool->task_queue);
if (pool->threadID)
free(pool->threadID);

//销毁锁和条件变量
pthread_mutex_destroy(&pool->mutex_pool);
pthread_mutex_destroy(&pool->mutex_busy_num);
pthread_cond_destroy(&pool->consumer_wait_producer);
pthread_cond_destroy(&pool->producer_wait_consumer);

//销毁线程池
free(pool);
pool = NULL;

return 0;
}



void Thread_Pool_Add(ThreadPool* pool, void(*function)(void*), void* args)
{
pthread_mutex_lock(&pool->mutex_pool);
//如果任务队列满了
while (pool->queue_size == pool->queue_capacity && !pool->shutdown)
{
//生产者阻塞,等待消费者进程
pthread_cond_wait(&pool->producer_wait_consumer, &pool->mutex_pool);
}

//如果线程池要关闭
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutex_pool);
return;
}

//添加任务
//队尾添加任务,队尾向后移动,队列任务数加一
pool->task_queue[pool->queue_tail].function = function;
pool->task_queue[pool->queue_tail].args = args;
pool->queue_tail = (pool->queue_tail + 1) % pool->queue_capacity;
pool->queue_size++;

//提醒消费者消费
pthread_cond_signal(&pool->consumer_wait_producer);
pthread_mutex_unlock(&pool->mutex_pool);
}



int Get_Busy_Num(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutex_busy_num);
int busynum = pool->busy_num;
pthread_mutex_unlock(&pool->mutex_busy_num);
return busynum;
}



int Get_Live_Num(ThreadPool* pool)
{
pthread_mutex_lock(&pool->mutex_pool);
int livenum = pool->live_num;
pthread_mutex_unlock(&pool->mutex_pool);
return livenum;
}



void* Worker(void* args)
{
ThreadPool* pool = (ThreadPool*)args;

while (1)
{
pthread_mutex_lock(&pool->mutex_pool);

//如果当前任务队列为空
while (pool->queue_size == 0 && !pool->shutdown)
{
//阻塞消费者线程
pthread_cond_wait(&pool->consumer_wait_producer,&pool->mutex_pool);

//如果要销毁线程
if (pool->exit_num > 0)
{
pool->exit_num--;
//同时如果存活线程大于最小值
if (pool->live_num > pool->min_num)
{
pool->live_num--;
pthread_mutex_unlock(&pool->mutex_pool);
Thread_Exit(pool);
}
}
}


//如果线程池要关闭
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutex_pool);
Thread_Exit(pool);
}


//从任务队列取出任务
Task task;
task.function = pool->task_queue[pool->queue_front].function;
task.args = pool->task_queue[pool->queue_front].args;

//队头向后移动以下次取任务
pool->queue_front = (pool->queue_front + 1) % pool->queue_capacity;
pool->queue_size--;

//解锁提醒生产者
pthread_cond_signal(&pool->producer_wait_consumer);
pthread_mutex_unlock(&pool->mutex_pool);

//打印,busynum++
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutex_busy_num);
pool->busy_num++;
pthread_mutex_unlock(&pool->mutex_busy_num);

//执行函数,完成任务,销毁任务队列中这个数据(函数)
task.function(task.args);
free(task.args);
task.args = NULL;

//打印,busynum--
printf("thread %ld end working\n", pthread_self());
pthread_mutex_lock(&pool->mutex_busy_num);
pool->busy_num--;
pthread_mutex_unlock(&pool->mutex_busy_num);

if (pool->shutdown)
Thread_Exit(pool);
}
return NULL;
}



void* Manager(void* args)
{
ThreadPool* pool = (ThreadPool*)args;
while (!pool->shutdown)
{
//每隔三秒检查一次
sleep(3);

//取出线程池中任务的数量和当前的线程数量
pthread_mutex_lock(&pool->mutex_pool);
int queue_size = pool->queue_size;
int live_num = pool->live_num;
pthread_mutex_unlock(&pool->mutex_pool);

//取出忙的线程数量
pthread_mutex_lock(&pool->mutex_busy_num);
int busy_num = pool->busy_num;
pthread_mutex_unlock(&pool->mutex_busy_num);

//添加线程
//任务个数>存活线程个数 && 存活线程个数<最大线程数(消费者太少了,任务队列消化太慢
if (queue_size > live_num && live_num < pool->max_num)
{
pthread_mutex_lock(&pool->mutex_pool);
int counter = 0;
for (int i = 0; i < pool->max_num &&
counter < NUMBER &&
pool->live_num < pool->max_num; ++i)
{
if (pool->threadID[i] == 0)
{
pthread_create(&pool->threadID[i], NULL, Worker, pool);
counter++;
pool->live_num++;
}
}
pthread_mutex_unlock(&pool->mutex_pool);
}

//销毁线程
//忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busy_num * 2 < live_num && live_num > pool->min_num)
{
pthread_mutex_lock(&pool->mutex_pool);
pool->exit_num = NUMBER;
pthread_mutex_unlock(&pool->mutex_pool);

//让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->consumer_wait_producer);
}
}
}
return NULL;
}

void Thread_Exit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->max_num; ++i)
{
if (pool->threadID[i] == tid)
{
pool->threadID[i] = 0;
printf("thread %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}

2.3 测试部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "threadpool.h"

void Task_Test(void* args)
{
int num = *(int*)args;
printf("thread %ld is working,number = %d\n", pthread_self(), num);
sleep(1);
return;
}

int main()
{
//创建线程池
ThreadPool* pool = Thread_Pool_Create(3, 10, 100);
for (int i = 0; i < 100; ++i)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
Thread_Pool_Add(pool, Task_Test, num);
}

sleep(30);

Thread_Pool_Destroy(pool);
return 0;
}

3. C++实现线程池

3.1 头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#define _CRT_SECURE_NO_WARNINGS
#pragma once
#include<iostream>
#include<string.h>
#include<string>
#include<pthread.h>
#include<stdlib.h>
#include<queue>
#include<unistd.h>
using namespace std;


using callback = void(*)(void*);
//任务的结构体
template<typename T>
struct Task
{
Task()
{
function = nullptr;
args = nullptr;
}
Task(callback fun, void* args)
{
function = fun;
this -> args = (T*)args;
}
callback function;
T* args;
};

//任务队列
template<typename T>
class TaskQueue
{
public:
TaskQueue()
{
pthread_mutex_init(&mutex,NULL);
}

~TaskQueue()
{
pthread_mutex_destroy(&mutex);
}

//添加任务
void AddTask(Task<T> task)
{
pthread_mutex_lock(&mutex);
queue.push(task);
pthread_mutex_unlock(&mutex);
}
void AddTask(callback fun, void* args)
{
pthread_mutex_lock(&mutex);
Task<T> task(fun,args);
queue.push(task);
pthread_mutex_unlock(&mutex);
}

//取出一个任务
Task<T> TakeTask()
{
Task<T> task;
pthread_mutex_lock(&mutex);
if (queue.size() > 0)
{
task = queue.front();
queue.pop();
}
pthread_mutex_unlock(&mutex);
return task;
}

//获取当前队列中的任务个数
inline int GetTaskNum()
{
return queue.size();
}
private:
pthread_mutex_t mutex; //互斥锁
std::queue<Task<T>> queue;
};


//线程池
template<typename T>
class ThreadPool
{
public:
ThreadPool(int min , int max)
{
//实例化任务队列
taskqueue = new TaskQueue<T>;

//初始化线程池
min_num = min;
max_num = max;
busy_num = 0;
live_num = min;

//根据线程最大上限,给线程数组分配内存
threadID = new pthread_t[max];
if (threadID == nullptr)
{
cout << "new threadID fail" << endl;
}

//初始化线程ID
memset(threadID, 0, sizeof(pthread_t) * max);

//初始化互斥锁和条件变量
if (pthread_mutex_init(&mutex_pool, NULL) != 0 ||
pthread_cond_init(&notempty, NULL) != 0)
{
cout << "mutex or cond init fail" << endl;
}

//创建线程
for (size_t i = 0; i < min; ++i)
{
pthread_create(&threadID[i], NULL, Work, this);
cout << "create thread ID :" << to_string(threadID[i]) << endl;
}
pthread_create(&managerID, NULL, Manage, this);

}

~ThreadPool()
{
shutdown = true;
//销毁管理者进程
pthread_join(managerID, NULL);

//唤醒消费者
for (int i = 0; i < live_num; ++i)
{
pthread_cond_signal(&notempty);
}

if (taskqueue)
{
delete taskqueue;
}

if (threadID)
{
delete[] threadID;
}

pthread_mutex_destroy(&mutex_pool);
pthread_cond_destroy(&notempty);
}

//添加任务
void Add_Task(Task<T> task)
{
if (shutdown)
return;

//添加任务,不需加锁,队列中有
taskqueue->AddTask(task);

//唤醒消费者
pthread_cond_signal(&notempty);
}

//获取忙线程个数
int Get_Busy_Num()
{
int busynum = 0;
pthread_mutex_lock(&mutex_pool);
busynum = busy_num;
pthread_mutex_unlock(&mutex_pool);
return busynum;
}

//获取存活线程个数
int Get_Live_Num()
{
int livenum = 0;
pthread_mutex_lock(&mutex_pool);
livenum = live_num;
pthread_mutex_unlock(&mutex_pool);
return livenum;
}

private:
//工作的线程任务函数
static void* Work(void* args)
{
ThreadPool* pool = static_cast<ThreadPool*>(args);

while (true)
{
//访问任务队列加锁
pthread_mutex_lock(&pool->mutex_pool);
//判断任务队列是否为空,空了就堵塞
while (pool->taskqueue->GetTaskNum() == 0 && !pool->shutdown)
{
cout << "thread :" << to_string(pthread_self()) << " waiting..." << endl;
pthread_cond_wait(&pool->notempty, &pool->mutex_pool);

//解除后 判断是否要销毁进程
if (pool->exit_num > 0)
{
pool->exit_num--;
if (pool->live_num > pool->min_num)
{
pool->live_num--;
pthread_mutex_unlock(&pool->mutex_pool);
pool->Thread_Exit();
}
}
}

//判断线程池是否要关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutex_pool);
pool->Thread_Exit();
}

//从任务队列取出任务
Task<T> task = pool->taskqueue->TakeTask();
pool->busy_num++;
pthread_mutex_unlock(&pool->mutex_pool);

cout << "thread :" << to_string(pthread_self()) << " start working..." << endl;
task.function(task.args);

delete task.args;
task.args = nullptr;

//任务结束
cout << "thread :" << to_string(pthread_self()) << " end working..." << endl;
pthread_mutex_lock(&pool->mutex_pool);
pool->busy_num--;
pthread_mutex_unlock(&pool->mutex_pool);

}
return nullptr;
}

//管理者线程任务函数
static void* Manage(void* args)
{
ThreadPool* pool = static_cast<ThreadPool*>(args);
while (!pool->shutdown)
{
//5秒检测一次
sleep(5);
pthread_mutex_lock(&pool->mutex_pool);
int livenum = pool->live_num;
int busynum = pool->busy_num;
int queuesize = pool->taskqueue->GetTaskNum();
pthread_mutex_unlock(&pool->mutex_pool);

const int NUMBER = 2;
//创建
if (queuesize > livenum && livenum < pool->max_num)
{
pthread_mutex_lock(&pool->mutex_pool);
int num = 0;
for (int i = 0; i < pool->max_num &&
num < NUMBER &&
pool->live_num < pool->max_num ; ++i)
{
if (pool->threadID[i] == 0)
{
pthread_create(&pool->threadID[i], NULL, Work, pool);
num++;
pool->live_num++;
}
}
pthread_mutex_unlock(&pool->mutex_pool);
}

//销毁
if (busynum * 2 < livenum && livenum > pool->min_num)
{
pthread_mutex_lock(&pool->mutex_pool);
pool->exit_num = NUMBER;
pthread_mutex_unlock(&pool->mutex_pool);
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notempty);
}
}
}
return nullptr;
}

void Thread_Exit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < max_num; ++i)
{
if (threadID[i] == tid)
{
cout << "thread :" << to_string(pthread_self()) << "exiting" << endl;
threadID[i] = 0;
break;
}
}
pthread_exit(NULL);
}
private:
pthread_mutex_t mutex_pool;
pthread_cond_t notempty;
pthread_t* threadID;
pthread_t managerID;
TaskQueue<T>* taskqueue;
int min_num;
int max_num;
int busy_num;
int live_num;
int exit_num;
bool shutdown = false;

};

3.2 测试部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include"ThreadPool.h"

void Task_Test(void* args)
{
int num = *(int*)args;
cout<<"thread :" << pthread_self() << " is working " << "number =" << num <<endl;
sleep(1);
return;
}

int main()
{
//创建线程池
ThreadPool<int> pool(3, 10);
for (int i = 0; i < 100; ++i)
{
int* num = new int(i+100);
pool.Add_Task(Task<int>(Task_Test,num));
}
sleep(40);
return 0;
}



以上只是基于C修改出对应于C++的代码

并且以上代码存在一个问题
输出的结果有时会因为线程原因出现混乱
可以通过加锁来解决,但锁的数量超过1就容易导致死锁问题,所以暂且搁置


4. C++11实现线程池

并非原创,摘于此处

4.1 头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#pragma once
#include<queue>
#include<thread>
#include<condition_variable>
#include<atomic>
#include<stdexcept>
#include<future>
#include<vector>
#include<functional>

namespace std
{
#define THREADPOOL_MAX_NUM 16
class threadpool
{
unsigned short _initsize;
using Task = function<void()>;
vector<thread> _pool;
queue<Task> _tasks;
mutex _lock;
mutex _lockGrow;
condition_variable _task_cv;
atomic<bool> _run{true};
atomic<int> _spa_trd_num{0};

public:
inline threadpool(unsigned short size = 4)
{
_initsize = size;
Add_Thread(size);
}
inline ~threadpool()
{
_run = false;
_task_cv.notify_all();
for (thread& thread : _pool)
{
if (thread.joinable())
thread.join();
}
}

template<typename F,typename... Args>
auto commit(F&& f, Args&& ...args) -> future<decltype(f(args...)) >
{
if (!_run)
throw runtime_error{"commit auto stop"};
using RetType = decltype(f(args...));
auto task = make_shared<packaged_task<RetType()>>(bind(forward<F>(f), forward<Args>(args)...));
future<RetType> future = task->get_future();
{
lock_guard<mutex> lock{_lock};
_tasks.emplace([task]() {(*task)(); });
}
if (_spa_trd_num < 1 && _pool.size() < THREADPOOL_MAX_NUM)
Add_Thread(1);
_task_cv.notify_one();
return future;
}

template<typename F>
void commit2(F&& f)
{
if (!_run)
return;
{
lock_guard<mutex> lock{_lock};
_tasks.emplace(forward<F>(f));
}
if (_spa_trd_num < 1 && _pool.size() < THREADPOOL_MAX_NUM)
Add_Thread(1);
_task_cv.notify_one();
}

int idlCount() { return _spa_trd_num; }
int thrCount() { return _pool.size(); }

private:
void Add_Thread(unsigned short size)
{
if (!_run)
throw runtime_error{"Add_Thread stop"};
unique_lock<mutex> lockgrow{_lockGrow};
for (; _pool.size() < THREADPOOL_MAX_NUM && size > 0; --size)
{
_pool.emplace_back([this]
{
while (true)
{
Task task;
{
unique_lock<mutex> lock{_lock};
_task_cv.wait(lock, [this] {return !_run || !_tasks.empty(); });
if (!_run && _tasks.empty())
return;
_spa_trd_num--;
task = move(_tasks.front());
_tasks.pop();
}
task();
if (_spa_trd_num > 0 && _pool.size() > _initsize)
return;
{
unique_lock<mutex> lock{_lock};
_spa_trd_num++;
}
}
});
{
unique_lock<mutex> lock{_lock};
_spa_trd_num++;
}
}
}
};
}

要使用pthread依赖库


4.2 测试部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include"ThreadPool.hpp"
#include<iostream>

void fun1(int slp)
{
printf("fun1 %ld\n", std::this_thread::get_id());
if (slp > 0)
{
printf("fun1 sleep %ld ========= %ld\n", slp, std::this_thread::get_id());
std::this_thread::sleep_for(std::chrono::milliseconds(slp));
}
}

struct gfun
{
int operator()(int n)
{
printf("gfun %ld\n", n, std::this_thread::get_id());
return 42;
}
};

class A
{
public:
static int Afun(int n = 0) //函数必须是 static 的才能直接使用线程池
{
std::cout << n << "Afun " << std::this_thread::get_id() << std::endl;
return n;
}

static std::string Bfun(int n, std::string str, char c)
{
std::cout << n << "Bfun " << str.c_str() << " " << (int)c << " " << std::this_thread::get_id() << std::endl;
return str;
}
};

int main()
try {
std::threadpool executor{ 50 };
std::future<void> ff = executor.commit(fun1, 0);
std::future<int> fg = executor.commit(gfun{}, 0);
//std::future<int> gg = executor.commit(A::Afun, 9999); //IDE提示错误,但可以编译运行
std::future<std::string> gh = executor.commit(A::Bfun, 9998, "mult args", 123);
std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id() << std::endl; return "hello,fh ret !\n"; });

std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << fg.get() << " " << fh.get().c_str() << " " << std::this_thread::get_id() << std::endl;

std::cout << " ======= fun1,55 ========= " << std::this_thread::get_id() << std::endl;
executor.commit(fun1, 55).get(); //调用.get()获取返回值会等待线程执行完

std::threadpool pool(4);
std::vector< std::future<int> > results;

for (int i = 0; i < 8; ++i)
{
results.emplace_back(
pool.commit([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "world " << i << std::endl;
return i * i;
})
);
}
std::this_thread::sleep_for(std::chrono::seconds(15));
for (auto&& result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
return 0;
}
catch (std::exception& e)
{
std::cout << "some error " << std::this_thread::get_id() << e.what() << std::endl;
}
  • 测试结果
    在这里插入图片描述