netty3.2.3源码分析--服务器端读数据分析

Posted on 2010-12-03 21:26 alex_zheng 阅读(2084) 评论(0)  编辑  收藏 所属分类: java
上一篇分析了serverboostrap的启动,接下来分析netty的数据读取。
在nioworker的,负责读取操作是由,在该方法中,如果当前channel的(readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0,且此时
ch.read(buff)<0,则判断客户端已经断开连接
private boolean read(SelectionKey k) {
        
final SocketChannel ch = (SocketChannel) k.channel();
        
final NioSocketChannel channel = (NioSocketChannel) k.attachment();

        
final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        
//默认1024个字节空间
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

        
int ret = 0;
        
int readBytes = 0;
        
boolean failure = true;
        
//分配连续的1024个byte空间
        ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
        
try {
            
while ((ret = ch.read(bb)) > 0) {
                readBytes 
+= ret;
                
if (!bb.hasRemaining()) {
                    
break;
                }
            }
            failure 
= false;
        } 
catch (ClosedChannelException e) {
            
// Can happen, and does not need a user attention.
        } catch (Throwable t) {
            fireExceptionCaught(channel, t);
        }

        
if (readBytes > 0) {
            bb.flip();

            
final ChannelBufferFactory bufferFactory =
                channel.getConfig().getBufferFactory();
            
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
            buffer.setBytes(
0, bb);
            buffer.writerIndex(readBytes);

            recvBufferPool.release(bb);

            
// Update the predictor.
            predictor.previousReceiveBufferSize(readBytes);

            
//触发消息接收事件,根据pipeline中upstreamhandler由上到下的顺序,调用messageReceived方法
            fireMessageReceived(channel, buffer);
        } 
else {
            recvBufferPool.release(bb);
        }

        
if (ret < 0 || failure) {
            close(channel, succeededFuture(channel));
            
return false;
        }

        
return true;
    }
    

在pipelinefactory中的第一个upstreamhandler为DelimiterBasedFrameDecoder,继承自FrameDecoder
public ChannelPipeline getPipeline() throws Exception {
        
// Create a default pipeline implementation.
        ChannelPipeline pipeline = pipeline();

        
// Add the text line codec combination first,
        pipeline.addLast("framer"new DelimiterBasedFrameDecoder(
                
8192, Delimiters.lineDelimiter()));
        pipeline.addLast(
"decoder"new StringDecoder());
        pipeline.addLast(
"encoder"new StringEncoder());

        
// and then business logic.
        pipeline.addLast("handler"new TelnetServerHandler());

        
return pipeline;
    }
会调用FrameDecoder的messageReceived
 
public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) 
throws Exception {

        Object m 
= e.getMessage();
        
if (!(m instanceof ChannelBuffer)) {
            ctx.sendUpstream(e);
            
return;
        }

        ChannelBuffer input 
= (ChannelBuffer) m;
        
if (!input.readable()) {
            
return;
        }

        ChannelBuffer cumulation 
= cumulation(ctx);
        
if (cumulation.readable()) {
            cumulation.discardReadBytes();
            cumulation.writeBytes(input);
            callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
        } 
else {
            
//这里调用子类的decode方法
            callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
            
if (input.readable()) {
                cumulation.writeBytes(input);
            }
        }
    }

 //在这个upstreamhandler中,会一直读取数据,直到遇到协议约定的结束标志才将messagereceived事件传给下一个
 
private void callDecode(
            ChannelHandlerContext context, Channel channel,
            ChannelBuffer cumulation, SocketAddress remoteAddress) 
throws Exception {

        
while (cumulation.readable()) {
            
int oldReaderIndex = cumulation.readerIndex();
            Object frame 
= decode(context, channel, cumulation);
            
if (frame == null) {
                
if (oldReaderIndex == cumulation.readerIndex()) {
                    
// Seems like more data is required.
                    
// Let us wait for the next notification.
                    break;
                } 
else {
                    
// Previous data has been discarded.
                    
// Probably it is reading on.
                    continue;
                }
            } 
else if (oldReaderIndex == cumulation.readerIndex()) {
                
throw new IllegalStateException(
                        
"decode() method must read at least one byte " +
                        
"if it returned a frame (caused by: " + getClass() + ")");
            }
            
//将messagereceive事件传给下个upstreamhandler
            unfoldAndFireMessageReceived(context, remoteAddress, frame);
        }
    }
看子类的decode是如何判断数据读取完毕
protected Object decode(
            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) 
throws Exception {
        
// Try all delimiters and choose the delimiter which yields the shortest frame.
        int minFrameLength = Integer.MAX_VALUE;
        ChannelBuffer minDelim 
= null;
        
//获取\r\n的位置
        for (ChannelBuffer delim: delimiters) {
            
int frameLength = indexOf(buffer, delim);
            
if (frameLength >= 0 && frameLength < minFrameLength) {
                minFrameLength 
= frameLength;
                minDelim 
= delim;
            }
        }
        
//如果找到\r\n,表明客户端数据发送完毕
        if (minDelim != null) {
            
int minDelimLength = minDelim.capacity();
            ChannelBuffer frame;

            
if (discardingTooLongFrame) {
                
// We've just finished discarding a very large frame.
                
// Go back to the initial state.
                discardingTooLongFrame = false;
                buffer.skipBytes(minFrameLength 
+ minDelimLength);

                
// TODO Let user choose when the exception should be raised - early or late?
                
//      If early, fail() should be called when discardingTooLongFrame is set to true.
                int tooLongFrameLength = this.tooLongFrameLength;
                
this.tooLongFrameLength = 0;
                fail(ctx, tooLongFrameLength);
                
return null;
            }

            
if (minFrameLength > maxFrameLength) {
                
// Discard read frame.
                buffer.skipBytes(minFrameLength + minDelimLength);
                fail(ctx, minFrameLength);
                
return null;
            }

            
if (stripDelimiter) {
                
//这里读取全部数据
                frame = buffer.readBytes(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } 
else {
                frame 
= buffer.readBytes(minFrameLength + minDelimLength);
            }

            
return frame;
        } 
else {
            
if (!discardingTooLongFrame) {
                
if (buffer.readableBytes() > maxFrameLength) {
                    
// Discard the content of the buffer until a delimiter is found.
                    tooLongFrameLength = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    discardingTooLongFrame 
= true;
                }
            } 
else {
                
// Still discarding the buffer since a delimiter is not found.
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            
return null;
        }
    }
因为unfold默认是false,会执行,调用下一个upstreamhandler,这里是stringdecoder,通过stringdecoder,将channelbuffer中的数据转为string
然后再触发下一个upstreamhandler的messagereceive,这里是TelnetServerHandler
public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) {

        
// Cast to a String first.
        
// We know it is a String because we put some codec in TelnetPipelineFactory.
        String request = (String) e.getMessage();

        
// Generate and write a response.
        String response;
        
boolean close = false;
        
if (request.length() == 0) {
            response 
= "Please type something."r"n";
        } 
else if (request.toLowerCase().equals("bye")) {
            response 
= "Have a good day!"r"n";
            close 
= true;
        } 
else {
            response 
= "Did you say '" + request + "'?"r"n";
        }

        
// We do not need to write a ChannelBuffer here.
        
// We know the encoder inserted at TelnetPipelineFactory will do the conversion.
        ChannelFuture future = e.getChannel().write(response);

        
// Close the connection after sending 'Have a good day!'
        
// if the client has sent 'bye'.
        if (close) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }

数据读取分析完毕,接着继续分析服务器端数据的发送



只有注册用户登录后才能发表评论。


网站导航:
 

posts - 10, comments - 9, trackbacks - 0, articles - 15

Copyright © alex_zheng