Dict.CN 在线词典, 英语学习, 在线翻译

都市淘沙者

荔枝FM Everyone can be host

统计

留言簿(23)

积分与排名

优秀学习网站

友情连接

阅读排行榜

评论排行榜

用NIO实现的一个Chat Demo [转]

发现网上找到的许多NIO的用例在跑起来后都有许多问题,最常见的就是没有对interest event进行合理的registry和unregistry,导致程序一直在loopling,又或者当客户端或服务器端连接断开时有死循环的迹象。忍不住自己做了一个demo,我想可以作为一个NIO应用的模板去修改,只要把doRead,doWrite之类的用基于线程的Handler去处理,那就基本可以满足需求了。
这个Demo的目的是在Client和Server间建立类似QQ聊天那样的功能,让客户端和服务器端都支持用户输入和异步消息显示(因为服务器端要支持用户的console输入,所以不要用多个客户端进行连接,否则可能会出现难以预测的问题)。
代码中用红色显示的地方是我认为需要注意的地方,说老实话NIO虽然很强大,但完全用Non-Blocking来编程,有许多需要小心的地方,一不小心还可能造成死循环。就像ReentrantLock之于Synchronized,如果基本的IO能满足需求,就不必强求应用NIO。
注意:OP_WRITE应该是在写入准备就绪的时候才添加到SelectionKey里面去,而且在写入完成后一定要去除,否则selector.select()方法就不会被blocking而造成死循环。

MyNioServer.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

public class MyNioServer {

    private int BUFFERSIZE = 1024*10;
    private String CHARSET = "GBK";
    private Selector sel;

    public MyNioServer(int port) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.socket().bind(
                new InetSocketAddress(InetAddress.getLocalHost(), port));
        sel = Selector.open();
        ssc.register(sel, SelectionKey.OP_ACCEPT);
    }

    public void startup() {
        System.out.println("Server start...");
        try {
            while (!Thread.interrupted()) {
                int keysCount = sel.select();
                System.out.println("Catched " + keysCount + " SelectionKeys");
                if (keysCount < 1) {
                    continue;
                }
                Set<SelectionKey> set = sel.selectedKeys();
                Iterator<SelectionKey> it = set.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    if (key.isAcceptable()) {
                        System.out.println("Key isAcceptable");
                        doAccept(key);
                    }
                    if (key.isValid() && key.isReadable()) {
                        System.out.println("Key isReadable");
                        doRead(key);
                    }
                    if (key.isValid() && key.isWritable()) {
                        System.out.println("Key isWritable");
                        doWrite(key);
                    }
                }
                set.clear();
            }
            System.err.println("Program is interrupted.");
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("Server stop...");
        shutdown();
    }
   
    public void shutdown(){
        Set<SelectionKey> keys = sel.keys();
        for(SelectionKey key:keys){
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            sel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doAccept(SelectionKey key) {
        try {
            SocketChannel sc = ((ServerSocketChannel) key.channel()).accept();
            sc.configureBlocking(false);
            SelectionKey newkey = sc.register(sel, SelectionKey.OP_READ);
            newkey.attach(new LinkedList<ByteBuffer>());
            new Thread(new UserInteractive(newkey)).start();
        } catch (IOException e) {
            e.printStackTrace();
            System.err.println("Failed to accept new client.");
        }
        System.out.println("end doAccept");
    }

    // TODO buffersize performance testing
    private void doRead(SelectionKey key) {
        try {
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
            StringBuffer sb = new StringBuffer();
            int count = 0;
            while ( (count = sc.read(bb)) > 0) {
                bb.flip();
                sb.append(Charset.forName(CHARSET).decode(bb));
                bb.flip();
            }
            //if client disconnected, read return -1
            if(count == -1){
                System.out.println("client disconnected");
                disconnect(key);   
            } else {
                System.out.println("message received from client:" + sb.toString());
            }
        } catch (IOException e) {
            disconnect(key);
            e.printStackTrace();
        }
        System.out.println("end doRead");
    }

    private void doWrite(SelectionKey key) {
        SocketChannel sc = (SocketChannel) key.channel();
        LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                .attachment();
        ByteBuffer bb = outseq.poll();
        if(bb == null){
            return;
        }
        try {
            while(bb.hasRemaining()){
                sc.write(bb);
            }           
        } catch (IOException e) {
            disconnect(key);
            e.printStackTrace();
        }
        if (outseq.size() == 0) {
            System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
            key.interestOps(SelectionKey.OP_READ);
        }
        System.out.println("end doWrote");
    }

    private void disconnect(SelectionKey key) {
        try {
            key.channel().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //TODO find out how to shutdown
    private class UserInteractive implements Runnable {

        SelectionKey key;

        public UserInteractive(SelectionKey key) {
            this.key = key;
        }

        public void run() {
            System.out.println("UserInteractive thread start...");
            BufferedReader br = new BufferedReader(new InputStreamReader(
                    System.in));
            while (true) {
                try {
                    String inputLine = br.readLine();
                    ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                    bb = ByteBuffer.wrap(inputLine.getBytes());
                    ((LinkedList<ByteBuffer>) key.attachment()).offer(bb);
                    System.out
                            .println("after input, register OP_WRITE to interestOps and wakeup selector");
                    key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    key.selector().wakeup();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            MyNioServer server = new MyNioServer(10001);
            server.startup();
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Exception caught, program exiting…");
        }
    }
}


MyNioClient.java

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.text.MessageFormat;
import java.util.LinkedList;
import java.util.Set;
import java.util.Iterator;

public class MyNioClient {

    private int BUFFERSIZE = 1024*10;
    private String CHARSET = "GBK";
    private Selector sel;

    public MyNioClient(int port) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);    // this operation need to be executed before socket.connnect, for OP_CONNECT event
        sc.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
        sel = Selector.open();
        sc.register(sel, SelectionKey.OP_CONNECT |SelectionKey.OP_READ);
    }

    public void startup() {
        System.out.println("Client start...");
        try {
            while (!Thread.interrupted()) {
                int keysCount = sel.select();
                System.out.println("Catched " + keysCount + " SelectionKeys");
                if (keysCount < 1) {
                    continue;
                }               
                Set<SelectionKey> selectedKeys = sel.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    //printKeyInfo(key);
                    if (key.isConnectable()) {
                        System.out.println("Key isConnectable");
                        doConnect(key);
                    } else if (key.isValid() && key.isReadable()) {
                        System.out.println("Key isReadable");
                        doRead(key);
                    } else if (key.isValid() && key.isWritable()) {
                        System.out.println("Key isWritable");
                        doWrite(key);
                    }
                }
                selectedKeys.clear();
            }
            System.err.println("Program is interrupted.");
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("Client stop...");
        shutdown();
    }
   
    public void shutdown(){
        Set<SelectionKey> keys = sel.keys();
        for(SelectionKey key:keys){
            try {
                key.channel().close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try {
            sel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void printKeyInfo(SelectionKey key) {
        String keyStr = MessageFormat
                .format(
                        "IntOps:{0},ReadyOps:{1},isVal:{2},isAcc:{3},isCnn:{4},isRead:{5},isWrite:{6}",
                        key.interestOps(), key.readyOps(), key.isValid(), key
                                .isAcceptable(), key.isConnectable(), key
                                .isReadable(), key.isWritable());
        System.out.println(keyStr);
    }

    private void doConnect(SelectionKey key) {
        try {
            boolean flag = ((SocketChannel) key.channel()).finishConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        System.out.println("unregister OP_CONNECT from interestOps");
        key.interestOps(SelectionKey.OP_READ);
        key.attach(new LinkedList<ByteBuffer>());
        new Thread(new UserInteractive(key)).start();
    }

    private void doRead(SelectionKey key) {
        try {
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
            StringBuffer sb = new StringBuffer();
            while (sc.read(bb) > 0) {
                bb.flip();
                sb.append(Charset.forName(CHARSET).decode(bb));
                bb.flip();
            }
            System.out.println("message received from server:" + sb.toString());
        } catch (IOException e) {
            e.printStackTrace();
            disconnect(key);
            System.exit(1);
        }
        System.out.println("now end readMessage");
    }

    private void doWrite(SelectionKey key) {
        SocketChannel sc = (SocketChannel) key.channel();
        LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                .attachment();
        ByteBuffer bb = outseq.poll();
        if(bb == null){
            return;
        }
        try {
            while(bb.hasRemaining()){
                sc.write(bb);
            }           
        } catch (IOException e) {
            disconnect(key);
            e.printStackTrace();
        }
        if (outseq.size() == 0) {
            System.out.println("after all buffers wrote, unregister OP_WRITE from interestOps");
            key.interestOps(SelectionKey.OP_READ);
        }
        System.out.println("end doWrote");
    }

    private void disconnect(SelectionKey key) {
        try {
            key.channel().close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private class UserInteractive implements Runnable {

        SelectionKey key;

        public UserInteractive(SelectionKey key) {
            this.key = key;
        }

        public void run() {
            LinkedList<ByteBuffer> outseq = (LinkedList<ByteBuffer>) key
                    .attachment();
             BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                try {
                    String inputLine = br.readLine();
                    if ("quit".equalsIgnoreCase(inputLine)) {
                        key.channel().close();
                        System.exit(1);
                        break;
                    }
                    ByteBuffer bb = ByteBuffer.allocate(BUFFERSIZE);
                    bb = ByteBuffer.wrap(inputLine.getBytes());
                    outseq.offer(bb);
                    System.out
                    .println("after input, register OP_WRITE to interestOps and wakeup selector");
                    key.interestOps(SelectionKey.OP_READ
                            | SelectionKey.OP_WRITE);
                    sel.wakeup();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            MyNioClient client = new MyNioClient(10001);
            client.startup();
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Exception caught, program exiting...");
        }
    }

}

posted on 2010-05-29 12:38 都市淘沙者 阅读(1347) 评论(1)  编辑  收藏 所属分类: 多线程并发编程

评论

# re: 用NIO实现的一个Chat Demo [转][未登录] 2015-04-08 23:36 harry

请教一个问题,为什么UserInteractive里面 SelctionKey.wakeup以后,就成了写就绪模式呢(key.isWritable()是true)  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航: