手写RPC笔记(2)
上一代版本RPC的server与业务类别务类别绑定。如果此时有额外的业务需求,则无法处理,因此需要重新设计server端,要实现server与业务解耦。因此,使用服务暴露的想法server端的服务接口暴露在注册中心,然后使用线程池创建类似的服务接口server线程,用于处理业务。 创建主线程池server类:
public class TestServer{
public static void main(String[] args) {
UserServiceImpl userService = new UserServiceImpl(); BlogServiceImpl blogService = new BlogServiceImpl(); // HashMap<String, Object> serviceProvide = new HashMap<>(); // serviceProvide.put("com.twk.service.UserService",userService); // serviceProvide.put("com.twk.service.BlogService",blogService); ServiceProvider serviceProvider = new ServiceProvider(); serviceProvider.provideServiceInterface(userService); serviceProvider.provideServiceInterface(blogService); RPCServer rpcServer = new ThreadPoolRPCServer(serviceProvider.getProvide()); rpcServer.start(8899); } }
线程池server类:
public class ThreadPoolRPCServer implements RPCServer{
private final ThreadPoolExecutor threadPool; private Map<String, Object> serviceProvide; public ThreadPoolRPCServer(Map<String,Object> serviceProvide) {
threadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
1000,60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
this.serviceProvide = serviceProvide;
}
public ThreadPoolRPCServer(Map<String,Object> serviceProvide, int corePoolSize,
int maximumPoolSize,
long keepAliceTime,
TimeUnit unit,
BlockingDeque<Runnable> workQueue){
threadPool = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliceTime,unit,workQueue);
this.serviceProvide = serviceProvide;
}
@Override
public void start(int port) {
System.out.println("服务器已启动");
try {
ServerSocket serverSocket = new ServerSocket(port);
while(true){
Socket socket = serverSocket.accept();
threadPool.execute(new WorkThread(socket,serviceProvide));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void stop() {
System.out.println("服务器停止运行");
}
}
工作任务类:
public class WorkThread implements Runnable{
private Socket socket;
private Map<String,Object> serviceProvide = new HashMap<>();
public WorkThread(Socket socket, Map<String, Object> serviceProvide) {
this.socket = socket;
this.serviceProvide = serviceProvide;
}
@Override
public void run() {
try {
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
RPCRequest request = (RPCRequest) ois.readObject();
RPCResponse response = getResponse(request);
oos.writeObject(response);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RPCResponse getResponse(RPCRequest request){
String interfaceName = request.getInterfaceName();
Object service = serviceProvide.get(interfaceName);
Method method = null;
try {
method = service.getClass().getMethod(request.getMethodName(), request.getParamsTypes());
Object invoke = method.invoke(service, request.getParams());
return RPCResponse.success(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RPCResponse.fail();
}
}
}
服务接口暴露类:
public class ServiceProvider {
private Map<String, Object> interfaceProvider;
public ServiceProvider() {
this.interfaceProvider = new HashMap<>();
}
public Map<String, Object> getProvide() {
return this.interfaceProvider;
}
public void provideServiceInterface(Object service) {
String serviceName = service.getClass().getName();
Class<?>[] interfaces = service.getClass().getInterfaces();
for (Class clazz : interfaces) {
interfaceProvider.put(clazz.getName(), service);
}
}
public Object getService(String interfaceName) {
return interfaceProvider.get(interfaceName);
}
}
业务类合集:
@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Blog implements Serializable {
private Integer id;
private Integer userId;
private String title;
}
public interface BlogService {
Blog findBlogById(Integer id);
}
public class BlogServiceImpl implements BlogService{
@Override
public Blog findBlogById(Integer id) {
Blog blog = Blog.builder().id(id).userId(new Random().nextInt(100)).title("我的博客").build();
System.out.println("客户端查询了id为 " + id + "的博客");
return blog;
}
}