posts - 167,  comments - 30,  trackbacks - 0

markdown形式,简书地址:基于Netty构建服务的基本步骤
基于netty构建服务的基本步骤
我们通过netty实现一个Http服务器的功能,来说明通过netty构建的Server基本步骤。
学习一个新的知识点,都是通过Hello world开始的,对于netty的学习写一个Hello world程序不像写其他程序那么简单,这里涉及很多非常重要的组件,比如ChannelHandler、EeventLoopGroup、ChannelPipeline等,这些组件随着后续不断学习再一一分析其实现原理。
基于netty构建Http服务器基本步骤实践:
1. 首先我们定义两个线程组 也叫做事件循环组
EevetLoopGroup bossGroup =  new NioEevetLoopGroup();
EevetLoopGroup workerGroup =  new NioEevetLoopGroup();
为什么定义两个线程组,实际上一个线程组也能完成所需的功能,不过netty建议我们使用两个线程组,分别具有不同的职责。bossGroup目的是获取客户端连接,连接接收到之后再将连接转发给workerGroup去处理。
2. 定义一个轻量级的启动服务类
 ServerBootstrap serverBootstrap = new ServerBootstrap(); 
    serverBootstrap.group(bossGroup, wokrerGroup).channel(NioServerSocketChannel.class).childHandler(null);
    // 服务启动后通过绑定到8899端口上,返回ChannelFuture。
    ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
    channelFuture.channel().closeFuture().sync();   
3. 通过ChannelPipeline初始化处理器,类似于拦截器Chain,当客户端首次连接后即调用initChannel方法完成初始化动作。
[示例代码]
public class TestServerInitializer extends ChannelInitializer<SocketChannel>{
    // 初始化器,服务端启动后会自动调用这个方法,它是一个回调方法。
    @Override protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("initChannel invoked "); // 有客户端连接就会执行.
        ChannelPipeline channelPipeline = ch.pipeline(); // pipeline一个管道里面可以有很多的ChannelHandler,相当于包含很多个拦截器。
        
// 添加处理器,可以添加多个,并且可以将处理器放到pipeline管道的不同位置上。
        channelPipeline.addLast("httpServerCodec", new HttpServerCodec()); //HttpServerCodec也是一个很重要的组件.
        channelPipeline.addLast("httpServerHandler", new TestHttpServerHandler()); // 自定义处理器
    }
}
4. 创建自定义处理器,通常继承SimpleChannelInboundHandler<T>,  该处理器覆写channelRead0方法,该方法负责请求接入,读取客户端请求,发送响应给客户端。
[示例代码]
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    private final String FAVICON_ICO = "/favicon.ico";
    // 读取客户端请求,向客户端响应的方法,所以这里要构造响应返回给客户端。
    
// 注意:这里面跟Servlet没有任何关系,也符合Servlet规范,所以不会涉及到HttpServerltRequest和HttpServeletResponse对象。
    @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println("--------------httpserverHandler, remote_address " + ctx.channel().remoteAddress() + ", msg_class:" + msg.getClass());
//        Thread.sleep(3000); // 休眠5秒钟,lsof -i:8899 查看TCP连接状态
        if (msg instanceof HttpRequest) {
            HttpRequest httpRequest = (HttpRequest) msg;
            URI uri = new URI(httpRequest.uri());
            System.out.println("请求方法: " + httpRequest.method() + ", 请求path: " + uri.getPath());
            if (FAVICON_ICO.equals(uri.getPath())) {
                System.out.println("请求/favicon.ico");
                return;
            }
            // BytBuf:构造给客户端的响应内容, 制定好编码
            ByteBuf byteBuf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8);
            // 接下构造响应对象
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            // 调用flush才会将内容真正返回给客户端
            System.out.println("响应给客户端对象: " + response);
            ctx.writeAndFlush(response);
            ctx.channel().closeFuture();
        }
    }
    //------以下重写了ChannelInboundHandlerAdapter父类的方法,分析不同事件方法的调用时机------
    @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel register invoked");
        super.channelRegistered(ctx);
    }
    @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel unregister invoked");
        super.channelUnregistered(ctx);
    }
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel active invoked");
        super.channelActive(ctx);
    }
    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel inactive invoked");
        super.channelInactive(ctx);
    }
    // TODO 这里执行了2次,具体有待分析
    @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channel read complete");
        super.channelReadComplete(ctx);
    }
    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("exception caught invoked");
        super.exceptionCaught(ctx, cause);
    }
    @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handler add invoked");
        super.handlerAdded(ctx);
    }
}
这里注意,我使用的netty4.x版本,方法名叫做channelRead0,如果在其他文章中看到是messageReceived方法,则使用的是netty5.x,另外,因netty5.x已被废弃,故建议都使用netty4.x稳定版。
5. 将步骤1和2整合,写Main方法启动服务
[示例代码]
public class TestNettyServer {
    public static void main(String[] args) throws InterruptedException {
        // 服务器端可以理解为while(true){}死循环,去不断的接受请求连接。        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap(); // 启动服务端,这里的处理器都要是多实例的.            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());
            System.out.println("服务端已启动..");
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            System.out.println("服务端shutdown");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
6. 通过浏览器或者curl方式访问8899端口。
最后,通过 curl ‘localhost:8899’访问成功返回Hello World字符串,
如果TestHttpServerHandler的channelRead0中不加msg instanceof HttpRequest的判断,则运行时会抛出如下异常:
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1100)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:372)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:745)
```
debug代码时会发现有2个请求执行了channelRead0方法,两个msg分别是
--------------httpserverHandler msg:class io.netty.handler.codec.http.DefaultHttpRequest
--------------httpserverHandler msg:class io.netty.handler.codec.http.LastHttpContent$1
我们看到第二次请求并不是HttpRequest对象,所以此处理器无法处理,通过浏览器访问时,浏览器会自动的发起favion.cio图片的请求,所以可以增加个判断如果path是favion.cio则不往下执行。
![自定义处理器实现类图](http://upload-images.jianshu.io/upload_images/3609866-d5f324f513f98fd3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
TestHttpServerHandler是自己实现的处理器,继承了SimpleChannelInboundHandler,SimpleChannelInboundHandler继承了ChannelInboundHandlerAdapter类。
**子类可重写的方法及其含义**:
`channelActive() `           >在到服务器的连接已经建立之后将被调用(成为活跃状态)
` channelRead0()`           > 当从服务器接受到一条消息时被调用
` exceptionCaught()`        >在处理过程中引发异常时调用
` channelReigster() `        >注册到EventLoop上
` handlerAdd() `                >Channel被添加方法
` handlerRemoved()`        >Channel被删除方法
` channelInActive() `         > Channel离开活跃状态,不再连接到某一远端时被调用
` channelUnRegistered()` >Channel从EventLoop上解除注册
` channelReadComplete()` >当Channel上的某个读操作完成时被调用
在步骤4中有打印输出,通过curl ‘http://localhost:8899'访问,执行结果顺序:
服务端已启动..
initChannel invoked...
handler add invoked
channel register invoked
channel active invoked
--------------httpserverHandler, remote_address /127.0.0.1:50061, msg_class:class io.netty.handler.codec.http.DefaultHttpRequest
请求方法: GET, 请求path: /
响应给客户端对象: DefaultFullHttpResponse(decodeResult: success, version: HTTP/1.1, content: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33))
HTTP/1.1 200 OK
content-length: 11
content-type: text/plain
--------------httpserverHandler, remote_address /127.0.0.1:50061, msg_class:class io.netty.handler.codec.http.LastHttpContent$1
channel read complete   // ?
channel read complete
channel inactive invoked
channel unregister invoked
基本的hellworld程序已经运行起来,并且自行实现的处理器调用过程通过重写方法打印也能够有所了解了。
这里要注意的是,对于Netty来说,上层应用获取客户端请求之后,当请求是基于Http1.1协议的话会有个keepalive时间,比如30秒钟时间,如果在这段时间内没有接受到新的请求则由[服务端]主动关闭连接。当请求是基于Http1.0短连接协议,请求发过来之后,服务器就将这个连接关闭掉,上述示例中可以根据判断调用ctx .channel().close()来关闭连接。
**基于netty构建服务基本流程总结:**
1. 创建EventLoopGroup实例
2. 通过ServerBootstrap启动服务,bind到一个端口. 如果是客户端,则使用Bootstrap,连接主机和端口. 
3. 创建ChannelInitializer实例,通过ChannelPipieline初始化处理器链.
4. 创建ChannelServerHandler实例,继承SimpleChannelInboundHandler,重写channelRead0方法(netty4.x).
5. 将ChannelServerHandler实例addLast到ChannelPipeline上.
6. 将ChannelInitializer实例childHandler到bootstrap上.

posted on 2017-05-30 19:26 David1228 阅读(3267) 评论(0)  编辑  收藏 所属分类: Netty

只有注册用户登录后才能发表评论。


网站导航:
 

<2017年5月>
30123456
78910111213
14151617181920
21222324252627
28293031123
45678910

常用链接

留言簿(4)

随笔分类

随笔档案

文章档案

新闻分类

新闻档案

相册

收藏夹

Java

Linux知识相关

Spring相关

云计算/Linux/虚拟化技术/

友情博客

多线程并发编程

开源技术

持久层技术相关

搜索

  •  

积分与排名

  • 积分 - 356952
  • 排名 - 154

最新评论

阅读排行榜

评论排行榜