Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。 Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty 本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。 一个好的协议有两个标准: (1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。 (2)传输数据和业务对象之间的转换速度要快。 (友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen) 一、协议的定义 无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。 (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节: 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte) (2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String> 数据格式定义: 字段1键名长度 字段1键名 字段1值长度 字段1值 字段2键名长度 字段2键名 字段2值长度 字段2值 字段3键名长度 字段3键名 字段3值长度 字段3值 … … … … 长度为整型,占4个字节 代码中用两个Vo对象来表示:XLRequest和XLResponse。 1package org.jboss.netty.example.xlsvr.vo; 2 3import java.util.HashMap; 4import java.util.Map; 5 6/** *//** 7 * @author hankchen 10 * 2012-2-3 下午02:46:52 11 */ 12 13 14/** *//** 15 * 响应数据 16 */ 17 18/** *//** 19 * 通用协议介绍 20 * 21 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后 22 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节: 23 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte) 24 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String> 25 * 数据格式定义: 26 * 字段1键名长度 字段1键名 字段1值长度 字段1值 27 * 字段2键名长度 字段2键名 字段2值长度 字段2值 28 * 字段3键名长度 字段3键名 字段3值长度 字段3值 29 * … … … … 30 * 长度为整型,占4个字节 31 */ 32public class XLResponse { 33 private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1 34 private byte encrypt;// 加密类型。0表示不加密 35 private byte extend1;// 用于扩展协议。暂未定义任何值 36 private byte extend2;// 用于扩展协议。暂未定义任何值 37 private int sessionid;// 会话ID 38 private int result;// 结果码 39 private int length;// 数据包长 40 41 private Map<String,String> values=new HashMap<String, String>(); 42 43 private String ip; 44 45 public void setValue(String key,String value){ 46 values.put(key, value); 47 } 48 49 public String getValue(String key){ 50 if (key==null) { 51 return null; 52 } 53 return values.get(key); 54 } 55 56 public byte getEncode() { 57 return encode; 58 } 59 60 public void setEncode(byte encode) { 61 this.encode = encode; 62 } 63 64 public byte getEncrypt() { 65 return encrypt; 66 } 67 68 public void setEncrypt(byte encrypt) { 69 this.encrypt = encrypt; 70 } 71 72 public byte getExtend1() { 73 return extend1; 74 } 75 76 public void setExtend1(byte extend1) { 77 this.extend1 = extend1; 78 } 79 80 public byte getExtend2() { 81 return extend2; 82 } 83 84 public void setExtend2(byte extend2) { 85 this.extend2 = extend2; 86 } 87 88 public int getSessionid() { 89 return sessionid; 90 } 91 92 public void setSessionid(int sessionid) { 93 this.sessionid = sessionid; 94 } 95 96 public int getResult() { 97 return result; 98 } 99 100 public void setResult(int result) { 101 this.result = result; 102 } 103 104 public int getLength() { 105 return length; 106 } 107 108 public void setLength(int length) { 109 this.length = length; 110 } 111 112 public Map<String, String> getValues() { 113 return values; 114 } 115 116 public String getIp() { 117 return ip; 118 } 119 120 public void setIp(String ip) { 121 this.ip = ip; 122 } 123 124 public void setValues(Map<String, String> values) { 125 this.values = values; 126 } 127 128 @Override 129 public String toString() { 130 return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2 131 + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]"; 132 } 133} 1package org.jboss.netty.example.xlsvr.vo; 2 3import java.util.HashMap; 4import java.util.Map; 5 6/** *//** 7 * @author hankchen 8 * 2012-2-3 下午02:46:41 9 */ 10 11/** *//** 12 * 请求数据 13 */ 14 15/** *//** 16 * 通用协议介绍 17 * 18 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后 19 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节: 20 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte) 21 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String> 22 * 数据格式定义: 23 * 字段1键名长度 字段1键名 字段1值长度 字段1值 24 * 字段2键名长度 字段2键名 字段2值长度 字段2值 25 * 字段3键名长度 字段3键名 字段3值长度 字段3值 26 * … … … … 27 * 长度为整型,占4个字节 28 */ 29public class XLRequest { 30 private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1 31 private byte encrypt;// 加密类型。0表示不加密 32 private byte extend1;// 用于扩展协议。暂未定义任何值 33 private byte extend2;// 用于扩展协议。暂未定义任何值 34 private int sessionid;// 会话ID 35 private int command;// 命令 36 private int length;// 数据包长 37 38 private Map<String,String> params=new HashMap<String, String>(); //参数 39 40 private String ip; 41 42 public byte getEncode() { 43 return encode; 44 } 45 46 public void setEncode(byte encode) { 47 this.encode = encode; 48 } 49 50 public byte getEncrypt() { 51 return encrypt; 52 } 53 54 public void setEncrypt(byte encrypt) { 55 this.encrypt = encrypt; 56 } 57 58 public byte getExtend1() { 59 return extend1; 60 } 61 62 public void setExtend1(byte extend1) { 63 this.extend1 = extend1; 64 } 65 66 public byte getExtend2() { 67 return extend2; 68 } 69 70 public void setExtend2(byte extend2) { 71 this.extend2 = extend2; 72 } 73 74 public int getSessionid() { 75 return sessionid; 76 } 77 78 public void setSessionid(int sessionid) { 79 this.sessionid = sessionid; 80 } 81 82 public int getCommand() { 83 return command; 84 } 85 86 public void setCommand(int command) { 87 this.command = command; 88 } 89 90 public int getLength() { 91 return length; 92 } 93 94 public void setLength(int length) { 95 this.length = length; 96 } 97 98 public Map<String, String> getParams() { 99 return params; 100 } 101 102 public void setValue(String key,String value){ 103 params.put(key, value); 104 } 105 106 public String getValue(String key){ 107 if (key==null) { 108 return null; 109 } 110 return params.get(key); 111 } 112 113 public String getIp() { 114 return ip; 115 } 116 117 public void setIp(String ip) { 118 this.ip = ip; 119 } 120 121 public void setParams(Map<String, String> params) { 122 this.params = params; 123 } 124 125 @Override 126 public String toString() { 127 return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2 128 + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]"; 129 } 130} 131 二、协议的编码和解码 对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。 1package org.jboss.netty.example.xlsvr.codec; 2 3import java.nio.ByteBuffer; 4 5import org.jboss.netty.buffer.ChannelBuffer; 6import org.jboss.netty.buffer.ChannelBuffers; 7import org.jboss.netty.channel.ChannelHandlerContext; 8import org.jboss.netty.channel.Channels; 9import org.jboss.netty.channel.MessageEvent; 10import org.jboss.netty.channel.SimpleChannelDownstreamHandler; 11import org.jboss.netty.example.xlsvr.util.ProtocolUtil; 12import org.jboss.netty.example.xlsvr.vo.XLResponse; 13import org.slf4j.Logger; 14import org.slf4j.LoggerFactory; 15 16/** *//** 17 * @author hankchen 18 * 2012-2-3 上午10:48:15 19 */ 20 21/** *//** 22 * 服务器端编码器 23 */ 24public class XLServerEncoder extends SimpleChannelDownstreamHandler { 25 Logger logger=LoggerFactory.getLogger(XLServerEncoder.class); 26 27 @Override 28 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 29 XLResponse response=(XLResponse)e.getMessage(); 30 ByteBuffer headBuffer=ByteBuffer.allocate(16); 31 /** *//** 32 * 先组织报文头 33 */ 34 headBuffer.put(response.getEncode()); 35 headBuffer.put(response.getEncrypt()); 36 headBuffer.put(response.getExtend1()); 37 headBuffer.put(response.getExtend2()); 38 headBuffer.putInt(response.getSessionid()); 39 headBuffer.putInt(response.getResult()); 40 41 /** *//** 42 * 组织报文的数据部分 43 */ 44 ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues()); 45 int length=dataBuffer.readableBytes(); 46 headBuffer.putInt(length); 47 /** *//** 48 * 非常重要 49 * ByteBuffer需要手动flip(),ChannelBuffer不需要 50 */ 51 headBuffer.flip(); 52 ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer(); 53 totalBuffer.writeBytes(headBuffer); 54 logger.info("totalBuffer size="+totalBuffer.readableBytes()); 55 totalBuffer.writeBytes(dataBuffer); 56 logger.info("totalBuffer size="+totalBuffer.readableBytes()); 57 Channels.write(ctx, e.getFuture(), totalBuffer); 58 } 59 60} 61 1package org.jboss.netty.example.xlsvr.codec; 2 3import org.jboss.netty.buffer.ChannelBuffer; 4import org.jboss.netty.buffer.ChannelBuffers; 5import org.jboss.netty.channel.Channel; 6import org.jboss.netty.channel.ChannelHandlerContext; 7import org.jboss.netty.example.xlsvr.util.ProtocolUtil; 8import org.jboss.netty.example.xlsvr.vo.XLResponse; 9import org.jboss.netty.handler.codec.frame.FrameDecoder; 10 11/** *//** 12 * @author hankchen 13 * 2012-2-3 上午10:47:54 14 */ 15 16/** *//** 17 * 客户端解码器 18 */ 19public class XLClientDecoder extends FrameDecoder { 20 21 @Override 22 protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception { 23 if (buffer.readableBytes()<16) { 24 return null; 25 } 26 buffer.markReaderIndex(); 27 byte encode=buffer.readByte(); 28 byte encrypt=buffer.readByte(); 29 byte extend1=buffer.readByte(); 30 byte extend2=buffer.readByte(); 31 int sessionid=buffer.readInt(); 32 int result=buffer.readInt(); 33 int length=buffer.readInt(); // 数据包长 34 if (buffer.readableBytes()<length) { 35 buffer.resetReaderIndex(); 36 return null; 37 } 38 ChannelBuffer dataBuffer=ChannelBuffers.buffer(length); 39 buffer.readBytes(dataBuffer, length); 40 41 XLResponse response=new XLResponse(); 42 response.setEncode(encode); 43 response.setEncrypt(encrypt); 44 response.setExtend1(extend1); 45 response.setExtend2(extend2); 46 response.setSessionid(sessionid); 47 response.setResult(result); 48 response.setLength(length); 49 response.setValues(ProtocolUtil.decode(encode, dataBuffer)); 50 response.setIp(ProtocolUtil.getClientIp(channel)); 51 return response; 52 } 53 54} 1package org.jboss.netty.example.xlsvr.util; 2 3import java.net.SocketAddress; 4import java.nio.charset.Charset; 5import java.util.HashMap; 6import java.util.Map; 7import java.util.Map.Entry; 8 9import org.jboss.netty.buffer.ChannelBuffer; 10import org.jboss.netty.buffer.ChannelBuffers; 11import org.jboss.netty.channel.Channel; 12 13/** *//** 14 * @author hankchen 15 * 2012-2-4 下午01:57:33 16 */ 17public class ProtocolUtil { 18 19 /** *//** 20 * 编码报文的数据部分 21 * @param encode 22 * @param values 23 * @return 24 */ 25 public static ChannelBuffer encode(int encode,Map<String,String> values){ 26 ChannelBuffer totalBuffer=null; 27 if (values!=null && values.size()>0) { 28 totalBuffer=ChannelBuffers.dynamicBuffer(); 29 int length=0,index=0; 30 ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()]; 31 Charset charset=XLCharSetFactory.getCharset(encode); 32 for(Entry<String,String> entry:values.entrySet()){ 33 String key=entry.getKey(); 34 String value=entry.getValue(); 35 ChannelBuffer buffer=ChannelBuffers.dynamicBuffer(); 36 buffer.writeInt(key.length()); 37 buffer.writeBytes(key.getBytes(charset)); 38 buffer.writeInt(value.length()); 39 buffer.writeBytes(value.getBytes(charset)); 40 channelBuffers[index++]=buffer; 41 length+=buffer.readableBytes(); 42 } 43 44 for (int i = 0; i < channelBuffers.length; i++) { 45 totalBuffer.writeBytes(channelBuffers[i]); 46 } 47 } 48 return totalBuffer; 49 } 50 51 /** *//** 52 * 解码报文的数据部分 53 * @param encode 54 * @param dataBuffer 55 * @return 56 */ 57 public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){ 58 Map<String,String> dataMap=new HashMap<String, String>(); 59 if (dataBuffer!=null && dataBuffer.readableBytes()>0) { 60 int processIndex=0,length=dataBuffer.readableBytes(); 61 Charset charset=XLCharSetFactory.getCharset(encode); 62 while(processIndex<length){ 63 /** *//** 64 * 获取Key 65 */ 66 int size=dataBuffer.readInt(); 67 byte [] contents=new byte [size]; 68 dataBuffer.readBytes(contents); 69 String key=new String(contents, charset); 70 processIndex=processIndex+size+4; 71 /** *//** 72 * 获取Value 73 */ 74 size=dataBuffer.readInt(); 75 contents=new byte [size]; 76 dataBuffer.readBytes(contents); 77 String value=new String(contents, charset); 78 dataMap.put(key, value); 79 processIndex=processIndex+size+4; 80 } 81 } 82 return dataMap; 83 } 84 85 /** *//** 86 * 获取客户端IP 87 * @param channel 88 * @return 89 */ 90 public static String getClientIp(Channel channel){ 91 /** *//** 92 * 获取客户端IP 93 */ 94 SocketAddress address = channel.getRemoteAddress(); 95 String ip = ""; 96 if (address != null) { 97 ip = address.toString().trim(); 98 int index = ip.lastIndexOf(':'); 99 if (index < 1) { 100 index = ip.length(); 101 } 102 ip = ip.substring(1, index); 103 } 104 if (ip.length() > 15) { 105 ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15)); 106 } 107 return ip; 108 } 109} 110 三、服务器端实现 服务器端提供的功能是: 1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。 2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。 为了展示多协议的运用,这里客户端的请求采用的是基于问本行(\n\r)的协议。 具体代码如下: 1package org.jboss.netty.example.xlsvr; 2 3import java.net.InetSocketAddress; 4import java.util.concurrent.Executors; 5 6import org.jboss.netty.bootstrap.ServerBootstrap; 7import org.jboss.netty.channel.Channel; 8import org.jboss.netty.channel.ChannelPipeline; 9import org.jboss.netty.channel.group.ChannelGroup; 10import org.jboss.netty.channel.group.ChannelGroupFuture; 11import org.jboss.netty.channel.group.DefaultChannelGroup; 12import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 13import org.jboss.netty.example.xlsvr.codec.XLServerEncoder; 14import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; 15import org.jboss.netty.handler.codec.frame.Delimiters; 16import org.jboss.netty.handler.codec.string.StringDecoder; 17import org.jboss.netty.util.CharsetUtil; 18import org.slf4j.Logger; 19import org.slf4j.LoggerFactory; 20 21/** *//** 22 * @author hankchen 23 * 2012-1-30 下午03:21:38 24 */ 25 26public class XLServer { 27 public static final int port =8080; 28 public static final Logger logger=LoggerFactory.getLogger(XLServer.class); 29 public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer"); 30 private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); 31 32 public static void main(String [] args){ 33 try { 34 XLServer.startup(); 35 } catch (Exception e) { 36 e.printStackTrace(); 37 } 38 } 39 40 public static boolean startup() throws Exception{ 41 /** *//** 42 * 采用默认ChannelPipeline管道 43 * 这意味着同一个XLServerHandler实例将被多个Channel通道共享 44 * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能! 45 */ 46 ChannelPipeline pipeline=serverBootstrap.getPipeline(); 47 /** *//** 48 * 解码器是基于文本行的协议,\r\n或者\n\r 49 */ 50 pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter())); 51 pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); 52 pipeline.addLast("encoder", new XLServerEncoder()); 53 pipeline.addLast("handler", new XLServerHandler()); 54 55 serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀 56 serverBootstrap.setOption("child.keepAlive", true); //注意child前缀 57 58 /** *//** 59 * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象 60 */ 61 Channel channel=serverBootstrap.bind(new InetSocketAddress(port)); 62 allChannels.add(channel); 63 logger.info("server is started on port "+port); 64 return false; 65 } 66 67 public static void shutdown() throws Exception{ 68 try { 69 /** *//** 70 * 主动关闭服务器 71 */ 72 ChannelGroupFuture future=allChannels.close(); 73 future.awaitUninterruptibly();//阻塞,直到服务器关闭 74 //serverBootstrap.releaseExternalResources(); 75 } catch (Exception e) { 76 e.printStackTrace(); 77 logger.error(e.getMessage(),e); 78 } 79 finally{ 80 logger.info("server is shutdown on port "+port); 81 System.exit(1); 82 } 83 } 84} 85 1package org.jboss.netty.example.xlsvr; 2 3import java.util.Random; 4 5import org.jboss.netty.channel.Channel; 6import org.jboss.netty.channel.ChannelFuture; 7import org.jboss.netty.channel.ChannelHandlerContext; 8import org.jboss.netty.channel.ChannelHandler.Sharable; 9import org.jboss.netty.channel.ChannelStateEvent; 10import org.jboss.netty.channel.ExceptionEvent; 11import org.jboss.netty.channel.MessageEvent; 12import org.jboss.netty.channel.SimpleChannelHandler; 13import org.jboss.netty.example.xlsvr.vo.XLResponse; 14import org.slf4j.Logger; 15import org.slf4j.LoggerFactory; 16 17/** *//** 18 * @author hankchen 19 * 2012-1-30 下午03:22:24 20 */ 21 22@Sharable 23public class XLServerHandler extends SimpleChannelHandler { 24 private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class); 25 26 @Override 27 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 28 logger.info("messageReceived"); 29 if (e.getMessage() instanceof String) { 30 String content=(String)e.getMessage(); 31 logger.info("content is "+content); 32 if ("shutdown".equalsIgnoreCase(content)) { 33 //e.getChannel().close(); 34 XLServer.shutdown(); 35 }else { 36 sendResponse(ctx); 37 } 38 }else { 39 logger.error("message is not a String."); 40 e.getChannel().close(); 41 } 42 } 43 44 @Override 45 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 46 logger.error(e.getCause().getMessage(),e.getCause()); 47 e.getCause().printStackTrace(); 48 e.getChannel().close(); 49 } 50 51 @Override 52 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 53 logger.info("channelConnected"); 54 sendResponse(ctx); 55 } 56 57 @Override 58 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 59 logger.info("channelClosed"); 60 //删除通道 61 XLServer.allChannels.remove(e.getChannel()); 62 } 63 64 @Override 65 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 66 logger.info("channelDisconnected"); 67 super.channelDisconnected(ctx, e); 68 } 69 70 @Override 71 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 72 logger.info("channelOpen"); 73 //增加通道 74 XLServer.allChannels.add(e.getChannel()); 75 } 76 77 /** *//** 78 * 发送响应内容 79 * @param ctx 80 * @param e 81 * @return 82 */ 83 private ChannelFuture sendResponse(ChannelHandlerContext ctx){ 84 Channel channel=ctx.getChannel(); 85 Random random=new Random(); 86 XLResponse response=new XLResponse(); 87 response.setEncode((byte)0); 88 response.setResult(1); 89 response.setValue("name","hankchen"); 90 response.setValue("time", String.valueOf(System.currentTimeMillis())); 91 response.setValue("age",String.valueOf(random.nextInt())); 92 /** *//** 93 * 发送接收信息的时间戳到客户端 94 * 注意:Netty中所有的IO操作都是异步的! 95 */ 96 ChannelFuture future=channel.write(response); //发送内容 97 return future; 98 } 99} 100 四、客户端实现 客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。 关键代码如下: 1/** *//** 2 * Copyright (C): 2012 3 * @author hankchen 4 * 2012-1-30 下午03:21:26 5 */ 6 7/** *//** 8 * 服务器特征: 9 * 1、使用专用解码器解析服务器发过来的数据 10 * 2、客户端主动关闭连接 11 */ 12public class XLClient { 13 public static final int port =XLServer.port; 14 public static final String host ="localhost"; 15 private static final Logger logger=LoggerFactory.getLogger(XLClient.class); 16 private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); 17 private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory); 18 19 /** *//** 20 * @param args 21 * @throws Exception 22 */ 23 public static void main(String[] args) throws Exception { 24 ChannelFuture future=XLClient.startup(); 25 logger.info("future state is "+future.isSuccess()); 26 } 27 28 /** *//** 29 * 启动客户端 30 * @return 31 * @throws Exception 32 */ 33 public static ChannelFuture startup() throws Exception { 34 /** *//** 35 * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式 36 * 例如,下面的代码形式是错误的: 37 * ChannelPipeline pipeline=clientBootstrap.getPipeline(); 38 * pipeline.addLast("handler", new XLClientHandler()); 39 */ 40 clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置 41 /** *//** 42 * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象 43 */ 44 clientBootstrap.setOption("tcpNoDelay", true); 45 clientBootstrap.setOption("keepAlive", true); 46 47 ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port)); 48 /** *//** 49 * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态 50 */ 51 future.awaitUninterruptibly(); 52 /** *//** 53 * 如果连接失败,我们将打印连接失败的原因。 54 * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。 55 */ 56 if (!future.isSuccess()) { 57 future.getCause().printStackTrace(); 58 }else { 59 logger.info("client is connected to server "+host+":"+port); 60 } 61 return future; 62 } 63 64 /** *//** 65 * 关闭客户端 66 * @param future 67 * @throws Exception 68 */ 69 public static void shutdown(ChannelFuture future) throws Exception{ 70 try { 71 /** *//** 72 * 主动关闭客户端连接,会阻塞等待直到通道关闭 73 */ 74 future.getChannel().close().awaitUninterruptibly(); 75 //future.getChannel().getCloseFuture().awaitUninterruptibly(); 76 /** *//** 77 * 释放ChannelFactory通道工厂使用的资源。 78 * 这一步仅需要调用 releaseExternalResources()方法即可。 79 * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。 80 */ 81 clientBootstrap.releaseExternalResources(); 82 } catch (Exception e) { 83 e.printStackTrace(); 84 logger.error(e.getMessage(),e); 85 } 86 finally{ 87 System.exit(1); 88 logger.info("client is shutdown to server "+host+":"+port); 89 } 90 } 91} 1public class XLClientPipelineFactory implements ChannelPipelineFactory{ 2 3 @Override 4 public ChannelPipeline getPipeline() throws Exception { 5 ChannelPipeline pipeline=Channels.pipeline(); 6 /** *//** 7 * 使用专用的解码器,解决数据分段的问题 8 * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。 9 */ 10 pipeline.addLast("decoder", new XLClientDecoder()); 11 /** *//** 12 * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了! 13 */ 14 pipeline.addLast("handler", new XLClientHandler()); 15 return pipeline; 16 } 17 18} 1/** *//** 2 * Copyright (C): 2012 3 * @author hankchen 4 * 2012-1-30 下午03:21:52 5 */ 6 7/** *//** 8 * 服务器特征: 9 * 1、使用专用的编码解码器,解决数据分段的问题 10 * 2、使用POJO替代ChannelBuffer传输 11 */ 12public class XLClientHandler extends SimpleChannelHandler { 13 private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class); 14 private final AtomicInteger count=new AtomicInteger(0); //计数器 15 16 @Override 17 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 18 processMethod1(ctx, e); //处理方式一 19 } 20 21 /** *//** 22 * @param ctx 23 * @param e 24 * @throws Exception 25 */ 26 public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{ 27 logger.info("processMethod1……,count="+count.addAndGet(1)); 28 XLResponse serverTime=(XLResponse)e.getMessage(); 29 logger.info("messageReceived,content:"+serverTime.toString()); 30 Thread.sleep(1000); 31 32 if (count.get()<10) { 33 //从新发送请求获取最新的服务器时间 34 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again\r\n".getBytes())); 35 }else{ 36 //从新发送请求关闭服务器 37 ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown\r\n".getBytes())); 38 } 39 } 40 41 @Override 42 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 43 logger.info("exceptionCaught"); 44 e.getCause().printStackTrace(); 45 ctx.getChannel().close(); 46 } 47 48 @Override 49 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 50 logger.info("channelClosed"); 51 super.channelClosed(ctx, e); 52 } 53 54 55} 全文代码较多,写了很多注释,希望对读者有用,谢谢!
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
|