最近
接口测试的一个项目服务端是使用mina框架写的,顺便
学习了下mina2.0。下面简单介绍下mina框架及测试相关的内容。
一、mina是什么
官方解释:Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件驱动的异步API,使Java NIO在各种传输协议(如TCP/IP,UDP/IP协议等)下快速高效开发。
官网地址:http://mina.apache.org/
源码分析:http://my.oschina.net/ielts0909/blog/90355/
从上图可以看出,mina分为客户端和服务端,客户端建立连接,同时开启一个IoProcessor线程,服务端监听连接,客户端和服务端的连接周期由IoSession管理,IoFilter用来过滤消息,而IoHandler则主要用于业务的处理。
三、主要的类
1、IoService
IoService是创建服务的顶层接口,无论客户端还是服务端,都是从它继承实现的。
以下是创建服务端的代码
try { acceptor = new NioSocketAcceptor();\\创建一个服务端 acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); \\绑定一个解码器 acceptor.getFilterChain().addLast("logger",new LoggingFilter()); \\绑定一个 日志处理器 acceptor.getSessionConfig().setReadBufferSize(2048);\\设置读缓冲区大小 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);\\设置读写空闲进入时间 acceptor.setHandler(new IoHandlerAdapter());\\设置业务处理器 acceptor.bind(new InetSocketAddress(port));\\绑定端口 logger.info("服务端启动成功... 端口号为:" + port); } catch (Exception e) { logger.error("服务端启动异常....", e); e.printStackTrace(); } |
不难看出,要建立一个服务端的代码很简单,这比Java NIO或单纯用socket编程来的简单很多。
2、IoProcessor
对于一个IoAcceptor或IoConnector线程对应一个IoProcessor线程用于IO的处理,这个IoProcessor线程从IoProcessor线程池中取出。IoProcessor线程池的大小默认为机器的CPU核数+1,例如双核机器的IoProcessor的线程池大小默认为3,通过添加ExecutorFilter可以设置线程池,每个IoProcessor管理多个IoSession
acceptor.getFilterChain().addLast("threadpool",new ExecutorFilter(corePoolSize, maximumPoolSize, keepAliveTime,unit)
3、IoSession
IoSession是用来保持IoService的上下文,一个IoService在建立连接之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止。
主要功能:
1、管理连接(session.getSessionConfig.XX)。这里的管理连接并不是直接去控制我们上次讲的最底层的连接acceptor和connector。如果acceptor和connector建立的一条管道,那session就是在管道内的管理者,他是没有办法将管道对半拆分开的,他只能从内部阻断两边的通信。管理连接还有部分就是可以配置缓冲区的大小,闲置时间等等。
2、存储信息(session.setAttribute())。和web里的session一样,这里的session也有存储attribute的功能,不过一般来说,这里存储的都是和连接有关的东西,并不会像web开发一样存一些业务上的东西。
3、驱动读写操作。如session.write()。
4、统计功能。Session还记录了连接中的byte、message等数量。
4、IoHandler
public interface IoHandler { void sessionCreated(IoSession session) throws Exception; void sessionOpened(IoSession session) throws Exception; void sessionClosed(IoSession session) throws Exception; void sessionIdle(IoSession session, IdleStatus status) throws Exception; void exceptionCaught(IoSession session, Throwable cause) throws Exception; void messageReceived(IoSession session, Object message) throws Exception; void messageSent(IoSession session, Object message) throws Exception; } |
一般情况下,我们最关心的只有messageReceived方法,接收消息并处理,然后调用IoSession的write方法发送出消息。一般情况下很少有人实现IoHandler接口,而是继承它的一个实现类IoHandlerAdapter,这样不用覆盖它的7个方法,只需要根据具体需求覆盖其中的几个方法就可以。
5、IoFilter
Mina最主要的工作就是把底层传输的字节码转换为Java对象,提供给应用程序;或者把应用程序返回的结果转换为字节码,交给底层传输。这些都是由IoFilter完成的。
Filter,过滤器的意思。IoFilter,I/O操作的过滤器。IoFilter和Servlet中的过滤器一样,主要用于拦截和过滤网络传输中I/O操作的各种消息。
IoService实例会绑定一个DefaultIoFilterChainBuilder ---- 过滤器链,我们把自定义的各种过滤器(IoFilter)自由的插放在这个过滤器链上了。
Mina中自带的解码器:
CumulativeProtocolDecoder 累积性解码器
SynchronizedProtocolDecoder 这个解码器用于将任何一个解码器包装为一个线程安全的解码器,用于解决每次执行decode()方法时可能线程不是上一次的线程的问题,但这样会在高并发时,大大降低系统的性能。
TextLineDecoder 按照文本的换行符( Windows:\r\n 、Linux:\n、Mac:\r)解码数据。
如何自定义编解码?
1.继承相关编解码器类,重写实现doDecode/doEncode方法
2.重写ProtocolCodecFactory工厂类
三、如何测试mina的服务端
1、创建自己的mina客户端
public IoConnector creatClient() { NioSocketConnector connector = null; try { connector = new NioSocketConnector(); connector.getSessionConfig().setReadBufferSize(1024 * 1024 * 5); connector.getSessionConfig().setBothIdleTime(10); connector.getSessionConfig().setKeepAlive(true); connector.setHandler(new MyIoHandler()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DecodeFactory())); connector.setConnectTimeoutMillis(5000); } catch (Exception e) { e.printStackTrace(); } return connector; } |
2、重写Hanlder类,其中最重要的messageReciever方法需要先写好如何处理收到的数据,包括断言等
@Override public void sessionCreated(IoSession session) throws Exception { logger.info("服务端与客户端创建连接..."); } @Override public void sessionOpened(IoSession session) throws Exception { logger.info("服务端与客户端连接打开..."+ "当前第" + session.getId() + "个客户端"); } @Override public void messageReceived(IoSession session, Object message) throws Exception { if (message instanceof IoBuffer) { ServerResponse.getResponseInfo(session,(IoBuffer)message); } } @Override public void messageSent(IoSession session, Object message) throws Exception { logger.info("服务端发送信息成功..."); } @Override public void sessionClosed(IoSession session) throws Exception { logger.info("服务端与客户端连接断开..."); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.info("服务端进入空闲状态..."); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("服务端发送异常...", cause); } |
3、针对服务端通信规则重写编解码方法,支持断包粘包
public class ClientDecoder extends CumulativeProtocolDecoder { public static Logger logger = Logger.getLogger(ClientDecoder.class); @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(0).setAutoExpand(true); IoBuffer buffer = IoBuffer.allocate(0).setAutoExpand(true); byte [] head = new byte[16]; logger.info(in.remaining()); if (in.remaining() > 0) { int msgLen = 16;// 16字节判断消息长度 if (in.remaining() > msgLen) { in.mark(); in.get(head, 0, 16); buf.put(XorParamCodec.decryptXOR(head)); buf.flip(); int length=buf.getInt(); if (length - msgLen > in.remaining()) { in.reset(); return false; } else { in.position(0); while (in.hasRemaining()) { buffer.put(in.get()); } buffer.flip(); out.write(buffer); return false; } } } return false; } } |
4、编写Test发送数据
@Before public void beforeTest() { ip = PropertiesHandle.readValue("ip"); port = Integer.valueOf(PropertiesHandle.readValue("port")); mmc = new MyMinaClient(); } @Test public void testUpdate() throws Exception { IoConnector connector = mmc.creatClient(); IoSession session = mmc.getIoSession(connector, ip, port); mmc.sendMsg(session, ClientRegister.getRegisterInfo()); \\ClientRegister和ClientUpdate类是客户端发送的数据 mmc.sendMsg(session, ClientUpdate.getUpdateInfo()); mmc.close(session, connector); Assert.assertEquals(AcctGuardData.Registerr_Rec, HeaderInfo.response); } |