Reactor 模式的 JAVA NIO 多线程服务器,这是比较完善的一版了。Java 的 NIO 网络模型实在是不好用,还是使用现成的好。
public class NIOServer implements Runnable


{
private static final Log log = LogFactory.getLog(NIOServer.class);

private ExecutorService executor=null;
private final Selector sel;
private final ServerSocketChannel ssc;
private HandleUtil ho;
public NIOServer(int portnumber,HandleUtil ho) throws IOException

{
this.ho=ho;
sel = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
}
public NIOServer(int portnumber,HandleUtil ho,ExecutorService executor) throws IOException

{
this.ho=ho;
this.executor=executor;
sel = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
}
@Override
public void run()

{
try

{
while(sel.isOpen())

{
int nKeys=sel.select(100);
if(nKeys==0)
Thread.sleep(100);
else if(nKeys>0)

{
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
while (it.hasNext())

{
SelectionKey sk = it.next();
it.remove();
if(sk.isAcceptable()||sk.isReadable()||sk.isWritable())

{
Runnable r = (Runnable)sk.attachment();
r.run();
}
}
}
}
}

catch(IOException | InterruptedException e)
{ log.info(ExceptionUtil.getExceptionMessage(e)); }
}
class Acceptor implements Runnable

{
@Override
public void run()

{
try

{
SocketChannel sc = ssc.accept();
if (sc != null)

{
sc.configureBlocking(false);
sc.socket().setTcpNoDelay(true);
sc.socket().setSoLinger(false, -1);
SelectionKey sk=sc.register(sel, SelectionKey.OP_READ);
sk.attach(new Reader(sk));
sel.wakeup();
}
}

catch(IOException e)
{ log.info(ExceptionUtil.getExceptionMessage(e)); }
}
}
class Reader implements Runnable

{
private byte[] bytes=new byte[0];
private SelectionKey sk;
public Reader(SelectionKey sk)

{
this.sk=sk;
}
@Override
public void run()

{
try

{
SocketChannel sc = (SocketChannel) sk.channel();
Handle handle=null;
if(ho.getParameterTypes()==null)
handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname());
else
handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname(), ho.getParameterTypes(), ho.getParameters());
handle.setSocketChannel(sc);
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=-1;
while(sc.isConnected() && (len=sc.read(buffer))>0)

{
buffer.flip();
byte [] content = new byte[buffer.limit()];
buffer.get(content);
bytes=StringUtil.arrayCoalition(bytes,content);
buffer.clear();
}
if(len==0)

{
if(executor==null)

{
byte[] bb=handle.execute(bytes);
sk.interestOps(SelectionKey.OP_WRITE);
sk.attach(new Writer(sk,ByteBuffer.wrap(bb)));
sk.selector().wakeup();
}
else

{
handle.setData(bytes);
Future<byte[]> future=executor.submit(handle);
sk.interestOps(SelectionKey.OP_WRITE);
sk.attach(new Writer(sk,future));
sk.selector().wakeup();
}
}
else if(len==-1)

{
sk.cancel();
sk.selector().selectNow();
sc.close();
}
}
catch(Exception e)

{
sk.cancel();
log.info(ExceptionUtil.getExceptionMessage(e));
}
}
}
public class Writer implements Runnable

{
private SelectionKey sk;
private ByteBuffer output;
public Writer(SelectionKey sk,ByteBuffer output)

{
this.sk=sk;
this.output=output;
}
public Writer(SelectionKey sk,Future<byte[]> future) throws InterruptedException, ExecutionException

{
this.sk=sk;
this.output=ByteBuffer.wrap(future.get());
}
@Override
public void run()

{
SocketChannel sc = (SocketChannel) sk.channel();
try

{
while(sc.isConnected() && output.hasRemaining())

{
int len=sc.write(output);
if(len<0)
throw new EOFException();
else if(len==-1)

{
sk.cancel();
sk.selector().selectNow();
sc.close();
}
}
if(!output.hasRemaining())

{
output.clear();
sk.interestOps(SelectionKey.OP_READ);
sk.attach(new Reader(sk));
sk.selector().wakeup();
}
}
catch(Exception e)

{
sk.cancel();
log.info(ExceptionUtil.getExceptionMessage(e));
}
}
}
public void send(SocketChannel sc,byte[] bytes) throws ClosedChannelException

{
SelectionKey sk=sc.register(sel, SelectionKey.OP_WRITE);
sk.attach(new Writer(sk,ByteBuffer.wrap(bytes)));
sel.wakeup();
}
}
posted on 2013-05-14 16:31
nianzai 阅读(2722)
评论(1) 编辑 收藏 所属分类:
NIO