I want to fly higher
programming Explorer
posts - 114,comments - 263,trackbacks - 0
 1.CumulativeProtocolDecoder
      A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
     即解决'
粘包'->即一次接收数据不能完全体现一个完整的消息数据->通过应用层数据协议,如协议中通过4字节描述消息大小或以结束符.

2.CumulativeProtocolDecoder#decode实现
/**
    * 1.缓存decode中的IoBuffer in至session的attribute
    * 2.循环调用doDecode方法直到其返回false
    * 3.解码结束后缓存的buffer->压缩
   
*/

   
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
      
// 判断传输层是否存在消息分片,如果不分片则直接doDecode.(可参考TCP/IP详解)
        if (!session.getTransportMetadata().hasFragmentation()) {
           
while (in.hasRemaining()) {
               
if (!doDecode(session, in, out)) {
                   
break;
                }

            }


           
return;
        }


       
boolean usingSessionBuffer = true;
        IoBuffer buf
= (IoBuffer) session.getAttribute(BUFFER);
       
// 如果session中有BUFFER这个attribute则直接执行追加,否则直接用网络层读到的buffer
        if (buf != null) {
           
boolean appended = false;
           
// Make sure that the buffer is auto-expanded.
            if (buf.isAutoExpand()) {
               
try {
                    buf.put(in);
                    appended
= true;
                }
catch (IllegalStateException e) {
                   
// 可能调用了类似slice的方法,会使父缓冲区的自动扩展属性失效(1.可参考AbstractIoBuffer#recapacityAllowed 2.可参考IoBuffer的实现)
                }
catch (IndexOutOfBoundsException e) {
                   
// 取消了自动扩展属性(可参考IoBuffer实现)
                }

            }


           
if (appended) {
   
// 追加成功的话,直接flip
                buf.flip();
            }
else {
    
// 因为用了派生的方法(父子缓冲区)如slice或取消了自动扩展而导致追加失败->重新分配一个Buffer
                buf.flip();
                IoBuffer newBuf
= IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
                newBuf.order(buf.order());
                newBuf.put(buf);
                newBuf.put(in);
                newBuf.flip();
                buf
= newBuf;

               
// 更新session属性
                session.setAttribute(BUFFER, buf);
            }

        }
else {
   
// 此else表示session无BUFFER属性,直接赋值
            buf = in;
            usingSessionBuffer
= false;
        }


       
// 无限循环直到break 1.doDecode返回false 2.doDecode返回true且buf已无数据 3.异常
        for (;;) {
           
int oldPos = buf.position();
           
boolean decoded = doDecode(session, buf, out);
           
if (decoded) {
               
if (buf.position() == oldPos) {
                   
throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
                }


               
if (!buf.hasRemaining()) {
                   
break;
                }

            }
else {
               
break;
            }

        }


       
// 如果经过decode,buffer依然有剩余数据则存储到session->这样下次decode的时候就可以从session取出buffer并执行追加了
        if (buf.hasRemaining()) {
           
if (usingSessionBuffer && buf.isAutoExpand()) {
      
// 压缩
                buf.compact();
            }
else {
                storeRemainingInSession(buf, session);
            }

        }
else {
           
if (usingSessionBuffer) {
                removeSessionBuffer(session);
            }

        }

    }

            注.
                    1.doDecode在消息非完整的时候返回false. 
                    2.如果doDecode成功解码出一条完整消息则返回true->如果此时buffer中依然有剩余数据则继续执行for->doDecode->直到buffer中的数据不足以解码出一条成功消息返回false.或者恰恰有n条完整的消息->从for跳出.

3.CumulativeProtocolDecoder example
    /**
      * 解码以CRLF(回车换行)作为结束符的消息
             
*/

   
public class CrLfTerminatedCommandLineDecoder
         
extends CumulativeProtocolDecoder {

    
private Command parseCommand(IoBuffer in) {
   
// 实现将二进制byte[]转为业务逻辑消息对象Command
      }


  
// 只需实现doDecode方法即可
    protected boolean doDecode(
             IoSession session, IoBuffer in, ProtocolDecoderOutput out)
             
throws Exception {

       
// 初始位置
          int start = in.position();

         
// 查找'\r\n'标记
          byte previous = 0;
        
while (in.hasRemaining()) {
            
byte current = in.get();

             
// 找到了\r\n
              if (previous == '\r' && current == '\n') {
                
// Remember the current position and limit.
                  int position = in.position();
               
int limit = in.limit();
               
try {
                      in.position(start);
                    in.limit(position);
//设置当前的位置为limit

          
// position和limit之间是一个完整的CRLF消息
                    out.write(parseCommand(in.slice()));//调用slice方法获得positon和limit之间的子缓冲区->调用write方法加入消息队列(因为网络层一个包可能有多个完整消息)->后经调用flush(遍历消息队列的消息)->nextFilter.messageReceived
filter
                  }
finally {
                    
// 设置position为解码后的position.limit设置为旧的limit
                     in.position(position);
                    in.limit(limit);
                  }


   
// 直接返回true.因为在父类的decode方法中doDecode是循环执行的直到不再有完整的消息返回false.
               return true;
           }


            previous
= current;
         }


         
// 没有找到\r\n,则重置position并返回false.使得父类decode->for跳出break.
          in.position(start);

         
return false;
      }

  }

 4.
DemuxingProtocolDecoder
    
 1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
     2.这是一个复合的decoder->多路复用->找到一个合适的MessageDecoder.(不同的消息协议)

     3.其doDecode实现为迭代候选的MessageDecoder列表->调用MessageDecoder#decodable方法->如果解码结果为MessageDecoderResult#NOT_OK,则从候选列表移除;如果解码结果为MessageDecoderResult#NEED_DATA,则保留该候选decoder并在更多数据到达的时候会再次调用decodable;如果返回结果为MessageDecoderResult#OK,则表明找到了正确的decoder;如果没有剩下任何的候选decoder,则抛出异常.

    4.doDecode源码
      protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
             
// 从Session中获取一个State.State包含一个MessageDecoder数组以及一个当前的decoder
            State state = getState(session);

          
// 如果当前decoder为空
            if (state.currentDecoder == null) {
                MessageDecoder[] decoders
= state.decoders;
               
int undecodables = 0;
       
       
// 遍历decoder候选列表
                for (int i = decoders.length - 1; i >= 0; i--) {
                    MessageDecoder decoder
= decoders[i];
                   
int limit = in.limit();
                   
int pos = in.position();

                    MessageDecoderResult result;

                   
try {
                
// 执行decodable方法并返回result(decodable方法是检查特定的buffer是否可以decoder解码)
                        result = decoder.decodable(session, in);
                    }
finally {
             
// 一定要重置回旧的position和limit
                        in.position(pos);
                        in.limit(limit);
                    }


                   
if (result == MessageDecoder.OK) {
             
// 如果返回结果为OK,则设置为state的当前decoder并break
                        state.currentDecoder = decoder;
                       
break;
                    }
else if (result == MessageDecoder.NOT_OK) {
             
// 如果返回结果为NOT_OK,则记录undecodables数目++
                        undecodables++;
                    }
else if (result != MessageDecoder.NEED_DATA) {
              
// 如果结果都不是,即也不是NEED_DATA,则直接抛出异常
                        throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
                    }

                }


       
// 如果没有找到合适的decoder,则抛出异常
                if (undecodables == decoders.length) {
                   
// Throw an exception if all decoders cannot decode data.
                    String dump = in.getHexDump();
                    in.position(in.limit());
// 跳过这段数据
                    ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
                    e.setHexdump(dump);
                   
throw e;
                }

       
       
// 迭代结束,如果还没有找到合适的decoder则表示可能需要更多的数据->所以返回false->跳出父类的for-dodecode循环
                if (state.currentDecoder == null) {
                   
// Decoder is not determined yet (i.e. we need more data)
                    return false;
                }

            }


          
// 这里表示已找到合适的decoder,调用decode方法进行解码二进制或者特定的协议数据为更高业务层的消息对象
            try {
                MessageDecoderResult result
= state.currentDecoder.decode(session, in, out);
               
if (result == MessageDecoder.OK) {
          
// 重置为null
                    state.currentDecoder = null;
                   
return true;
                }
else if (result == MessageDecoder.NEED_DATA) {
                   
return false;
                }
else if (result == MessageDecoder.NOT_OK) {
                    state.currentDecoder
= null;
                   
throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
                }
else {
                    state.currentDecoder
= null;
                   
throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
                }

            }
catch (Exception e) {
                state.currentDecoder
= null;
               
throw e;
            }

        }

5.一个特定消息协议的编解码的例子,{@link org.apache.mina.example.sumup}
    1.AbstractMessageEncoder
    /**
     * 1.编码消息头,消息体编码由子类实现.
     * 2.AbstractMessage中只有一个sequence字段
    
*/

   
public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T> {
      
// 类型字段
        private final int type;

       
protected AbstractMessageEncoder(int type) {
           
this.type = type;
        }


       
public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception {
            IoBuffer buf
= IoBuffer.allocate(16);
            buf.setAutoExpand(
true); // Enable auto-expand for easier encoding

           
// 编码消息头
            buf.putShort((short) type);//type字段占2个字节(short)
            buf.putInt(message.getSequence());// sequence字段占4个字节(int)

           
// 编码消息体,由子类实现
            encodeBody(session, message, buf);
            buf.flip();
            out.write(buf);
        }


       
// 子类实现编码消息体
        protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
    }

    2.AbstractMessageDecoder
    /**
        * 解码消息头,消息体由子类实现解码
       
*/

       
public abstract class AbstractMessageDecoder implements MessageDecoder {
       
private final int type;

       
private int sequence;

       
private boolean readHeader;

       
protected AbstractMessageDecoder(int type) {
           
this.type = type;
        }


       
// 需覆写decodable方法,检查解码结果
        public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
           
// HEADER_LEN为type+sequence的长度,共占6个字节.如果此时buffer剩余数据不足header的长度,则返回NEED_DATA的result.
            if (in.remaining() < Constants.HEADER_LEN) {
               
return MessageDecoderResult.NEED_DATA;
            }


           
// 第一个if判断ok->读取2字节(short),如果和type匹配则返回OK.
            if (type == in.getShort()) {
               
return MessageDecoderResult.OK;
            }


           
// 两个if判断都不ok,则返回NOT_OK
            return MessageDecoderResult.NOT_OK;
        }

       
                
// 终极解码
        public MessageDecoderResult decode(IoSession session, IoBuffer in,
                ProtocolDecoderOutput out)
throws Exception {
           
// 如果header数据已ok且消息体数据不足则下次直接略过
            if (!readHeader) {
                in.getShort();
// Skip 'type'.
                sequence = in.getInt(); // Get 'sequence'.
                readHeader = true;
            }


           
// 解码消息体,如果数据不足以解析消息体,则返回null
            AbstractMessage m = decodeBody(session, in);
           
// 消息数据体数据不足->返回NEED_DATA
            if (m == null) {
               
return MessageDecoderResult.NEED_DATA;
            }
else {
                readHeader
= false; // 成功解码出一条完成消息,则重置readHeader->下次继续读取header
            }

            m.setSequence(sequence);
            out.write(m);

           
return MessageDecoderResult.OK;
        }


       
/**
         * 数据完整不足以解析整个消息体则返回null
        
*/

       
protected abstract AbstractMessage decodeBody(IoSession session,
                IoBuffer in);
    }

    3.AddMessageEncoder
    /**
                           * 1.AddMessage的encoder.AddMessage继承自AbstractMessage,又增加了一个字段value
                           * 2.该encoder的type为Constants.ADD,值为1
        
*/

       
public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T> {
       
public AddMessageEncoder() {
           
super(Constants.ADD);
        }


        @Override
       
protected void encodeBody(IoSession session, T message, IoBuffer out) {                 // 实现了编码消息体,向buffer追加了AddMessage的消息体value(4个字节-int)
            out.putInt(message.getValue());
        }


       
public void dispose() throws Exception {
        }

    }


       4.AddMessageDecoder
    /**
        *  AddMessage的decoder.type为Constants.ADD(1)
       
*/

       
public class AddMessageDecoder extends AbstractMessageDecoder {

       
public AddMessageDecoder() {
           
super(Constants.ADD);
        }


        @Override
       
protected AbstractMessage decodeBody(IoSession session, IoBuffer in) {                  // ADD_BODY_LEN为AddMessage的消息体长度(value属性),即为4字节(int),如果此时不足4字节,则返回null,表示body数据不足
            if (in.remaining() < Constants.ADD_BODY_LEN) {
               
return null;
            }


            AddMessage m
= new AddMessage();
            m.setValue(in.getInt());
// 读取一个int
            return m;
        }


       
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
               
throws Exception {
        }

    }

6.总结:使用CumulativeProtocolDecoder可以方便的进行特定消息协议的消息解码并完美的解决了'粘包'问题.另外DemuxingProtocolDecoder结合MessageDecoder可更完美实现解码方案.
posted on 2013-12-02 18:55 landon 阅读(3375) 评论(2)  编辑  收藏 所属分类: Sources

FeedBack:
# re: apache-mina-2.07源码笔记4-codec
2013-12-03 09:38 | 鹏达锁业
谢谢博主分享。。。。。。。。。。。  回复  更多评论
  
# re: apache-mina-2.07源码笔记4-codec
2013-12-05 17:26 | 左岸
好东西啊,谢谢分享  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: