RMI(RPC),是一种远程过程调用。这里对其概念就不赘述了,简单说下个人理解。远程调用,便是在客户机端远程调用服务器端的方法,并将结果返回给客户端。这样的话,客户端无需关心方法的具体实现,只需注意调用的参数等。
1.其实这里可以分开思考,最外面的框架无非是,客户端通过代理去执行方法。这也是rpc给人最浅层的感受。就感觉方法只是在本地执行一样。
2.然后再加入网络传输的概念,代理机制里实质上将方法和参数通过网络传给SERVER。
3.server监听到一个客户端连接请求时,交给Executor去处理。
4.Executor通过beanFactory取得相关的方法,再通过反射机制执行。
5.将结果再传回client便结束了。
当然上面的过程只是执行方法的过程,因为执行前server还需要factory的初始化和注册Bean的注册过程。而这些过程,还是在后面结合代码作以叙述吧。
这里先给出RPCClient的代码。可以看到,client层没有干什么事,只给给出代理,并将执行的事物先交给Executor(实际执行者)去处理。这里要注意一点,为了避免调用method的重载方法,这里需要将method作如下的操作,因为重载方法的hashcode()是相同的。
public class RpcClient { private RpcClientExecutor rpcClientExecutor; public RpcClient() { } public RpcClient(String serverIp, int serverPort) { this.rpcClientExecutor = new RpcClientExecutor(serverIp, serverPort); } @SuppressWarnings("unchecked") public <T> T getProxy(Class<?> klass) { return (T) Proxy.newProxyInstance( klass.getClassLoader(), new Class<?>[] { klass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String rpcBeanId = String.valueOf(method.toString().hashCode()); return rpcClientExecutor.rpcExecutor(rpcBeanId, args); } }); } }client层将所有事情甩手交给Executor去处理,由它去负责传输方法执行的参数和beanId.并负责接受有服务器执行的结果。这里也有一点需要注意,那就是inputStream和OutInputStream的执行顺序,因为这里没有用线程去接收,因此必须这么做。
public class RpcClientExecutor { private int serverPort; private String serverIp; public RpcClientExecutor() { } public RpcClientExecutor(String serverIp, int serverPort) { this.serverIp = serverIp; this.serverPort = serverPort; } protected int getServerPort() { return serverPort; } protected void setServerPort(int serverPort) { this.serverPort = serverPort; } protected String getServerIp() { return serverIp; } protected void setServerIp(String serverIp) { this.serverIp = serverIp; } private void closeSocket(ObjectInputStream ois, ObjectOutputStream oos, Socket socket) { try { if(ois != null) { ois.close(); } } catch (IOException e) { e.printStackTrace(); }finally { ois = null; } try { if(oos != null) { oos.close(); } } catch (IOException e) { e.printStackTrace(); }finally { oos = null; } try { if(socket != null && !socket.isClosed()) { socket.close(); } } catch (IOException e) { e.printStackTrace(); }finally { socket = null; } } @SuppressWarnings("unchecked") <T> T rpcExecutor(String rpcBeanId, Object[] agrs) throws Exception { Socket socket = new Socket(serverIp, serverPort); ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeUTF(rpcBeanId); oos.writeObject(agrs); ObjectInputStream ois = new ObjectInputStream(socket.getInputStream()); Object result = ois.readObject(); closeSocket(ois, oos, socket); return (T)result; } }这时字节码便会传输到server层,而在server层,功能也是很单调的,每次server接收到一个client连接,便产生一个executor去处理。不影响server层的继续侦听。
public class RpcServer implements Runnable { private ServerSocket server; private int port; private Boolean goon; private RpcBeanFactory beanFactory; private long executorId; public RpcServer() { beanFactory = new RpcBeanFactory(); goon = false; } public void setPort(int port) { this.port = port; } RpcBeanFactory getRpcBeanFactory() { return beanFactory; } public void startRpcServer() throws IOException { if(this.port == 0) { return; } this.server = new ServerSocket(port); this.goon = true; new Thread(this, "RPC_SERVER").start(); } public void stopServer() { if(this.server != null && !this.server.isClosed()) { try { this.server.close(); } catch (IOException e) { e.printStackTrace(); }finally { this.server = null; } } } @Override public void run() { while(goon) { try { Socket client = server.accept(); new RpcServerExecutor(this, client, ++executorId); } catch (IOException e) { e.printStackTrace(); } } } public void registryRpc(Class<?> interfaces, Object object) { RpcBeanRegisty.registyInterface(beanFactory, interfaces, object); } public void registryRpc(Class<?> interfaces, Class<?> implementsClass) { RpcBeanRegisty.registyInterface(beanFactory, interfaces, implementsClass); } }你会发现,客户端所要执行的方法,其实是在这里执行的。但就存在一个问题,客户端传过来的参数是远远不够执行的。其实,server这里是有一个beanfactory的。它负责存储server层所有的bean,简单说,就是可以被远程调用的所有的方法。
public class RpcServerExecutor implements Runnable{ private Socket socket; private RpcServer server; private ObjectInputStream ois; private ObjectOutputStream oos; public RpcServerExecutor(RpcServer server, Socket socket, long executorId) throws IOException { this.server = server; this.socket = socket; this.ois = new ObjectInputStream(socket.getInputStream()); this.oos = new ObjectOutputStream(socket.getOutputStream()); new Thread(this, "RPC_EXECUTOR_"+ executorId).start(); } private void closeSocket() { try { if(this.ois != null) { this.ois.close(); } }catch (Exception e) { e.printStackTrace(); }finally { this.ois = null; } try { if(this.oos != null) { this.oos.close(); } }catch (Exception e) { e.printStackTrace(); }finally { this.oos = null; } try { if(this.socket != null && !this.socket.isClosed()) { this.socket.close(); } }catch (Exception e) { e.printStackTrace(); }finally { this.socket = null; } } @Override public void run() { try { String beanId = ois.readUTF(); Object[] args = (Object[]) ois.readObject(); showParameters(args); BeanDefination beanDefination = server.getRpcBeanFactory().getRpcBean(beanId); Method method = beanDefination.getMethod(); Object object = beanDefination.getObject(); Object result = method.invoke(object, args); oos.writeObject(result); } catch (Exception e) { e.printStackTrace(); }finally { closeSocket(); } } private void showParameters(Object[] args) { for(int i = 0; i < args.length; i++) { System.out.println("第" + i + "个"+ args[i]); } } }因此Beanfactory的存在意义就不用赘述了
public class RpcBeanFactory { private final Map<String, BeanDefination> rpcBeanMap; public RpcBeanFactory() { rpcBeanMap = new HashMap<>(); } public void beanRegisty(String beanId, BeanDefination beanDefination) { if(rpcBeanMap.containsKey(beanId)) { return; } rpcBeanMap.put(beanId, beanDefination); } public BeanDefination getRpcBean(String beanId) { return rpcBeanMap.get(beanId); } } public class BeanDefination { private Class<?> klass; private Method method; private Object object; public BeanDefination() { } protected Class<?> getKlass() { return klass; } protected void setKlass(Class<?> klass) { this.klass = klass; } protected Method getMethod() { return method; } protected void setMethod(Method method) { this.method = method; } protected Object getObject() { return object; } protected void setObject(Object object) { this.object = object; } }可以看到,beanfactory是最基本的单个方法的注入,这是最底层的。这里就的多说点了。我们代理机制用的是JDK代理。也就是说我们在client能只有一个接口,client只知道这个接口中的方法。它可以远程调用接口中的所有的方法。因此服务器这里必须能对接口进行注册。并且有一点,将来的执行方法的对象是在服务器层产生,因此,还需要在注册时,给出接口的实现类,方便之后的实例化。
public class RpcBeanRegisty { public RpcBeanRegisty() { } private static void doRegisty(RpcBeanFactory beanFactory, Class<?> interfaces, Object object) { Method[] methods = interfaces.getDeclaredMethods(); for(Method method : methods) { String beanId = String.valueOf(method.toString().hashCode()); BeanDefination beanDefination = new BeanDefination(); beanDefination.setKlass(interfaces); beanDefination.setMethod(method); beanDefination.setObject(object); beanFactory.beanRegisty(beanId, beanDefination); } } public static void registyInterface(RpcBeanFactory beanFactory, Class<?> interfaces, Object object ) { if(!interfaces.isAssignableFrom(object.getClass())) { return; } doRegisty(beanFactory, interfaces, object); } public static void registyInterface(RpcBeanFactory beanFactory, Class<?> interfaces, Class<?> klass) { if(!interfaces.isAssignableFrom(klass)) { return; } try { doRegisty(beanFactory, interfaces, klass.newInstance()); } catch (Exception e) { e.printStackTrace(); } } }这里我们可以发现,这个注册类,其实只是一个工具,因此我们给的方法都是static的,不需要产生对象,即可直接使用。
rpc终于是写完了。其实最近的小项目上,发现代理机制真的很犀利。好多框架的实现,必须用到代理机制,才能谈及之后的设计。亦可以说,它是比较基本的,底层的东西。