随笔-124  评论-194  文章-0  trackbacks-0

最近需要一个能根据请求数变化的线程池,JAVA有这样的东西,可是C++下好像一般只是固定大小的线程池。所以就基于ACE写了个,只做了初步测试。

主要思想是:
1. 重载ACE_Task,这相当于是个固定线程池,用一个信号量(ACE_Thread_Semaphore)来记数空闲线程数。
2. 初始化时根据用户的输入,确定最少线程数minnum和最大线程数maxnum,当多个请求到来,并且无空闲线程(信号量用光),判断总线程数小于maxnum,就开始强迫增加线程数。
3. 当线程响应完一个请求(任务)后,如果当前任务队列为空,且线程数大于minnum,就退出本线程。这里做了一个优化,就算满足条件,线程也会在队列上再等待10秒,防止线程池抖动带来不必要的开销。

使用:
重载这个类,重载service_func函数实现自己的任务处理。
start_pool初始化线程池,之后,就可以用add_task向线程池添加任务。
它会根据请求的数量自动控制池大小进行处理。
已经在LINUX下测试通过。由于ACE是跨平台的,所以这个实现也应该可以在WINDOWS下工作。

编译:
带THREAD_POOL_UNIT_TEST选项,则编译出自测程序test
gcc -g -Wall -O2 -g -Wall -I. -I../ -I../mon/comm/ACE_wrappers -g -DTHREAD_POOL_UNIT_TEST -o test thread_pool.cpp -lpthread -lm -lz -lstdc++ ../mon/comm/ACE_wrappers/ace/libACE.a -ldl


thread_pool.h头文件:

#ifndef THREAD_POOL
#define THREAD_POOL

#include 
"ace/Task.h"
#include 
"ace/Thread_Mutex.h"
#include 
"ace/Thread_Semaphore.h"

class thread_pool : public ACE_Task<ACE_MT_SYNCH>
{
public:
    thread_pool ();

    
~thread_pool ();

    
// begin the initial threads and waiting for request
    int start_pool (
        
int minnum = 5// min number of thread
        int maxnum = 100,  // max number of thread
        int waitsize = 1024// request queue length
        int parsize = 1024); // your parameter size


    
// pending request in work queue
    int wait_cnt ();

    
// add one task to thread pool
    int add_task (void *arg, int size);

    
// user defined work thread function
    virtual int service_func (void* arg);

    
// overide base class function for thread pool logical
    virtual int svc (void);

    
// not use
    virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg);

private:
    
int minnum_, maxnum_;
    
int waitsize_, parsize_;

//    ACE_Recursive_Thread_Mutex free_thread_cnt__mutex_;

    ACE_Thread_Semaphore 
*pfree_thread_; // for free thread count

    
long thread_flags_; // ace thread create flag
}
;


#endif 
/* THREAD_POOL */




thread_pool.cpp实现文件:
#include "thread_pool.h"

#define THREAD_POOL_DONOT_ACQUIRE    
0x1001 // do not aquire again in new added thread

thread_pool::thread_pool () 
{
    thread_flags_ 
= THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED;
    pfree_thread_ 
= NULL;
}


thread_pool::
~thread_pool () {
    
if (pfree_thread_)
        delete pfree_thread_;
}


int thread_pool::wait_cnt () {
    
return this->msg_queue()->message_count ();
}


int thread_pool::handle_timeout (const ACE_Time_Value &tv, const void *arg) {
    
return 0;
}


int thread_pool::start_pool (
    
int minnum,
    
int maxnum, 
    
int waitsize, 
    
int parsize) {
    minnum_ 
= minnum;
    maxnum_ 
= maxnum;
    waitsize_ 
= waitsize;
    parsize_ 
= parsize;
    
    
this->msg_queue()->high_water_mark (waitsize * parsize);

    pfree_thread_ 
= new ACE_Thread_Semaphore (minnum);

    
int ret = this->activate (thread_flags_, minnum);

    
return ret;
}


int thread_pool::add_task (void *arg, int size) {
    ACE_Message_Block 
*mb = new ACE_Message_Block (parsize_);
    
    
// test free threads condition
    if (pfree_thread_->tryacquire () == -1// acquire one free thread to do work
        printf ("free thread used up\n");

        
if (this->thr_count () < maxnum_) {
            
this->activate (thread_flags_, 11);
            
            printf (
"new thread release\n");
            pfree_thread_
->release ();
            
            printf (
"new one thread, now %d\n"this->thr_count ());
        }
 else {
            printf (
"can't new more threads, queue len %d\n", wait_cnt () + 1);
        }

    }
 else {
        
// pfree_thread_->release (); // restore cnt, let svc function do acquire work
        printf ("new task acquire\n");
        mb
->set_flags (THREAD_POOL_DONOT_ACQUIRE);
    }

    
    
// create msg
    printf ("add msg\n");

    memcpy (mb
->wr_ptr (), (char*) arg, size);
            
    
this->putq (mb);

    
return 0;
}



int thread_pool::service_func (void* arg) {
    sleep (
1);
    printf (
"finished task %d in thread %02X\n"*(int*) arg, (int)ACE_Thread::self ());
    
return 0;
}



int thread_pool::svc (void{
    printf (
"thread started\n");

    
while (1)
    
{                
        ACE_Message_Block 
*= 0;
        ACE_Time_Value wait 
= ACE_OS::gettimeofday ();
        wait.sec (wait.sec () 
+ 10); // timeout in 10 secs to test if more tasks need to do or we'll exit
        
        
if (this->getq (b, &wait) < 0{
            
if (this->thr_count () > minnum_) {
                printf (
"over task acquire\n");
                pfree_thread_
->acquire ();
                printf (
"delete one thread, now %d\n"this->thr_count ()-1);
                
                
return 0;
            }
 else 
                
continue// I'm the one of last min number of threads
        }


        
if (b->flags () & THREAD_POOL_DONOT_ACQUIRE == 0{
            printf (
"queue task acquire\n");
            pfree_thread_
->acquire (); // I'll use one free thread
        }

        
else 
            printf (
"no need to acquire\n");

        
this->service_func ((void*)b->rd_ptr());
                            
        printf (
"finished release\n");
        b
->release();
        
        pfree_thread_
->release (); // added one free thread
    }


    
return 0;
}



#ifdef THREAD_POOL_UNIT_TEST 
int main (int argc, int ** argv) {
    printf (
"begin test:\n");
/*
    ACE_Thread_Semaphore* s = new ACE_Thread_Semaphore (0);
    s->release (3);
    s->acquire ();
    s->acquire ();
    s->acquire ();
    printf ("ok");
    return 0;
*/
    
    thread_pool t;
    t.start_pool (
10100);

    
for (int i=0; i<200; i++{
        t.add_task (
&i, sizeof(i));
        
if (i % 20 == 0)
            sleep (
1);
    }


    sleep (
1000);
    
    printf (
"end test:\n");
    
return 0;
}


#endif

posted on 2007-08-14 17:56 我爱佳娃 阅读(6067) 评论(4)  编辑  收藏 所属分类: 自写类库

评论:
# re: C++实现的带最大最小线程数的线程池(基于ACE) 2007-08-14 21:08 | pass86
怎么写道了BLOGJAVA.COM,不过学ACE是好的。  回复  更多评论
  
# re: C++实现的带最大最小线程数的线程池(基于ACE) 2007-08-16 23:35 | alwayscy
嘿嘿,大部分BLOGJAVA的同学都只有一个技术博客吧,只要保证大部分与JAVA有关就好了。  回复  更多评论
  
# re: C++实现的带最大最小线程数的线程池(基于ACE) 2008-01-13 14:40 | liuruigong
编译错误修改
1#include <ace/OS.h>
2.ACE_OS::sleep();
3.最好把主函数的sleep(2000) 修改为
ACE_Thread_Manager::instance()->wait();

这个线程池写的不错  回复  更多评论
  
# re: C++实现的带最大最小线程数的线程池(基于ACE)[未登录] 2008-01-14 22:09 | 我爱佳娃
以前搞C++,ACE是个不错的框架,最近接触了不少JAVA的东西,感觉JAVA这东西琳琅满目。  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: