Reactor 模式的 JAVA NIO 多线程服务器,这是比较完善的一版了。Java 的 NIO 网络模型实在是不好用,还是使用现成的好。
public class NIOServer implements Runnable
data:image/s3,"s3://crabby-images/16507/1650758e64773369e558bf6a35239aa629f2eb9d" alt=""
data:image/s3,"s3://crabby-images/f4fe2/f4fe2905e6a68eecdb5a9c900ae477a6bd055e40" alt=""
{
private static final Log log = LogFactory.getLog(NIOServer.class);
data:image/s3,"s3://crabby-images/a0398/a0398c5eaea7654f53f3ad01f4ef86b30b77f7b1" alt=""
private ExecutorService executor=null;
private final Selector sel;
private final ServerSocketChannel ssc;
private HandleUtil ho;
public NIOServer(int portnumber,HandleUtil ho) throws IOException
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
this.ho=ho;
sel = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
}
public NIOServer(int portnumber,HandleUtil ho,ExecutorService executor) throws IOException
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
this.ho=ho;
this.executor=executor;
sel = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(sel,SelectionKey.OP_ACCEPT,new Acceptor());
}
@Override
public void run()
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
try
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
while(sel.isOpen())
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
int nKeys=sel.select(100);
if(nKeys==0)
Thread.sleep(100);
else if(nKeys>0)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
while (it.hasNext())
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
SelectionKey sk = it.next();
it.remove();
if(sk.isAcceptable()||sk.isReadable()||sk.isWritable())
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
Runnable r = (Runnable)sk.attachment();
r.run();
}
}
}
}
}
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
catch(IOException | InterruptedException e)
{ log.info(ExceptionUtil.getExceptionMessage(e)); }
}
class Acceptor implements Runnable
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
@Override
public void run()
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
try
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
SocketChannel sc = ssc.accept();
if (sc != null)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
sc.configureBlocking(false);
sc.socket().setTcpNoDelay(true);
sc.socket().setSoLinger(false, -1);
SelectionKey sk=sc.register(sel, SelectionKey.OP_READ);
sk.attach(new Reader(sk));
sel.wakeup();
}
}
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
catch(IOException e)
{ log.info(ExceptionUtil.getExceptionMessage(e)); }
}
}
class Reader implements Runnable
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
private byte[] bytes=new byte[0];
private SelectionKey sk;
public Reader(SelectionKey sk)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
this.sk=sk;
}
@Override
public void run()
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
try
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
SocketChannel sc = (SocketChannel) sk.channel();
Handle handle=null;
if(ho.getParameterTypes()==null)
handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname());
else
handle=(Handle)HandleUtil.getObjectByClassName(ho.getClassname(), ho.getParameterTypes(), ho.getParameters());
handle.setSocketChannel(sc);
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=-1;
while(sc.isConnected() && (len=sc.read(buffer))>0)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
buffer.flip();
byte [] content = new byte[buffer.limit()];
buffer.get(content);
bytes=StringUtil.arrayCoalition(bytes,content);
buffer.clear();
}
if(len==0)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
if(executor==null)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
byte[] bb=handle.execute(bytes);
sk.interestOps(SelectionKey.OP_WRITE);
sk.attach(new Writer(sk,ByteBuffer.wrap(bb)));
sk.selector().wakeup();
}
else
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
handle.setData(bytes);
Future<byte[]> future=executor.submit(handle);
sk.interestOps(SelectionKey.OP_WRITE);
sk.attach(new Writer(sk,future));
sk.selector().wakeup();
}
}
else if(len==-1)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
sk.cancel();
sk.selector().selectNow();
sc.close();
}
}
catch(Exception e)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
sk.cancel();
log.info(ExceptionUtil.getExceptionMessage(e));
}
}
}
public class Writer implements Runnable
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
private SelectionKey sk;
private ByteBuffer output;
public Writer(SelectionKey sk,ByteBuffer output)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
this.sk=sk;
this.output=output;
}
public Writer(SelectionKey sk,Future<byte[]> future) throws InterruptedException, ExecutionException
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
this.sk=sk;
this.output=ByteBuffer.wrap(future.get());
}
@Override
public void run()
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
SocketChannel sc = (SocketChannel) sk.channel();
try
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
while(sc.isConnected() && output.hasRemaining())
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
int len=sc.write(output);
if(len<0)
throw new EOFException();
else if(len==-1)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
sk.cancel();
sk.selector().selectNow();
sc.close();
}
}
if(!output.hasRemaining())
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
output.clear();
sk.interestOps(SelectionKey.OP_READ);
sk.attach(new Reader(sk));
sk.selector().wakeup();
}
}
catch(Exception e)
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
sk.cancel();
log.info(ExceptionUtil.getExceptionMessage(e));
}
}
}
public void send(SocketChannel sc,byte[] bytes) throws ClosedChannelException
data:image/s3,"s3://crabby-images/4989c/4989c5aa5aeee035dc328aff8277d531300533ab" alt=""
{
SelectionKey sk=sc.register(sel, SelectionKey.OP_WRITE);
sk.attach(new Writer(sk,ByteBuffer.wrap(bytes)));
sel.wakeup();
}
}
posted on 2013-05-14 16:31
nianzai 阅读(2719)
评论(1) 编辑 收藏 所属分类:
NIO