上一篇分析了serverboostrap的启动,接下来分析netty的数据读取。
在nioworker的,负责读取操作是由,在该方法中,如果当前channel的(readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0,且此时
ch.read(buff)<0,则判断客户端已经断开连接
private boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
//默认1024个字节空间
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
int ret = 0;
int readBytes = 0;
boolean failure = true;
//分配连续的1024个byte空间
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
} catch (ClosedChannelException e) {
// Can happen, and does not need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
bb.flip();
final ChannelBufferFactory bufferFactory =
channel.getConfig().getBufferFactory();
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
buffer.setBytes(0, bb);
buffer.writerIndex(readBytes);
recvBufferPool.release(bb);
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
//触发消息接收事件,根据pipeline中upstreamhandler由上到下的顺序,调用messageReceived方法
fireMessageReceived(channel, buffer);
} else {
recvBufferPool.release(bb);
}
if (ret < 0 || failure) {
close(channel, succeededFuture(channel));
return false;
}
return true;
}
在pipelinefactory中的第一个upstreamhandler为DelimiterBasedFrameDecoder,继承自FrameDecoder
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
// Add the text line codec combination first,
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// and then business logic.
pipeline.addLast("handler", new TelnetServerHandler());
return pipeline;
}
会调用FrameDecoder的messageReceived
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}
ChannelBuffer input = (ChannelBuffer) m;
if (!input.readable()) {
return;
}
ChannelBuffer cumulation = cumulation(ctx);
if (cumulation.readable()) {
cumulation.discardReadBytes();
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
} else {
//这里调用子类的decode方法
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
if (input.readable()) {
cumulation.writeBytes(input);
}
}
}
//在这个upstreamhandler中,会一直读取数据,直到遇到协议约定的结束标志才将messagereceived事件传给下一个
private void callDecode(
ChannelHandlerContext context, Channel channel,
ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
while (cumulation.readable()) {
int oldReaderIndex = cumulation.readerIndex();
Object frame = decode(context, channel, cumulation);
if (frame == null) {
if (oldReaderIndex == cumulation.readerIndex()) {
// Seems like more data is required.
// Let us wait for the next notification.
break;
} else {
// Previous data has been discarded.
// Probably it is reading on.
continue;
}
} else if (oldReaderIndex == cumulation.readerIndex()) {
throw new IllegalStateException(
"decode() method must read at least one byte " +
"if it returned a frame (caused by: " + getClass() + ")");
}
//将messagereceive事件传给下个upstreamhandler
unfoldAndFireMessageReceived(context, remoteAddress, frame);
}
}
看子类的decode是如何判断数据读取完毕
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ChannelBuffer minDelim = null;
//获取\r\n的位置
for (ChannelBuffer delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
//如果找到\r\n,表明客户端数据发送完毕
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ChannelBuffer frame;
if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
// TODO Let user choose when the exception should be raised - early or late?
// If early, fail() should be called when discardingTooLongFrame is set to true.
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
fail(ctx, tooLongFrameLength);
return null;
}
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(ctx, minFrameLength);
return null;
}
if (stripDelimiter) {
//这里读取全部数据
frame = buffer.readBytes(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readBytes(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
因为unfold默认是false,会执行,调用下一个upstreamhandler,这里是stringdecoder,通过stringdecoder,将channelbuffer中的数据转为string
然后再触发下一个upstreamhandler的messagereceive,这里是TelnetServerHandler
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Cast to a String first.
// We know it is a String because we put some codec in TelnetPipelineFactory.
String request = (String) e.getMessage();
// Generate and write a response.
String response;
boolean close = false;
if (request.length() == 0) {
response = "Please type something."r"n";
} else if (request.toLowerCase().equals("bye")) {
response = "Have a good day!"r"n";
close = true;
} else {
response = "Did you say '" + request + "'?"r"n";
}
// We do not need to write a ChannelBuffer here.
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
ChannelFuture future = e.getChannel().write(response);
// Close the connection after sending 'Have a good day!'
// if the client has sent 'bye'.
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
数据读取分析完毕,接着继续分析服务器端数据的发送