什么是线程池,为什么要使用线程池?
什么是线程池?
首先顾名思义,就是把一堆开辟好的线程放在一个池子统一管理,就是一个线程池。
为什么要使用线程池?
难道来一个请求给它申请一个线程,请求处理完了释放线程不行吗?也行,但是如果创建线程和销毁线程的时间比线程处理请求时间长,而且请求很多的情况下,我们的 CPU 资源都浪费在创建线程和销毁线程上了线程池linux,所以这种方法效率会比较低。于是,我们可以将若干已经创建好的线程放在一起统一管理,如果来了一个请求,我们就从线程池中取出一个线程来处理请求,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的创建和结束线程的时间,有效的利用了 CPU 资源。
还有一个很重要的作用就是异步解耦。我们在做开发的时候,日志也是很重要的。对于日志的保存,一般都是保存到磁盘上,要知道磁盘的读写要比内存慢很多倍。如果日志直接写入磁盘的话,会发现整个服务器吞吐量的性能被压在磁盘的读写上面。如何解决性能不被压在磁盘上面,我们引入线程池。线程池起到了一个异步解耦的作用。那什么是异步解耦?就是写磁盘和落盘这个两个动作不在一个流程里面。
线程池的模型
按照我的理解,线程池的作用和双缓冲的作用类似,可以完成任务处理的 ”鱼贯动作“。
最后,如何才能创建一个线程池的模型呢?一般需要以下三个参与者:
线程池结构,它负责管理多个线程并提供任务队列的接口工作线程,它们负责处理任务任务队列,存放待处理的任务
有了三个参与者,下一个问题就是怎么使线程池安全有序的工作,可以使用 POSIX 中的信号量、互斥锁和条件变量等同步手段。下面描述了一个客户端发起请求时,服务器端是如何从线程池中取出一个线程进行处理的。
#include
#include
#include
#include
#include
#define LL_ADD(item, list) do{ \
item->prev = NULL; \
item->next = list; \
if(list) \
list->prev = item; \
list = item; \
}while(0);
#define LL_REMOVE(item, list) do{ \
if(item->prev) \
item->prev->next = item->next; \
if(item->next) \
item->next->prev = item->prev; \
if(list == item) \
list = item->next; \
item->next = item->prev = NULL;
}while(0);
struct NWORKER {
pthread_t thread;
struct threadpool_t *pool;
int terminate;
struct NWORKER *prev;
struct NWORKER *next;
};
struct NJOB{
void (*func)(struct NJOB *job);
void *user_data;
struct NJOB *prev;
struct NJOB *next;
};
struct threadpool_t{
struct NWORKER *workers;
struct NJOB *jobs;
pthread_cond_t jobs_cond;
pthread_mutex_t jobs_mutex;
};
static void *ThreadCallback(void *args)
{
struct NWORKER *worker = (struct NWORKER *)args;
while(1)
{
pthread_mutex_lock(&worker->pool->jobs_mutex);
while(worker->pool->jobs == NULL)
{
if(worker->terminate)
break;
pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex);
}
if(worker->terminate)
{
pthread_mutex_unlock(&worker->pool->jobs_mutex);
break;
}
struct NJOB *job = worker->pool->jobs;
LL_REMOVE(job, worker->pool->jobs);
pthread_mutex_unlock(&worker->pool->jobs_mutex);
job->func(job->user_data);
}
free(worker);
pthread_exit(NULL);
}
int ThreadPoolCreate(struct threadpool_t *pool, int workers)
{
if(workers < 1)
workers = 1;
if(!pool)
return -1;
memset(pool, 0, sizeof(struct threadpool_t));
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
memcpy(pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
int i = 0;
for(; i < workers; i++)
{
struct NWORKER *worker = (struct NWORKER *)malloc(sizeof( struct NWORKER));
if(!worker)
assert(worker != NULL);
memset(worker, 0, sizeof(worker));
worker->pool = pool;
int ret = pthread_create(worker->thread, NULL, ThreadCallback, (void *)worker);
assert(ret == 0);
LL_ADD(worker, pool->workers);
}
return 0;
}
void ThreadPoolPush(struct threadpool_t *pool, struct NJOB *job)
{
pthread_mutex_lock(&pool->jobs_mutex);
LL_ADD(job, pool->jobs);
pthread_cond_signal(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}
int ThreadPoolDestroy(struct threadpool_t *pool)
{
if(!pool)
return -1;
struct NWORKER *worker = NULL;
for(worker = pool->workers; worker != NULL; worker = worker->next)
{
worker->terminate = 1;
}
pthread_mutex_lock(&pool->jobs_mutex);
pthread_cond_broadcast(&pool->jobs_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}
(编辑:开发网_商丘站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|