上一篇分析了服务器端读取客户发送的数据,这篇来看服务器端如何发送数据给客户端,服务器往外发送数据是通过downstreamhandler从下到上执行
发送从ChannelFuture future = e.getChannel().write(response)开始执行Channels下的
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
telentpipeline中最下面一个downstreamhandler是stringencoder,最后执行OneToOneEncoder的handleDownstream
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
Object originalMessage = e.getMessage();
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) {
ctx.sendDownstream(evt);
} else if (encodedMessage != null) {
//这里写encode数据,DefaultChannelPipeline的sendDownstream
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
}
DefaultChannelPipeline的sendDownstream方法
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
//因为stringencoder是唯一一个downstreamhandler,这里执行NioServerSocketPipelineSink.eventSunk
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
eventSunk方法会执行
private void handleAcceptedSocket(ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent event = (ChannelStateEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();
switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
channel.worker.close(channel, future);
}
break;
case BOUND:
case CONNECTED:
if (value == null) {
channel.worker.close(channel, future);
}
break;
case INTEREST_OPS:
channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
break;
}
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
//放入writerequestqueue队列
boolean offered = channel.writeBuffer.offer(event);
assert offered;
//执行nioworker的writeFromUserCode,之后执行write0方法
channel.worker.writeFromUserCode(channel);
}
}
private void write0(NioSocketChannel channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
long writtenBytes = 0;
final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final SocketChannel ch = channel.socket;
//之前将channel放到了该队列
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
//默认尝试16次写
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf;
if (evt == null) {
//从writebuffer中获得一个writeevent
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
buf = channel.currentWriteBuffer;
}
ChannelFuture future = evt.getFuture();
try {
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
//发送数据给客户端,执行PooledSendBuffer.transferTo
localWrittenBytes = buf.transferTo(ch);
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
if (localWrittenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
} catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore.
} catch (Throwable t) {
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
buf = null;
evt = null;
future.setFailure(t);
fireExceptionCaught(channel, t);
if (t instanceof IOException) {
open = false;
close(channel, succeededFuture(channel));
}
}
}
channel.inWriteNowLoop = false;
}
//触发写完成事件,执行的是DefaultChannelPipeline的sendUpstream,最后调用SimpleChannelUpstreamHandler.writeComplete
//pipeline中的upstreamhandler的writeComplete都未重写,所以只是简单的传递该事件
fireWriteComplete(channel, writtenBytes);
if (open) {
if (addOpWrite) {
setOpWrite(channel);
} else if (removeOpWrite) {
clearOpWrite(channel);
}
}
}