最近需要一个能根据请求数变化的线程池,JAVA有这样的东西,可是C++下好像一般只是固定大小的线程池。所以就基于ACE写了个,只做了初步测试。
主要思想是:
1. 重载ACE_Task,这相当于是个固定线程池,用一个信号量(ACE_Thread_Semaphore)来记数空闲线程数。
2. 初始化时根据用户的输入,确定最少线程数minnum和最大线程数maxnum,当多个请求到来,并且无空闲线程(信号量用光),判断总线程数小于maxnum,就开始强迫增加线程数。
3. 当线程响应完一个请求(任务)后,如果当前任务队列为空,且线程数大于minnum,就退出本线程。这里做了一个优化,就算满足条件,线程也会在队列上再等待10秒,防止线程池抖动带来不必要的开销。
使用:
重载这个类,重载service_func函数实现自己的任务处理。
start_pool初始化线程池,之后,就可以用add_task向线程池添加任务。
它会根据请求的数量自动控制池大小进行处理。
已经在LINUX下测试通过。由于ACE是跨平台的,所以这个实现也应该可以在WINDOWS下工作。
编译:
带THREAD_POOL_UNIT_TEST选项,则编译出自测程序test
gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl
thread_pool.h头文件:
#ifndef THREAD_POOL
#define THREAD_POOL
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
#include "ace/Task.h"
#include "ace/Thread_Mutex.h"
#include "ace/Thread_Semaphore.h"
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
class thread_pool : public ACE_Task<ACE_MT_SYNCH>
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
data:image/s3,"s3://crabby-images/193dc/193dcd26d393debb58fd71fda627adc79a974993" alt=""
{
public:
thread_pool ();
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
~thread_pool ();
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// begin the initial threads and waiting for request
int start_pool (
int minnum = 5, // min number of thread
int maxnum = 100, // max number of thread
int waitsize = 1024, // request queue length
int parsize = 1024); // your parameter size
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// pending request in work queue
int wait_cnt ();
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// add one task to thread pool
int add_task (void *arg, int size);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// user defined work thread function
virtual int service_func (void* arg);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// overide base class function for thread pool logical
virtual int svc (void);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// not use
virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
private:
int minnum_, maxnum_;
int waitsize_, parsize_;
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
// ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
ACE_Thread_Semaphore *pfree_thread_; // for free thread count
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
long thread_flags_; // ace thread create flag
};
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
#endif /**//* THREAD_POOL */
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
thread_pool.cpp实现文件:
#include "thread_pool.h"
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
#define THREAD_POOL_DONOT_ACQUIRE 0x1001 // do not aquire again in new added thread
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
thread_pool::thread_pool ()
{
thread_flags_ = THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
pfree_thread_ = NULL;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
thread_pool::~thread_pool ()
{
if (pfree_thread_)
delete pfree_thread_;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int thread_pool::wait_cnt ()
{
return this->msg_queue()->message_count ();
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg)
{
return 0;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
int thread_pool::start_pool (
int minnum,
int maxnum,
int waitsize,
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int parsize)
{
minnum_ = minnum;
maxnum_ = maxnum;
waitsize_ = waitsize;
parsize_ = parsize;
this->msg_queue()->high_water_mark (waitsize * parsize);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
pfree_thread_ = new ACE_Thread_Semaphore (minnum);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
int ret = this->activate (thread_flags_, minnum);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
return ret;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int thread_pool::add_task (void *arg, int size)
{
ACE_Message_Block *mb = new ACE_Message_Block (parsize_);
// test free threads condition
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
if (pfree_thread_->tryacquire () == -1)
{ // acquire one free thread to do work
printf ("free thread used up\n");
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
if (this->thr_count () < maxnum_)
{
this->activate (thread_flags_, 1, 1);
printf ("new thread release\n");
pfree_thread_->release ();
printf ("new one thread, now %d\n", this->thr_count ());
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
} else
{
printf ("can't new more threads, queue len %d\n", wait_cnt () + 1);
}
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
} else
{
// pfree_thread_->release (); // restore cnt, let svc function do acquire work
printf ("new task acquire\n");
mb->set_flags (THREAD_POOL_DONOT_ACQUIRE);
}
// create msg
printf ("add msg\n");
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
memcpy (mb->wr_ptr (), (char*) arg, size);
this->putq (mb);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
return 0;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int thread_pool::service_func (void* arg)
{
sleep (1);
printf ("finished task %d in thread %02X\n", *(int*) arg, (int)ACE_Thread::self ());
return 0;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int thread_pool::svc (void)
{
printf ("thread started\n");
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
while (1)
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
{
ACE_Message_Block *b = 0;
ACE_Time_Value wait = ACE_OS::gettimeofday ();
wait.sec (wait.sec () + 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
if (this->getq (b, &wait) < 0)
{
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
if (this->thr_count () > minnum_)
{
printf ("over task acquire\n");
pfree_thread_->acquire ();
printf ("delete one thread, now %d\n", this->thr_count ()-1);
return 0;
} else
continue; // I'm the one of last min number of threads
}
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0)
{
printf ("queue task acquire\n");
pfree_thread_->acquire (); // I'll use one free thread
}
else
printf ("no need to acquire\n");
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
this->service_func ((void*)b->rd_ptr());
printf ("finished release\n");
b->release();
pfree_thread_->release (); // added one free thread
}
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
return 0;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
#ifdef THREAD_POOL_UNIT_TEST
data:image/s3,"s3://crabby-images/2a1f3/2a1f35146451967292b66fa62c8f22027e7067cf" alt=""
int main (int argc, int ** argv)
{
printf ("begin test:\n");
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
/**//*
ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
s->release (3);
s->acquire ();
s->acquire ();
s->acquire ();
printf ("ok");
return 0;
*/
thread_pool t;
t.start_pool (10, 100);
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
data:image/s3,"s3://crabby-images/8d7d9/8d7d99ac571b1efcbf1f7e7a4120707c8e90d1fd" alt=""
for (int i=0; i<200; i++)
{
t.add_task (&i, sizeof(i));
if (i % 20 == 0)
sleep (1);
}
data:image/s3,"s3://crabby-images/96c01/96c01a9005d00151a1af2189b6a9f266915ba654" alt=""
sleep (1000);
printf ("end test:\n");
return 0;
}
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
#endif
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
data:image/s3,"s3://crabby-images/9e1b5/9e1b5b2a3e46b5341b22649797d1794392182f55" alt=""
posted on 2007-08-14 17:56
我爱佳娃 阅读(6074)
评论(4) 编辑 收藏 所属分类:
自写类库