Netty项目中,自带了很多使用的例子,对于刚刚开始接触和学习Netty源码的开发者来说,可以通过例子来更好的理解Netty的具体实现。源码可以再netty 4.0的example找到。
1 public class EchoServerHandler extends ChannelInboundByteHandlerAdapter {
2 private static final Logger logger = Logger.getLogger(
3 EchoServerHandler.class.getName());
4
5 @Override
6 public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
7 ByteBuf out = ctx.nextOutboundByteBuffer();
8 out.discardReadBytes();
9 out.writeBytes(in);
10 ctx.flush();
11 }
12
13 @Override
14 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
15 // Close the connection when an exception is raised.
16 logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
17 ctx.close();
18 }
19 }
Line 1: 声明一个EchoServerHandler, 并且继承了ChannelInboundByteHandlerAdapter。 这样EchoServerHandler就可以处理client发送过来的request。
Line 6: 重写inboundBufferUpdated方法,对client发送过来的request进行对ByteBuffer对象的操作。关于ByteBuffer的概念将在以后章节讨论。
Line 7: ctx.nextOutboundByteBuffer()将返回一个ByteBuffer对象。如果该对象不存在,那么抛出UnsupportedOperationException异常。
Line 14: 重写exceptionCaught在server端捕获异常。
1 public class EchoServer {
2
3 private final int port;
4
5 public EchoServer(int port) {
6 this.port = port;
7 }
8
9 public void run() throws Exception {
10 // Configure the server.
11 ServerBootstrap b = new ServerBootstrap();
12 try {
13 b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
14 .channel(NioServerSocketChannel.class)
15 .option(ChannelOption.SO_BACKLOG, 100)
16 .localAddress(new InetSocketAddress(port))
17 .childOption(ChannelOption.TCP_NODELAY, true)
18 .handler(new LoggingHandler(LogLevel.INFO))
19 .childHandler(new ChannelInitializer<SocketChannel>() {
20 @Override
21 public void initChannel(SocketChannel ch) throws Exception {
22 ch.pipeline().addLast(
23 new LoggingHandler(LogLevel.INFO),
24 new EchoServerHandler());
25 }
26 });
27
28 // Start the server.
29 ChannelFuture f = b.bind().sync();
30
31 // Wait until the server socket is closed.
32 f.channel().closeFuture().sync();
33 } finally {
34 // Shut down all event loops to terminate all threads.
35 b.shutdown();
36 }
37 }
38
39 public static void main(String[] args) throws Exception {
40 int port;
41 if (args.length > 0) {
42 port = Integer.parseInt(args[0]);
43 } else {
44 port = 8080;
45 }
46 new EchoServer(port).run();
47 }
48 }
Line 11: 通过ServerBootStrap对象,来启动服务器
Line 13: 通过group方法,来绑定EventLoopGroup,EventLoopGroup用来处理SocketChannel和Channel上面的所有时间和IO。
Line 16: localAddress方法用于绑定服务器地址和端口。
Line 18,19: handler方法和childhandler方法用于指定各种ChannelHandler对象,指定的ChannelHandler对象将用于处理client端来的request。
Line 21: 初始化一个Channel对象,并且绑定之前定义的EchoServerHandler类。
Line 22: 将EchoServerHandler添加到Pipeline中。
Line 29: ServerBootstrap对象准备就绪,启动server,
Line 32: 等待直到server socket关闭
public class EchoClientHandler extends ChannelInboundByteHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.write(firstMessage);
}
@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf out = ctx.nextOutboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
EchoClientHandler类的实现与EchoServerHandler类似,也都继承了ChannelInboundByteHandlerAdapter。不同在于重写了channelActive()方法。
1 public class EchoClient {
2
3 private final String host;
4 private final int port;
5 private final int firstMessageSize;
6
7 public EchoClient(String host, int port, int firstMessageSize) {
8 this.host = host;
9 this.port = port;
10 this.firstMessageSize = firstMessageSize;
11 }
12
13 public void run() throws Exception {
14 // Configure the client.
15 Bootstrap b = new Bootstrap();
16 try {
17 b.group(new NioEventLoopGroup())
18 .channel(NioSocketChannel.class)
19 .option(ChannelOption.TCP_NODELAY, true)
20 .remoteAddress(new InetSocketAddress(host, port))
21 .handler(new ChannelInitializer<SocketChannel>() {
22 @Override
23 public void initChannel(SocketChannel ch) throws Exception {
24 ch.pipeline().addLast(
25 new LoggingHandler(LogLevel.INFO),
26 new EchoClientHandler(firstMessageSize));
27 }
28 });
29
30 // Start the client.
31 ChannelFuture f = b.connect().sync();
32
33 // Wait until the connection is closed.
34 f.channel().closeFuture().sync();
35 } finally {
36 // Shut down the event loop to terminate all threads.
37 b.shutdown();
38 }
39 }
40
41 public static void main(String[] args) throws Exception {
42 // Print usage if no argument is specified.
43 if (args.length < 2 || args.length > 3) {
44 System.err.println(
45 "Usage: " + EchoClient.class.getSimpleName() +
46 " <host> <port> [<first message size>]");
47 return;
48 }
49
50 // Parse options.
51 final String host = args[0];
52 final int port = Integer.parseInt(args[1]);
53 final int firstMessageSize;
54 if (args.length == 3) {
55 firstMessageSize = Integer.parseInt(args[2]);
56 } else {
57 firstMessageSize = 256;
58 }
59
60 new EchoClient(host, port, firstMessageSize).run();
61 }
62 }
Line 20: 指定远程服务器的地址和端口(localhost:8080)
备注:因为笔者开始写Netty源码分析的时候,Netty 4.0还是处于Alpha阶段,之后的API可能还会有改动,笔者将会及时更改。使用开源已经有好几年的时间了,一直没有时间和精力来具体研究某个开源项目的具体实现,这次是第一次写开源项目的源码分析,如果文中有错误的地方,欢迎读者可以留言指出。对于转载的读者,请注明文章的出处。
希望和广大的开发者/开源爱好者进行交流,欢迎大家的留言和讨论。
-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;