手撸RPC框架 完善消费者发送消息基础功能

大家好,我是小趴菜,接下来我会从0到1手写一个RPC框架,该专题包括以下专题,有兴趣的小伙伴就跟着我一起学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发消息基础实现 -> opt-01
自定义网络传输协议的实现 -> opt-02
自定义编解码实现 -> opt-03
服务提供者调用真实方法实现 -> opt-04
完善服务消费者发送消息基础功能 -> opt-05
注册中心基础功能实现 -> opt-06
服务提供者整合注册中心 -> opt-07
服务消费者整合注册中心 -> opt-08
完善服务消费者接收响应结果 -> opt-09
服务消费者,服务提供者整合SpringBoot -> opt-10
动态代理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制基础功能实现 -> opt-11
SPI机制扩展随机负载均衡策略 -> opt-12
SPI机制扩展轮询负载均衡策略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前言

在之前的章节中,我们已经实现了服务提供者的收发消息功能,并且服务提供者能够调用真实方法并返回响应结果。

但是我们发现,发送消息是在服务消费者注册的时候,发送了一条消息,这显然不符合我们的需求。

这一章我们完善下服务消费者发送消息的基础功能

实现

修改服务消费者:com.xpc.rpc.consumer.RpcConsumer

package com.xpc.rpc.consumer;

import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.request.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class RpcConsumer {

     private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);

     private Bootstrap bootstrap;

     private EventLoopGroup eventLoopGroup;

     private Map handlerMap = new ConcurrentHashMap();

     public RpcConsumer() {
         eventLoopGroup = new NioEventLoopGroup();
         bootstrap = new Bootstrap();

         bootstrap.group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer() {
                     @Override
                     protected void initChannel(SocketChannel channel) throws Exception {
                         ChannelPipeline pipeline = channel.pipeline();
                         //我们自己的编解码器
                         pipeline.addLast(new RpcDecoder());
                         pipeline.addLast(new RpcEncoder());
                         pipeline.addLast(new RpcConsumerHandler());
                     }
                 });
     }

     public void sendRequest(ProtocolMessage requestProtocolMessage) {
         //先写死,后续会通过注册中心获取
         String host = "127.0.0.1";
         int port = 21778;
         RpcRequest request = requestProtocolMessage.getT();
         String key = buildHandlerMapKey(request.getClassName(), request.getMethodName());
         RpcConsumerHandler consumerHandler;
         if(handlerMap.containsKey(key)) {
             consumerHandler = handlerMap.get(key);
             Channel channel = consumerHandler.getChannel();
             if(!channel.isOpen() || !channel.isActive()) {
                 consumerHandler = getConsumerHandler(key, host, port);
                 handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
             }
         }else {
             consumerHandler = getConsumerHandler(key, host, port);
             if(consumerHandler == null) {
                 throw new RuntimeException("");
             }
             handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
         }
         //发送消息
         consumerHandler.sendRequest(requestProtocolMessage);
     }

    private RpcConsumerHandler getConsumerHandler(String key,String host, int port) {
        try {
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.addListener(new GenericFutureListener