1.CumulativeProtocolDecoder A {@link ProtocolDecoder} that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.If the received {@link IoBuffer} is only a part of a message.decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
即解决'粘包'->即一次接收数据不能完全体现一个完整的消息数据->通过应用层数据协议,如协议中通过4字节描述消息大小或以结束符.
2.CumulativeProtocolDecoder#decode实现
/** *//**
* 1.缓存decode中的IoBuffer in至session的attribute
* 2.循环调用doDecode方法直到其返回false
* 3.解码结束后缓存的buffer->压缩
*/
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
// 判断传输层是否存在消息分片,如果不分片则直接doDecode.(可参考TCP/IP详解)
if (!session.getTransportMetadata().hasFragmentation()) {
while (in.hasRemaining()) {
if (!doDecode(session, in, out)) {
break;
}
}
return;
}
boolean usingSessionBuffer = true;
IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
// 如果session中有BUFFER这个attribute则直接执行追加,否则直接用网络层读到的buffer
if (buf != null) {
boolean appended = false;
// Make sure that the buffer is auto-expanded.
if (buf.isAutoExpand()) {
try {
buf.put(in);
appended = true;
} catch (IllegalStateException e) {
// 可能调用了类似slice的方法,会使父缓冲区的自动扩展属性失效(1.可参考AbstractIoBuffer#recapacityAllowed 2.可参考IoBuffer的实现)
} catch (IndexOutOfBoundsException e) {
// 取消了自动扩展属性(可参考IoBuffer实现)
}
}
if (appended) {
// 追加成功的话,直接flip
buf.flip();
} else {
// 因为用了派生的方法(父子缓冲区)如slice或取消了自动扩展而导致追加失败->重新分配一个Buffer
buf.flip();
IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
newBuf.order(buf.order());
newBuf.put(buf);
newBuf.put(in);
newBuf.flip();
buf = newBuf;
// 更新session属性
session.setAttribute(BUFFER, buf);
}
} else {
// 此else表示session无BUFFER属性,直接赋值
buf = in;
usingSessionBuffer = false;
}
// 无限循环直到break 1.doDecode返回false 2.doDecode返回true且buf已无数据 3.异常
for (;;) {
int oldPos = buf.position();
boolean decoded = doDecode(session, buf, out);
if (decoded) {
if (buf.position() == oldPos) {
throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
}
if (!buf.hasRemaining()) {
break;
}
} else {
break;
}
}
// 如果经过decode,buffer依然有剩余数据则存储到session->这样下次decode的时候就可以从session取出buffer并执行追加了
if (buf.hasRemaining()) {
if (usingSessionBuffer && buf.isAutoExpand()) {
// 压缩
buf.compact();
} else {
storeRemainingInSession(buf, session);
}
} else {
if (usingSessionBuffer) {
removeSessionBuffer(session);
}
}
}
注.
1.doDecode在消息非完整的时候返回false.
2.如果doDecode成功解码出一条完整消息则返回true->如果此时buffer中依然有剩余数据则继续执行for->doDecode->直到buffer中的数据不足以解码出一条成功消息返回false.或者恰恰有n条完整的消息->从for跳出.
3.CumulativeProtocolDecoder example
/** *//**
* 解码以CRLF(回车换行)作为结束符的消息
*/
public class CrLfTerminatedCommandLineDecoder
extends CumulativeProtocolDecoder {
private Command parseCommand(IoBuffer in) {
// 实现将二进制byte[]转为业务逻辑消息对象Command
}
// 只需实现doDecode方法即可
protected boolean doDecode(
IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// 初始位置
int start = in.position();
// 查找'\r\n'标记
byte previous = 0;
while (in.hasRemaining()) {
byte current = in.get();
// 找到了\r\n
if (previous == '\r' && current == '\n') {
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();
try {
in.position(start);
in.limit(position);//设置当前的位置为limit
// position和limit之间是一个完整的CRLF消息
out.write(parseCommand(in.slice()));//调用slice方法获得positon和limit之间的子缓冲区->调用write方法加入消息队列(因为网络层一个包可能有多个完整消息)->后经调用flush(遍历消息队列的消息)->nextFilter.messageReceived
filter
} finally {
// 设置position为解码后的position.limit设置为旧的limit
in.position(position);
in.limit(limit);
}
// 直接返回true.因为在父类的decode方法中doDecode是循环执行的直到不再有完整的消息返回false.
return true;
}
previous = current;
}
// 没有找到\r\n,则重置position并返回false.使得父类decode->for跳出break.
in.position(start);
return false;
}
}
4.DemuxingProtocolDecoder
1.public class DemuxingProtocolDecoder extends CumulativeProtocolDecoder
2.这是一个复合的decoder->多路复用->找到一个合适的MessageDecoder.(不同的消息协议)
3.其doDecode实现为迭代候选的MessageDecoder列表->调用MessageDecoder#decodable方法->如果解码结果为MessageDecoderResult#NOT_OK,则从候选列表移除;如果解码结果为MessageDecoderResult#NEED_DATA,则保留该候选decoder并在更多数据到达的时候会再次调用decodable;如果返回结果为MessageDecoderResult#OK,则表明找到了正确的decoder;如果没有剩下任何的候选decoder,则抛出异常.
4.doDecode源码
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
// 从Session中获取一个State.State包含一个MessageDecoder数组以及一个当前的decoder
State state = getState(session);
// 如果当前decoder为空
if (state.currentDecoder == null) {
MessageDecoder[] decoders = state.decoders;
int undecodables = 0;
// 遍历decoder候选列表
for (int i = decoders.length - 1; i >= 0; i--) {
MessageDecoder decoder = decoders[i];
int limit = in.limit();
int pos = in.position();
MessageDecoderResult result;
try {
// 执行decodable方法并返回result(decodable方法是检查特定的buffer是否可以decoder解码)
result = decoder.decodable(session, in);
} finally {
// 一定要重置回旧的position和limit
in.position(pos);
in.limit(limit);
}
if (result == MessageDecoder.OK) {
// 如果返回结果为OK,则设置为state的当前decoder并break
state.currentDecoder = decoder;
break;
} else if (result == MessageDecoder.NOT_OK) {
// 如果返回结果为NOT_OK,则记录undecodables数目++
undecodables++;
} else if (result != MessageDecoder.NEED_DATA) {
// 如果结果都不是,即也不是NEED_DATA,则直接抛出异常
throw new IllegalStateException("Unexpected decode result (see your decodable()): " + result);
}
}
// 如果没有找到合适的decoder,则抛出异常
if (undecodables == decoders.length) {
// Throw an exception if all decoders cannot decode data.
String dump = in.getHexDump();
in.position(in.limit()); // 跳过这段数据
ProtocolDecoderException e = new ProtocolDecoderException("No appropriate message decoder: " + dump);
e.setHexdump(dump);
throw e;
}
// 迭代结束,如果还没有找到合适的decoder则表示可能需要更多的数据->所以返回false->跳出父类的for-dodecode循环
if (state.currentDecoder == null) {
// Decoder is not determined yet (i.e. we need more data)
return false;
}
}
// 这里表示已找到合适的decoder,调用decode方法进行解码二进制或者特定的协议数据为更高业务层的消息对象
try {
MessageDecoderResult result = state.currentDecoder.decode(session, in, out);
if (result == MessageDecoder.OK) {
// 重置为null
state.currentDecoder = null;
return true;
} else if (result == MessageDecoder.NEED_DATA) {
return false;
} else if (result == MessageDecoder.NOT_OK) {
state.currentDecoder = null;
throw new ProtocolDecoderException("Message decoder returned NOT_OK.");
} else {
state.currentDecoder = null;
throw new IllegalStateException("Unexpected decode result (see your decode()): " + result);
}
} catch (Exception e) {
state.currentDecoder = null;
throw e;
}
}
5.一个特定消息协议的编解码的例子,{@link org.apache.mina.example.sumup}
1.AbstractMessageEncoder
/** *//**
* 1.编码消息头,消息体编码由子类实现.
* 2.AbstractMessage中只有一个sequence字段
*/
public abstract class AbstractMessageEncoder<T extends AbstractMessage> implements MessageEncoder<T> {
// 类型字段
private final int type;
protected AbstractMessageEncoder(int type) {
this.type = type;
}
public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception {
IoBuffer buf = IoBuffer.allocate(16);
buf.setAutoExpand(true); // Enable auto-expand for easier encoding
// 编码消息头
buf.putShort((short) type);//type字段占2个字节(short)
buf.putInt(message.getSequence());// sequence字段占4个字节(int)
// 编码消息体,由子类实现
encodeBody(session, message, buf);
buf.flip();
out.write(buf);
}
// 子类实现编码消息体
protected abstract void encodeBody(IoSession session, T message, IoBuffer out);
}
2.AbstractMessageDecoder
/** *//**
* 解码消息头,消息体由子类实现解码
*/
public abstract class AbstractMessageDecoder implements MessageDecoder {
private final int type;
private int sequence;
private boolean readHeader;
protected AbstractMessageDecoder(int type) {
this.type = type;
}
// 需覆写decodable方法,检查解码结果
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
// HEADER_LEN为type+sequence的长度,共占6个字节.如果此时buffer剩余数据不足header的长度,则返回NEED_DATA的result.
if (in.remaining() < Constants.HEADER_LEN) {
return MessageDecoderResult.NEED_DATA;
}
// 第一个if判断ok->读取2字节(short),如果和type匹配则返回OK.
if (type == in.getShort()) {
return MessageDecoderResult.OK;
}
// 两个if判断都不ok,则返回NOT_OK
return MessageDecoderResult.NOT_OK;
}
// 终极解码
public MessageDecoderResult decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
// 如果header数据已ok且消息体数据不足则下次直接略过
if (!readHeader) {
in.getShort(); // Skip 'type'.
sequence = in.getInt(); // Get 'sequence'.
readHeader = true;
}
// 解码消息体,如果数据不足以解析消息体,则返回null
AbstractMessage m = decodeBody(session, in);
// 消息数据体数据不足->返回NEED_DATA
if (m == null) {
return MessageDecoderResult.NEED_DATA;
} else {
readHeader = false; // 成功解码出一条完成消息,则重置readHeader->下次继续读取header
}
m.setSequence(sequence);
out.write(m);
return MessageDecoderResult.OK;
}
/** *//**
* 数据完整不足以解析整个消息体则返回null
*/
protected abstract AbstractMessage decodeBody(IoSession session,
IoBuffer in);
}
3.AddMessageEncoder
/** *//**
* 1.AddMessage的encoder.AddMessage继承自AbstractMessage,又增加了一个字段value
* 2.该encoder的type为Constants.ADD,值为1
*/
public class AddMessageEncoder<T extends AddMessage> extends AbstractMessageEncoder<T> {
public AddMessageEncoder() {
super(Constants.ADD);
}
@Override
protected void encodeBody(IoSession session, T message, IoBuffer out) { // 实现了编码消息体,向buffer追加了AddMessage的消息体value(4个字节-int)
out.putInt(message.getValue());
}
public void dispose() throws Exception {
}
}
4.AddMessageDecoder
/** *//**
* AddMessage的decoder.type为Constants.ADD(1)
*/
public class AddMessageDecoder extends AbstractMessageDecoder {
public AddMessageDecoder() {
super(Constants.ADD);
}
@Override
protected AbstractMessage decodeBody(IoSession session, IoBuffer in) { // ADD_BODY_LEN为AddMessage的消息体长度(value属性),即为4字节(int),如果此时不足4字节,则返回null,表示body数据不足
if (in.remaining() < Constants.ADD_BODY_LEN) {
return null;
}
AddMessage m = new AddMessage();
m.setValue(in.getInt());// 读取一个int
return m;
}
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
}
}
6.总结:使用CumulativeProtocolDecoder可以方便的进行特定消息协议的消息解码并完美的解决了'粘包'问题.另外DemuxingProtocolDecoder结合MessageDecoder可更完美实现解码方案.
posted on 2013-12-02 18:55
landon 阅读(3373)
评论(2) 编辑 收藏 所属分类:
Sources