Reactor 模式的 JAVA NIO 多线程服务器,这是比较完善的一版了。Java 的 NIO 网络模型实在是不好用,还是使用现成的好。
 public class NIOServer implements Runnable
public class NIOServer implements Runnable 


 {
{
 private static final Log log = LogFactory.getLog(NIOServer.class);
    private static final Log log = LogFactory.getLog(NIOServer.class);

 private ExecutorService executor=null;
    private ExecutorService executor=null;
 private final Selector sel;
    private final Selector sel;
 private final ServerSocketChannel ssc;
    private final ServerSocketChannel ssc;
 private HandleUtil ho;
    private HandleUtil ho;
 
    
 public NIOServer(int portnumber,HandleUtil ho) throws IOException
    public NIOServer(int portnumber,HandleUtil ho) throws IOException

 
     {
{
 this.ho=ho;
        this.ho=ho;
 sel = Selector.open();
        sel = Selector.open();
 ssc = ServerSocketChannel.open();
        ssc = ServerSocketChannel.open();
 ssc.socket().bind(new InetSocketAddress(portnumber));
        ssc.socket().bind(new InetSocketAddress(portnumber));
 ssc.configureBlocking(false);
        ssc.configureBlocking(false);
 ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
        ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
 }
    }
 
    
 public NIOServer(int portnumber,HandleUtil ho,ExecutorService executor) throws IOException
    public NIOServer(int portnumber,HandleUtil ho,ExecutorService executor) throws IOException

 
     {
{
 this.ho=ho;
        this.ho=ho;
 this.executor=executor;
        this.executor=executor;
 sel = Selector.open();
        sel = Selector.open();
 ssc = ServerSocketChannel.open();
        ssc = ServerSocketChannel.open();
 ssc.socket().bind(new InetSocketAddress(portnumber));
        ssc.socket().bind(new InetSocketAddress(portnumber));
 ssc.configureBlocking(false);
        ssc.configureBlocking(false);
 ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
        ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
 }
    }
 
    
 @Override
    @Override
 public void run()
    public void run()

 
     {
{
 try
        try

 
         {
{
 while(sel.isOpen())
            while(sel.isOpen())

 
             {
{
 int nKeys=sel.select(100);
                int nKeys=sel.select(100);
 if(nKeys==0)
                if(nKeys==0)
 Thread.sleep(100);
                    Thread.sleep(100);
 else if(nKeys>0)
                else if(nKeys>0)

 
                 {
{
 Iterator<SelectionKey> it = sel.selectedKeys().iterator();
                    Iterator<SelectionKey> it = sel.selectedKeys().iterator();
 while (it.hasNext())
                    while (it.hasNext()) 

 
                     {
{
 SelectionKey sk = it.next();
                        SelectionKey sk = it.next();
 it.remove();
                        it.remove();
 if(sk.isAcceptable()||sk.isReadable()||sk.isWritable())
                        if(sk.isAcceptable()||sk.isReadable()||sk.isWritable())

 
                         {
{
 Runnable r = (Runnable)sk.attachment();
                            Runnable r = (Runnable)sk.attachment();
 r.run();
                            r.run();
 }
                        }
 }
                    }
 }
                }
 }
            }
 }
        }

 catch(IOException | InterruptedException e)
        catch(IOException | InterruptedException e)         { log.info(ExceptionUtil.getExceptionMessage(e));    }
{ log.info(ExceptionUtil.getExceptionMessage(e));    }
 }
    }
 
    
 class Acceptor implements Runnable
    class Acceptor implements Runnable 

 
     {
{
 @Override
        @Override
 public void run()
        public void run() 

 
         {
{
 try
            try

 
             {
{
 SocketChannel sc = ssc.accept();
                SocketChannel sc = ssc.accept();
 if (sc != null)
                if (sc != null)

 
                 {
{
 sc.configureBlocking(false);
                    sc.configureBlocking(false);
 sc.socket().setTcpNoDelay(true);
                    sc.socket().setTcpNoDelay(true);
 sc.socket().setSoLinger(false, -1);
                    sc.socket().setSoLinger(false, -1);
 SelectionKey sk=sc.register(sel, SelectionKey.OP_READ);
                    SelectionKey sk=sc.register(sel, SelectionKey.OP_READ);
 sk.attach(new Reader(sk));
                    sk.attach(new Reader(sk));
 sel.wakeup();
                    sel.wakeup();
 }
                }
 }
            }

 catch(IOException e)
            catch(IOException e)  { log.info(ExceptionUtil.getExceptionMessage(e)); }
{ log.info(ExceptionUtil.getExceptionMessage(e)); }
 }
        }
 }
    }
 
    
 class Reader implements Runnable
    class Reader implements Runnable 

 
     {
{
 private byte[] bytes=new byte[0];
        private byte[] bytes=new byte[0];
 private SelectionKey sk;
        private SelectionKey sk;
 
        
 public Reader(SelectionKey sk)
        public Reader(SelectionKey sk)

 
         {
{
 this.sk=sk;
            this.sk=sk;
 }
        }
 
        
 @Override
        @Override
 public void run()
        public void run()

 
         {
{
 try
            try

 
             {
{
 SocketChannel sc = (SocketChannel) sk.channel();
                SocketChannel sc = (SocketChannel) sk.channel();
 Handle handle=null;
                Handle handle=null;
 if(ho.getParameterTypes()==null)
                if(ho.getParameterTypes()==null)
 handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname());
                    handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname());
 else
                else
 handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname(), ho.getParameterTypes(), ho.getParameters());
                    handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname(), ho.getParameterTypes(), ho.getParameters());
 handle.setSocketChannel(sc);
                handle.setSocketChannel(sc);
 ByteBuffer buffer=ByteBuffer.allocate(1024);
                ByteBuffer buffer=ByteBuffer.allocate(1024);
 int len=-1;
                int len=-1;
 while(sc.isConnected() && (len=sc.read(buffer))>0)
                while(sc.isConnected() && (len=sc.read(buffer))>0)

 
                 {
{
 buffer.flip();
                    buffer.flip();
 byte [] content = new byte[buffer.limit()];
                      byte [] content = new byte[buffer.limit()];
 buffer.get(content);
                    buffer.get(content);
 bytes=StringUtil.arrayCoalition(bytes,content);
                    bytes=StringUtil.arrayCoalition(bytes,content);
 buffer.clear();
                    buffer.clear();
 }
                }
 if(len==0)
                if(len==0)

 
                 {
{
 if(executor==null)
                    if(executor==null)

 
                     {
{
 byte[] bb=handle.execute(bytes);
                        byte[] bb=handle.execute(bytes);
 sk.interestOps(SelectionKey.OP_WRITE);
                        sk.interestOps(SelectionKey.OP_WRITE);
 sk.attach(new Writer(sk,ByteBuffer.wrap(bb)));
                        sk.attach(new Writer(sk,ByteBuffer.wrap(bb)));
 sk.selector().wakeup();
                        sk.selector().wakeup();
 }
                    }
 else
                    else

 
                     {
{
 handle.setData(bytes);
                        handle.setData(bytes);
 Future<byte[]> future=executor.submit(handle);
                        Future<byte[]> future=executor.submit(handle);
 sk.interestOps(SelectionKey.OP_WRITE);
                        sk.interestOps(SelectionKey.OP_WRITE);
 sk.attach(new Writer(sk,future));
                        sk.attach(new Writer(sk,future));
 sk.selector().wakeup();
                        sk.selector().wakeup();
 }
                    }
 }
                }
 else if(len==-1)
                else if(len==-1)

 
                 {
{
 sk.cancel();
                    sk.cancel();
 sk.selector().selectNow();
                    sk.selector().selectNow();
 sc.close();
                    sc.close();
 }
                }
 }
            }
 catch(Exception e)
            catch(Exception e)

 
             {
{
 sk.cancel();
                sk.cancel();
 log.info(ExceptionUtil.getExceptionMessage(e));
                log.info(ExceptionUtil.getExceptionMessage(e));
 }
            }
 }
        }
 }
    }
 
    
 public class Writer implements Runnable
    public class Writer implements Runnable 

 
     {
{
 private SelectionKey sk;
        private SelectionKey sk;
 private ByteBuffer output;
        private ByteBuffer output;
 
        
 public Writer(SelectionKey sk,ByteBuffer output)
        public Writer(SelectionKey sk,ByteBuffer output)

 
         {
{
 this.sk=sk;
            this.sk=sk;
 this.output=output;
            this.output=output;
 }
        }
 
        
 public Writer(SelectionKey sk,Future<byte[]> future) throws InterruptedException, ExecutionException
        public Writer(SelectionKey sk,Future<byte[]> future) throws InterruptedException, ExecutionException

 
         {
{
 this.sk=sk;
            this.sk=sk;
 this.output=ByteBuffer.wrap(future.get());
            this.output=ByteBuffer.wrap(future.get());
 }
        }
 
        
 @Override
        @Override
 public void run()
        public void run()

 
         {
{
 SocketChannel sc = (SocketChannel) sk.channel();
            SocketChannel sc = (SocketChannel) sk.channel();
 try
            try

 
             {
{
 while(sc.isConnected() && output.hasRemaining())
                while(sc.isConnected() && output.hasRemaining())

 
                 {
{
 int len=sc.write(output);
                    int len=sc.write(output);
 if(len<0)
                    if(len<0)
 throw new EOFException();
                        throw new EOFException();
 else if(len==-1)
                    else if(len==-1)

 
                     {
{
 sk.cancel();
                        sk.cancel();
 sk.selector().selectNow();
                        sk.selector().selectNow();
 sc.close();
                        sc.close();
 }
                    }
 }
                }
 if(!output.hasRemaining())
                if(!output.hasRemaining())

 
                 {
{
 output.clear();
                    output.clear();
 sk.interestOps(SelectionKey.OP_READ);
                    sk.interestOps(SelectionKey.OP_READ);
 sk.attach(new Reader(sk));
                    sk.attach(new Reader(sk));
 sk.selector().wakeup();
                    sk.selector().wakeup();
 }
                }
 }
            }
 catch(Exception e)
            catch(Exception e)

 
             {
{
 sk.cancel();
                sk.cancel();
 log.info(ExceptionUtil.getExceptionMessage(e));
                log.info(ExceptionUtil.getExceptionMessage(e));
 }
            }
 }
        }
 }
    }
 
    
 public void send(SocketChannel sc,byte[] bytes) throws ClosedChannelException
    public void send(SocketChannel sc,byte[] bytes) throws ClosedChannelException

 
     {
{
 SelectionKey sk=sc.register(sel, SelectionKey.OP_WRITE);
        SelectionKey sk=sc.register(sel, SelectionKey.OP_WRITE);
 sk.attach(new Writer(sk,ByteBuffer.wrap(bytes)));
        sk.attach(new Writer(sk,ByteBuffer.wrap(bytes)));
 sel.wakeup();
        sel.wakeup();
 }
    }
 }
}posted on 2013-05-14 16:31 
nianzai 阅读(2743) 
评论(1)  编辑  收藏  所属分类: 
NIO