线程池代码

ThreadPoolManager

package com.threadpool.test;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.omg.CORBA.TIMEOUT;

import com.nio.test.ReadFileThread;

public class ThreadPoolManager {

    private static ThreadPoolManager tpm = new ThreadPoolManager();

    // 线程池最小线程数
    private final static int MIN_SIZE = 4;
    // 线程池最大线程数
    private final static int MAX_SIZE = 10;
    // 线程池维护线程允许的空闲限制
    private final static int KEEP_ACTIVE_TIME = 0;
    // 线程池用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 10;
    // 消息缓冲队列
    Queue queue = new LinkedList();
    
    final Runnable accessBuffeThread = new Runnable()
    {

        public void run() {
            
            if( hasMoreAcquire() ){
                
                String msg = ( String ) queue.poll();
                Runnable task = new AccessDBThread( msg );
                threadpool.execute( task );
            }
        }
        
    };
    // 无法由 ThreadPoolExecutor 执行的任务的处理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler()
    {

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // TODO Auto-generated method stub
            System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");
            queue.offer((( AccessDBThread ) r ).getMsg() );
        }
        
    };
    
    final ThreadPoolExecutor threadpool = new ThreadPoolExecutor(MIN_SIZE, MAX_SIZE, KEEP_ACTIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE),this.handler);
    
    // 调度线程池

    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );

    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(accessBuffeThread, 0, 1, TimeUnit.SECONDS);
    
    public static ThreadPoolManager newinstance()
    {
        return tpm;
    }
    
    private ThreadPoolManager (){}
    
    
    private boolean hasMoreAcquire()
    {
        return !queue.isEmpty();
    }
    
    public void addLogMsg(String msg)
    {
     Runnable task = new AccessDBThread(msg);
     threadpool.execute(task);
    }
}
AccessDBThread
package com.threadpool.test;

public class AccessDBThread implements Runnable{

    private String msg;
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    
    public AccessDBThread(){
        super();
    }
    
    public AccessDBThread(String msg)
    {
        this.msg=msg;
    }
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("Added the message: "+msg+" into the Database");
    }

    
}

测试类:
package com.threadpool.test;


public class TestDriver {

    ThreadPoolManager tpm = ThreadPoolManager.newinstance();
    
    public void addMsg(String msg)
    {
        tpm.addLogMsg(msg);
    }
    public static void main(String[] args) {
        for(int i=0;i<100;i++)
        {
            new TestDriver().addMsg(Integer.toString(i));
        }
    }
}





李阳

posted on 2011-09-02 15:20 crazy-李阳 阅读(412) 评论(0)  编辑  收藏


只有注册用户登录后才能发表评论。


网站导航:
博客园   IT新闻   Chat2DB   C++博客   博问  
 
<2011年9月>
28293031123
45678910
11121314151617
18192021222324
2526272829301
2345678

导航

统计

常用链接

留言簿

随笔档案

搜索

最新评论

阅读排行榜

评论排行榜