posts - 101,  comments - 29,  trackbacks - 0

增加对结果的处理:

1、修改Job,实现Callable接口

Java代码  收藏代码
  1. public abstract class Job implements Callable<Object> {  
  2.   
  3.     @Override  
  4.     public Object call() throws Exception {  
  5.         Object result = this.execute();//执行子类具体任务  
  6.         synchronized (Executer.LOCK) {  
  7.             //处理完业务后,任务结束,递减线程数,同时唤醒主线程  
  8.             Executer.THREAD_COUNT--;  
  9.             Executer.LOCK.notifyAll();  
  10.         }  
  11.         return result;  
  12.     }  
  13.     /** 
  14.      * 业务处理函数 
  15.      */  
  16.     public abstract Object execute();  
  17.   
  18. }  

 

2、修改Executer,增加对结果的处理

Java代码  收藏代码
  1. public class Executer {  
  2.     //计算已经派发的任务数(条件谓词)  
  3.     public static int THREAD_COUNT = 0;  
  4.     //存储任务的执行结果  
  5.     private List<Future<Object>> futres = new ArrayList<Future<Object>>();   
  6.     //条件队列锁  
  7.     public static final Object LOCK = new Object();  
  8.     //线程池  
  9.     private ExecutorService pool = null;  
  10.     public Executer() {  
  11.         this(1);  
  12.     }  
  13.     public Executer(int threadPoolSize) {  
  14.         pool = Executors.newFixedThreadPool(threadPoolSize);  
  15.     }  
  16.     /** 
  17.      * 任务派发 
  18.      * @param job 
  19.      */  
  20.     public void fork(Job job){  
  21.         //将任务派发给线程池去执行  
  22.         futres.add(pool.submit(job));  
  23.         //增加线程数  
  24.         synchronized (LOCK) {  
  25.             THREAD_COUNT++;  
  26.         }  
  27.     }  
  28.     /** 
  29.      * 统计任务结果 
  30.      */  
  31.     public List<Object> join(){  
  32.         synchronized (LOCK) {  
  33.             while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成  
  34.                 System.out.println("threadCount: "+THREAD_COUNT);  
  35.                 try {  
  36.                     LOCK.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知  
  37.                 } catch (InterruptedException e) {  
  38.                     e.printStackTrace();  
  39.                 }  
  40.             }  
  41.         }  
  42.         List<Object> list = new ArrayList<Object>();  
  43.         //取出每个任务的处理结果,汇总后返回  
  44.         for (Future<Object> future : futres) {  
  45.             try {  
  46.                 Object result = future.get();//因为任务都已经完成,这里直接get  
  47.                 list.add(result);  
  48.             } catch (Exception e) {  
  49.                 e.printStackTrace();  
  50.             }   
  51.         }  
  52.         return list;  
  53.     }  
  54. }  

 

 3、测试:

Java代码  收藏代码
  1. public static void main(String[] args) {  
  2.         //初始化任务池  
  3.         Executer exe = new Executer(5);  
  4.         //初始化任务  
  5.         long time = System.currentTimeMillis();  
  6.         for (int i = 0; i < 10; i++) {  
  7.             MyJob job = new MyJob();  
  8.             exe.fork(job);//派发任务  
  9.         }  
  10.         //汇总任务结果  
  11.         List<Object> list = exe.join();  
  12.         System.out.println("Result: "+list);  
  13.         System.out.println("time: "+(System.currentTimeMillis() - time));  
  14.     }  

 

4、执行结果:

Java代码  收藏代码
  1. threadCount: 10  
  2. running thread id = 9  
  3. running thread id = 11  
  4. running thread id = 8  
  5. running thread id = 10  
  6. running thread id = 12  
  7. threadCount: 5  
  8. running thread id = 9  
  9. running thread id = 8  
  10. running thread id = 11  
  11. running thread id = 12  
  12. running thread id = 10  
  13. Result: [8910111281112910]  
  14. time: 2000  

 

5、附件是完整代码

posted on 2012-07-15 01:21 mixer-a 阅读(1120) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: