1.CumulativeProtocolDecoder A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
即解决'粘包'->即一次接收数据不能完全体现一个完整的消息数据->通过应用层数据协议,如协议中通过4字节描述消息大小或以结束符.
2.CumulativeProtocolDecoder#decode实现

/** *//**
* 1.缓存decode中的IoBuffer in至session的attribute
* 2.循环调用doDecode方法直到其返回false
* 3.解码结束后缓存的buffer->压缩
*/

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
// 判断传输层是否存在消息分片,如果不分片则直接doDecode.(可参考TCP/IP详解)

if (!session.getTransportMetadata().hasFragmentation())
{

while (in.hasRemaining())
{

if (!doDecode(session, in, out))
{
break;
}
}

return;
}

boolean usingSessionBuffer = true;
IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
// 如果session中有BUFFER这个attribute则直接执行追加,否则直接用网络层读到的buffer

if (buf != null)
{
boolean appended = false;
// Make sure that the buffer is auto-expanded.

if (buf.isAutoExpand())
{

try
{
buf.put(in);
appended = true;

} catch (IllegalStateException e)
{
// 可能调用了类似slice的方法,会使父缓冲区的自动扩展属性失效(1.可参考AbstractIoBuffer#recapacityAllowed 2.可参考IoBuffer的实现)

} catch (IndexOutOfBoundsException e)
{
// 取消了自动扩展属性(可参考IoBuffer实现)
}
}


if (appended)
{
// 追加成功的话,直接flip
buf.flip();

} else
{
// 因为用了派生的方法(父子缓冲区)如slice或取消了自动扩展而导致追加失败->重新分配一个Buffer
buf.flip();
IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
newBuf.order(buf.order());
newBuf.put(buf);
newBuf.put(in);
newBuf.flip();
buf = newBuf;

// 更新session属性
session.setAttribute(BUFFER, buf);
}

} else
{
// 此else表示session无BUFFER属性,直接赋值
buf = in;
usingSessionBuffer = false;
}

// 无限循环直到break 1.doDecode返回false 2.doDecode返回true且buf已无数据 3.异常

for (;;)
{
int oldPos = buf.position();
boolean decoded = doDecode(session, buf, out);

if (decoded)
{

if (buf.position() == oldPos)
{
throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
}


if (!buf.hasRemaining())
{
break;
}

} else
{
break;
}
}

// 如果经过decode,buffer依然有剩余数据则存储到session->这样下次decode的时候就可以从session取出buffer并执行追加了

if (buf.hasRemaining())
{

if (usingSessionBuffer && buf.isAutoExpand())
{
// 压缩
buf.compact();

} else
{
storeRemainingInSession(buf, session);
}

} else
{

if (usingSessionBuffer)
{
removeSessionBuffer(session);
}
}
}
注.
1.doDecode在消息非完整的时候返回false.
2.如果doDecode成功解码出一条完整消息则返回true->如果此时buffer中依然有剩余数据则继续执行for->doDecode->直到buffer中的数据不足以解码出一条成功消息返回false.或者恰恰有n条完整的消息->从for跳出.
3.CumulativeProtocolDecoder example

/** *//**
* 解码以CRLF(回车换行)作为结束符的消息
*/
public class CrLfTerminatedCommandLineDecoder

extends CumulativeProtocolDecoder
{

private Command parseCommand(IoBuffer in)
{
// 实现将二进制byte[]转为业务逻辑消息对象Command
}

// 只需实现doDecode方法即可
protected boolean doDecode(
IoSession session, IoBuffer in, ProtocolDecoderOutput out)

throws Exception
{

// 初始位置
int start = in.position();
// 查找'\r\n'标记
byte previous = 0;

while (in.hasRemaining())
{
byte current = in.get();
// 找到了\r\n

if (previous == '\r' && current == '\n')
{
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();

try
{
in.position(start);
in.limit(position);//设置当前的位置为limit

// position和limit之间是一个完整的CRLF消息
out.write(parseCommand(in.slice()));//调用slice方法获得positon和limit之间的子缓冲区->调用write方法加入消息队列(因为网络层一个包可能有多个完整消息)->后经调用flush(遍历消息队列的消息)->nextFilter.messageReceived
filter

} finally
{
// 设置position为解码后的position.limit设置为旧的limit
in.position(position);
in.limit(limit);
}

// 直接返回true.因为在父类的decode方法中doDecode是循环执行的直到不再有完整的消息返回false.
return true;
}
previous = current;
}
// 没有找到\r\n,则重置position并返回false.使得父类decode->for跳出break.
in.position(start);
return false;
}
}
4.DemuxingProtocolDecoder
1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
2.这是一个复合的decoder->多路复用->找到一个合适的MessageDecoder.(不同的消息协议)
3.其doDecode实现为迭代候选的MessageDecoder列表->调用MessageDecoder#decodable方法->如果解码结果为MessageDecoderResult#NOT_OK,则从候选列表移除;如果解码结果为MessageDecoderResult#NEED_DATA,则保留该候选decoder并在更多数据到达的时候会再次调用decodable;如果返回结果为MessageDecoderResult#OK,则表明找到了正确的decoder;如果没有剩下任何的候选decoder,则抛出异常.
4.doDecode源码

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception
{
// 从Session中获取一个State.State包含一个MessageDecoder数组以及一个当前的decoder
State state = getState(session);

// 如果当前decoder为空

if (state.currentDecoder == null)
{
MessageDecoder[] decoders = state.decoders;
int undecodables = 0;
// 遍历decoder候选列表

for (int i = decoders.length - 1; i >= 0; i--)
{
MessageDecoder decoder = decoders[i];
int limit = in.limit();
int pos = in.position();

MessageDecoderResult result;


try
{
// 执行decodable方法并返回result(decodable方法是检查特定的buffer是否可以decoder解码)
result = decoder.decodable(session, in);

} finally
{
// 一定要重置回旧的position和limit
in.position(pos);
in.limit(limit);
}


if (result == MessageDecoder.OK)
{
// 如果返回结果为OK,则设置为state的当前decoder并break
state.currentDecoder = decoder;
break;

} else if (result == MessageDecoder.NOT_OK)
{
// 如果返回结果为NOT_OK,则记录undecodables数目++
undecodables++;

} else if (result != MessageDecoder.NEED_DATA)
{
// 如果结果都不是,即也不是NEED_DATA,则直接抛出异常
throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
}
}

// 如果没有找到合适的decoder,则抛出异常

if (undecodables == decoders.length)
{
// Throw an exception if all decoders cannot decode data.
String dump = in.getHexDump();
in.position(in.limit()); // 跳过这段数据
ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
e.setHexdump(dump);
throw e;
}
// 迭代结束,如果还没有找到合适的decoder则表示可能需要更多的数据->所以返回false->跳出父类的for-dodecode循环

if (state.currentDecoder == null)
{
// Decoder is not determined yet (i.e. we need more data)
return false;
}
}

// 这里表示已找到合适的decoder,调用decode方法进行解码二进制或者特定的协议数据为更高业务层的消息对象

try
{
MessageDecoderResult result = state.currentDecoder.decode(session, in, out);

if (result == MessageDecoder.OK)
{
// 重置为null
state.currentDecoder = null;
return true;

} else if (result == MessageDecoder.NEED_DATA)
{
return false;

} else if (result == MessageDecoder.NOT_OK)
{
state.currentDecoder = null;
throw new ProtocolDecoderException("Message decoder returned NOT_OK.");

} else
{
state.currentDecoder = null;
throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
}

} catch (Exception e)
{
state.currentDecoder = null;
throw e;
}
}
5.一个特定消息协议的编解码的例子,{@link org.apache.mina.example.sumup}
1.AbstractMessageEncoder

/** *//**
* 1.编码消息头,消息体编码由子类实现.
* 2.AbstractMessage中只有一个sequence字段
*/

public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T>
{
// 类型字段
private final int type;


protected AbstractMessageEncoder(int type)
{
this.type = type;
}


public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception
{
IoBuffer buf = IoBuffer.allocate(16);
buf.setAutoExpand(true); // Enable auto-expand for easier encoding

// 编码消息头
buf.putShort((short) type);//type字段占2个字节(short)
buf.putInt(message.getSequence());// sequence字段占4个字节(int)

// 编码消息体,由子类实现
encodeBody(session, message, buf);
buf.flip();
out.write(buf);
}

// 子类实现编码消息体
protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
}
2.AbstractMessageDecoder

/** *//**
* 解码消息头,消息体由子类实现解码
*/

public abstract class AbstractMessageDecoder implements MessageDecoder
{
private final int type;

private int sequence;

private boolean readHeader;


protected AbstractMessageDecoder(int type)
{
this.type = type;
}

// 需覆写decodable方法,检查解码结果

public MessageDecoderResult decodable(IoSession session, IoBuffer in)
{
// HEADER_LEN为type+sequence的长度,共占6个字节.如果此时buffer剩余数据不足header的长度,则返回NEED_DATA的result.

if (in.remaining() < Constants.HEADER_LEN)
{
return MessageDecoderResult.NEED_DATA;
}

// 第一个if判断ok->读取2字节(short),如果和type匹配则返回OK.

if (type == in.getShort())
{
return MessageDecoderResult.OK;
}

// 两个if判断都不ok,则返回NOT_OK
return MessageDecoderResult.NOT_OK;
}
// 终极解码
public MessageDecoderResult decode(IoSession session, IoBuffer in,

ProtocolDecoderOutput out) throws Exception
{
// 如果header数据已ok且消息体数据不足则下次直接略过

if (!readHeader)
{
in.getShort(); // Skip 'type'.
sequence = in.getInt(); // Get 'sequence'.
readHeader = true;
}

// 解码消息体,如果数据不足以解析消息体,则返回null
AbstractMessage m = decodeBody(session, in);
// 消息数据体数据不足->返回NEED_DATA

if (m == null)
{
return MessageDecoderResult.NEED_DATA;

} else
{
readHeader = false; // 成功解码出一条完成消息,则重置readHeader->下次继续读取header
}
m.setSequence(sequence);
out.write(m);

return MessageDecoderResult.OK;
}


/** *//**
* 数据完整不足以解析整个消息体则返回null
*/
protected abstract AbstractMessage decodeBody(IoSession session,
IoBuffer in);
}
3.AddMessageEncoder

/** *//**
* 1.AddMessage的encoder.AddMessage继承自AbstractMessage,又增加了一个字段value
* 2.该encoder的type为Constants.ADD,值为1
*/

public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T>
{

public AddMessageEncoder()
{
super(Constants.ADD);
}

@Override

protected void encodeBody(IoSession session, T message, IoBuffer out)
{ // 实现了编码消息体,向buffer追加了AddMessage的消息体value(4个字节-int)
out.putInt(message.getValue());
}


public void dispose() throws Exception
{
}
}
4.AddMessageDecoder

/** *//**
* AddMessage的decoder.type为Constants.ADD(1)
*/

public class AddMessageDecoder extends AbstractMessageDecoder
{


public AddMessageDecoder()
{
super(Constants.ADD);
}

@Override

protected AbstractMessage decodeBody(IoSession session, IoBuffer in)
{ // ADD_BODY_LEN为AddMessage的消息体长度(value属性),即为4字节(int),如果此时不足4字节,则返回null,表示body数据不足

if (in.remaining() < Constants.ADD_BODY_LEN)
{
return null;
}

AddMessage m = new AddMessage();
m.setValue(in.getInt());// 读取一个int
return m;
}

public void finishDecode(IoSession session, ProtocolDecoderOutput out)

throws Exception
{
}
}
6.总结:使用CumulativeProtocolDecoder可以方便的进行特定消息协议的消息解码并完美的解决了'粘包'问题.另外DemuxingProtocolDecoder结合MessageDecoder可更完美实现解码方案.
posted on 2013-12-02 18:55
landon 阅读(3376)
评论(2) 编辑 收藏 所属分类:
Sources