随笔-23  评论-58  文章-0  trackbacks-0
Reactor 模式的 JAVA NIO 多线程服务器,这是比较完善的一版了。Java 的 NIO 网络模型实在是不好用,还是使用现成的好。
public class NIOServer implements Runnable 
{
    
private static final Log log = LogFactory.getLog(NIOServer.class);

    
private ExecutorService executor=null;
    
private final Selector sel;
    
private final ServerSocketChannel ssc;
    
private HandleUtil ho;
    
    
public NIOServer(int portnumber,HandleUtil ho) throws IOException
    
{
        
this.ho=ho;
        sel 
= Selector.open();
        ssc 
= ServerSocketChannel.open();
        ssc.socket().bind(
new InetSocketAddress(portnumber));
        ssc.configureBlocking(
false);
        ssc.register(sel,SelectionKey.OP_ACCEPT,
new Acceptor());
    }

    
    
public NIOServer(int portnumber,HandleUtil ho,ExecutorService executor) throws IOException
    
{
        
this.ho=ho;
        
this.executor=executor;
        sel 
= Selector.open();
        ssc 
= ServerSocketChannel.open();
        ssc.socket().bind(
new InetSocketAddress(portnumber));
        ssc.configureBlocking(
false);
        ssc.register(sel,SelectionKey.OP_ACCEPT,
new Acceptor());
    }

    
    @Override
    
public void run()
    
{
        
try
        
{
            
while(sel.isOpen())
            
{
                
int nKeys=sel.select(100);
                
if(nKeys==0)
                    Thread.sleep(
100);
                
else if(nKeys>0)
                
{
                    Iterator
<SelectionKey> it = sel.selectedKeys().iterator();
                    
while (it.hasNext()) 
                    
{
                        SelectionKey sk 
= it.next();
                        it.remove();
                        
if(sk.isAcceptable()||sk.isReadable()||sk.isWritable())
                        
{
                            Runnable r 
= (Runnable)sk.attachment();
                            r.run();
                        }

                    }

                }

            }

        }

        
catch(IOException | InterruptedException e)        { log.info(ExceptionUtil.getExceptionMessage(e));    }
    }

    
    
class Acceptor implements Runnable 
    
{
        @Override
        
public void run() 
        
{
            
try
            
{
                SocketChannel sc 
= ssc.accept();
                
if (sc != null)
                
{
                    sc.configureBlocking(
false);
                    sc.socket().setTcpNoDelay(
true);
                    sc.socket().setSoLinger(
false-1);
                    SelectionKey sk
=sc.register(sel, SelectionKey.OP_READ);
                    sk.attach(
new Reader(sk));
                    sel.wakeup();
                }

            }

            
catch(IOException e) { log.info(ExceptionUtil.getExceptionMessage(e)); }
        }

    }

    
    
class Reader implements Runnable 
    
{
        
private byte[] bytes=new byte[0];
        
private SelectionKey sk;
        
        
public Reader(SelectionKey sk)
        
{
            
this.sk=sk;
        }

        
        @Override
        
public void run()
        
{
            
try
            
{
                SocketChannel sc 
= (SocketChannel) sk.channel();
                Handle handle
=null;
                
if(ho.getParameterTypes()==null)
                    handle
=(Handle)HandleUtil.getObjectByClassName(ho.getClassname());
                
else
                    handle
=(Handle)HandleUtil.getObjectByClassName(ho.getClassname(), ho.getParameterTypes(), ho.getParameters());
                handle.setSocketChannel(sc);
                ByteBuffer buffer
=ByteBuffer.allocate(1024);
                
int len=-1;
                
while(sc.isConnected() && (len=sc.read(buffer))>0)
                
{
                    buffer.flip();
                      
byte [] content = new byte[buffer.limit()];
                    buffer.get(content);
                    bytes
=StringUtil.arrayCoalition(bytes,content);
                    buffer.clear();
                }

                
if(len==0)
                
{
                    
if(executor==null)
                    
{
                        
byte[] bb=handle.execute(bytes);
                        sk.interestOps(SelectionKey.OP_WRITE);
                        sk.attach(
new Writer(sk,ByteBuffer.wrap(bb)));
                        sk.selector().wakeup();
                    }

                    
else
                    
{
                        handle.setData(bytes);
                        Future
<byte[]> future=executor.submit(handle);
                        sk.interestOps(SelectionKey.OP_WRITE);
                        sk.attach(
new Writer(sk,future));
                        sk.selector().wakeup();
                    }

                }

                
else if(len==-1)
                
{
                    sk.cancel();
                    sk.selector().selectNow();
                    sc.close();
                }

            }

            
catch(Exception e)
            
{
                sk.cancel();
                log.info(ExceptionUtil.getExceptionMessage(e));
            }

        }

    }

    
    
public class Writer implements Runnable 
    
{
        
private SelectionKey sk;
        
private ByteBuffer output;
        
        
public Writer(SelectionKey sk,ByteBuffer output)
        
{
            
this.sk=sk;
            
this.output=output;
        }

        
        
public Writer(SelectionKey sk,Future<byte[]> future) throws InterruptedException, ExecutionException
        
{
            
this.sk=sk;
            
this.output=ByteBuffer.wrap(future.get());
        }

        
        @Override
        
public void run()
        
{
            SocketChannel sc 
= (SocketChannel) sk.channel();
            
try
            
{
                
while(sc.isConnected() && output.hasRemaining())
                
{
                    
int len=sc.write(output);
                    
if(len<0)
                        
throw new EOFException();
                    
else if(len==-1)
                    
{
                        sk.cancel();
                        sk.selector().selectNow();
                        sc.close();
                    }

                }

                
if(!output.hasRemaining())
                
{
                    output.clear();
                    sk.interestOps(SelectionKey.OP_READ);
                    sk.attach(
new Reader(sk));
                    sk.selector().wakeup();
                }

            }

            
catch(Exception e)
            
{
                sk.cancel();
                log.info(ExceptionUtil.getExceptionMessage(e));
            }

        }

    }

    
    
public void send(SocketChannel sc,byte[] bytes) throws ClosedChannelException
    
{
        SelectionKey sk
=sc.register(sel, SelectionKey.OP_WRITE);
        sk.attach(
new Writer(sk,ByteBuffer.wrap(bytes)));
        sel.wakeup();
    }

}


posted on 2013-05-14 16:31 nianzai 阅读(2714) 评论(1)  编辑  收藏 所属分类: NIO

评论:
# re: JAVA NIO 多线程服务器 1.3版 [未登录] 2013-09-18 15:34 | z
Handle 这个方法里面写的是什么处理呢?能否也贴出来看看  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: