随笔-23  评论-58  文章-0  trackbacks-0
Reactor 模式的 JAVA NIO 多线程服务器

public class MiniServer extends Thread
{
    
private static final Log log = LogFactory.getLog(MiniServer.class);
    
    
private final Selector s;
    
private final ServerSocketChannel ssc;
    
private ExecutorService executor;
    
    
public MiniServer(int portnumber,ExecutorService executor) throws IOException
    
{
        
this.executor=executor;
        s 
= Selector.open();
        ssc 
= ServerSocketChannel.open();
        ssc.socket().bind(
new InetSocketAddress(portnumber));
        ssc.configureBlocking(
false);
        ssc.register(s,SelectionKey.OP_ACCEPT);
    }

    
    
public void run()
    
{
        
try
        
{
            
while(s.isOpen())
            
{
                
int nKeys=s.select();
                
if(nKeys>0)
                
{
                    Iterator
<SelectionKey> it = s.selectedKeys().iterator();
                    
while (it.hasNext()) 
                    
{
                        SelectionKey key 
= it.next();
                        it.remove();
                        
if (!key.isValid() || !key.channel().isOpen())
                            
continue;
                        
if(key.isAcceptable())
                        
{
                            SocketChannel sc 
= ssc.accept();
                            
if (sc != null)
                            
{
                                sc.configureBlocking(
false);
                                sc.register(s, SelectionKey.OP_READ, 
new Reader(executor));
                            }

                        }

                        
else if(key.isReadable()||key.isWritable())
                        
{
                            Reactor reactor 
= (Reactor) key.attachment();
                            reactor.execute(key);
                        }

                    }

                }

            }

        }

        
catch(IOException e)
        
{
            log.info(e);
        }

    }

}



public interface Reactor 
{
    
void execute(SelectionKey key);
}



public class Reader implements Reactor 
{
    
private static final Log log = LogFactory.getLog(Reader.class);
    
    
private byte[] bytes=new byte[0];
    
private ExecutorService executor;
    
    
public Reader(ExecutorService executor)
    
{
        
this.executor=executor;
    }

    
    @Override
    
public void execute(SelectionKey key)
    
{
        SocketChannel sc 
= (SocketChannel) key.channel();
        
try
        
{
            ByteBuffer buffer
=ByteBuffer.allocate(1024);
            
int len=-1;
            
while(sc.isConnected() && (len=sc.read(buffer))>0)
            
{
                buffer.flip();
                  
byte [] content = new byte[buffer.limit()];
                buffer.get(content);
                bytes
=NutUtil.ArrayCoalition(bytes,content);
                buffer.clear();
            }

            
if(len==0)
            
{
                key.interestOps(SelectionKey.OP_READ);
                key.selector().wakeup(); 
            }

            
else if(len==-1)
            
{
                Callable
<byte[]> call=new ProcessCallable(bytes);
                Future
<byte[]> task=executor.submit(call);
                ByteBuffer output
=ByteBuffer.wrap(task.get());
                sc.register(key.selector(), SelectionKey.OP_WRITE, 
new Writer(output));
            }

        }

        
catch(Exception e)
        
{
            log.info(e);
        }

    }

}



public class Writer implements Reactor 
{
    
private static final Log log = LogFactory.getLog(Writer.class);
    
    
private ByteBuffer output;
    
    
public Writer(ByteBuffer output)
    
{
        
this.output=output;
    }

    
    
public void execute(SelectionKey key)
    
{
        SocketChannel sc 
= (SocketChannel) key.channel();
        
try
        
{
            
while(sc.isConnected() && output.hasRemaining())
            
{
                
int len=sc.write(output);
                
if(len<0)
                

                    
throw new EOFException(); 
                }
 
                
if(len==0
                

                    key.interestOps(SelectionKey.OP_WRITE); 
                    key.selector().wakeup(); 
                    
break
                }

            }

            
if(!output.hasRemaining())
            
{
                output.clear();
                key.cancel();
                sc.close();
            }

        }

        
catch(IOException e)
        
{
            log.info(e);
        }

    }

}

posted on 2011-08-29 18:35 nianzai 阅读(3093) 评论(3)  编辑  收藏 所属分类: NIO

评论:
# re: JAVA NIO 多线程服务器 1.2版 2011-08-30 13:59 | seo千里眼
这个多线程程序挺实用哦。  回复  更多评论
  
# re: JAVA NIO 多线程服务器 1.2版 2011-09-03 16:13 | 阿不都外力
收藏一下!以后看。。。  回复  更多评论
  
# re: JAVA NIO 多线程服务器 1.2版 2011-09-05 23:54 | 步步为营
Tomcat中用NIO比较多,搭建高性能服务器时NIO挺好用的,呵呵  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: