分布式RPC服务器(容灾和服务器识别机制的实现,借助zookeeper)最终完整代码
同样在services.py 中自定义需要实现消息协议、传输控制,并且实现客户端存根clientStub和服务器端存根serverStub,服务器定义以及channel的定义。
此时,添加DistributedChannel分布式的channel,并在ThreadServer中添加了registry方法。
```
importthreadingimportrandomimportstructimporttimefromioimportBytesIOimportsocketimportjsonfromkazoo.clientimportKazooClientclassInvalidOperation(BaseException):def__init__(self,message=None):self.message=messageor'involidoperation'classMethodProtocol(object):''''解读方法名'''def__init__(self,connection):self.conn=connectiondef_read_all(self,size):"""帮助我们读取二进制数据:paramsize:想要读取的二进制数据大小:return:二进制数据bytes"""#self.connifisinstance(self.conn,BytesIO):buff=self.conn.read(size)returnbuffelse:#有时候长度大于每次读取的长度have=0buff=b''whilehave<size:chunk=self.conn.recv(size-have)buff+=chunkl=len(chunk)have+=lifl==0:#表示客户端已经关闭了raiseEOFErrorreturnbuffdefget_method_name(self):#读取字符串长度buff=self._read_all(4)length=struct.unpack('!I',buff)[0]#读取字符串buff=self._read_all(length)name=buff.decode()returnnameclassDivideProtocol(object):"""divide过程消息协议转换工具"""defargs_encode(self,num1,num2=1):"""将原始调用的请求参数转换打包成二进制消息数据:paramnum1:int:paramnum2:int:return:bytes二进制消息数据"""name='divide'#处理函数名buff=struct.pack('!I',6)#无符号intbuff+=name.encode()#处理参数1buff2=struct.pack('!B',1)#无符号bytebuff2+=struct.pack('!i',num1)#处理参数2ifnum2!=1:#没有传参的时候buff2+=struct.pack('!B',2)buff2+=struct.pack('!i',num2)#处理参数边界和组合成完整数据buff+=struct.pack('!I',len(buff2))buff+=buff2returnbuffdef_read_all(self,size):"""帮助我们读取二进制数据:paramsize:想要读取的二进制数据大小:return:二进制数据bytes"""#self.connifisinstance(self.conn,BytesIO):buff=self.conn.read(size)returnbuffelse:#有时候长度大于每次读取的长度have=0buff=b''whilehave<size:chunk=self.conn.recv(size-have)buff+=chunkl=len(chunk)have+=lifl==0:#表示客户端已经关闭了raiseEOFErrorreturnbuffdefargs_decode(self,connection):"""接受调用请求数据病进行解析:paramconnection:链接请求数据socketBytesIO:return:因为有多个参数,定义为字典"""param_len_map={1:4,2:4,}param_fmt_map={1:'!i',2:'!i',}param_name_map={1:'num1',2:'num2',}#保存用来返回的参数字典args={}self.conn=connection#处理方法的名字,已经提前被处理,稍后处理#处理消息边界#1)读取二进制数据----read,------ByteIO.read#2)将二进制数据转换为python的数据类型buff=self._read_all(4)length=struct.unpack('!I',buff)[0]#记录已经读取的长度值have=0#处理第一个参数#解析参数序号buff=self._read_all(1)have+=1param_seq=struct.unpack('!B',buff)[0]#解析参数值param_len=param_len_map[param_seq]buff=self._read_all(param_len)have+=param_lenparam_fmt=param_fmt_map[param_seq]param=struct.unpack(param_fmt,buff)[0]#设置解析后的字典param_name=param_name_map[param_seq]args[param_name]=paramifhave>=length:returnargs#处理第二个参数#解析参数序号buff=self._read_all(1)param_seq=struct.unpack('!B',buff)[0]#解析参数值param_len=param_len_map[param_seq]buff=self._read_all(param_len)param_fmt=param_fmt_map[param_seq]param=struct.unpack(param_fmt,buff)[0]#设置解析后的字典param_name=param_name_map[param_seq]args[param_name]=paramreturnargsdefresult_encode(self,result):"""将原始结果数据转换为消息协议二进制数据:paramresult::return:"""ifisinstance(result,float):#处理返回值类型buff=struct.pack('!B',1)buff+=struct.pack('!f',result)returnbuffelse:buff=struct.pack('!B',2)#处理返回值length=len(result.message)#处理字符串长度buff+=struct.pack('!I',length)buff+=result.message.encode()returnbuffdefresult_decode(self,connection):"""将返回值消息数据转换为原始返回值:paramconnection:socketBytesIo:return:floatInvalidOperation对象"""self.conn=connection#处理返回值类型buff=self._read_all(1)result_type=struct.unpack('!B',buff)[0]ifresult_type==1:#正常情况buff=self._read_all(4)val=struct.unpack('!f',buff)[0]returnvalelse:buff=self._read_all(4)length=struct.unpack('!I',buff)[0]#读取字符串buff=self._read_all(length)message=buff.decode()returnInvalidOperation(message)classChannel(object):"""用于客户端建立网络链接"""def__init__(self,host,port):self.host=hostself.port=portdefget_connection(self):"""获取链接对象:return:与服务器通讯的socket"""sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)sock.connect((self.host,self.port))returnsockclassDistributedChannel(object):"""支持分布式的zookeeper的RPC客户端链接工具"""def__init__(self):#创建kazoo对象,用来跟zookeeper链接,获取信息zk=KazooClient('127.0.0.1:2181')zk.start()self.zk=zkself._servers=[]self._get_servers()#第一次,手动开启def_get_servers(self,event=None):"""从zookeeper中获取所有可用的RPC服务器的地址:return:"""self._servers=[]#每次重新获取地址信息#从zookeeper中获取/rpc节点下的所有可用的rpc服务器节点servers=self.zk.get_children('/rpc',watch=self._get_servers)#监视的回调函数为自身forserverinservers:addr_data=self.zk.get('/rpc/'+server)[0]addr=json.loads(addr_data.decode())self._servers.append(addr)def_get_server(self):"""从可用的服务器列表中选出一台服务器:return:{"host":xxx,"port":xxx}"""returnrandom.choice(self._servers)defget_connection(self):"""提供一个具体的与RPC服务器的链接socket:return:"""whileTrue:addr=self._get_server()print(addr)try:sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)sock.connect((addr['host'],addr['port']))exceptConnectionRefusedError:time.sleep(1)continueelse:returnsockclassThreadServer(object):"""多线成RPC服务器"""def__init__(self,host,port,handlers):sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#地址复用sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)self.host=hostself.port=port#绑定地址sock.bind((self.host,self.port))#因为在启动的方法中才开启监听,所以不在此处开启#sock.listen(128)self.sock=sockself.handlers=handlersdefserve(self):"""开启服务器运行,提供RPC服务:return:"""#开启服务器的监听,等待客户端的链接请求self.sock.listen(128)print("服务器开启监听,ip地址为%s,port为%d..."%(self.host,self.port))#注册到zookeeperself.register_zookeeper()whileTrue:#不断的接收客户端的链接请求client_sock,client_addr=self.sock.accept()print("与客户端%s建立连接"%str(client_addr))t=threading.Thread(target=self.handle,args=(client_sock,))t.start()defregister_zookeeper(self):"""在zookeeper中心注册本服务器的地址信息:return:"""#创建kazoo的客户端zk=KazooClient('127.0.0.1:2181')#建立与zookeeper的链接zk.start()#在zookeeper中创建节点保存数据zk.ensure_path('/rpc')data=json.dumps({'host':self.host,'port':self.port})zk.create('/rpc/server',data.encode(),ephemeral=True,sequence=True)#子线程函数defhandle(self,client_sock):"""子线程调用的方法,用来处理一个客户段的请求:return:"""#交个ServerStub,完成客户端的具体的RPC的调用请求stub=ServerStub(client_sock,self.handlers)try:whileTrue:#不断的接收stub.process()exceptEOFError:#表示客户端关闭了连接print('客户端关闭了连接')client_sock.close()classServer(object):"""RPC服务器"""def__init__(self,host,port,handlers):sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#地址复用sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)self.host=hostself.port=port#绑定地址sock.bind((self.host,self.port))#因为在启动的方法中才开启监听,所以不在此处开启#sock.listen(128)self.sock=sockself.handlers=handlersdefserve(self):"""开启服务器运行,提供RPC服务:return:"""#开启服务器的监听,等待客户端的链接请求self.sock.listen(128)print("服务器开启监听,ip地址为%s,port为%d..."%(self.host,self.port))whileTrue:#不断的接收客户端的链接请求client_sock,client_addr=self.sock.accept()print("与客户端%s建立连接"%str(client_addr))#交个ServerStub,完成客户端的具体的RPC的调用请求stub=ServerStub(client_sock,self.handlers)try:whileTrue:#不断的接收stub.process()exceptEOFError:#表示客户端关闭了连接print('客户端关闭了连接')client_sock.close()classClientStub(object):"""用来帮助客户端完成远程过程调用RPC调用stub=ClientStub()stub.divide(200,100)"""def__init__(self,channel):self.channel=channelself.conn=self.channel.get_connection()defdivide(self,num1,num2=1):#将调用的参数打包成消息协议的数据proto=DivideProtocol()args=proto.args_encode(num1,num2)#将消息数据通过网络发送给服务器self.conn.sendall(args)#接受服务器返回的消息数据,并进行解析result=proto.result_decode(self.conn)#将结果之(正常float或异常InvalidOperation)返回给客户端ifisinstance(result,float):returnresultelse:raiseresultclassServerStub(object):"""服务端存根帮助服务端完成远端过程调用"""def__init__(self,connection,handlers):""":paramconnection:与客户端的链接:paramhandlers:真正的本地函数路由此处不以map的形式处理,实现类的形式classHandler:@staticmethoddefdivide():pass@staticmethoddefadd():pass"""self.conn=connectionself.method_proto=MethodProtocol(self.conn)self.process_map={'divide':self._process_divide,'add':self._process_add}self.handlers=handlersdefprocess(self):"""当服务端接受了客户的链接,建立好链接后,完成远端调用的处理:return:"""#接收消息数据,并解析方法的名字name=self.method_proto.get_method_name()#根据解析获得的方法名,调用相应的过程协议,接收并解析消息数据self.process_map[name]()def_process_divide(self):"""处理除法过程调用:return:"""proto=DivideProtocol()args=proto.args_decode(self.conn)#args={'num1':xxx,'num2':xxx}#除法过程的本地调用------------------->>>>>>>>>#将本地调用过程的返回值(包括可能的异常)打包成消息协议的数据,通过网络返回给客户端try:val=self.handlers.divide(**args)exceptInvalidOperationase:ret_message=proto.result_encode(e)else:ret_message=proto.result_encode(val)self.conn.sendall(ret_message)def_process_add(self):"""处理加法过程调用此方法暂时不识闲:return:"""passif__name__=='__main__':#目的:消息协议测试,模拟网络传输#构造消息数据proto=DivideProtocol()#测试一#divide(200,100)#message=proto.args_encode(200,100)#测试二message=proto.args_encode(200)conn=BytesIO()conn.write(message)conn.seek(0)#解析消息数据method_proto=MethodProtocal(conn)name=method_proto.get_method_name()print(name)args=proto.args_decode(conn)print(args)
```
接下来,修改server.py文件
server.py
```
fromservicesimportInvalidOperation#fromservicesimportServerfromservicesimportThreadServerimportsysclassHandlers:@staticmethoddefdivide(num1,num2=1):ifnum2==0:raiseInvalidOperation('ck_god_err')val=num1/num2returnvalif__name__=='__main__':#开启服务器#_server=Server('127.0.0.1',8000,Handlers)#_server.serve()#从启动命令中提取服务器运行的ip地址和端口号,启动的多线程服务器host=sys.argv[1]port=int(sys.argv[2])_server=ThreadServer(host,port,Handlers)_server.serve()
```
最后,将client.py文件也稍作修改。
```
importtimefromservicesimportClientStub#fromservicesimportChannelfromservicesimportDistributedChannelfromservicesimportInvalidOperation#创建与服务器的连接#channel=Channel('127.0.0.1',8000)channel=DistributedChannel()#进行调用foriinrange(50):try:#创建用于rpc调用的工具stub=ClientStub(channel)#初始化的时候才真正的创建连接了,所以放到里面val=stub.divide(i*100,100)#val=stub.divide(i*100)#val=stub.divide(100,0)exceptInvalidOperationase:print(e.message)else:print(val)time.sleep(1)
```
ctrl + shift + T在pycharm中打开多个Terminal窗口
右键运行客户端,可以看到不断地随机切换服务器。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。