三百行代码完成一个简单的rpc框架
花了半天的时间写了个简单的rpc框架,是因为我最初看dubbo源码的时候发现dubbo虽然看起来很庞大,但是隐隐约约总感觉,其实其绝大多数功能,都是基于可扩张性和服务治理的需要而编写的。我看过dubbo和grpc的源码,这两个都是非常优秀的rpc框架,但是为了让初学rpc的同学能够快速抓住rpc的核心,所以写了这篇文章,希望看过的同学,再次去看dubbo的源码的时候,能够抓住这个核心去看。
一:rpc协议的接口
RpcProtocol.java
public interface RpcProtocol { void export(int port); Object refer(Class inrerfaceClass,String host, int port);
}
这个接口类只提供两个接口,那是因为对于一个rpc框架来说,本质上就只需要两个接口,一个是consumer引用provider的服务,一个是provider接收到consumer的请求之后对外暴露服务。
下面是具体的实现。代码不复杂,可以直接复制到idea,慢慢调试
二:rpc协议的具体实现
RpcCore.java
public class RpcCore implements RpcProtocol{private Socket socket;private ObjectOutputStream objectOutputStream;private ObjectInputStream objectInputStream;private ServerSocket serverSocket;private Map<String,List<Object>> services=new ConcurrentHashMap<String, List<Object>>();private Map<String,Map<String,Object>> interfaceAtrributes=new ConcurrentHashMap<>();@Overridepublic void export(int port){ start(port);}@Overridepublic Object refer(final Class interfaceClass,String host, int port){ connect(host,port); return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String interfaceName=interfaceClass.getName(); String fullName= (String) interfaceAtrributes.get(interfaceName).get("fullName"); return get(fullName,method,args); } });}public Object get(String interfaceFullName,Method method,Object[] parames){ Object result=null; try { objectOutputStream.writeUTF(interfaceFullName); objectOutputStream.writeUTF(method.getName()); objectOutputStream.writeObject(method.getParameterTypes()); objectOutputStream.writeObject(parames); objectOutputStream.flush(); result=objectInputStream.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { try { if (objectOutputStream!=null) { objectOutputStream.close(); } if (objectInputStream!=null) { objectInputStream.close(); } } catch (IOException e) { e.printStackTrace(); } } return result;}private void start(int port) { try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress("localhost", port)); init(); } catch (IOException e) { e.printStackTrace(); } while (true) { System.out.println("server has started success port is --->"+port); Socket socket = null; try { socket = serverSocket.accept(); new Thread(new Processsor(socket,services)).start(); } catch (IOException e) { e.printStackTrace(); } }}public void init(){ RpcDemo rpcDemo=new RpcDemoImplProvider(); String group="rpcDemo"; String version="1.0.0"; String fullName=RpcDemo.class.getName()+"&"+group+"&"+version; List<Object> rpcDemoInstances=services.get(fullName); if (rpcDemoInstances==null){ rpcDemoInstances=new ArrayList(); rpcDemoInstances.add(rpcDemo); } services.put(fullName,rpcDemoInstances);}public void connect(String host, int port) { try { storeInterface(); socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); objectInputStream=new ObjectInputStream(socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); }}private void storeInterface(){ String group="rpcDemo"; String version="1.0.0"; String fullName=RpcDemo.class.getName()+"&"+group+"&"+version; Map<String,Object> attributes=interfaceAtrributes.get(fullName); if (attributes==null){ attributes=new ConcurrentHashMap(100); attributes.put("group",group); attributes.put("version",version); attributes.put("fullName",fullName); } interfaceAtrributes.put(RpcDemo.class.getName(),attributes);}class Processsor implements Runnable { private Socket socket; private ObjectInputStream objectInputStream; private ObjectOutputStream objectOutputStream; private Map<String,List<Object>> services; private Processsor(Socket socket,Map<String,List<Object>> services) { this.socket = socket; this.services=services; } @Override public void run() { System.out.println((((InetSocketAddress) socket.getRemoteSocketAddress()).getPort())); try { objectInputStream = new ObjectInputStream(socket.getInputStream()); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); String interfaceFullName=objectInputStream.readUTF(); String methodName=objectInputStream.readUTF(); Class[] parameTypes= (Class[]) objectInputStream.readObject(); Object[] objects= (Object[]) objectInputStream.readObject(); String interfaceName=interfaceFullName.split("&")[0]; Class service=Class.forName(interfaceName); Method method=service.getMethod(methodName,parameTypes); Object instances=services.get(interfaceFullName).get(0); Object result = method.invoke(instances, objects); objectOutputStream.writeObject(result); objectOutputStream.flush(); objectOutputStream.close(); objectInputStream.close(); } catch (Exception e) { e.printStackTrace(); } }}}
三:rpc测试的接口
所谓接口说白了就是协议,与http,mqtt等其他的协议本质上没什么区别,只不过rpc是强依赖,而后两个是弱依赖而已,另外之所以把实体作为内部类,是为了表达一种思想,rpc的实体类和异常都是协议的一部分,应该将他们放到一起。
Rpcdemo.java
public interface RpcDemo {Student getStudent(Integer id,String name);class Student implements Serializable{ public long id; public String name; public int age; public boolean man; public Student(long id, String name, int age, boolean man) { this.id = id; this.name = name; this.age = age; this.man = man; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public boolean isMan() { return man; } public void setMan(boolean man) { this.man = man; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", man=" + man + '}'; }}
}
四:接口的实现:
RpcDemoImplProvider.java
public class RpcDemoImplProvider implements RpcDemo{public Student getStudent(Integer id,String name){ return new Student(1234,"zhangsan",20,true);}public static void main(String[] args) { RpcCore rpcCore=new RpcCore(); rpcCore.export(8087);}
}
五:RpcDemoConsumer.java
消费端。
public class RpcDemoConsumer {public static void main(String[] args) { RpcCore rpcCore=new RpcCore(); RpcDemo rpcDemo = (RpcDemo)rpcCore.refer(RpcDemo.class, "127.0.0.1", 8087); System.out.println(" 远程调用成功"); System.out.println("返回的结果是---->"+rpcDemo.getStudent(111,"zhangsan"));}
}
六:先启动RpcDemoImplProvider
七:启动RpcDemoConsumer
总共也就250行代码左右。over
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。