netty从4.x开始,已经不再是jboss的一部分,所以引包中,发现还有org.jboss.netty.*等字样说明你还用的是3.x即以下版本,现在已经到5.0了该更新了,新的包名统一为io.netty.*开头
5.0以后内容:
netty初体验,netty是个高性能的java通信框架,至于oio还是nio,它都支持,核心概念:基于事件驱动的架构,很容易让人联想到观察者模式。它提供的数据结构为ByteBuf,这个是个什么东西?可以理解为:一个数据的载体,比如我接收和发送的消息,得到的都是ByteBuf对象,它是对字节流的一个高度抽象化,并提供比NIO的ByteBuffer更多的功能,不至于同一个buffer中经常操作,flip,compact等方法,更为简洁实用,同时如果客户端和服务端都采用java,那么它也可以提供自定义的object类型的数据载体。netty官方提供的example很多,客户端和服务端如何写,照猫画虎即可。
把握住两个关键点即可:*handler和传递的内容(即发送和接收的消息),*handler里面包含具体事件的触发方法:比如exceptionCaught方法(出现异常时)、messageReceived方法(接收消息时)、channelActive(连接刚建立时)等方法,采用最新的SimpleChannelInboundHandler<T> 这个handler,T可以为自定义的任何对象,如果不需要自定义对象,那么传递Object即可,如果是自定义对象或者java基本类型或String类型,那么必须得有个大前提:客户端和服务端都必须得进行一定的转换,换句话说:我的客户端和服务端必须都得用netty的相关API封装一次(具体看netty例子)。如果不是自定义对象,而传递的是Object,那么在收到消息时,必须进行强制转换为ByteBuf对象,通过ChannelHandlerContext进行发送,这个时候发送的是ByteBuf对象,如果是自定义对象,那么ChannelHandlerContext.write(自定义对象)即可,同时必须调用flush方法才能发送出去,也可调用wirteAndFlush(自定义对象)方法。
深究了两天netty,得出的结论是:如果客户端和服务端都基于netty,那么互发消息,各种类型协议消息,基本都不成问题。官方example很多,照猫画虎,自定义随便玩
但是,如果我只用netty的服务端,而客户端是一个纯粹的socket,比如其它语言的客户端,比如纯粹只是一个硬件,进行socket连接等等,即不采用netty的API,而且不是java语言,那么就会有一些问题。
首先我用java的oio和nio写了2个极其简单的客户端不使用netty,同时netty服务端要给返回响应,先看两个极其简单的客户端代码:
1
//首先oio客户端
2
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException
{
3
Socket socket = new Socket("127.0.0.1",8080);
4
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
5
BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
6
bw.write("hello world!");
7
bw.flush();
8
9
while(true)
{
10
String str = br.readLine();
11
System.out.println(str);
Thread.sleep(2000);
}
12
}
1
//其次nio客户端,其实也可改成阻塞,效果一样
2
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException
{
3
4
SocketChannel socket = SocketChannel.open();
5
boolean status = socket.connect(new InetSocketAddress("127.0.0.1",8080) );
6
System.out.println(status);
7
socket.configureBlocking(false);
8
// Selector selection = Selector.open();
9
String msg = "hello world!";
10
ByteBuffer buffer = ByteBuffer.allocate(msg.getBytes().length);
11
buffer.put(msg.getBytes());
12
buffer.flip();
13
socket.write(buffer);
14
buffer.compact();
15
16
17
18
Thread.sleep(1000000);
19
} netty服务端的handler是DiscardServerHandler extends SimpleChannelInboundHandler<Object> 里面的messageReceived方法
1
@Override
2
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception
{
3
// discard
4
ByteBuf buffer = (ByteBuf) msg;
5
byte[] bytes = new byte[buffer.readableBytes()];
6
System.out.println("readableBytes="+buffer.readableBytes());
7
buffer.readBytes(bytes);
8
String str = new String(bytes);
9
logger.info("=========="+str+"==============");
10
11
boolean close =false;
12
if(str.equals("bye"))
{
13
close = true;
14
}
15
16
System.out.println("buffer capacity="+buffer.capacity() +" str length="+str.length()
17
+"readableBytes="+buffer.readableBytes());
18
String str1 = "hi I'am server this is my info : @111111@";
19
buffer.writeBytes(str1.getBytes());
20
ctx.writeAndFlush(buffer);
21
// ChannelFuture future = ctx.writeAndFlush(buffer);
22
23
// Close the connection after sending 'Have a good day!'
24
// if the client has sent 'bye'.
25
// if (close) {
26
// future.addListener(ChannelFutureListener.CLOSE);
27
// }
28
}
服务端,客户端也能收到消息,但服务端抛出以下异常:
警告: Unexpected exception from downstream.
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:115)
at io.netty.buffer.WrappedByteBuf.release(WrappedByteBuf.java:819)
at io.netty.buffer.SimpleLeakAwareByteBuf.release(SimpleLeakAwareByteBuf.java:34)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:68)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:110)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:74)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:138)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:320)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:127)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:794)
at java.lang.Thread.run(Unknown Source)
原因是:refCnt=0了,表明使用的这个ByteBuf已经被回收了,代码中调用ctx.writeAndFlush(buff)会使此次ByteBuf回收也即将refCnt置为0,那么在SimpleChannelInboundHandler里面,会接着调用代码如下:
1
@Override
2
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
3
boolean release = true;
4
try
{
5
if (acceptInboundMessage(msg))
{
6
@SuppressWarnings("unchecked")
7
I imsg = (I) msg;
8
messageReceived(ctx, imsg);
9
} else
{
10
release = false;
11
ctx.fireChannelRead(msg);
12
}
13
} finally
{
14
if (autoRelease && release)
{
15
ReferenceCountUtil.release(msg);
16
}
17
}
18
} 里面的15行ReferenceCountUtil.release(msg);这是netty提供的一个释放ByteBuf内存的方法,如果不采用这个,直接调用ByteBuf.release方法也可以,但是因为调用了writeAndFlush方法,已经将ByteBuf的refCnt置为0了,这个里面调用的时候又会在设置一次,但是发现已经为0了,所以就抛出的该异常。
该问题需要在定位。。。。未完待续
posted on 2014-05-03 14:11
朔望魔刃 阅读(5460)
评论(0) 编辑 收藏 所属分类:
netty