programmer's home, welcome here!

technical issues and my life

常用链接

统计

最新评论

Java 通用线程池-转自CSDN

感谢ryang的劳动!

Java实现通用线程池

线程池通俗的描述就是预先创建若干空闲线程,等到需要用多线程去处理事务的时候去唤醒某些空闲线程执行处理任务,这样就省去了频繁创建线程的时间,因为频繁创建线程是要耗费大量的CPU资源的。如果一个应用程序需要频繁地处理大量并发事务,不断的创建销毁线程往往会大大地降低系统的效率,这时候线程池就派上用场了。
      本文旨在使用Java语言编写一个通用的线程池。当需要使用线程池处理事务时,只需按照指定规范封装好事务处理对象,然后用已有的线程池对象去自动选择空闲线程自动调用事务处理对象即可。并实现线程池的动态修改(修改当前线程数,最大线程数等)。下面是实现代码:

//ThreadTask .java

 

package polarman.threadpool;

/** *//**
 * 线程任务
 * @author ryang
 * 2006-8-8
 
*/

public 
interface ThreadTask ...{
    
public void run();
}



//PooledThread.java

package polarman.threadpool;

import java.util.Collection;
import java.util.Vector;

/** *//**
 * 接受线程池管理的线程
 * @author ryang
 * 2006-8-8
 
*/

public 
class PooledThread extends Thread ...{
   
    
protected Vector tasks = new Vector();
    
protected boolean running = false;
    
protected boolean stopped = false;
    
protected boolean paused = false;
    
protected boolean killed = false;
    private ThreadPool pool;
   
    
public PooledThread(ThreadPool pool)...{
        
this.pool = pool;
    }

   
    public 
void putTask(ThreadTask task)...{
        tasks.add(task);
    }

   
    public 
void putTasks(ThreadTask[] tasks)...{
        
for(int i=0; i<tasks.length; i++)
            
this.tasks.add(tasks[i]);
    }

   
    public 
void putTasks(Collection tasks)...{
        
this.tasks.addAll(tasks);
    }

   
    protected
 ThreadTask popTask()...{
        
if(tasks.size() > 0)
            
return (ThreadTask)tasks.remove(0);
        else
            return
 null;
    }

   
    public 
boolean isRunning()...{
        
return running;
    }

   
    public 
void stopTasks()...{
        stopped 
= true;
    }

   
    public 
void stopTasksSync()...{
        stopTasks();
        
while(isRunning())...{
            
try ...{
                sleep(5);
            }
 catch
 (InterruptedException e) ...{
            }

        }

    }

   
    public 
void pauseTasks()...{
        paused 
= true;
    }

   
    public 
void pauseTasksSync()...{
        pauseTasks();
        
while(isRunning())...{
            
try ...{
                sleep(5);
            }
 catch
 (InterruptedException e) ...{
            }

        }

    }

   
    public 
void kill()...{
        
if(!running)
            interrupt();
        else
            killed =
 true;
    }

   
    public 
void killSync()...{
        kill();
        
while(isAlive())...{
            
try ...{
                sleep(5);
            }
 catch
 (InterruptedException e) ...{
            }

        }

    }

   
    public 
synchronized void startTasks()...{
        running 
= true;
        
this.notify();
    }

   
    public 
synchronized void run()...{
        
try...{
            
while(true)...{
                
if(!running || tasks.size() == 0)...{
                    pool.notifyForIdleThread();
                    
//System.out.println(Thread.currentThread().getId() + ": 空闲");
                    this.wait();
                }
else
...{
                    ThreadTask task;
                    
while((task = popTask()) != null)...{
                        task.run();
                        
if(stopped)...{
                            stopped 
= false;
                            
if(tasks.size() > 0)...{
                                tasks.clear();
                                System.out.println(Thread.currentThread().getId() 
+ ": Tasks are stopped");
                                
break;
                            }

                        }

                        if
(paused)...{
                            paused 
= false;
                            
if(tasks.size() > 0)...{
                                System.out.println(Thread.currentThread().getId() 
+ ": Tasks are paused");
                                
break;
                            }

                        }

                    }

                    running =
 false;
                }


                if
(killed)...{
                    killed 
= false;
                    
break;
                }

            }

        }
catch
(InterruptedException e)...{
            
return;
        }

       
        //System.out.println(Thread.currentThread().getId() + ": Killed");
    }

}



//ThreadPool.java


下面是线程池的测试程序
//ThreadPoolTest.java

ThreadPool pool = new ThreadPool(3, 2);
pool.init();

要处理的任务实现ThreadTask...接口即可(如测试代码里的SimpleTask),这个接口只有一个方法run()
两行代码即可调用:

ThreadTask task = ... //实例化你的任务对象
pool.processTask(task);

package polarman.threadpool;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;

/** *//**
 * 线程池
 * @author ryang
 * 2006-8-8
 
*/

public 
class ThreadPool ...{
   
    
protected int maxPoolSize;
    
protected int initPoolSize;
    
protected Vector threads = new Vector();
    
protected boolean initialized = false;
    
protected boolean hasIdleThread = false;
   
    
public ThreadPool(int maxPoolSize, int initPoolSize)...{
        
this.maxPoolSize = maxPoolSize;
        
this.initPoolSize = initPoolSize;
    }

   
    public 
void init()...{
        initialized 
= true;
        
for(int i=0; i<initPoolSize; i++)...{
            PooledThread thread 
= new PooledThread(this);
            thread.start();
            threads.add(thread);
        }

       
        //System.out.println("线程池初始化结束,线程数=" + threads.size() + " 最大线程数=" + maxPoolSize);
    }

   
    public 
void setMaxPoolSize(int maxPoolSize)...{
        
//System.out.println("重设最大线程数,最大线程数=" + maxPoolSize);
        
this.maxPoolSize = maxPoolSize;
        
if(maxPoolSize < getPoolSize())
            setPoolSize(maxPoolSize);
    }

   
    
/**
     * 重设当前线程数
     * 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事务处理完成
     * 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     * @param size
     
*/

    public 
void setPoolSize(int size)...{
        
if(!initialized)...{
            initPoolSize 
= size;
            return;
        }
else 
if(size > getPoolSize())...{
            
for(int i=getPoolSize(); i<size && i<maxPoolSize; i++)...{
                PooledThread thread 
= new PooledThread(this);
                thread.start();
                threads.add(thread);
            }

        }
else 
if(size < getPoolSize())...{
            
while(getPoolSize() > size)...{
                PooledThread th 
= (PooledThread)threads.remove(0);
                th.kill();
            }

        }

       
        //System.out.println("重设线程数,线程数=" + threads.size());
    }

   
    public 
int getPoolSize()...{
        
return threads.size();
    }

   
    protected 
void notifyForIdleThread()...{
        hasIdleThread 
= true;
    }

   
    protected 
boolean waitForIdleThread()...{
        hasIdleThread 
= false;
        
while(!hasIdleThread && getPoolSize() >= maxPoolSize)...{
            
try ...{
                Thread.sleep(5);
            }
 catch
 (InterruptedException e) ...{
                
return false;
            }

        }

       
        return
 true;
    }

   
    public 
synchronized PooledThread getIdleThread()...{
        
while(true)...{
            
for(Iterator itr=threads.iterator(); itr.hasNext();)...{
                PooledThread th = (PooledThread)itr.next();
                
if(!th.isRunning())
                    
return th;
            }

           
            if(getPoolSize() 
< maxPoolSize)...{
                PooledThread thread 
= new PooledThread(this);
                thread.start();
                threads.add(thread);
                
return thread;
            }

           
            //System.out.println("线程池已满,等待...");
            
if(waitForIdleThread() == false)
                
return null;
        }

    }

   
    public 
void processTask(ThreadTask task)...{
        PooledThread th = getIdleThread();
        
if(th != null)...{
            th.putTask(task);
            th.startTasks();
        }

    }

   
    public 
void processTasksInSingleThread(ThreadTask[] tasks)...{
        PooledThread th = getIdleThread();
        
if(th != null)...{
            th.putTasks(tasks);
            th.startTasks();
        }

    }

   
    public 
void processTasksInSingleThread(Collection tasks)...{
        PooledThread th = getIdleThread();
        
if(th != null)...{
            th.putTasks(tasks);
            th.startTasks();
        }

    }

}


 

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import polarman.threadpool.ThreadPool;
import polarman.threadpool.ThreadTask;

public class ThreadPoolTest ...{

    
public static void main(String[] args) ...{
        System.out.println(
""quit" 退出");
        System.out.println(
""task A 10" 启动任务B,时长为10秒");
        System.out.println(
""size 2" 设置当前线程池大小为2");
        System.out.println(
""max 3" 设置线程池最大线程数为3");
       
System.out.println();

       
        final ThreadPool pool = new ThreadPool(3, 2);
        pool.init();
        
        Thread cmdThread 
= new Thread()...{
            
public void run()...{
                
                BufferedReader reader 
= new BufferedReader(new InputStreamReader(System.in));
                
                
while(true)...{
                    
try ...{
                        String line = reader.readLine();
                        String words[] 
= line.split(" ");
                        
if(words[0].equalsIgnoreCase("quit"))...{
                            System.exit(0);
                        }
else 
if(words[0].equalsIgnoreCase("size"&& words.length >= 2)...{
                            
try...{
                                
int size = Integer.parseInt(words[1]);
                                pool.setPoolSize(size);
                            }
catch
(Exception e)...{
                            }

                        }
else 
if(words[0].equalsIgnoreCase("max"&& words.length >= 2)...{
                            
try...{
                                
int max = Integer.parseInt(words[1]);
                                pool.setMaxPoolSize(max);
                            }
catch
(Exception e)...{
                            }

                        }
else 
if(words[0].equalsIgnoreCase("task"&& words.length >= 3)...{
                            
try...{
                                
int timelen = Integer.parseInt(words[2]);
                                SimpleTask task 
= new SimpleTask(words[1], timelen * 1000);
                                pool.processTask(task);
                            }
catch
(Exception e)...{
                            }

                        }

                        
                    }
 catch
 (IOException e) ...{
                        e.printStackTrace();
                    }

                }

            }

        }
;
        
        cmdThread.start();
        
/**//*
        for(int i=0; i<10; i++){
            SimpleTask task = new SimpleTask("Task" + i, (i+10)*1000);
            pool.processTask(task);
        }
*/

    }


}


class SimpleTask 
implements ThreadTask...{
    
    private String taskName;
    
private int timeLen;
    
    
public SimpleTask(String taskName, int timeLen)...{
        
this.taskName = taskName;
        
this.timeLen = timeLen;
    }

    
    public 
void run() ...{
        System.out.println(Thread.currentThread().getId() +
                "
: START TASK "" + taskName + """);
        
try ...{
            Thread.sleep(timeLen);
        }
 
catch (InterruptedException e) ...{
        }

        
        System.out.println(Thread.currentThread().getId() 
+
                ": END TASK "
" + taskName + """);
    }

    
}



使用此线程池相当简单,下面两行代码初始化线程池:

posted on 2007-04-12 23:33 crazy zerlot 阅读(469) 评论(0)  编辑  收藏 所属分类: J2SE & J2EE ABC


只有注册用户登录后才能发表评论。


网站导航: