Netty 网络编程

Netty 入门

概述

什么是 Netty?

  • 异步:用的多线程,不是异步 IO
  • 基于事件驱动:表示用的 Selector

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

Netty 的地位

Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位

以下的框架都使用了 Netty,因为它们有网络通信需求!

  • Cassandra – nosql 数据库
  • Spark – 大数据分布式计算框架
  • Hadoop – 大数据分布式存储框架
  • RocketMQ – ali 开源的消息队列
  • ElasticSearch – 搜索引擎
  • gRPC – rpc 框架
  • Dubbo – rpc 框架
  • Spring 5.x – flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
  • Zookeeper – 分布式协调框架

Netty 的优势

Netty vs NIO,工作量大,bug 多

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • epoll 空轮询导致 CPU 100%
  • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

Netty vs 其它网络应用框架

  • Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀

创建服务器 / 客户端

  • 首先创建启动器
  • 创建NioEventLoopGroup基于NIO服务端实现
  • childHandler表示添加的处理器都是给SocketChannel用的
  • ChannelInitializer 仅仅执行一次
  • 客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
  • new ServerBootstrap().bind 绑定监听端口
  • 服务端

    // 1. 启动器,负责组装 netty组件,启动服务器
    new ServerBootstrap()
            // BossEventLoop,WorkerEventLoop(selector, thread)
            // 创建一个事件循环线程组,适用于 NIO 实现,可以简单理解为 `线程池 + Selector`
            .group(new NioEventLoopGroup())
            // 指定要使用的服务器 Channel 实现,适用于 NIO 实现
            .channel(NioServerSocketChannel.class) // OIO BIO
            // BOSS 负责处理连接Work(child)负责处理读写,决定了work(child)能执行哪些(handler)
            // 配置服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
            .childHandler(
                    // 5. channel代表客户端进行数据读写的通道Initializer初始化,负责添加到别的 handler
                    new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            // 添加具体的 handler
                            // 将 bytebuf 转换为字符串
                            nioSocketChannel.pipeline().addLast(new StringDecoder());
                            // 自定义handler
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override// 读事件
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    // 打印转好的字符串
                                    System.out.println(msg);
                                }
                            });
                        }
                    })
            // 监听端口
            .bind(8080);
    

    客户端

    // 启动类
    new Bootstrap()
            // 添加 EventLoop
            .group(new NioEventLoopGroup())
            // 选择客户端 channel 实现
            .channel(NioSocketChannel.class)
            // 添加处理器
            .handler(new ChannelInitializer() {
                @Override // 在建立连接后被调用
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    // 把字符串编码成字节
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            // 连接服务器
            .connect(new InetSocketAddress("localhost", 8080))
            .sync()
            .channel()
            // 向服务器发送数据
            .writeAndFlush("hello, world");
    

    流程分析

  • 先创建启动器类
  • 添加组件,eventloop(内部就有线程和选择器不断循环,查找事件)
  • 选择NIOServerSocket 实现
  • 添加处理器,只有连接事件发生之后才会执行 initChannel初始化方法
  • 绑定监听端口(服务端就到这里)
  • (客户端)创建启动器和eventloop
  • 客户端选择socket事件
  • 添加处理器,也是等连接建立才会执行初始化方法
  • 最后连接服务器
  • 服务器监听到accept事件之后
  • 最后找处理器处理这个事件
  • (我们看不懂事线),连接建立后调用初始化方法
  • 客户端 sync 只有连接之后才会继续执行
  • channel() 这个是连接对象
  • 最后就可以读写
  • 发数据就会走到处理器内部
  • 进行转为字节数组进行 bytebuf进行发送
  • 服务端的eventloop就会监听到读事件
  • 走到了服务器的处理器,进行处理
  • channel数据传输通道,可读出来可写进。和nio概念一致

    handel中的message流动数据,handel 是一个工序,对原始数据进行一道道工序进行处理

    pipeline就是流水线,一道工序,进行添加工序。

    handel 分为 inbound和outbound
    入站和出站 读入就走入站,写出就走出站

    eventloop就是线程,相当于工人
    一旦某一个工人负责一个事件,那就会负责到底。一个工人是可以管理多个事件的

    eventloop既可以执行io操作,也可以进行普通任务,每个工人都有自己的任务队列,依次处理。底层用的单线程的线程池。

    任务可以是定时任务也可以是普通任务

    组件

    EventLoop

    EventLoop是事件循环对象

    本质是一个单线程执行器,同时维护了Selector,里面有run方法处理 channel 上源源不断的 io 事件。

    继承关系:

    • 一条线是继承自 java.util.concurrent.ScheduledExecutorService
    • 因此包含了线程池的所有方法
    • 另一条是继承自 netty 自己的 OrderedEventExecutor
    • 提供了 Boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
    • 提供了 parent方法来看看自己属于哪个 EventLoopGroup
    // eventloop
    public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
        EventLoopGroup parent();
    }
    
    // 继承线程池接口
    public interface EventExecutorGroup extends ScheduledExecutorService, Iterable {
    

    EventLoopGroup 事件循环组,我们一般使用这个

    EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法俩绑定其中一个 EventLoop,后续这个 channel 上的 io 事件都由同一个 EventLoop 来处理(保证了 io 事件处理时的线程安全)

    • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

    我们平常使用的是下面两个循环组实现类

    // 可以实现普通任务,io事件,定时任务
    EventLoopGroup nioGroup = new NioEventLoopGroup();
    // 普通任务和定时任务
    EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
    

    那么我们空参创建对象,线程数是多少呢

    下面这段代码,是用来初始化 EventLoopGroup对象的。

    如果没有指定线程数,会采用默认的,也就是当前系统的CPU核心数 * 2

    /**
     * 构造函数,创建MultithreadEventLoopGroup对象
     * @param nThreads 表示EventLoopGroup中EventLoop的数量,如果为0则使用默认的线程数
     * @param executor 用于执行任务的Executor对象
     * @param args 可选参数列表
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 调用父类的构造函数,初始化EventLoopGroup对象
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    获取当前CPU核数–我的是 16核

    NettyRuntime.availableProcessors();
    

    指定线程数量

    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    

    获取下一个线程

    // 创建两个线程的事件循环组
    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    // 获取下一个线程
    System.out.println(nioGroup.next()); // 第一次打印第一个
    System.out.println(nioGroup.next()); // 打印第二个
    System.out.println(nioGroup.next()); // 打印第一个,因为一共就两个
    System.out.println(nioGroup.next()); // 打印第二个
    // io.netty.channel.nio.NioEventLoop@737996a0
    // io.netty.channel.nio.NioEventLoop@61dc03ce
    // io.netty.channel.nio.NioEventLoop@737996a0
    // io.netty.channel.nio.NioEventLoop@61dc03ce
    

    执行普通任务

    • 这里的execute和submit都是一样的效果
    // 创建两个线程的事件循环组
    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    // 执行普通任务
    nioGroup.next().execute(() -> {
       log.debug("ok");
    });
    log.debug("main");
    

    定时任务

    // 创建两个线程的事件循环组
    EventLoopGroup nioGroup = new NioEventLoopGroup(2);
    // 执行定时任务 以一定的频率执行
    // 参数1:执行的方法
    // 参数2:表示第一次启动多久开始
    // 参数3:表示间隔多久再次触发
    // 参数4:单位
    nioGroup.next().scheduleAtFixedRate(() -> {
       log.debug("ok");
    }, 3, 5, TimeUnit.SECONDS);
    log.debug("main");
    

    IO 事件

    下面是服务端,客户端同上面。

    一个客户端绑定一个服务的 EventLoop线程。

    下面再编写客户端的时候,如果要打上断点实现阻塞,需要将idea的断点卡成单线程的

    image.png

    new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            // 这个实际开发是要指定字符类型的,不要默认
                            log.debug(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            })
            .bind(8080);
    

    我们来分工细化一下

  • 我们可以把 eventloop 划分为 boss 和 work
  • 将accept 和 read 分开处理
  • 那 第一个事件循环组 线程是否可以设置为 1 呢

    因为服务器这一有个,它也只会和里面一个 eventloop 进行绑定

    new ServerBootstrap()
            // 将 参数1:只处理accept 参数2:处理read
            .group(new NioEventLoopGroup(), new NioEventLoopGroup())
    

    继续分工细化–如果其中一个Nio线程执行中
    在读操作时执行太久,会影响其他 channel读操作
    最好不要让它占用 work nio线程,所以我们继续细分

  • 创建独立的事件循环对象,因为不需要进行io所以是普通的
  • 将下个处理器绑定上。
  • // 细分2:创建独立的 EventLoopGroup
    DefaultEventLoopGroup group = new DefaultEventLoopGroup();
    new ServerBootstrap()
            .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline()
                            .addLast("handler1", new ChannelInboundHandlerAdapter() {
                                @Override
                                // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    // 这个实际开发是要指定字符类型的,不要默认
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                    // 让消息传递给下一个handler
                                    ctx.fireChannelRead(msg);
                                }
                            })
                            // 指定事件循环组和名称
                            .addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                                @Override
                                // 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    // 这个实际开发是要指定字符类型的,不要默认
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                }
                            });
                }
            })
            .bind(8080);
    

    image.png

    那么是怎么实现切换事件循环组,也就是换人处理的呢?

    如果两个 handler 绑定的是同一个线程,那么直接调用,否则调用的代码封装为一个任务对象。由一下一个 handler的线程调用

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
        /**
        *    EventExecutor 是事件循环组
        */
        EventExecutor executor = next.executor();
        
        // 当前 handler 中的线程是否和 eventloop 是同一个线程
        // 是,直接调用
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } 
        // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
        else {
        // 使用runnable 执行一个事件
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    

    Channel & 连接 & 关闭问题 & 异步

    channel 的主要作用

    • close() 可以用来关闭 channel
    • closeFuture() 用来处理 channel 的关闭
      • sync 方法作用是同步等待 channel 关闭
      • 而 addListener 方法是异步等待 channel 关闭
    • pipeline() 方法添加处理器
    • write() 方法将数据写入,只是写进入缓冲区
    • writeAndFlush() 方法将数据写入并刷出

    connect 连接问题

    connect:是异步阻塞,main方法发起,执行是Nio的线程

    所以如果在发送数据前,一定要保证已经连接好了,否则数据是发送不出去的

    ChannelFuture:带有Future或者 promise都是和异步配套使用,用来处理结果

    ChannelFuture channelFuture = new Bootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                }
            })
            // connect 是异步阻塞,main方法发起调用,真正执行连接的是Nio线程
            .connect(new InetSocketAddress("localhost", 8080));
    //        channelFuture.sync();
    // 会无阻塞向下执行获取channel,最后发送数据
    Channel channel = channelFuture.channel();
    //        log.debug("{}", channel); // 如果没有获取到打印的就是没有连接的 channel,所以需要执行 sync
    channel.writeAndFlush("123");
    

    解决连接问题,保证发送数据前一定是正确连接的

  • sync
  • // 阻塞当前线程,直到连接建立完毕
    channelFuture.sync();
    Channel channel = channelFuture.channel();
    log.debug("{}", channel);
    channel.writeAndFlush("123");
    
  • 将执行发送的代码,交给nio线程。
    • addListener(回调对象)方法异步处理结果
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        // 在nio 线程连接建立好之后,会调用operationComplete
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            Channel channel = channelFuture.channel();
            log.debug("{}", channel);
            channel.writeAndFlush("123");
        }
    });
    

    关闭问题

    我们看下面这段代码

    我们可以输入内容,发送给服务端,按 q 关闭传输通道

    但是我们关闭的时候,想做一些操作,怎么整
    直接在close下面写吗,那是不对的,因为close方法是异步的

    channelFuture.sync();
    Channel channel = channelFuture.channel();
    new Thread(new Runnable() {
        @Override
        public void run() {
            Scanner sc = new Scanner(System.in);
            while (true) {
                String line = sc.nextLine();
                if ("q".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }
    }, "input").start();
    

    解决异步问题

    添加日志调试

    nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    
  • 同步解决—由主线程处理关闭操作
  • new Thread(() -> {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String line = sc.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }, "input").start();
    // 添加关闭处理器
    ChannelFuture closeFuture = channel.closeFuture();
    closeFuture.sync();// 设置同步
    log.debug("处理关闭之后....");
    
  • 异步解决–由 nio 线程处理
  • new Thread(() -> {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String line = sc.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }, "input").start();
    ChannelFuture closeFuture = channel.closeFuture();
    
    // 设置关闭回调函数
    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            log.debug("处理关闭之后....");
        }
    });
    

    优雅的关闭

    • 它会等待所有正在处理的任务完成后,再进行关闭。
    NioEventLoopGroup group = new NioEventLoopGroup();
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
    
    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            log.debug("处理关闭之后的操作");
            group.shutdownGracefully(); // 优雅的关闭
        }
    });
    

    异步

    异步:就是一个线程发起连接一个线程去建立连接

    思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96个病人

    image.png

    那我们就可以细分一下,四个医生分别处理四个事情

    只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12 效率几乎是原来的四倍

    image.png

    重点:

  • 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键
  • Future & Promise

    处理异步时:经常用到的两个接口, futer 核 promise

    首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

    • jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
    • netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
    • netty Promise;不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
    功能/名称 jdk Future netty Future Promise
    cancel 取消任务
    isCanceled 任务是否取消
    isDone 任务是否完成,不能区分成功失败
    get 获取任务结果,阻塞等待
    getNow 获取任务结果,非阻塞,还未产生结果时返回 null
    await 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
    sync 等待任务结束,如果任务失败,抛出异常
    isSuccess 判断任务是否成功
    cause 获取失败信息,非阻塞,如果没有失败,返回null
    addLinstener 添加回调,异步接收结果
    setSuccess 设置成功结果
    setFailure 设置失败结果

    jdk future

    // 线程池
    ExecutorService pool = Executors.newFixedThreadPool(2);
    // 提交任务
    Future future = pool.submit(new Callable() {
        @Override
        public Integer call() throws Exception {
            log.debug("执行计算"); // poll 线程
            Thread.sleep(1000);
            return 50;
        }
    });
    // 主线程通过 future 获取结果
    log.debug("等待结果"); // 主线程
    log.debug("结果是 {}", future.get()); // 主线程
    

    nio future

    // 创建事件循环组
    NioEventLoopGroup eventGroup = new NioEventLoopGroup();
    // 获取执行事件
    EventLoop eventLoop = eventGroup.next();
    // 执行方法
    Future future = eventLoop.submit(new Callable() {
        @Override
        public Integer call() throws Exception {
            log.debug("执行计算"); // nio 线程
            Thread.sleep(1000);
            return 50;
        }
    });
    

    同步获取线程返回结果

    // 主线程通过 future 获取结果
    log.debug("等待结果"); // 主线程
    log.debug("结果是 {}", future.get()); // 主线程
    

    异步

    future.addListener(new GenericFutureListener

    上一篇 细节决定成败:探究Mybatis中javaType和ofType的区别
    下一篇 全网注释第二全的GO教程数组与切片(Array&Slice)