线程池原理详解及如何用C语言实现线程池
mhr18 2024-12-06 16:54 24 浏览 0 评论
线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效的利用高并发服务器上的线程资源;线程与进程用于处理各项分支子功能,我们通常的操作是:接收消息 ==> 消息分类 ==> 线程创建 ==> 传递消息到子线程 ==> 线程分离 ==> 在子线程中执行任务 ==> 任务结束退出;
对大多数小型局域网的通信来说,上述方法足够满足需求;但当我们的通信范围扩大到广域网或大型局域网通信中时,我们将面临大量消息频繁请求服务器;在这种情况下,创建与销毁线程都已经成为一种奢侈的开销,特别对于嵌入式服务器来说更应保证内存资源的合理利用;
因此,线程池技术应运而生;线程池允许一个线程可以多次复用,且每次复用的线程内部的消息处理可以不相同,将创建与销毁的开销省去而不必来一个请求开一个线程;
结构讲解:
线程池是一个抽象的概念,其内部由任务队列,一堆线程,管理者线程组成;
我们将以上图为例,实现一个最基础的线程池,接下来将分部分依次讲解;讲解顺序为:1.线程池总体结构 2.线程数组 3.任务队列 4.管理者线程 5.使用线程池接口的例子
一、线程池总体结构
这里讲解线程池在逻辑上的结构体;看下方代码,该结构体threadpool_t中包含线程池状态信息,任务队列信息以及多线程操作中的互斥锁;在任务结构体中包含了一个可以放置多种不同任务函数的函数指针,一个传入该任务函数的void*类型的参数;
注意:在使用时需要将你的消息分类处理函数装入任务的(*function);然后放置到任务队列并通知空闲线程;
线程池状态信息:描述当前线程池的基本信息,如是否开启、最小线程数、最大线程数、存活线程数、忙线程数、待销毁线程数等… …
任务队列信息:描述当前任务队列基本信息,如最大任务数、队列不为满条件变量、队列不为空条件变量等… …
多线程互斥锁:保证在同一时间点上只有一个线程在任务队列中取任务并修改任务队列信息、修改线程池信息;
函数指针:在打包消息阶段,将分类后的消息处理函数放在(*function);
void*类型参数:用于传递消息处理函数需要的信息;
/*任务*/
typedef struct {
void *(*function)(void *);
void *arg;
} threadpool_task_t;
/*线程池管理*/
struct threadpool_t{
pthread_mutex_t lock; /* 锁住整个结构体 */
pthread_mutex_t thread_counter; /* 用于使用忙线程数时的锁 */
pthread_cond_t queue_not_full; /* 条件变量,任务队列不为满 */
pthread_cond_t queue_not_empty; /* 任务队列不为空 */
pthread_t *threads; /* 存放线程的tid,实际上就是管理了线 数组 */
pthread_t admin_tid; /* 管理者线程tid */
threadpool_task_t *task_queue; /* 任务队列 */
/*线程池信息*/
int min_thr_num; /* 线程池中最小线程数 */
int max_thr_num; /* 线程池中最大线程数 */
int live_thr_num; /* 线程池中存活的线程数 */
int busy_thr_num; /* 忙线程,正在工作的线程 */
int wait_exit_thr_num; /* 需要销毁的线程数 */
/*任务队列信息*/
int queue_front; /* 队头 */
int queue_rear; /* 队尾 */
int queue_size;
/* 存在的任务数 */
int queue_max_size; /* 队列能容纳的最大任务数 */
/*线程池状态*/
int shutdown; /* true为关闭 */
};
**/*创建线程池*/**
threadpool_t *
threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{ /* 最小线程数 最大线程数 最大任务数*/
int i;
threadpool_t *pool = NULL;
do
{
/* 线程池空间开辟 */
if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
{
printf("malloc threadpool false; \n");
break;
}
/*信息初始化*/
pool->min_thr_num = min_thr_num;
pool->max_thr_num = max_thr_num;
pool->busy_thr_num = 0;
pool->live_thr_num = min_thr_num;
pool->wait_exit_thr_num = 0;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->shutdown = false;
/* 根据最大线程数,给工作线程数组开空间,清0 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
if (pool->threads == NULL)
{
printf("malloc threads false;\n");
break;
}
memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);
/* 队列开空间 */
pool->task_queue =
(threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
if (pool->task_queue == NULL)
{
printf("malloc task queue false;\n");
break;
}
/* 初始化互斥锁和条件变量 */
if ( pthread_mutex_init(&(pool->lock), NULL) != 0 ||
pthread_mutex_init(&(pool->thread_counter), NULL) !=0 ||
pthread_cond_init(&(pool->queue_not_empty), NULL) !=0 ||
pthread_cond_init(&(pool->queue_not_full), NULL) !=0)
{
printf("init lock or cond false;\n");
break;
}
/* 启动min_thr_num个工作线程 */
for (i=0; i<min_thr_num; i++)
{
/* pool指向当前线程池 threadpool_thread函数在后面讲解 */
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
printf("start thread 0x%x... \n", (unsigned int)pool->threads[i]);
}
/* 管理者线程 admin_thread函数在后面讲解 */
pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);
return pool;
} while(0);
/* 释放pool的空间 */
threadpool_free(pool);
return NULL;
}
二、线程数组
线程数组实际上是在线程池初始化时开辟的一段存放一堆线程tid的空间,在逻辑上形成一个池,里面放置着提前创建的线程;这段空间中包含了正在工作的线程,等待工作的线程(空闲线程),等待被销毁的线程,申明但没有初始化的线程空间;
/*工作线程*/
void *
threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
while (true)
{
pthread_mutex_lock(&(pool->lock));
/* 无任务则阻塞在 “任务队列不为空” 上,有任务则跳出 */
while ((pool->queue_size == 0) && (!pool->shutdown))
{
printf("thread 0x%x is waiting \n", (unsigned int)pthread_self());
pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
/* 判断是否需要清除线程,自杀功能 */
if (pool->wait_exit_thr_num > 0)
{
pool->wait_exit_thr_num--;
/* 判断线程池中的线程数是否大于最小线程数,是则结束当前线程 */
if (pool->live_thr_num > pool->min_thr_num)
{
printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
pool->live_thr_num--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);//结束线程
}
}
}
/* 线程池开关状态 */
if (pool->shutdown) //关闭线程池
{
pthread_mutex_unlock(&(pool->lock));
printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
pthread_exit(NULL); //线程自己结束自己
}
//否则该线程可以拿出任务
task.function = pool->task_queue[pool->queue_front].function; //出队操作
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; //环型结构
pool->queue_size--;
//通知可以添加新任务
pthread_cond_broadcast(&(pool->queue_not_full));
//释放线程锁
pthread_mutex_unlock(&(pool->lock));
//执行刚才取出的任务
printf("thread 0x%x start working \n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter)); //锁住忙线程变量
pool->busy_thr_num++;
pthread_mutex_unlock(&(pool->thread_counter));
(*(task.function))(task.arg); //执行任务
//任务结束处理
printf("thread 0x%x end working \n", (unsigned int)pthread_self());
pthread_mutex_lock(&(pool->thread_counter));
pool->busy_thr_num--;
pthread_mutex_unlock(&(pool->thread_counter));
}
pthread_exit(NULL);
}
三、任务队列
任务队列的存在形式与线程数组相似;在线程池初始化时根据传入的最大任务数开辟空间;当服务器前方后请求到来后,分类并打包消息成为任务,将任务放入任务队列并通知空闲线程来取;不同之处在于任务队列有明显的先后顺序,先进先出;而线程数组中的线程则是一个竞争关系去拿到互斥锁争取任务;
/*向线程池的任务队列中添加一个任务*/
int
threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));
/*如果队列满了,调用wait阻塞*/
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
/*如果线程池处于关闭状态*/
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
return -1;
}
/*清空工作线程的回调函数的参数arg*/
if (pool->task_queue[pool->queue_rear].arg != NULL)
{
free(pool->task_queue[pool->queue_rear].arg);
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列*/
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 逻辑环 */
pool->queue_size++;
/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
四、管理者线程
作为线程池的管理者,该线程的主要功能包括:检查线程池内线程的存活状态,工作状态;负责根据服务器当前的请求状态去动态的增加或删除线程,保证线程池中的线程数量维持在一个合理高效的平衡上;
说到底,它就是一个单独的线程,定时的去检查,根据我们的一个维持平衡算法去增删线程;
/*管理线程*/
void *
admin_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown)
{
printf("admin -----------------\n");
sleep(DEFAULT_TIME); /*隔一段时间再管理*/
pthread_mutex_lock(&(pool->lock)); /*加锁*/
int queue_size = pool->queue_size; /*任务数*/
int live_thr_num = pool->live_thr_num; /*存活的线程数*/
pthread_mutex_unlock(&(pool->lock)); /*解锁*/
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; /*忙线程数*/
pthread_mutex_unlock(&(pool->thread_counter));
printf("admin busy live -%d--%d-\n", busy_thr_num, live_thr_num);
/*创建新线程 实际任务数量大于 最小正在等待的任务数量,存活线程数小于最大线程数*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
{
printf("admin add-----------\n");
pthread_mutex_lock(&(pool->lock));
int add=0;
/*一次增加 DEFAULT_THREAD_NUM 个线程*/
for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM
&& pool->live_thr_num < pool->max_thr_num; i++)
{
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
printf("new thread -----------------------\n");
}
}
pthread_mutex_unlock(&(pool->lock));
}
/*销毁多余的线程 忙线程x2 都小于 存活线程,并且存活的大于最小线程数*/
if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
// printf("admin busy --%d--%d----\n", busy_thr_num, live_thr_num);
/*一次销毁DEFAULT_THREAD_NUM个线程*/
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
pthread_mutex_unlock(&(pool->lock));
for (i=0; i<DEFAULT_THREAD_NUM; i++)
{
//通知正在处于空闲的线程,自杀
pthread_cond_signal(&(pool->queue_not_empty));
printf("admin cler --\n");
}
}
}
return NULL;
/*线程是否存活*/
int
is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); //发送0号信号,测试是否存活
if (kill_rc == ESRCH) //线程不存在
{
return false;
}
return true;
}
五、释放
/*释放线程池*/
int
threadpool_free(threadpool_t *pool)
{
if (pool == NULL)
return -1;
if (pool->task_queue)
free(pool->task_queue);
if (pool->threads)
{
free(pool->threads);
pthread_mutex_lock(&(pool->lock)); /*先锁住再销毁*/
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
/*销毁线程池*/
int
threadpool_destroy(threadpool_t *pool)
{
int i;
if (pool == NULL)
{
return -1;
}
pool->shutdown = true;
/*销毁管理者线程*/
pthread_join(pool->admin_tid, NULL);
//通知所有线程去自杀(在自己领任务的过程中)
for (i=0; i<pool->live_thr_num; i++)
{
pthread_cond_broadcast(&(pool->queue_not_empty));
}
/*等待线程结束 先是pthread_exit 然后等待其结束*/
for (i=0; i<pool->live_thr_num; i++)
{
pthread_join(pool->threads[i], NULL);
}
threadpool_free(pool);
return 0;
}
六、接口
/* 线程池初始化,其管理者线程及工作线程都会启动 */
threadpool_t *thp = threadpool_create(10, 100, 100);
printf("threadpool init ... ... \n");
/* 接收到任务后添加 */
threadpool_add_task(thp, do_work, (void *)p);
// ... ...
/* 销毁 */
threadpool_destroy(thp);
需要C/C++ Linux服务器开发学习资料私信“资料”(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
相关推荐
- 一文带您了解数据库的行列之争:行式与列式存储的异同
-
数据库存储格式是数据库管理系统中一个至关重要的方面,它直接影响到数据的组织和检索效率。在数据库中,有两种主要的存储格式,即行式存储和列式存储。这两者采用截然不同的方法来组织和存储数据,各自具有一系列优...
- NL2SQL(三)开源项目怎么选:talk is cheap, show me the code!
-
老规矩,先看效果下面的demo来自试用的SuperSonic,将会在下面详细介绍:大模型时代Text-to-SQL特点随着基于LLM技术的发展,RAG/AIAgent/Fine...
- JDK25长期支持版九月降临:18项王炸功能全解析
-
Java要放大招啦!9月份推出的JDK25长期支持版已经锁定18个超能力,从稳定值到结构化并发,还有Linux系统下的"预知未来"性能分析!下面我用打游戏的术语给你们掰扯明白:1、飞...
- OceanBase 推出单机版 高度兼容MySQL和Oracle
-
【环球网科技综合报道】3月27日,独立数据库厂商OceanBase正式发布单机版产品。据悉,这一产品基于自主研发的单机分布式一体化架构设计,具备极简数据库架构和高度兼容性,为中小规模业务提供兼具性能与...
- 黄远邦:应对7月1日闰秒对Oracle数据库影响
-
由于今年7月1日全世界会多出一秒,这可能对时间敏感的IT系统造成较大影响。中亦科技数据库团队对此问题做了深入的研究,并对用户系统提出了相应的解决方法及建议。中亦科技数据库产品总监黄远邦认为,闰秒调整会...
- MySQL数据库密码忘记了,怎么办?(mysql 数据库密码)
-
#头条创作挑战赛#MySQL数据库密码忘记了且没有其他可以修改账号密码的账户时怎么办呢?登录MySQL,密码输入错误/*密码错误,报如下错误*/[root@TESTDB~]#mysql-u...
- Chinese AI Talent in Spotlight as Nvidia and Meta Escalate Talent War
-
OntherightisBanghuaZhu,ChiefResearchScientistatNVIDIATMTPOST--SiliconValley’stoptech...
- 用Cursor开启JAVA+AI生涯(javascirpt怎么开启)
-
Cursor是基于VSCode开发的一款编辑器,支持多种语言的开发编辑。与传统的开发工具相比,它有多种优势:与AI无缝集成,响应速度快,占用内存小。但很多同学在"起步"过程中遇到了...
- 毕业十年了,自从做了开发用了很多软件,但距离写开发工具还很远
-
办公系统类:办公软件Word、Excel、PowerPoint三大必备技能+腾讯/金山在线文档解压缩操作:7-zip/winrar文件文本处理:Notepad++(文本编辑器正则表达式超级好...
- 盘点Java中最没用的知识⑤:这3个老古董你还在代码里“考古”?
-
一、Stack类:“继承Vector”的历史bug,为何成了性能拖油瓶?你是不是在学Java集合时,老师说过“栈结构用Stack类”?是不是在老代码里见过"newStack<>(...
- Gemini 2.5 Pro 0506发布,编程最强大模型, 碾压 Claude3.7 sonnent
-
一、Gemini2.5Pro(I/Oedition)发布1、为何叫I/Oedition?谷歌史上最强编程模型Gemini2.5Pro(I/Oedition)发布,具体型号是Gemin...
- 如何让无聊变得有趣(附本人大量美图)
-
文/图:金冬成在这条长300公里的公路上,我已经来回往返了无数次。3小时车程,一个人,想想都是多么无聊的一件事。其实,人生道路上,类似这种无聊的事情有很多很多。无聊的事情、枯燥的工作,往往让我们容易失...
- Oracle 推出 Java 24,增强 AI 支持和后量子加密
-
导读:Oracle宣布正式发布Java24,该语言增加了几个新功能,例如StreamGatherersAPI和Class-FileAPI的可用性,以及专门为AI推理和量子安全设计...
- 公司ERP突然变慢?“索引重建”这颗“药”可不能随便吃!
-
各位老板、IT小哥、财务小姐姐,有没有遇到过公司ERP系统突然卡顿得像“老爷车”,点个按钮半天没反应,急得直跺脚?这时候,可能有人会跳出来说:“我知道,重建一下数据库索引就好了!”听起来像个“神操作”...
- 基于Java实现,支持在线发布API接口读取数据库,有哪些工具?
-
基于java实现,不需要编辑就能发布api接口的,有哪些工具、平台?还能一键发布、快速授权和开放提供给第三方请求调用接口的解决方案。架构方案设计:以下是一些基于Java实现的无需编辑或只需少量编辑...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- 一文带您了解数据库的行列之争:行式与列式存储的异同
- NL2SQL(三)开源项目怎么选:talk is cheap, show me the code!
- JDK25长期支持版九月降临:18项王炸功能全解析
- OceanBase 推出单机版 高度兼容MySQL和Oracle
- 黄远邦:应对7月1日闰秒对Oracle数据库影响
- MySQL数据库密码忘记了,怎么办?(mysql 数据库密码)
- Chinese AI Talent in Spotlight as Nvidia and Meta Escalate Talent War
- 用Cursor开启JAVA+AI生涯(javascirpt怎么开启)
- 毕业十年了,自从做了开发用了很多软件,但距离写开发工具还很远
- 盘点Java中最没用的知识⑤:这3个老古董你还在代码里“考古”?
- 标签列表
-
- oracle位图索引 (74)
- oracle批量插入数据 (65)
- oracle事务隔离级别 (59)
- oracle 空为0 (51)
- oracle主从同步 (55)
- oracle 乐观锁 (51)
- redis 命令 (78)
- php redis (88)
- redis 存储 (66)
- redis 锁 (69)
- 启动 redis (66)
- redis 时间 (56)
- redis 删除 (67)
- redis内存 (57)
- redis并发 (52)
- redis 主从 (69)
- redis 订阅 (51)
- redis 登录 (54)
- redis 面试 (58)
- 阿里 redis (59)
- redis 搭建 (53)
- redis的缓存 (55)
- lua redis (58)
- redis 连接池 (61)
- redis 限流 (51)