设计目标
Ø 提供一个线程池的组件,具有良好的伸缩性,当线程够用时,销毁不用线程,当线程不够用时,自动增加线程数量;
Ø 提供一个工作任务接口和工作队列,实际所需要的任务都必须实现这个工作任务接口,然后放入工作队列中;
Ø 线程池中的线程从工作队列中,自动取得工作任务,执行任务。
主要控制类和功能接口设计
线程池管理器 ThreadPoolManager 的功能:
Ø 管理线程池中的各个属性变量
ü 最大工作线程数
ü 最小工作线程数
ü 激活的工作线程总数
ü 睡眠的工作线程总数
ü 工作线程总数 (即:激活的工作线程总数+睡眠的工作线程总数)
Ø 创建工作线程
Ø 销毁工作线程
Ø 启动处于睡眠的工作线程
Ø 睡眠处于激活的工作线程
Ø 缩任务:当工作线程总数小于或等于最小工作线程数时,销毁多余的睡眠的工作线程,使得现有工作线程总数等于最小工作任务总数
Ø 伸任务:当任务队列任务总数大于工作线程数时,增加工作线程总数至最大工作线程数
Ø 提供线程池启动接口
Ø 提供线程池销毁接口
工作线程 WorkThread 的功能:
Ø 从工作队列取得工作任务
Ø 执行工作任务接口中的指定任务
工作任务接口 ITask 的功能:
Ø 提供指定任务动作
工作队列 IWorkQueue 的功能:
Ø 提供获取任务接口,并删除工作队列中的任务;
Ø 提供加入任务接口;
Ø 提供删除任务接口;
Ø 提供取得任务总数接口;
Ø 提供自动填任务接口;(当任务总数少于或等于默认总数的25%时,自动装填)
Ø 提供删除所有任务接口;
Code
ThreadPoolManager:
=====================================
CODE:
package test.thread.pool1;
import java.util.ArrayList;
import java.util.List;
import test.thread.pool1.impl.MyWorkQueue;
/**
* <p>Title: 线程池管理器</p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class ThreadPoolManager {
/*最大线程数*/
private int threads_max_num;
/*最小线程数*/
private int threads_min_num;
/* 线程池线程增长步长 */
private int threads_increase_step = 5;
/* 任务工作队列 */
private IWorkQueue queue;
/* 线程池监视狗 */
private PoolWatchDog poolWatchDog ;
/* 队列线程 */
private Thread queueThread ;
/* 线程池 封装所有工作线程的数据结构 */
private List pool = new ArrayList();
/* 线程池中 封装所有钝化后的数据结构*/
private List passivePool = new ArrayList();
/* 空闲60秒 */
private static final long IDLE_TIMEOUT = 60000L;
/* 关闭连接池标志位 */
private boolean close = false;
/**
* 线程池管理器
* @param queue 任务队列
* @param threads_min_num 工作线程最小数
* @param threads_max_num 工作线程最大数
*/
public ThreadPoolManager(int threads_max_num
,int threads_min_num
,IWorkQueue queue){
this.threads_max_num = threads_max_num;
this.threads_min_num = threads_min_num;
this.queue = queue;
}
/**
* 线程池启动
*/
public void startPool(){
System.out.println("=== startPool..........");
poolWatchDog = new PoolWatchDog("PoolWatchDog");
poolWatchDog.setDaemon(true);
poolWatchDog.start();
System.out.println("=== startPool..........over");
}
/**
* 线程池销毁接口
*/
public void destoryPool(){
System.out.println("==========================DestoryPool starting ...");
this.close = true;
int pool_size = this.pool.size();
//中断队列线程
System.out.println("===Interrupt queue thread ... ");
queueThread.interrupt();
queueThread = null;
System.out.println("===Interrupt thread pool ... ");
Thread pool_thread = null;
for(int i=0; i<pool_size; i++){
pool_thread = (Thread)pool.get(i);
if(pool_thread !=null
&& pool_thread.isAlive()
&& !pool_thread.isInterrupted()){
pool_thread.interrupt();
System.out.println("Stop pool_thread:"
+pool_thread.getName()+"[interrupt] "
+pool_thread.isInterrupted());
}
}//end for
if(pool != null){
pool.clear();
}
if(passivePool != null){
pool.clear();
}
try{
System.out.println("=== poolWatchDog.join() starting ...");
poolWatchDog.join();
System.out.println("=== poolWatchDog.join() is over ...");
}
catch(Throwable ex){
System.out.println("###poolWatchDog ... join method throw a exception ... "
+ex.toString());
}
poolWatchDog =null;
System.out.println("==============================DestoryPool is over ...");
}
public static void main(String[] args) throws Exception{
ThreadPoolManager threadPoolManager1 = new ThreadPoolManager(10,5,new MyWorkQueue(50,30000));
threadPoolManager1.startPool();
Thread.sleep(60000);
threadPoolManager1.destoryPool();
}
/**
* 线程池监视狗
*/
private class PoolWatchDog extends Thread{
public PoolWatchDog(String name){
super(name);
}
public void run(){
Thread workThread = null;
Runnable run = null;
//开启任务队列线程,获取数据--------
System.out.println("===QueueThread starting ... ... ");
queueThread = new Thread(new QueueThread(),"QueueThread");
queueThread.start();
System.out.println("===Initial thread Pool ... ...");
//初始化线程池的最小线程数,并放入池中
for(int i=0; i<threads_min_num; i++){
run = new WorkThread();
workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
workThread.start();
if(i == threads_min_num -1){
workThread = null;
run = null;
}
}
System.out.println("===Initial thread Pool..........over ,and get pool's size:"+pool.size());
//线程池线程动态增加线程算法--------------
while(!close){
//等待5秒钟,等上述线程都启动----------
synchronized(this){
try{
System.out.println("===Wait the [last time] threads starting ....");
this.wait(15000);
}
catch(Throwable ex){
System.out.println("###PoolWatchDog invoking is failure ... "+ex);
}
}//end synchronized
//开始增加线程-----------------------spread动作
int queue_size = queue.getTaskSize();
int temp_size = (queue_size - threads_min_num);
if((temp_size > 0) && (temp_size/threads_increase_step > 2) ){
System.out.println("================Spread thread pool starting ....");
for(int i=0; i<threads_increase_step && (pool.size() < threads_max_num); i++){
System.out.println("=== Spread thread num : "+i);
run = new WorkThread();
workThread = new Thread(run,"WorkThread_"+System.currentTimeMillis()+i);
workThread.start();
}//end for
workThread = null;
run = null;
System.out.println("===Spread thread pool is over .... and pool size:"+pool.size());
}//end if
//删除已经多余的睡眠线程-------------shrink动作
int more_sleep_size = pool.size() - threads_min_num;//最多能删除的线程数
int sleep_threads_size = passivePool.size();
if(more_sleep_size >0 && sleep_threads_size >0){
System.out.println("================Shrink thread pool starting ....");
for(int i=0; i < more_sleep_size && i < sleep_threads_size ; i++){
System.out.println("=== Shrink thread num : "+i);
Thread removeThread = (Thread)passivePool.get(0);
if(removeThread != null && removeThread.isAlive() && !removeThread.isInterrupted()){
removeThread.interrupt();
}
}
System.out.println("===Shrink thread pool is over .... and pool size:"+pool.size());
}
System.out.println("===End one return [shrink - spread operator] ....");
}//end while
}//end run
}//end private class
/**
* 工作线程
*/
class WorkThread implements Runnable{
public WorkThread(){
}
public void run(){
String name = Thread.currentThread().getName();
System.out.println("===Thread.currentThread():"+name);
pool.add(Thread.currentThread());
while(true){
//获取任务---------
ITask task = null;
try{
System.out.println("===Get task from queue is starting ... ");
//看线程是否被中断,如果被中断停止执行任务----
if(Thread.currentThread().isInterrupted()){
System.out.println("===Breaking current thread and jump whlie [1] ... ");
break;
}
task = queue.getTask();
}
catch(Throwable ex){
System.out.println("###No task in queue:"+ex);
}//end tryc
if(task != null){
//执行任务---------
try{
System.out.println("===Execute the task is starting ... ");
//看线程是否被中断,如果被中断停止执行任务----
if(Thread.currentThread().isInterrupted()){
System.out.println("===Breaking current thread and jump whlie [1] ... ");
break;
}
task.executeTask();
//任务执行完毕-------
System.out.println("===Execute the task is over ... ");
}
catch(Throwable ex){
System.out.println("###Execute the task is failure ... "+ex);
}//end tryc
}else{
//没有任务,则钝化线程至规定时间--------
synchronized(this){
try{
System.out.println("===Passivate into passivePool ... ");
//看线程是否被中断,如果被中断停止执行任务----
boolean isInterrupted = Thread.currentThread().isInterrupted();
if(isInterrupted){
System.out.println("===Breaking current thread and jump whlie [1] ... ");
break;
}
// passivePool.add(this);
passivePool.add(Thread.currentThread());
//准备睡眠线程-------
isInterrupted = Thread.currentThread().isInterrupted();
if(isInterrupted){
System.out.println("===Breaking current thread and jump whlie [2] ... ");
break;
}
this.wait(IDLE_TIMEOUT);
}
catch(Throwable ex1){
System.out.println("###Current Thread passivate is failure ... break while cycle. "+ex1);
break;
}
}
}
}//end while--------
if(pool.contains(passivePool)){
pool.remove(this);
}
if(passivePool.contains(passivePool)){
passivePool.remove(this);
}
System.out.println("===The thread execute over ... ");
}//end run----------
}
class QueueThread implements Runnable{
public QueueThread(){
}
public void run(){
while(true){
//自动装在任务--------
queue.autoAddTask();
System.out.println("===The size of queue's task is "+queue.getTaskSize());
synchronized(this){
if(Thread.currentThread().isInterrupted()){
break;
}else{
try{
this.wait(queue.getLoadDataPollingTime());
}
catch(Throwable ex){
System.out.println("===QueueThread invoked wait is failure ... break while cycle."+ex);
break;
}
}//end if
}//end synchr
}//end while
}//end run
}
}
WorkQueue
=====================================
CODE:
package test.thread.pool1;
import java.util.LinkedList;
import test.thread.pool1.impl.MyTask;
/**
* <p>Title: 工作队列对象 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public abstract class WorkQueue implements IWorkQueue{
/* 预计装载量 */
private int load_size;
/* 数据装载轮循时间 */
private long load_polling_time;
/* 队列 */
private LinkedList queue = new LinkedList();
/**
*
* @param load_size 预计装载量
* @param load_polling_time 数据装载轮循时间
*/
public WorkQueue(int load_size,long load_polling_time){
this.load_size = (load_size <= 10) ? 10 : load_size;
this.load_polling_time = load_polling_time;
}
/* 数据装载轮循时间 */
public long getLoadDataPollingTime(){
return this.load_polling_time;
}
/*获取任务,并删除队列中的任务*/
public synchronized ITask getTask(){
ITask task = (ITask)queue.getFirst();
queue.removeFirst();
return task;
}
/*加入任务*/
public void addTask(ITask task){
queue.addLast(task);
}
/*删除任务*/
public synchronized void removeTask(ITask task){
queue.remove(task);
}
/*任务总数*/
public synchronized int getTaskSize(){
return queue.size();
}
/*自动装填任务*/
public synchronized void autoAddTask(){
synchronized(this){
float load_size_auto = load_size - getTaskSize() / load_size;
System.out.println("===load_size_auto:"+load_size_auto);
if(load_size_auto > 0.25){
autoAddTask0();
}
else {
System.out.println("=== Not must load new work queue ... Now! ");
}
}
}
/*删除所有任务*/
public synchronized void clearAllTask(){
queue.clear();
}
/**
* 程序员自己实现该方法
*/
protected abstract void autoAddTask0();
}
MyWorkQueue
=====================================
CODE:
package test.thread.pool1.impl;
import java.util.LinkedList;
import test.thread.pool1.WorkQueue;
/**
* <p>Title: 例子工作队列对象 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MyWorkQueue extends WorkQueue{
/**
* @param load_size 预计装载量
* @param load_polling_time 数据装载轮循时间
*/
public MyWorkQueue(int load_size,long load_polling_time){
super(load_size,load_polling_time);
}
/**
* 自动加载任务
*/
protected synchronized void autoAddTask0(){
//-------------------
System.out.println("===MyWorkQueue ... invoked autoAddTask0() method ...");
for(int i=0; i<10; i++){
System.out.println("===add task :"+i);
this.addTask(new MyTask());
}
//-------------------
}
}
MyTask
=====================================
CODE:
package test.thread.pool1.impl;
import test.thread.pool1.ITask;
/**
* <p>Title: 工作任务接口 </p>
* <p>Description: </p>
* <p>Copyright: Copyright (c) 2005</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class MyTask implements ITask {
/**
* 执行的任务
* @throws java.lang.Throwable
*/
public void executeTask() throws Throwable{
System.out.println("["+this.hashCode()+"] MyTask ... invoked executeTask() method ... ");
}
}