这里首先分析下ServerBootstrap的启动过程,在netty中,channel可以看成是socketchannel的抽象
channelpipeline里存放着channelhandler,channelpipeline根据不同的channelevent触发对应的操作
如channel的open,bind,connect等
下面以TelnetServer为例来一步步看server启动
public static void main(String[] args) throws Exception {
// Configure the server.
// new NioServerSocketChannelFactory中初始化一个NioServerSocketPipelineSink,用来处理downstreamhandler
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new TelnetServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));
}
直接看serverbootstrap的bind方法
public Channel bind(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
//该队列中只放了一个Binder
final BlockingQueue<ChannelFuture> futureQueue =
new LinkedBlockingQueue<ChannelFuture>();
//Binder extends SimpleChannelUpstreamHandler,处理channelOpen
ChannelHandler binder = new Binder(localAddress, futureQueue);
//这里parenthandler为null
ChannelHandler parentHandler = getParentHandler();
//初始化DefaultChannelPipeline
//在绑定端口前的pipeline里只有一个binder的upstreamhandler
ChannelPipeline bossPipeline = pipeline();
//这里调用DefaultChannelPipeline的addlast方法,初始化一个DefaultChannelHandlerContext,
//handlercontext里面是一个链表结构
//该context中只有一个binder
bossPipeline.addLast("binder", binder);
if (parentHandler != null) {
bossPipeline.addLast("userHandler", parentHandler);
}
//一切从这里开始,getFactory()==NioServerSocketChannelFactory
Channel channel = getFactory().newChannel(bossPipeline);
}
NioServerSocketChannelFactory.newChannel(ChannelPipeline pipeline)如下
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
//初始化一个NioServerSocketChannel,pipeline中放的是binder,sink是NioServerSocketPipelineSink
return new NioServerSocketChannel(this, pipeline, sink);
}
来看NioServerSocketChannel的构造函数中我们看到这么一句fireChannelOpen(this);引用自Channles
public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}
//这里调用DefaultChannelPipeline的sendUpstream方法
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));
}
DefaultChannelPipeline.sendUpstream(ChannelEvent e)
public void sendUpstream(ChannelEvent e) {
//this.head==binder
DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
if (head == null) {
logger.warn(
"The pipeline contains no upstream handlers; discarding: " + e);
return;
}
sendUpstream(head, e);
}
执行
void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
try {
//ctx.getHandler()==binder-->SimpleChannelUpstreamHandler
((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
}
这里会在SimpleChannelUpstreamHandler.handleUpstream(ctx, e);中调用binder的channelOpen
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {
try {
//设置NioServerSocketChannel的DefaultServerSocketChannelConfig的pipelinetfactory
//在之后的线程分发中会去取该factory的pipeline,即TelnetServerPipelineFactory中的pipeline
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
} finally {
ctx.sendUpstream(evt);
}
//执行NioServerSocketChannel.bind,最终会调用Channels.bind(Channel channel, SocketAddress localAddress)
boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
assert finished;
}
Channels.bind方法如下:
public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
ChannelFuture future = future(channel);
//又调用了DefaultChannelPipeline的senddownstream,对应事件是bound
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.BOUND, localAddress));
return future;
}
DefaultChannelPipeline的senddownstream
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
从getActualDownstreamContext返回的是null,所以上面会执行 getSink().eventSunk(this, e);
DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
if (ctx == null) {
return null;
}
DefaultChannelHandlerContext realCtx = ctx;
//Binder是upstream,这里返回null
while (!realCtx.canHandleDownstream()) {
realCtx = realCtx.prev;
if (realCtx == null) {
return null;
}
}
return realCtx;
}
sendDownstream将执行 getSink().eventSunk(this, e);
getSink()获得的是NioServerSocketPipelineSink,
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}
private void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}
ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
//根据new DownstreamChannelStateEvent(channel, future, ChannelState.BOUND, localAddress)
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
close(channel, future);
}
break;
case BOUND:
if (value != null) {
//在这里完成socketAddress绑定
bind(channel, future, (SocketAddress) value);
} else {
close(channel, future);
}
break;
}
}
对应的NioServerSocketPipelineSink.bind方法
private void bind(
NioServerSocketChannel channel, ChannelFuture future,
SocketAddress localAddress) {
boolean bound = false;
boolean bossStarted = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
//触发channelbound
fireChannelBound(channel, channel.getLocalAddress());
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id +
" (" + channel + ')')));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!bossStarted && bound) {
close(channel, future);
}
}
}
先来看Channels.fireChannelBound方法做了什么
public static void fireChannelBound(Channel channel, SocketAddress localAddress) {
//channel.getPipeline()的DefaultChannelPipeline中只有一个binder
//这里调用SimpleChannelUpstreamHandler的handleUpstream中的hannelBound(ctx, evt);
channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.BOUND, localAddress));
}
接着看bind方法
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
//在bossexcutor中创建一个boss线程
//在该boss线程中分派新的客户端连接给workerExecutor,workerExecutor的数量为cpu*2
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id +
" (" + channel + ')')));
在new Boss的时候,注册channel的accept事件
Boss(NioServerSocketChannel channel) throws IOException {
this.channel = channel;
selector = Selector.open();
boolean registered = false;
try {
channel.socket.register(selector, SelectionKey.OP_ACCEPT);
registered = true;
} finally {
if (!registered) {
closeSelector();
}
}
channel.selector = selector;
}
最终调用Boss.run()
public void run() {
//获得当前boss线程,mainreactor
final Thread currentThread = Thread.currentThread();
channel.shutdownLock.lock();
try {
for (;;) {
try {
if (selector.select(1000) > 0) {
selector.selectedKeys().clear();
}
//接收新的客户端连接
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket != null) {
//分派当前连接给workerexcutor,即subreactor
registerAcceptedChannel(acceptedSocket, currentThread);
}
}
}
} finally {
channel.shutdownLock.unlock();
closeSelector();
}
}
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
try {
//这里获得用户的pipleline,那么这个是在哪里设置的呢,在Binder的channelopen方法的第一句
// evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());
//在这之前的pipeline都是defalutchannelpipeline,里面只有一个Binder
//在这之后,每一个NioAcceptedSocketChannel的pipeline获得的是TelnetServerPipelineFactory中的pipeline
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
//nioworker充当subreactor
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
}
}
这里调用NioWorker.register
void register(NioSocketChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
//初始化新的task,将当前accept的socketchannel绑定到nioworker的selecortkey的attch
Runnable registerTask = new RegisterTask(channel, future, server);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
//启动一个线程来处理该连接
executor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(this, threadName)));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
//加入到任务队列
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
来看registertask的run方法
public void run() {
SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress();
if (localAddress == null || remoteAddress == null) {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, succeededFuture(channel));
return;
}
try {
if (server) {
channel.socket.configureBlocking(false);
}
synchronized (channel.interestOpsLock) {
//这里注册当前accepted的socketchannel的read事件
channel.socket.register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {
channel.setConnected();
future.setSuccess();
}
} catch (IOException e) {
if (future != null) {
future.setFailure(e);
}
close(channel, succeededFuture(channel));
if (!(e instanceof ClosedChannelException)) {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
}
if (!server) {
if (!((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
}
}
其中executor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable(this, threadName)));
这里的this指向当前的nioworker,调用nioworker.run
public void run() {
//当前nioworker
thread = Thread.currentThread();
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try {
SelectorUtil.select(selector);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select()'. (BAD)
// 2) Selector is waken up between 'selector.select()' and
// 'if (wakenUp.get()) { }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select()' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select()' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select().
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
cancelledKeys = 0;
processRegisterTaskQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
来看processSelectedKeys(selector.selectedKeys());
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
//可读,处理downstream
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
//可写,处理upstream
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
if (cleanUpCancelledKeys()) {
break; // break the loop to avoid ConcurrentModificationException
}
}
}
从这个过程来看,在netty中,boss线程用来侦听socket的连接,然后分派该连接给nioworker,在nioworker中有读和写的任务注册线程池,nioworker线程负责从这些线程中获取任务进行读写操作