接着上面的流程,现在请求到了Poller的#register()方法。
public void register(final NioChannel socket) {
socket.setPoller(this);
// KeyAttachment是对NioChannel信息的包装,同样是非GC
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key != null ? key : new KeyAttachment(socket);
ka.reset(this, socket, getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
// PollerEvent的初始化,非GC Again
PollerEvent r = eventCache.poll();
// this is what OP_REGISTER turns into.
// 读取数据的事件
ka.interestOps(SelectionKey.OP_READ);
if (r == null)
r = new PollerEvent(socket, ka, OP_REGISTER);
else
r.reset(socket, ka, OP_REGISTER);
// 把事件加到Poller
addEvent(r);
}
public void addEvent(Runnable event) {
// 把事件加入到队列中
events.offer(event);
// ++wakeupCounter
if (wakeupCounter.incrementAndGet() == 0) selector.wakeup();
}
其实也挺好懂的,就是把NioChannel作为OP_REGISTER事件注册到Poller,这样在Poller的#run()方法中就可以对加入Poller的事件进行处理了。
public void run() {
while (running) {
try {
while (paused && (!close)) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;
hasEvents = (hasEvents | events());
// Time to terminate?
if (close) {
timeout(0, false);
break;
}
try {
if (!close) {
if (wakeupCounter.get() > 0) {
// 立刻返回 I/O 就绪的那些通道的键
keyCount = selector.selectNow();
} else {
keyCount = selector.keys().size();
// 这里把wakeupCounter设成-1,在addEvent的时候就会唤醒selector
wakeupCounter.set(-1);
// 使用阻塞的方式
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
timeout(0, false);
selector.close();
break;
}
} catch (NullPointerException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled())
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
if (wakeupCounter == null || selector == null)
throw x;
continue;
} catch (CancelledKeyException x) {
// sun bug 5076772 on windows JDK 1.5
if (log.isDebugEnabled())
log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5", x);
if (wakeupCounter == null || selector == null)
throw x;
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("", x);
continue;
}
// either we timed out or we woke up, process events first
if (keyCount == 0)
hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator()
: null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
// 这里的KeyAttachment实在#register()方法中注册的
KeyAttachment attachment = (KeyAttachment) sk.attachment();
attachment.access();
iterator.remove();
// 继续流程
processKey(sk, attachment);
}// while
// process timeouts
timeout(keyCount, hasEvents);
if (oomParachute > 0 && oomParachuteData == null)
checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
} catch (Throwable oomt) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
} catch (Throwable letsHopeWeDontGetHere) {
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}// while
synchronized (this) {
this.notifyAll();
}
stopLatch.countDown();
}
这个方法有2个方法需要关注一下:#events()和#processKey():
public boolean events() {
boolean result = false;
// synchronized (events) {
Runnable r = null;
// 返回是事件队列中是否有事件
result = (events.size() > 0);
while ((r = events.poll()) != null) {
try {
// 执行KeyEvent的#run()
r.run();
if (r instanceof PollerEvent) {
((PollerEvent) r).reset();
// 对KeyEvent进行回收
eventCache.offer((PollerEvent) r);
}
} catch (Throwable x) {
log.error("", x);
}
}
// events.clear();
// }
return result;
}
这里执行了SocketChannel对应的KeyEvent的#run()方法,在这个方法里给SocketChannel注册了OP_READ:
public void run() {
if (interestOps == OP_REGISTER) {
try {
// 给SocketChannel注册OP_READ
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ,
key);
} catch (Exception x) {
log.error("", x);
}
} else {
// 这里应该是对comet进行支持的,暂时先不看
......
}// end if
}// run
第二个是#processKey()方法,里边的很多流程我现在不是很关心,都略去了,
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
if (close) {
cancelledKey(sk, SocketStatus.STOP, false);
} else if (sk.isValid() && attachment != null) {
attachment.access();// make sure we don't time out valid sockets
sk.attach(attachment);// cant remember why this is here
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable()) {
if (attachment.getSendfileData() != null) {
processSendfile(sk, attachment, true, false);
} else if (attachment.getComet()) {// 这里应该是对comet的支持
......
} else {
// 这个分支是现在比较关心的
if (isWorkerAvailable()) {// 这个好像还没实现
// 这个#unreg()很巧妙,防止了通道对同一个事件不断select的问题
unreg(sk, attachment, sk.readyOps());
boolean close = (!processSocket(channel, null, true));
if (close) {
cancelledKey(sk, SocketStatus.DISCONNECT, false);
}
} else {
result = false;
}
}
}
} else {
// invalid key
cancelledKey(sk, SocketStatus.ERROR, false);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk, SocketStatus.ERROR, false);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("", t);
}
return result;
}
protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
reg(sk, attachment, sk.interestOps() & (~readyOps));
}
protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
sk.interestOps(intops);
attachment.interestOps(intops);
attachment.setCometOps(intops);
}
这里的#unreg()方法据我理解应该很巧妙的解决了重复的IO事件问题,我自己写的测试用的NIO代码里就会有这个问题。
这样,流程就来到了Poller最后的#processSocket()方法了:
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment) socket.getAttachment(false);
attachment.setCometNotify(false); // will get reset upon next reg
// 使用SocketProcessor
SocketProcessor sc = processorCache.poll();
if (sc == null)
sc = new SocketProcessor(socket, status);
else
sc.reset(socket, status);
if (dispatch && getExecutor() != null)// 如果配置了ThreadPoolExecutor,那么使用它来执行
getExecutor().execute(sc);
else
sc.run();
} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:" + socket, rx);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
这里SocketProcessor的#run()方法就不列出了,里边最后会通过下面的语句将流程转到Http11NioProtocol类,其中的handler就是对Http11NioProtocol的引用:
SocketState state = SocketState.OPEN;
state = (status==null)?handler.process(socket):handler.event(socket,status);
最后,对Acceptor和Poller的处理过程做个小结,见下图:
posted on 2010-12-08 08:48
臭美 阅读(2481)
评论(0) 编辑 收藏 所属分类:
Tomcat