badqiu

XPer
随笔 - 46, 文章 - 3, 评论 - 195, 引用 - 0
数据加载中……

rapid-framework工具类介绍一: 异步IO类

在一些特殊的场合,我们可能需要使用异步的IO来大幅提高性能.
如日志信息收集.

而rapid-framework提供的异步IO类,使用生产者/消费者的多线程同步模式及Decorator模式,如同使用正常的IO一样,只需套多一层AsyncWriter/AsyncOutputStream,即可将普通IO转换为异步IO来使用.打开一个异步IO后,将会在后台开启一个异步的线程来写数据.

异步的Writer使用:

1 BufferedWriter writer  =   new  BufferedWriter( new  AsyncWriter( new  FileWriter( " c:/debug.log " )));   
2 writer.write( " xxxxx " );  



异步的OutputStream使用:

1 BufferedOutputStream output  =   new  BufferedOutputStream( new  AsyncOutputStream( new  FileOutputStream( " c:/debug.log " )));   
2 output.write( " foo " .getBytes());  

 

在output使用完确保output被close,因为在close时,会强制异步线程将数据全部写入最终的targetOutput. 而调用flush()方法则是空操作,不会写数据.

异步IO使用tip(1):

可以将BufferedWriter/BufferedOutputStream的缓冲区加大,以减少写入次数.

异步IO使用tip(2):

在close异步IO时也放在一个单独的线程中,因为在实际应用场景中,close异步IO可能是十分耗时的操作.

AsyncWriter源码:

  1 public class AsyncWriter extends Writer {
  2 
  3     private static Log log = LogFactory.getLog(AsyncWriter.class);
  4     
  5     private static final int DEFAULT_QUEUE_CAPACITY = 50000;
  6     private final static char[] CLOSED_SIGNEL = new char[0];
  7     
  8     private Writer out;
  9     private DataProcessorThread dataProcessor;
 10     private boolean isClosed = false;
 11     private BlockingQueue<char[]> queue ;
 12     
 13     private AsyncExceptinHandler asyncExceptinHandler = new DefaultAsyncExceptinHandler();
 14     private static long threadSeqNumber;
 15     private static synchronized long nextThreadID() {
 16         return ++threadSeqNumber;
 17     }
 18     
 19     private class DataProcessorThread extends Thread {
 20         
 21         private boolean enabled = true;
 22         private boolean hasRuned = false;
 23         DataProcessorThread() {
 24             super("AsyncWriter.DataProcessorThread-"+nextThreadID());
 25             setDaemon(true);
 26         }
 27 
 28         public void run() {
 29             hasRuned = true;
 30             while (this.enabled || !queue.isEmpty()) {
 31                 
 32                 char[] buf;
 33                 try {
 34                     buf = queue.take();
 35                 } catch (InterruptedException e) {
 36 //                    e.printStackTrace();
 37                     continue;
 38                 }
 39                 
 40                 if(buf == CLOSED_SIGNEL) {
 41                     return;
 42                 }
 43                 
 44                 try {
 45                     out.write(buf);
 46                 } catch (IOException e) {
 47                      asyncExceptinHandler.handle(e);
 48                 }
 49             }
 50         }
 51     }
 52 
 53     public AsyncWriter(Writer out) {
 54         this(out,DEFAULT_QUEUE_CAPACITY,Thread.NORM_PRIORITY + 1);
 55     }
 56     
 57     public AsyncWriter(Writer out,int queueCapacity) {
 58         this(out,queueCapacity,Thread.NORM_PRIORITY + 1);
 59     }
 60     
 61     public AsyncWriter(Writer out,int queueCapacity,int dataProcesserThreadPriority) {
 62         this(out,new ArrayBlockingQueue(queueCapacity),dataProcesserThreadPriority);
 63     }
 64     
 65     public AsyncWriter(Writer out,BlockingQueue queue,int dataProcesserThreadPriority) {
 66         if(out == nullthrow new NullPointerException();
 67         if(queue == nullthrow new NullPointerException();
 68         
 69         this.queue = queue;
 70         this.dataProcessor = new DataProcessorThread();
 71         if(dataProcesserThreadPriority != Thread.NORM_PRIORITY) {
 72             this.dataProcessor.setPriority(dataProcesserThreadPriority);
 73         }
 74         this.dataProcessor.start();
 75         this.out = out;
 76     }
 77     
 78     public AsyncWriter(Writer out,AsyncExceptinHandler handler) {
 79         this(out);
 80         setAsyncExceptinHandler(handler);
 81     }
 82 
 83     public void write(char[] buf, int offset, int length) throws IOException {
 84         synchronized (lock) {
 85             if(isClosed) throw new IOException("already closed");
 86             try {
 87                 queue.put(BufferCopyUtils.copyBuffer(buf, offset, length));
 88             } catch (InterruptedException e) {
 89                 throw new IOException("AsyncWriter occer error",e);
 90             }
 91         }
 92     }
 93 
 94     public void close() throws IOException {
 95         synchronized (lock) {
 96             try {
 97                 isClosed = true;
 98                 dataProcessor.enabled = false;
 99                 if(queue.isEmpty()) {
100                     queue.offer(CLOSED_SIGNEL);
101                 }
102                 
103                 try {
104                     dataProcessor.join();
105                 } catch (InterruptedException e) {
106                     //ignore
107                 }
108                 
109                 if(!dataProcessor.hasRuned) {
110                     dataProcessor.run();
111                 }
112             }finally {
113                 out.close();
114             }
115         }
116     }
117     
118     public void flush() throws IOException {
119     }
120 
121     protected void finalize() throws Throwable {
122         super.finalize();
123         if(!isClosed) {
124             log.warn("AsyncWriter not close:"+this);
125             close();
126         }
127     }
128 
129     public void setAsyncExceptinHandler(AsyncExceptinHandler asyncExceptinHandler) {
130         if(asyncExceptinHandler == nullthrow new NullPointerException();
131         this.asyncExceptinHandler = asyncExceptinHandler;
132     }
133 
134 }

 

 

rapid-framework网站:
http://code.google.com/p/rapid-framework

 

在线javadoc:
http://www.rapid-framework.org.cn/rapid-javadoc-v2.0.x/

 

posted on 2009-05-08 01:22 badqiu 阅读(1497) 评论(0)  编辑  收藏


只有注册用户登录后才能发表评论。


网站导航: