发现网上找到的许多NIO的用例在跑起来后都有许多问题,最常见的就是没有对interest event进行合理的registry和unregistry,导致程序一直在loopling,又或者当客户端或服务器端连接断开时有死循环的迹象。忍不住自己做了一个demo,我想可以作为一个NIO应用的模板去修改,只要把doRead,doWrite之类的用基于线程的Handler去处理,那就基本可以满足需求了。
这个Demo的目的是在Client和Server间建立类似QQ聊天那样的功能,让客户端和服务器端都支持用户输入和异步消息显示(因为服务器端要支持用户的console输入,所以不要用多个客户端进行连接,否则可能会出现难以预测的问题)。
代码中用红色显示的地方是我认为需要注意的地方,说老实话NIO虽然很强大,但完全用Non-Blocking来编程,有许多需要小心的地方,一不小心还可能造成死循环。就像ReentrantLock之于Synchronized,如果基本的IO能满足需求,就不必强求应用NIO。
注意:OP_WRITE应该是在写入准备就绪的时候才添加到SelectionKey里面去,而且在写入完成后一定要去除,否则selector.select()方法就不会被blocking而造成死循环。
MyNioServer.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class MyNioServer {
private int BUFFERSIZE = 1024*10;
private String CHARSET = "GBK";
private Selector sel;
public MyNioServer(int port) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(
new InetSocketAddress(InetAddress.getLocalHost(), port));
sel = Selector.open();
ssc.register(sel, SelectionKey.OP_ACCEPT);
}
public void startup() {
System.out.println("Server start...");
try {
while (!Thread.interrupted()) {
int keysCount = sel.select();
System.out.println("Catched " + keysCount + " SelectionKeys");
if (keysCount < 1) {
continue;
}
Set<SelectionKey> set = sel.selectedKeys();
Iterator<SelectionKey> it = set.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
System.out.println("Key isAcceptable");
doAccept(key);
}
if (key.isValid() && key.isReadable()) {
System.out.println("Key isReadable");
doRead(key);
}
if (key.isValid() && key.isWritable()) {
System.out.println("Key isWritable");
doWrite(key);
}
}
set.clear();
}
System.err.println("Program is interrupted.");
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Server stop...");
shutdown();
}
public void shutdown(){
Set<SelectionKey> keys = sel.keys();
for(SelectionKey key:keys){
try {
key.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
sel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doAccept(SelectionKey key) {
try {
SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
sc.configureBlocking(false);
SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ);
newkey.attach(new LinkedList<ByteBuffer>());
new Thread(new UserInteractive(newkey)).start();
} catch (IOException e) {
e.printStackTrace();
System.err.println("Failed to accept new client.");
}
System.out.println("end doAccept");
}
// TODO buffersize performance testing
private void doRead(SelectionKey key) {
try {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
StringBuffer sb = new StringBuffer();
int count = 0;
while ( (count = sc.read(bb)) > 0) {
bb.flip();
sb.append(Charset.forName(CHARSET).decode(bb));
bb.flip();
}
//if client disconnected, read return -1
if(count == -1){
System.out.println("client disconnected");
disconnect(key);
} else {
System.out.println("message received from client:" + sb.toString());
}
} catch (IOException e) {
disconnect(key);
e.printStackTrace();
}
System.out.println("end doRead");
}
private void doWrite(SelectionKey key) {
SocketChannel sc = (SocketChannel) key.channel();
LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
.attachment();
ByteBuffer bb = outseq.poll();
if(bb == null){
return;
}
try {
while(bb.hasRemaining()){
sc.write(bb);
}
} catch (IOException e) {
disconnect(key);
e.printStackTrace();
}
if (outseq.size() == 0) {
System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
key.interestOps(SelectionKey.OP_READ);
}
System.out.println("end doWrote");
}
private void disconnect(SelectionKey key) {
try {
key.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
//TODO find out how to shutdown
private class UserInteractive implements Runnable {
SelectionKey key;
public UserInteractive(SelectionKey key) {
this.key = key;
}
public void run() {
System.out.println("UserInteractive thread start...");
BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
while (true) {
try {
String inputLine = br.readLine();
ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
bb = ByteBuffer.wrap(inputLine.getBytes());
((LinkedList<ByteBuffer>) key.attachment()).offer(bb);
System.out
.println("after input, register OP_WRITE to interestOps and wakeup selector");
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
key.selector().wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
try {
MyNioServer server = new MyNioServer(10001);
server.startup();
} catch (Exception e) {
e.printStackTrace();
System.err.println("Exception caught, program exiting…");
}
}
}
|
MyNioClient.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.text.MessageFormat;
import java.util.LinkedList;
import java.util.Set;
import java.util.Iterator;
public class MyNioClient {
private int BUFFERSIZE = 1024*10;
private String CHARSET = "GBK";
private Selector sel;
public MyNioClient(int port) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false); // this operation need to be executed before socket.connnect, for OP_CONNECT event
sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
sel = Selector.open();
sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ);
}
public void startup() {
System.out.println("Client start...");
try {
while (!Thread.interrupted()) {
int keysCount = sel.select();
System.out.println("Catched " + keysCount + " SelectionKeys");
if (keysCount < 1) {
continue;
}
Set<SelectionKey> selectedKeys = sel.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
//printKeyInfo(key);
if (key.isConnectable()) {
System.out.println("Key isConnectable");
doConnect(key);
} else if (key.isValid() && key.isReadable()) {
System.out.println("Key isReadable");
doRead(key);
} else if (key.isValid() && key.isWritable()) {
System.out.println("Key isWritable");
doWrite(key);
}
}
selectedKeys.clear();
}
System.err.println("Program is interrupted.");
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Client stop...");
shutdown();
}
public void shutdown(){
Set<SelectionKey> keys = sel.keys();
for(SelectionKey key:keys){
try {
key.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
sel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void printKeyInfo(SelectionKey key) {
String keyStr = MessageFormat
.format(
"IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}",
key.interestOps(), key.readyOps(), key.isValid(), key
.isAcceptable(), key.isConnectable(), key
.isReadable(), key.isWritable());
System.out.println(keyStr);
}
private void doConnect(SelectionKey key) {
try {
boolean flag = ((SocketChannel) key.channel()).finishConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
System.out.println("unregister OP_CONNECT from interestOps");
key.interestOps(SelectionKey.OP_READ);
key.attach(new LinkedList<ByteBuffer>());
new Thread(new UserInteractive(key)).start();
}
private void doRead(SelectionKey key) {
try {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
StringBuffer sb = new StringBuffer();
while (sc.read(bb) > 0) {
bb.flip();
sb.append(Charset.forName(CHARSET).decode(bb));
bb.flip();
}
System.out.println("message received from server:" + sb.toString());
} catch (IOException e) {
e.printStackTrace();
disconnect(key);
System.exit(1);
}
System.out.println("now end readMessage");
}
private void doWrite(SelectionKey key) {
SocketChannel sc = (SocketChannel) key.channel();
LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
.attachment();
ByteBuffer bb = outseq.poll();
if(bb == null){
return;
}
try {
while(bb.hasRemaining()){
sc.write(bb);
}
} catch (IOException e) {
disconnect(key);
e.printStackTrace();
}
if (outseq.size() == 0) {
System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
key.interestOps(SelectionKey.OP_READ);
}
System.out.println("end doWrote");
}
private void disconnect(SelectionKey key) {
try {
key.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
private class UserInteractive implements Runnable {
SelectionKey key;
public UserInteractive(SelectionKey key) {
this.key = key;
}
public void run() {
LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
.attachment();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
String inputLine = br.readLine();
if ("quit".equalsIgnoreCase(inputLine)) {
key.channel().close();
System.exit(1);
break;
}
ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
bb = ByteBuffer.wrap(inputLine.getBytes());
outseq.offer(bb);
System.out
.println("after input, register OP_WRITE to interestOps and wakeup selector");
key.interestOps(SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
sel.wakeup();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
try {
MyNioClient client = new MyNioClient(10001);
client.startup();
} catch (Exception e) {
e.printStackTrace();
System.err.println("Exception caught, program exiting...");
}
}
}
|