JAVA NIO 多线程服务器是 Nut (lucene + hadoop 分布式搜索运行框架) Nut Search层封装代码
public interface Reactor
{
void execute(SelectionKey key);
}
public class Reader implements Reactor
{
private static final Log log = LogFactory.getLog(Reader.class);
private byte[] bytes=new byte[0];
private ExecutorService executor;
public Reader(ExecutorService executor)
{
this.executor=executor;
}
@Override
public void execute(SelectionKey key)
{
SocketChannel sc = (SocketChannel) key.channel();
try
{
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=-1;
while((len=sc.read(buffer))>0)
{
buffer.flip();
byte [] content = new byte[buffer.limit()];
buffer.get(content);
bytes=NutUtil.ArrayCoalition(bytes,content);
buffer.clear();
}
if(len==0)
key.interestOps(SelectionKey.OP_READ);
else
{
Callable<byte[]> call=new ProcessCallable(bytes);
Future<byte[]> task=executor.submit(call);
ByteBuffer output=ByteBuffer.wrap(task.get());
sc.register(key.selector(), SelectionKey.OP_WRITE, new Writer(output));
}
}
catch(Exception e)
{
log.info(e);
}
}
}
public class Writer implements Reactor
{
private static final Log log = LogFactory.getLog(Writer.class);
private ByteBuffer output;
public Writer(ByteBuffer output)
{
this.output=output;
}
public void execute(SelectionKey key)
{
SocketChannel sc = (SocketChannel) key.channel();
try
{
while(output.hasRemaining())
{
int len=sc.write(output);
if(len<0)
{
throw new EOFException();
}
if(len==0)
{
key.interestOps(SelectionKey.OP_WRITE);
key.selector().wakeup();
break;
}
}
if(!output.hasRemaining())
{
output.clear();
key.cancel();
sc.close();
}
}
catch(IOException e)
{
log.info(e);
}
}
}
public class MiniServer
{
private static final Log log = LogFactory.getLog(MiniServer.class);
private final Selector s;
private final ServerSocketChannel ssc;
private ExecutorService executor;
private static Map<String,Long> map=new TreeMap<String,Long>();//保存不能正确完成的SelectionKey
private ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
public MiniServer(int portnumber,ExecutorService executor) throws IOException
{
scheduled.scheduleAtFixedRate(task,10,10,TimeUnit.MINUTES);//每10分钟清空一次map
this.executor=executor;
s = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(s,SelectionKey.OP_ACCEPT);
}
public void execute()
{
try
{
while(s.isOpen())
{
int nKeys=s.select();
if(nKeys==0)
{
for (SelectionKey key : s.keys())
{
log.info("channel " + key.channel() + " waiting for " + key.interestOps());
//如果超过2分钟就废除
if(map.containsKey(key.toString()))
{
Long t= map.get(key.toString());
if((NutUtil.now()-t)>200);
{
map.remove(key.toString());
s.keys().remove(key);
key.cancel();
}
}
else
{
map.put(key.toString(), NutUtil.now());
}
}
continue;
}
Iterator<SelectionKey> it = s.selectedKeys().iterator();
while (it.hasNext())
{
SelectionKey key = it.next();
it.remove();
if (!key.isValid() || !key.channel().isOpen())
continue;
if(key.isAcceptable())
{
SocketChannel sc = ssc.accept();
if (sc != null)
{
sc.configureBlocking(false);
sc.register(s, SelectionKey.OP_READ, new Reader(executor));
}
}
else if(key.isReadable()||key.isWritable())
{
Reactor reactor = (Reactor) key.attachment();
reactor.execute(key);
}
}
}
}
catch(IOException e)
{
log.info(e);
}
}
Runnable task = new Runnable()
{
public void run()
{
map.clear();
}
};
}
posted on 2010-07-26 11:31
nianzai 阅读(2698)
评论(2) 编辑 收藏 所属分类:
NIO