Chan Chen Coding...

Netty 4.0 源码分析(二):Echo Server

Netty项目中,自带了很多使用的例子,对于刚刚开始接触和学习Netty源码的开发者来说,可以通过例子来更好的理解Netty的具体实现。源码可以再netty 4.0的example找到。

 

 1 public class EchoServerHandler extends ChannelInboundByteHandlerAdapter {            
 2     private static final Logger logger = Logger.getLogger(
 3             EchoServerHandler.class.getName());
 4 
 5     @Override
 6     public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
 7         ByteBuf out = ctx.nextOutboundByteBuffer();
 8         out.discardReadBytes();
 9         out.writeBytes(in);
10         ctx.flush();
11     }
12  
13     @Override
14     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
15         // Close the connection when an exception is raised.
16         logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
17         ctx.close();
18     }
19 }

Line 1: 声明一个EchoServerHandler, 并且继承了ChannelInboundByteHandlerAdapter 这样EchoServerHandler就可以处理client发送过来的request

Line 6: 重写inboundBufferUpdated方法,对client发送过来的request进行对ByteBuffer对象的操作。关于ByteBuffer的概念将在以后章节讨论。

Line 7: ctx.nextOutboundByteBuffer()将返回一个ByteBuffer对象。如果该对象不存在,那么抛出UnsupportedOperationException异常。

Line 14: 重写exceptionCaughtserver端捕获异常。

 

 1 public class EchoServer {
 2  
 3     private final int port;
 4  
 5     public EchoServer(int port) {
 6         this.port = port;
 7     }
 8  
 9     public void run() throws Exception {
10         // Configure the server.
11         ServerBootstrap b = new ServerBootstrap();
12         try {
13             b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
14              .channel(NioServerSocketChannel.class)
15              .option(ChannelOption.SO_BACKLOG, 100)
16              .localAddress(new InetSocketAddress(port))
17              .childOption(ChannelOption.TCP_NODELAY, true)
18              .handler(new LoggingHandler(LogLevel.INFO))
19              .childHandler(new ChannelInitializer<SocketChannel>() {
20                  @Override
21                  public void initChannel(SocketChannel ch) throws Exception {
22                      ch.pipeline().addLast(
23                              new LoggingHandler(LogLevel.INFO),
24                              new EchoServerHandler());
25                  }
26              });
27  
28             // Start the server.
29             ChannelFuture f = b.bind().sync();
30  
31             // Wait until the server socket is closed.
32             f.channel().closeFuture().sync();
33         } finally {
34             // Shut down all event loops to terminate all threads.
35             b.shutdown();
36         }
37     }
38  
39     public static void main(String[] args) throws Exception {
40         int port;
41         if (args.length > 0) {
42             port = Integer.parseInt(args[0]);
43         } else {
44             port = 8080;
45         }
46         new EchoServer(port).run();
47     }
48 }

 

 

Line 11: 通过ServerBootStrap对象,来启动服务器

Line 13:  通过group方法,来绑定EventLoopGroupEventLoopGroup用来处理SocketChannelChannel上面的所有时间和IO

Line 16: localAddress方法用于绑定服务器地址和端口。

Line 18,19: handler方法和childhandler方法用于指定各种ChannelHandler对象,指定的ChannelHandler对象将用于处理client端来的request

Line 21: 初始化一个Channel对象,并且绑定之前定义的EchoServerHandler类。

Line 22:  EchoServerHandler添加到Pipeline中。

Line 29: ServerBootstrap对象准备就绪,启动server

Line 32: 等待直到server socket关闭

 

public class EchoClientHandler extends ChannelInboundByteHandlerAdapter {
 
    private static final Logger logger = Logger.getLogger(
            EchoClientHandler.class.getName());
 
    private final ByteBuf firstMessage;
 
    /**
     * Creates a client-side handler.
     
*/
    public EchoClientHandler(int firstMessageSize) {
        if (firstMessageSize <= 0) {
            throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
        }
        firstMessage = Unpooled.buffer(firstMessageSize);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.write(firstMessage);
    }
 
    @Override
    public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
        ByteBuf out = ctx.nextOutboundByteBuffer();
        out.discardReadBytes();
        out.writeBytes(in);
        ctx.flush();
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
        ctx.close();
    }
}

 

EchoClientHandler类的实现与EchoServerHandler类似,也都继承了ChannelInboundByteHandlerAdapter。不同在于重写了channelActive()方法。

 

 1 public class EchoClient {
 2  
 3     private final String host;
 4     private final int port;
 5     private final int firstMessageSize;
 6  
 7     public EchoClient(String host, int port, int firstMessageSize) {
 8         this.host = host;
 9         this.port = port;
10         this.firstMessageSize = firstMessageSize;
11     }
12  
13     public void run() throws Exception {
14         // Configure the client.
15         Bootstrap b = new Bootstrap();
16         try {
17             b.group(new NioEventLoopGroup())
18              .channel(NioSocketChannel.class)
19              .option(ChannelOption.TCP_NODELAY, true)
20              .remoteAddress(new InetSocketAddress(host, port))
21              .handler(new ChannelInitializer<SocketChannel>() {
22                  @Override
23                  public void initChannel(SocketChannel ch) throws Exception {
24                      ch.pipeline().addLast(
25                              new LoggingHandler(LogLevel.INFO),
26                              new EchoClientHandler(firstMessageSize));
27                  }
28              });
29  
30             // Start the client.
31             ChannelFuture f = b.connect().sync();
32  
33             // Wait until the connection is closed.
34             f.channel().closeFuture().sync();
35         } finally {
36             // Shut down the event loop to terminate all threads.
37             b.shutdown();
38         }
39     }
40  
41     public static void main(String[] args) throws Exception {
42         // Print usage if no argument is specified.
43         if (args.length < 2 || args.length > 3) {
44             System.err.println(
45                     "Usage: " + EchoClient.class.getSimpleName() +
46                     " <host> <port> [<first message size>]");
47             return;
48         }
49  
50         // Parse options.
51         final String host = args[0];
52         final int port = Integer.parseInt(args[1]);
53         final int firstMessageSize;
54         if (args.length == 3) {
55             firstMessageSize = Integer.parseInt(args[2]);
56         } else {
57             firstMessageSize = 256;
58         }
59  
60         new EchoClient(host, port, firstMessageSize).run();
61     }
62 }

 

Line 20: 指定远程服务器的地址和端口(localhost:8080


备注:因为笔者开始写Netty源码分析的时候,Netty 4.0还是处于Alpha阶段,之后的API可能还会有改动,笔者将会及时更改。使用开源已经有好几年的时间了,一直没有时间和精力来具体研究某个开源项目的具体实现,这次是第一次写开源项目的源码分析,如果文中有错误的地方,欢迎读者可以留言指出。对于转载的读者,请注明文章的出处。
希望和广大的开发者/开源爱好者进行交流,欢迎大家的留言和讨论。



-----------------------------------------------------
Silence, the way to avoid many problems;
Smile, the way to solve many problems;

posted on 2012-11-24 12:38 Chan Chen 阅读(7071) 评论(1)  编辑  收藏 所属分类: Netty

评论

# re: Netty 4.0 源码分析(二):Echo Server[未登录] 2013-01-23 15:05

源码有,但是具体怎么使用。不怎么会!请教。实现一个最简单的例子的讲解  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航: