资讯详情

手写RPC框架笔记(2)

手写RPC笔记(2)

上一代版本RPC的server与业务类别务类别绑定。如果此时有额外的业务需求,则无法处理,因此需要重新设计server端,要实现server与业务解耦。因此,使用服务暴露的想法server端的服务接口暴露在注册中心,然后使用线程池创建类似的服务接口server线程,用于处理业务。 创建主线程池server类:

public class TestServer{ 
             public static void main(String[] args) { 
                 UserServiceImpl userService = new UserServiceImpl();         BlogServiceImpl blogService = new BlogServiceImpl(); // HashMap<String, Object> serviceProvide = new HashMap<>(); // serviceProvide.put("com.twk.service.UserService",userService); // serviceProvide.put("com.twk.service.BlogService",blogService);         ServiceProvider serviceProvider = new ServiceProvider();         serviceProvider.provideServiceInterface(userService);         serviceProvider.provideServiceInterface(blogService);         RPCServer rpcServer = new ThreadPoolRPCServer(serviceProvider.getProvide());         rpcServer.start(8899);     } } 

线程池server类:

public class ThreadPoolRPCServer implements RPCServer{ 
             private final ThreadPoolExecutor threadPool;     private Map<String, Object> serviceProvide;      public ThreadPoolRPCServer(Map<String,Object> serviceProvide) { 
        
        threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                1000,60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(100));
        this.serviceProvide = serviceProvide;
    }

    public ThreadPoolRPCServer(Map<String,Object> serviceProvide, int corePoolSize,
                               int maximumPoolSize,
                               long keepAliceTime,
                               TimeUnit unit,
                               BlockingDeque<Runnable> workQueue){ 
        
        threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliceTime,unit,workQueue);
        this.serviceProvide = serviceProvide;
    }

    @Override
    public void start(int port) { 
        
        System.out.println("服务器已启动");
        try { 
        
            ServerSocket serverSocket = new ServerSocket(port);
            while(true){ 
        
                Socket socket = serverSocket.accept();
                threadPool.execute(new WorkThread(socket,serviceProvide));
            }
        } catch (IOException e) { 
        
            e.printStackTrace();
        }
    }

    @Override
    public void stop() { 
        
        System.out.println("服务器停止运行");
    }
}

工作任务类:

public class WorkThread implements Runnable{ 
        
    private Socket socket;
    private Map<String,Object> serviceProvide = new HashMap<>();

    public WorkThread(Socket socket, Map<String, Object> serviceProvide) { 
        
        this.socket = socket;
        this.serviceProvide = serviceProvide;
    }

    @Override
    public void run() { 
        
        try { 
        
            ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
            ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
            RPCRequest request = (RPCRequest) ois.readObject();
            RPCResponse response = getResponse(request);
            oos.writeObject(response);
            oos.flush();
        } catch (IOException | ClassNotFoundException e) { 
        
            e.printStackTrace();
        }

    }

    private RPCResponse getResponse(RPCRequest request){ 
        
        String interfaceName = request.getInterfaceName();
        Object service = serviceProvide.get(interfaceName);
        Method method = null;
        try { 
        
            method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
            Object invoke = method.invoke(service, request.getParams());
            return RPCResponse.success(invoke);
        } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { 
        
            e.printStackTrace();
            System.out.println("方法执行错误");
            return RPCResponse.fail();
        }
    }
}

服务接口暴露类:

public class ServiceProvider { 
        

    private Map<String, Object> interfaceProvider;

    public ServiceProvider() { 
        
        this.interfaceProvider = new HashMap<>();
    }

    public Map<String, Object> getProvide() { 
        
        return this.interfaceProvider;
    }

    public void provideServiceInterface(Object service) { 
        
        String serviceName = service.getClass().getName();
        Class<?>[] interfaces = service.getClass().getInterfaces();

        for (Class clazz : interfaces) { 
        
            interfaceProvider.put(clazz.getName(), service);
        }
    }

    public Object getService(String interfaceName) { 
        
        return interfaceProvider.get(interfaceName);
    }

}

业务类合集:

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Blog implements Serializable { 
        
    private Integer id;
    private Integer userId;
    private String title;
}

public interface BlogService { 
        
    Blog findBlogById(Integer id);
}

public class BlogServiceImpl implements BlogService{ 
        
    @Override
    public Blog findBlogById(Integer id) { 
        
        Blog blog = Blog.builder().id(id).userId(new Random().nextInt(100)).title("我的博客").build();
        System.out.println("客户端查询了id为 " + id + "的博客");
        return blog;
    }
}

画个图来解释一下过程:

在这里插入图片描述

标签: twk拉线式传感器swf

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台