posts - 101,  comments - 29,  trackbacks - 0

最近的工作需要统计一些复杂的报表,为了提高效率,想用多线程去实现,但要在所有线程完成统计任务后,将结果汇总。所以在思考有没有什么办法解决,之所以是“系列一”是因为我想记录下我的思考过程。

1、首先设计一个Executer,负责任务的执行和汇总:

Java代码  收藏代码
  1. public class Executer {  
  2.     //计算已经派发的任务数(条件谓词)  
  3.     public static int THREAD_COUNT = 0;  
  4.     //线程池  
  5.     private Executor pool = null;  
  6.     public Executer() {  
  7.         this(1);  
  8.     }  
  9.     public Executer(int threadPoolSize) {  
  10.         pool = Executors.newFixedThreadPool(threadPoolSize);  
  11.     }  
  12.     /** 
  13.      * 任务派发 
  14.      * @param job 
  15.      */  
  16.     public void fork(Job job){  
  17.         //将任务派发给线程池去执行  
  18.         pool.execute(job);  
  19.         THREAD_COUNT++;  
  20.     }  
  21.     /** 
  22.      * 统计任务结果 
  23.      */  
  24.     public void join(){  
  25.         while(THREAD_COUNT > 0){  
  26.             System.out.println("threadCount: "+THREAD_COUNT);  
  27.             try {  
  28.                 wait();//如果任务没有全部完成,则挂起  
  29.             } catch (Exception e) {}//这里总是抛异常,不知道为什么,好吧!先不管它  
  30.         }  
  31.     }  
  32. }  

 2、写一个抽象的Job类,负责执行具体的任务

Java代码  收藏代码
  1. public abstract class Job implements Runnable {  
  2.   
  3.     @Override  
  4.     public void run() {  
  5.         this.execute();//执行子类具体任务  
  6.         Executer.THREAD_COUNT--;  
  7.         try{  
  8.             notifyAll();//这里总是抛异常,不知道为什么,好吧!先不管它  
  9.         }catch(Exception e){}  
  10.     }  
  11.     /** 
  12.      * 业务处理函数 
  13.      */  
  14.     public abstract void execute();  
  15.   
  16. }  

 

3、测试,先来一个具体的任务实现。

Java代码  收藏代码
  1. public class MyJob extends Job {  
  2.   
  3.     @Override  
  4.     public void execute() {  
  5.         //模拟业务需要处理1秒.  
  6.         try {Thread.sleep(1000);} catch (InterruptedException e) {}  
  7.         System.out.println("running thread id = "+Thread.currentThread().getId());  
  8.     }  
  9.   
  10. }  

 

4、测试。

Java代码  收藏代码
  1. public class Test {  
  2.     public static void main(String[] args) {  
  3.         //初始化任务池  
  4.         Executer exe = new Executer(5);  
  5.         //初始化任务  
  6.         long time = System.currentTimeMillis();  
  7.         for (int i = 0; i < 10; i++) {  
  8.             MyJob job = new MyJob();  
  9.             exe.fork(job);//派发任务  
  10.         }  
  11.         //汇总任务结果  
  12.         exe.join();  
  13.         System.out.println("time: "+(System.currentTimeMillis() - time));  
  14.     }  
  15.   
  16. }  

 

 5、好吧,看一下结果

 

Java代码  收藏代码
  1. threadCount: 10  
  2. ......(表示有N多个)  
  3. threadCount: 10  
  4. running thread id = 8  
  5. running thread id = 9  
  6. running thread id = 11  
  7. running thread id = 10  
  8. running thread id = 12  
  9. threadCount: 5  
  10. ......(表示有N多个)  
  11. threadCount: 5  
  12. running thread id = 9  
  13. running thread id = 10  
  14. running thread id = 12  
  15. running thread id = 8  
  16. running thread id = 11  
  17. threadCount: 3  
  18. time: 2032  

 哈哈,看来是可以了,最后汇总任务的处理时间是2032毫秒,看来是比单个任务顺序执行来的快。但是有几个问题:

1)如果没有catch那个超级Exception的话,就会抛下面的异常:

Java代码  收藏代码
  1. java.lang.IllegalMonitorStateException  
  2.     at java.lang.Object.wait(Native Method)  
  3.     at java.lang.Object.wait(Object.java:485)  
  4.     at com.one.Executer.join(Executer.java:38)  
  5.     at com.test.Test.main(Test.java:21)  

 

2)为啥会打印N多个同样值threadCount呢?

于是和同事(河东)沟通,他说wait要放在synchronized里面才行,好吧,试一下,改进一下Executer和Job

 

Java代码  收藏代码
  1. public class Executer {  
  2.     //计算已经派发的任务数(条件谓词)  
  3.     public static int THREAD_COUNT = 0;  
  4.     //条件队列锁  
  5.     public static final Object LOCK = new Object();  
  6.     //线程池  
  7.     private Executor pool = null;  
  8.     public Executer() {  
  9.         this(1);  
  10.     }  
  11.     public Executer(int threadPoolSize) {  
  12.         pool = Executors.newFixedThreadPool(threadPoolSize);  
  13.     }  
  14.     /** 
  15.      * 任务派发 
  16.      * @param job 
  17.      */  
  18.     public void fork(Job job){  
  19.         //将任务派发给线程池去执行  
  20.         pool.execute(job);  
  21.         //增加线程数  
  22.         synchronized (LOCK) {  
  23.             THREAD_COUNT++;  
  24.         }  
  25.     }  
  26.     /** 
  27.      * 统计任务结果 
  28.      */  
  29.     public void join(){  
  30.         synchronized (LOCK) {  
  31.             while(THREAD_COUNT > 0){  
  32.                 System.out.println("threadCount: "+THREAD_COUNT);  
  33.                 try {  
  34.                     LOCK.wait();//如果任务没有全部完成,则挂起  
  35.                 } catch (InterruptedException e) {  
  36.                     e.printStackTrace();  
  37.                 }  
  38.             }  
  39.         }  
  40.     }  
  41. }  

 

Java代码  收藏代码
  1. public abstract class Job implements Runnable {  
  2.   
  3.     @Override  
  4.     public void run() {  
  5.         this.execute();//执行子类具体任务  
  6.         synchronized (Executer.LOCK) {  
  7.             //处理完业务后,任务结束,递减线程数,同时唤醒主线程  
  8.             Executer.THREAD_COUNT--;  
  9.             Executer.LOCK.notifyAll();  
  10.         }  
  11.     }  
  12.     /** 
  13.      * 业务处理函数 
  14.      */  
  15.     public abstract void execute();  
  16.   
  17. }  

 6、测试一下:

Java代码  收藏代码
  1. threadCount: 10  
  2. running thread id = 8  
  3. running thread id = 11  
  4. running thread id = 9  
  5. threadCount: 7  
  6. running thread id = 10  
  7. threadCount: 6  
  8. running thread id = 12  
  9. threadCount: 5  
  10. running thread id = 11  
  11. running thread id = 12  
  12. running thread id = 10  
  13. threadCount: 2  
  14. running thread id = 9  
  15. running thread id = 8  
  16. threadCount: 1  
  17. time: 2016  

 还真的行,谢谢河东哈!

但是原因是什么呢?回去查了查书《Java并发编程实践》,见附件!

Java代码  收藏代码
  1. 14.2.1节这样说:  
  2.   
  3. 在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。  
  4.   
  5. ...  
  6.   
  7. 由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。  
  8.   
  9. 14.2.4节:  
  10.   
  11. 由于在调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程应该尽快地释放,从而确保正在等待的线程尽可能尽快的解除阻塞。  

 

看来之前是不会用wait和notify,哈哈~!

 

感谢河东,和你交流收获很大!

 

顺便测试一下java多线程情况下,多核CPU的利用率,修改上面的线程池大小和任务数(2个线程处理1000000个任务,去掉MyJob的sleep(这样可以多抢些CPU时间),结果如下:

 

看来window下是可以利用多核的,虽然是一个JVM进程。之前和斯亮讨论的结论是错误的。

posted on 2012-07-15 01:20 mixer-a 阅读(3777) 评论(2)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: