在一些特殊的场合,我们可能需要使用异步的IO来大幅提高性能.
如日志信息收集.
而rapid-framework提供的异步IO类,使用生产者/消费者的多线程同步模式及Decorator模式,如同使用正常的IO一样,只需套多一层AsyncWriter/AsyncOutputStream,即可将普通IO转换为异步IO来使用.打开一个异步IO后,将会在后台开启一个异步的线程来写数据.
异步的Writer使用:
1
BufferedWriter writer
=
new
BufferedWriter(
new
AsyncWriter(
new
FileWriter(
"
c:/debug.log
"
)));
2
writer.write(
"
xxxxx
"
);
异步的OutputStream使用:
1
BufferedOutputStream output
=
new
BufferedOutputStream(
new
AsyncOutputStream(
new
FileOutputStream(
"
c:/debug.log
"
)));
2
output.write(
"
foo
"
.getBytes());
在output使用完确保output被close,因为在close时,会强制异步线程将数据全部写入最终的targetOutput. 而调用flush()方法则是空操作,不会写数据.
异步IO使用tip(1):
可以将BufferedWriter/BufferedOutputStream的缓冲区加大,以减少写入次数.
异步IO使用tip(2):
在close异步IO时也放在一个单独的线程中,因为在实际应用场景中,close异步IO可能是十分耗时的操作.
AsyncWriter源码:
1 public class AsyncWriter extends Writer {
2
3 private static Log log = LogFactory.getLog(AsyncWriter.class);
4
5 private static final int DEFAULT_QUEUE_CAPACITY = 50000;
6 private final static char[] CLOSED_SIGNEL = new char[0];
7
8 private Writer out;
9 private DataProcessorThread dataProcessor;
10 private boolean isClosed = false;
11 private BlockingQueue<char[]> queue ;
12
13 private AsyncExceptinHandler asyncExceptinHandler = new DefaultAsyncExceptinHandler();
14 private static long threadSeqNumber;
15 private static synchronized long nextThreadID() {
16 return ++threadSeqNumber;
17 }
18
19 private class DataProcessorThread extends Thread {
20
21 private boolean enabled = true;
22 private boolean hasRuned = false;
23 DataProcessorThread() {
24 super("AsyncWriter.DataProcessorThread-"+nextThreadID());
25 setDaemon(true);
26 }
27
28 public void run() {
29 hasRuned = true;
30 while (this.enabled || !queue.isEmpty()) {
31
32 char[] buf;
33 try {
34 buf = queue.take();
35 } catch (InterruptedException e) {
36 // e.printStackTrace();
37 continue;
38 }
39
40 if(buf == CLOSED_SIGNEL) {
41 return;
42 }
43
44 try {
45 out.write(buf);
46 } catch (IOException e) {
47 asyncExceptinHandler.handle(e);
48 }
49 }
50 }
51 }
52
53 public AsyncWriter(Writer out) {
54 this(out,DEFAULT_QUEUE_CAPACITY,Thread.NORM_PRIORITY + 1);
55 }
56
57 public AsyncWriter(Writer out,int queueCapacity) {
58 this(out,queueCapacity,Thread.NORM_PRIORITY + 1);
59 }
60
61 public AsyncWriter(Writer out,int queueCapacity,int dataProcesserThreadPriority) {
62 this(out,new ArrayBlockingQueue(queueCapacity),dataProcesserThreadPriority);
63 }
64
65 public AsyncWriter(Writer out,BlockingQueue queue,int dataProcesserThreadPriority) {
66 if(out == null) throw new NullPointerException();
67 if(queue == null) throw new NullPointerException();
68
69 this.queue = queue;
70 this.dataProcessor = new DataProcessorThread();
71 if(dataProcesserThreadPriority != Thread.NORM_PRIORITY) {
72 this.dataProcessor.setPriority(dataProcesserThreadPriority);
73 }
74 this.dataProcessor.start();
75 this.out = out;
76 }
77
78 public AsyncWriter(Writer out,AsyncExceptinHandler handler) {
79 this(out);
80 setAsyncExceptinHandler(handler);
81 }
82
83 public void write(char[] buf, int offset, int length) throws IOException {
84 synchronized (lock) {
85 if(isClosed) throw new IOException("already closed");
86 try {
87 queue.put(BufferCopyUtils.copyBuffer(buf, offset, length));
88 } catch (InterruptedException e) {
89 throw new IOException("AsyncWriter occer error",e);
90 }
91 }
92 }
93
94 public void close() throws IOException {
95 synchronized (lock) {
96 try {
97 isClosed = true;
98 dataProcessor.enabled = false;
99 if(queue.isEmpty()) {
100 queue.offer(CLOSED_SIGNEL);
101 }
102
103 try {
104 dataProcessor.join();
105 } catch (InterruptedException e) {
106 //ignore
107 }
108
109 if(!dataProcessor.hasRuned) {
110 dataProcessor.run();
111 }
112 }finally {
113 out.close();
114 }
115 }
116 }
117
118 public void flush() throws IOException {
119 }
120
121 protected void finalize() throws Throwable {
122 super.finalize();
123 if(!isClosed) {
124 log.warn("AsyncWriter not close:"+this);
125 close();
126 }
127 }
128
129 public void setAsyncExceptinHandler(AsyncExceptinHandler asyncExceptinHandler) {
130 if(asyncExceptinHandler == null) throw new NullPointerException();
131 this.asyncExceptinHandler = asyncExceptinHandler;
132 }
133
134 }
rapid-framework网站:
http://code.google.com/p/rapid-framework
在线javadoc:
http://www.rapid-framework.org.cn/rapid-javadoc-v2.0.x/