一 概述1 运维管理的阶段1 人工阶段

人工盯着服务器,出了问题,到机器前面,翻日志,查状态,手动操作

2 脚本阶段

开始写一些自动化脚本,启动计划任务,自动启动服务,监控服务等

3 工具阶段

脚本功能太弱,开发了大量工具,某种工具解决某个特定领域的问题,常用的有ansible,puppet等

4 平台阶段

将工具整合,自主研发,实现标准化,实现自动化流程控制,而今,平台已经开始迈向智能化的发展方向。

二 mschedule 设计1 完整代码链接

https://gitee.com/ChangPaoZhe/mschedule

2要求

1 分发任务
分发脚本到目前节点上去执行


2 控制
控制并发,控制多少个节点同时执行
对错误做出响应,由用户设定,最多允许失败的比例或者数量,当超过范围时,需要终止任务执行


3 能跨机房部署


4 能对作业做版本控制,这是辅助功能,可过后实现

3 项目基本概述1 基本概述

本项目的出发点,是只需要会使用shell脚本就可以了,可以通过使用shell脚本的方式来完成远程任务的下发和处理流程。

2 其他自动化工具二次开发缺点

ansible,salt等需要学习特定的内部语言,如果觉得ansible这样的工具不能满足需求,二次开发难度过高,代码量不小,本身它们开发接口不完善,而且熟悉它的叫也比较难,就算开发出来维护也难。

从这些项目上二次开发,等于拉一个分支,如果主分支有了新的特性,想要合并也是比较困难的。

自己开发,满足自己需求,完全适合自己需求,代码规模可控,便于他人接收维护。

3 项目初始版本目标

自己开发就是造轮子,造轮子不是不好,其起初要实现的功能应该是比较简单的。后面可以逐步进行完善操作。

4 项目基本架构图

浏览器端和webSERVER端交互是通过HTTP实现的,而WEB server和master server 是通过TCP链接来实现的,master server 和agent之间也是通过TCP 链接来实现的

4 分发任务设计1 分发任务分类1 有agent 类

有agent类,被控节点需要安装或运行特殊的软件,用于和服务器端进行通信,服务器端把脚本,命令传递给agent端,由agent端控制来执行

2 无agent类

被控节点不需要安装或者运行特殊软件,如通过SSH来实现,这其实也是有agent的,不过不是自己写的程序


优缺点

1 通用,简单,易实现,但管理不善,容易出现安全问题

2 并行效率不高,有agent的并行执行可以不和管理服务器通信,可以并发很高,ssh执行要和master之间通信

3 ssh链接是有状态的,任务执行的时候,master不能挂了,否则任务将执行失败。

5 执行脚本(subprocess)

python 中有很多运行进程的方式,不过都过时了。
建议使用标准库subprocess模块,启动一个子进程。

1 初始化类源码

def __init__(self, args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=()):

第一个是参数,后面是可选,但shell默认为False,可将其置为True, stdout 后面跟文件或管道

def wait(self, timeout=None, endtime=None): """Wait for child process to terminate. Returns returncode attribute.""" if endtime is not None: timeout = self._remaining_time(endtime) if timeout is None: timeout_millis = _winapi.INFINITE else: timeout_millis = int(timeout * 1000) if self.returncode is None: result = _winapi.WaitForSingleObject(self._handle, timeout_millis) if result == _winapi.WAIT_TIMEOUT: raise TimeoutExpired(self.args, timeout) self.returncode = _winapi.GetExitCodeProcess(self._handle) return self.returncode

此处返回是状态,0为成功,其他为失败


stdout 方法调用的是一个文件,因此可使用文件的形式进行处理

if c2pread != -1: self.stdout = io.open(c2pread, 'rb', bufsize) if universal_newlines: self.stdout = io.TextIOWrapper(self.stdout)2 基本代码如下

#!/usr/bin/poython3.6#conding:utf-8import subprocessfrom subprocess import Popen,PIPEout=Popen("echo 'hello'",shell=True,stdout=PIPE)code=out.wait(10)txt=out.stdout.read()print ("code={} txt={}".format(code,txt.decode()))

结果如下

6 项目基本构建1 创建文件并添加虚拟环境

mkdir mschedule -pcd mschedule/pyenv virtualenv 3.5.3 mschpyenv local msch

2 构建模块agent,并创建执行程序executor.py

#!/usr/bin/poython3.6#conding:utf-8from subprocess import PIPE,Popenclass Executor: def run(self,script,timeout): p=Popen(script,shell=True,stdout=PIPE) code=p.wait(timeout=timeout) txt=p.stdout.read() return (code,txt)if __name__ == "__main__": exec=Executor() print (exec.run("echo 'hello'",3))

结果如下

7 agent 和master设计

用户和master server 通信,提交任务,此处是通过HTTP的方式提交任务
master 按照用户要求将任务分发到指定的节点上,这些节点上需要有agent用于和master通信,接受master发布的任务,并执行这些任务


设计agent,越简单越好,越简单bug越少,越稳定。
从本质上来说,master,agent设计是典型的CS编程模式
master作为CS中的server,agent作为CS中的client

8 消息设计1 注册信息

agent启动后,需要主动连接server,并注册自己
信息包括
hostname:报告自己的主机名称,此主机名称可能会重复

UUID,用于唯一标识这台主机

IP: 用于更加方便的管理主机

其它相关信息视情况而定

{ "type": "register", # 此处用于定义消息类型 "payload":{ "id" : uuid, #用于唯一标识一台主机 "hostname": "xxxx", # 对应agent名称 "IP": [], # agent IP地址,其可能包含多个IP地址,因此此处使用列表进行存储 } }2 心跳信息

agent定时向master发送心跳包,包含UUID这个唯一标识,附带hostname和ip地址,hostname和ip都可能变动,但agent不变,其UUID便不会发生变化,其他相关信息科一附加, 如更加flag,用于标识agent是否有正在执行的任务。

{ "type": "heartbeat", # 此处用于定义消息类型 "payload":{ "id" : uuid, #用于唯一标识一台主机 "hostname": "xxxx", # 对应agent名称 "IP": [], # agent IP地址,其可能包含多个IP地址,因此此处使用列表进行存储 }}3 任务消息

master分派任务给agent,发送任务描述信息到agent。
注意脚本字符串使用base64编码

{ "type" :"task", "payload" :{ "id" :"task-uuid", # 定义任务的唯一标识 "script" : "base64code", #定义执行任务的内容 "timeout" :0, # 定义超时时长 "parallel" :1, # 定义并行执行数 "fail_rate" :0, # 定义失败率,及百分比为多少代表失败 "fail_count" :-1 # 定义失败的次数为多少次表示失败,-1表示不关心 } }4 任务结果消息

当agent任务执行完成后,返回给master该任务执行的状态码和输出结果。

{ "type" :"result", "payload" :{ "id": "task-uuid", # 定义任务唯一标识 "agent_id": "agent-uuid", #定义任务执行者 "code" : 0, #定义任务执行结果返回值。0 表示成功,其他表示失败 "output" :"base64encode" # 定义任务执行结果,及输出到控制台的结果 }}

以上的master,agent之间需要传递消息,消息采用json格式。

三 agent端代码实现1 日志实现

具体代码如下

#!/usr/bin/poython3.6#conding:utf-8import loggingdef getlogger(mod_name:str,filepath:str='/var/log/mschedule'): logger=logging.getLogger(mod_name) # 获取名字 logger.setLevel(logging.INFO) # 添加日志级别 logger.propagate=False # 配置不想上传递 handler=logging.FileHandler("{}/{}.log".format(filepath,mod_name)) fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s (%(filename)s:L%(lineno)d)", datefmt='%Y-%m-%d %H:%M:%S') handler.setFormatter(fmt) logger.addHandler(handler) return loggerif __name__ == "__main__": log = getlogger('test') log.info('13234545654')

结果如下

2 通信模块实现(zerorpc )1 介绍和安装

原生的socket编程过于底层,很少使用,任何一门语言都要避开直接使用socket库开发,太过底层,难写难维护。

zeroprc 是基于 ZeroMQ和MessagePack 来实现的通信工具。

官网地址

http://www.zerorpc.io

安装

pip install zerorpc 2 基本代码实现

根目录创建app.py和appserver.py

server 端配置

#!/usr/bin/poython3.6#conding:utf-8import zerorpcclass HelloRPC(object): #定义方法 def hello(self, name): return "Hello, %s" % names = zerorpc.Server(HelloRPC()) # 方法注入s.bind("tcp://0.0.0.0:8080") # 绑定方法 s.run() # 运行方法

client端配置

#!/usr/bin/poython3.6#conding:utf-8import zerorpcc = zerorpc.Client()c.connect("tcp://127.0.0.1:8080")print (c.hello("RPC"))

#!/usr/bin/poython3.6#conding:utf-8import zerorpcimport threadingc = zerorpc.Client()c.connect("tcp://127.0.0.1:8080")e=threading.Event()while not e.wait(3): print(c.hello('test client')) print ('```````````````')

结果如下

3 注册消息实现1 uuid唯一主机标识

使用uuid.uuid4().hex 获取一个uuid,一个节点起始运行的时候是没有uuid的,一旦运行会生成一个uuid,并持久化到一个文件中,下次运行先找这个文件,如果文件中有uuid,就直接读取,没有uuid就重新生成并写入到该文件中。

#!/usr/bin/poython3.6#conding:utf-8#!/usr/bin/poython3.6#conding:utf-8import uuidprint (uuid.uuid4().hex)print (uuid.uuid4().hex)print (uuid.uuid4().hex)

结果如下

2 hostname

windows 和Linux 获取主机名称的方式是不同的

可以在所有平台上是使用socket.gethostname()获取主机名。

#!/usr/bin/poython3.6#conding:utf-8import socketprint (socket.gethostname())

3 ip 列表

pip install netifaces

netifaces.interfaces() 返回接口列表

netifaces.ifaddresss(interface) 获取指定接口的IP地址,返回相关信息

ip地址判断

#!/usr/bin/poython3.6#conding:utf-8import ipaddressips=['127.0.0.1','192.168.0.1','169.254.123.1','0.0.0.0','239.168.0.255','224.0.0.1','8.8.8.8']for ip in ips: print (ip) ip=ipaddress.ip_address(ip) print ('Linklocal {}'.format(ip.is_link_local)) # 169.254地址 print ('回环 {}'.format(ip.is_loopback)) # 回环 print ('多播 {}'.format(ip.is_multicast)) # 多播 print ('公网 {}'.format(ip.is_global)) # 公网,全球范围地址 print ('私有 {}'.format(ip.is_private)) # 私有地址 print ('保留 {}'.format(ip.is_reserved)) # 保留地址 print ('版本 {}'.format(ip.version)) #ipv4地址 print ('----------------------------')

结果如下


#!/usr/bin/poython3.6#conding:utf-8import netifacesprint (netifaces.interfaces()) # 获取所有的网卡接口for i in netifaces.interfaces(): print ('i....',netifaces.ifaddresses(i)) # 使用ifaddress获取端口对应的IP地址 print () print ('------------------------------') print () print ('[2]',netifaces.ifaddresses(i)[2]) # 获取字典key为2的对应的值

结果如下

其是一个字典,key为2就是ipv4地址
每一个接口返回的ipv4地址是一个列表,也就是说可以有多个,ipv4地址描述是在addr上

#!/usr/bin/poython3.6#conding:utf-8import netifacesprint (netifaces.interfaces()) # 获取所有的网卡接口for i in netifaces.interfaces(): for p in netifaces.ifaddresses(i)[2]: if p['addr']: print ('ip',p['addr']) # 获取ip地址

结果如下

#!/usr/bin/poython3.6#conding:utf-8import netifacesimport ipaddressprint (netifaces.interfaces()) # 获取所有的网卡接口for i in netifaces.interfaces(): for p in netifaces.ifaddresses(i)[2]: if p['addr']: ip=ipaddress.ip_address(p['addr']) #获取ip地址 if ip.is_loopback or ip.is_multicast or ip.is_link_local or ip.is_reserved: # 判断IP地址 continue print (ip)

结果如下

4 注册信息和相关信息处理

在agent文件包中创建msg.py文件,用于存储相关主从信息和配置信息

#!/usr/bin/poython3.6#conding:utf-8import socketimport uuidimport netifacesimport ipaddressimport osclass Messgae: def __init__(self,myidpath): if os.path.exists(myidpath): # 如果存在 with open(myidpath) as f: self.id=f.readline().strip() else: self.id=uuid.uuid4().hex with open(myidpath,'w') as f: f.write(self.id) def get_ipaddress(self): address=[] for p in netifaces.interfaces(): # 获取网口列表 n=netifaces.ifaddresses(p) # 获取字典 if n.get(2): # 查看是否存在ipv4地址 for ip in n[2]: # 此处获取对应列表的值 if ip['addr']: # 查看ip地址是否存在 ip=ipaddress.ip_address(ip['addr']) if ip.is_reserved or ip.is_multicast or ip.is_link_local or ip.is_loopback: continue address.append(str(ip)) return address def hearbeat(self): return { "type" :"hearbeat", "payload" :{ "ip" : self.get_ipaddress(), "hostname" : socket.gethostname(), "id" : self.id } } def reg(self): return { "type" :"register", "payload" :{ "ip" : self.get_ipaddress(), "hostname" : socket.gethostname(), "id" : self.id } }if __name__ == "__main__": msg=Messgae('/var/log/mschedule/uuid') print (msg.reg())

测试结果如下

5 处理链接相关配置

agent中创建config模块用于添加相关链接服务端IP地址
agent中创建cm 模块用于处理链接相关配置


config.py 配置如下

#!/usr/bin/poython3.6#conding:utf-8CONN_URL="tcp://127.0.0.1:9000"

cm.py 模块配置如下

#!/usr/bin/poython3.6#conding:utf-8import zerorpc #添加模块import threading # 用于处理中断相关from .msg import Messgae # 获取消息from .config import CONN_URLfrom utils import getloggerclass Conn_Manager: def __init__(self,timeout=3): self.timeout=timeout self.client=zerorpc.Client() self.event=threading.Event() self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息 self.log=getlogger('agent') # 此处填写相关的log日志名称 def start(self): self.client.connect(CONN_URL) # 链接处理 self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息 self.client.send(self.message.reg()) #处理注册消息 while not self.event.wait(self.timeout): # 等待的时间 self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat()))) # 发送心跳信息 def shutdown(self): self.log.info("关闭操作") self.client.close() self.event.set()

agent 中 _init_.py 端配置

#!/usr/bin/poython3.6#conding:utf-8from .cm import Conn_Managerclass app: def __init__(self,timeout): self.conn=Conn_Manager(timeout) def start(self): self.conn.start() def shutdown(self): self.conn.shutdown()

全局根目录下 app.py 端配置如下

#!/usr/bin/poython3.6#conding:utf-8from agent import appif __name__ == "__main__": agent=app(3) try: agent.start() except KeyboardInterrupt: agent.shutdown()

服务端测试文件appserver 配置如下

#!/usr/bin/poython3.6#conding:utf-8import zerorpcclass HelloRPC(object): #定义方法 def send(self, name): return "Hello, %s" % names = zerorpc.Server(HelloRPC()) # 方法注入s.bind("tcp://0.0.0.0:9000") # 绑定方法s.run() # 运行方法

启动结果如下

日志结果如下

处理客户端重连机制

默认的,服务端关闭后,客户端结果如下

处理结果如下

cm.py如下

#!/usr/bin/poython3.6#conding:utf-8import zerorpc #添加模块import threading # 用于处理中断相关from .msg import Messgae # 获取消息from .config import CONN_URLfrom utils import getloggerclass Conn_Manager: def __init__(self,timeout=3): self.timeout=timeout self.client=zerorpc.Client() self.event=threading.Event() self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息 self.log=getlogger('agent') # 此处填写相关的log日志名称 def start(self): try: self.client.connect(CONN_URL) # 链接处理 self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息 self.client.send(self.message.reg()) #处理注册消息 while not self.event.wait(self.timeout): # 等待的时间 self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hreadbeat()))) # 发送心跳信息 except Exception as e: print ('--------------------') self.event.set() raise e # 此处是抛出异常到上一级 def shutdown(self): self.log.info("关闭操作") self.client.close() self.event.set()

agent._init_.py 结果如下

#!/usr/bin/poython3.6#conding:utf-8from .cm import Conn_Managerimport threadingclass app: def __init__(self,timeout): self.conn=Conn_Manager(timeout) self.event=threading.Event() def start(self): while not self.event.is_set(): try: self.conn.start() except Exception as e: print('重连') self.conn.shutdown() self.event.wait(3) def shutdown(self): self.event.set() self.conn.shutdown()

app.py 如下

#!/usr/bin/poython3.6#conding:utf-8from agent import appif __name__ == "__main__": agent=app(3) try: agent.start() except KeyboardInterrupt: agent.shutdown()

结果如下

四 master端实现1 基本功能1 TCP Server

绑定端口,启动监听,等待agent链接。

2 信息存储

存储agent列表
存储用户提交的Task列表,用户通过WEB提交的任务信息存储下来。

3 接受注册

将注册信息写入agent列表
接受心跳信息
接受agent端发送的心跳信息

4 派发任务

将用户提交的任务分配到agent端

2 基本代码实现1 master.config 模块

用于指定服务端绑定IP地址和端口号

#!/usr/bin/poython3.6#conding:utf-8MASTER_URL="tcp://0.0.0.0:9000"if __name__ == "__main__": pass2 master.handler 模块

主要负责客户端数据的调度

#!/usr/bin/poython3.6#conding:utf-8from utils import getloggerlog=getlogger('handler')class Handler(object): def send(self,msg): # 定义一个可调用的基础函数 log.info(" ack ok {}".format(msg)) return " ack ok {}".format(msg)3 cm.py 模块

用于tcp 链接建立和关闭

#!/usr/bin/poython3.6#conding:utf-8from utils import getloggerfrom .config import MASTER_URLimport zerorpcfrom .handler import Handlerlog=getlogger('server')class Master_Listen: def __init__(self): self.server=zerorpc.Server(Handler()) def start(self): self.server.bind(MASTER_URL) log.info('Master 启动配置') self.server.run() def shutdown(self): self.server.close()4 master._init_.py 模块

#!/usr/bin/poython3.6#conding:utf-8from .cm import Master_Listenclass appserver: def __init__(self): self.appserver=Master_Listen() def start(self): self.appserver.start() def shutdown(self): self.appserver.shutdown()5 appserver.py模块

#!/usr/bin/poython3.6#conding:utf-8from master import appserverif __name__ == "__main__": appserver=appserver() try: appserver.start() except KeyboardInterrupt: appserver.shutdown()

启动服务测试如下

结果如下

上述代码实现了基本的注册,心跳部分的功能

经观察可知,目前注册和心跳除了类型不同外,其可以认为第一次心跳成功就是注册。

3 master的数据设计

master端核心需要存储2中数据:agent端数据,用户客户端浏览器提交的任务Task,构造出一个数据结构,存储相关信息.具体数据结构如下

1 agent客户端数据存储结构

{ "agents" :{ "agent_id" :{ "heartbeat" :"timestamp", "busy" :False, "info" :{ "hostname" :"", "ip" :[] } } }}

数据结构解释如下

1 agents里面记录了所有注册的agent
agent_id,字典的key,每一个agent 都有一个不同uuid,所以这个字典的键就是uuid,
heartbeat 由于设计中并没有让agent端发送心跳时间,所以就在master端记录了收到的时间
busy 如果agent 上有任务在执行。则此值表现为True
info 记录agent上发过来的hostname和ip列表

2 task数据存储结构

{ "tasks" :{ "task_id" :{ "script" :"base64encode", "targets" :{ "agent_id" :{ "state":"WAITING", "output" :"" } }, "state" :"WAITING" } }}

task 记录所有任务及target(agent)的状态

task_id ,字典的key对应一个一个task,item 也是taskid:{} 结构
task 任务,task.json 的payload信息
targets目标,用于指定agent的节点,记录agent上的state和输出output
state状态,单个agent上的执行状态

state 这是一个task的状态,整个任务的状态,比如统计达到了agent失败上限了,这个task的state 就置为失败

状态常量
"WAITING" "RUNNING" "SUCCEED" "FAILED"

4 agent 端信息存储

创建 storage.py 模块
构建Storage 类,用于存储用户信息

#!/usr/bin/poython3.6#conding:utf-8import datetimeclass Storage: def __init__(self): self.agents={} # 此处用于存储用户信息 self.tasks={} # 此处用于存储作业信息 def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址 self.agents[agent_id] = { 'heaerbeat' : datetime.datetime.now(), 'info' :info, 'busy':self.agents.get(agent_id,{}).get('busy',False) } # busy 读不到置False,读到了不变

handler.py端配置如下

#!/usr/bin/poython3.6#conding:utf-8from utils import getloggerfrom .storage import Storagelog=getlogger('handler')class Handler(object): def __init__(self): self.store=Storage() def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数 log.info('客户端agent发送消息为:{}'.format(msg)) try: if msg['type'] in {'hearbeat','register'}: payload=msg['payload'] info={'hostname' :payload['hostname'],'ip' :payload['ip']} self.store.reg_hb(payload['id'],info) log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表 return "agent信息为: {}".format(msg) except Exception as e: log.error("注册客户端信息错误为:{}".format(e)) return "Bad Request...."

运行结果如下

5 task 任务基本注册和创建1 概述

用户通过WEB(HTTP)提交新的任务,任务json信息有:
1 任务脚本script,base64编码
2 超时时间timeout
3 并行度 parallel
4 失败率 fail_rate
5 失败次数fail_count
6 targets 是跑任务的Agent的agent_id列表,这个目前也是在用户端选好的,如yoghurt需要在主机名为webserver-xxxx的几台设备上运行脚本,为了用户方便,可以使用类似ansible的分组。

在Master端受到信息后,需要添加2个信息

task_id 是Mater 端新建任务时生成的uuid
state 默认状态是WAITING

在WEB server 中最后将用户端发送来的数据组成下面的字典

task={ "task_id" :t.id, "script" :t.script, "timeout":t.timeout, "parallel" :t.parallelm, "fail_rate":t.fail_rate, "fail_count":t.fail_count, "state":t.state, "targets":t.targets}2 构建state类

用于处理相关消息的类型

#!/usr/bin/poython3.6#conding:utf-8WAITING='WAITING'RUNNING='RUNNING'SUCCEED='SUCCEED'FAILED='FAILED'3 构建task类

创建master/task.py 类处理webserver端数据

\

#!/usr/bin/poython3.6#conding:utf-8import uuid # 获取唯一的task_idfrom .state import *class Task: def __init__(self,task_id,script,targets,timeout=0,parallel=1,fail_rate=0,fail_count=-1): self.id=task_id # task唯一标识,用于确定任务 self.script=script # 对应的脚本内容,客户端输入的脚本 self.timeout=timeout # 超时时间 self.parallel=parallel # 并行执行数量 self.fail_rate=fail_rate #失败率 self.fail_count=fail_count #失败数 self.state=WAITING # 对应的消息的状态 self.targets={agent_id:{'state' : WAITING,'output':''} for agent_id in targets} # 此处对应客户端列表 self.target_count=len(self.targets) # 此处对应客户端的数量

在master.storage.py模块中进行相关方法调用,并将其存储进入task中

#!/usr/bin/poython3.6#conding:utf-8import datetimefrom .task import Taskclass Storage: def __init__(self): self.agents={} # 此处用于存储用户信息 self.tasks={} # 此处用于存储作业信息 def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址 self.agents[agent_id] = { 'heaerbeat' : datetime.datetime.now(), 'info' :info, 'busy':self.agents.get(agent_id,{}).get('busy',False) } # busy 读不到置False,读到了不变 def add_task(self,task:dict): # 此处用于从客户端获取相关的数据 t=Task(**task) # 此处进行参数解构 self.tasks[t.id]=t return t.id # 此处用于获取处理id

在master/handler.py 中处理用于webservr调用相关配置

#!/usr/bin/poython3.6#conding:utf-8from utils import getloggerfrom .storage import Storageimport uuidlog=getlogger('handler')class Handler(object): def __init__(self): self.store=Storage() def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数 log.info('客户端agent发送消息为:{}'.format(msg)) try: if msg['type'] in {'hearbeat','register'}: payload=msg['payload'] info={'hostname' :payload['hostname'],'ip' :payload['ip']} self.store.reg_hb(payload['id'],info) log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表 return "agent信息为: {}".format(msg) except Exception as e: log.error("注册客户端信息错误为:{}".format(e)) return "Bad Request...." def add_task(self,task): # 此处用于在webserver 端创建的agent调用方法返回结果 task['task_id']=uuid.uuid4().hex # 用于生成相关的任务id return self.store.add_task(task) # 此处用于调用相关配置 def get_agents(self): return self.store.get_agents()6 task 任务分派1 任务分派方式

任务在Storage中存储,一旦有了任务,需要将任务分派到指定节点执行,交给这些节点上的agent
不过,目前使用zerorpc,master是被动的接受agent端的数据并进行相关的响应操作,所以可以考虑使用一种agent端主动拉取数据的机制,提供一个接口,让agent访问,如果agent处于空闲状态,则就主动拉取任务,有任务就领走。
当agent少的时候,master推送任务到agent端,或者agent端主动拉取任务都是可以的,但是如果考虑到agent多的时候,或许使用agent拉模式是一个更好的选择。

本次采用agent拉取模式实现,所以master就不需要设计调度器了

2 客户端配置状态参数

agent/state.py

#!/usr/bin/poython3.6#conding:utf-8WAITING='WAITING'RUNNING='RUNNING'SUCCEED='SUCCEED'FAILED='FAILED'3 客户端添加消息类型result

用于返回至server端,用于最后返回至web浏览器端

#!/usr/bin/poython3.6#conding:utf-8import socketimport uuidimport netifacesimport ipaddressimport osclass Messgae: def __init__(self,myidpath): if os.path.exists(myidpath): # 如果存在 with open(myidpath) as f: self.id=f.readline().strip() else: self.id=uuid.uuid4().hex with open(myidpath,'w') as f: f.write(self.id) def get_ipaddress(self): address=[] for p in netifaces.interfaces(): # 获取网口列表 n=netifaces.ifaddresses(p) # 获取字典 if n.get(2): # 查看是否存在ipv4地址 for ip in n[2]: # 此处获取对应列表的值 if ip['addr']: # 查看ip地址是否存在 ip=ipaddress.ip_address(ip['addr']) if ip.is_reserved or ip.is_multicast or ip.is_link_local or ip.is_loopback: continue address.append(str(ip)) return address def hearbeat(self): return { "type" :"hearbeat", "payload" :{ "ip" : self.get_ipaddress(), "hostname" : socket.gethostname(), "id" : self.id } } def reg(self): return { "type" :"register", "payload" :{ "ip" : self.get_ipaddress(), "hostname" : socket.gethostname(), "id" : self.id } } def result(self,task_id,code,output): # 返回数据至web端,处理相关数据执行结果的返回 return { "type" :"result", "payload" :{ "id" : task_id, # 此处用于定义task_id 及任务id "agent_id" :self.id, # 此处用于获取客户端id "code" : code, # 此处用于对执行结果状态进行保存 "output" : output #此处用于对执行结果的输出信息进行保存,并进行相关配置 } }4 agent/cm.py模块

用于处理配置拉取相关事宜

#!/usr/bin/poython3.6#conding:utf-8import zerorpc #添加模块import threading # 用于处理中断相关from .msg import Messgae # 获取消息from .state import *from .config import CONN_URLfrom .executor import Executorfrom utils import getloggerclass Conn_Manager: def __init__(self,timeout=3): self.timeout=timeout self.client=zerorpc.Client() self.event=threading.Event() self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息 self.log=getlogger('agent') # 此处填写相关的log日志名称 self.state=WAITING self.exec=Executor() def start(self): try: self.event.clear() self.client.connect(CONN_URL) # 链接处理 self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息 self.client.send(self.message.reg()) #处理注册消息 while not self.event.wait(self.timeout): # 等待的时间 self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat()))) # 发送心跳信息 task=self.client.get_task(self.message.id) # 此处返回三个参数,1 为taskid,二是script ,三是timeout if task: code,output=self.exec.run(task[1],task[2]) self.client.send(self.message.result(task[0],code,output)) else: return "目前无消息" except Exception as e: self.event.set() raise e # 此处是抛出异常到上一级 def shutdown(self): self.log.info("关闭操作") self.client.close() self.event.set()4 服务端相关task获取配置

master/storage.py 用于配置获取agent_id和task相关信息

#!/usr/bin/poython3.6#conding:utf-8import datetimefrom .task import Taskfrom .state import *class Storage: def __init__(self): self.agents={} # 此处用于存储用户信息 self.tasks={} # 此处用于存储作业信息 def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址 self.agents[agent_id] = { 'heaerbeat' : datetime.datetime.now(), 'info' :info, 'busy':self.agents.get(agent_id,{}).get('busy',False) } # busy 读不到置False,读到了不变 def get_agents(self): return self.agents def add_task(self,task:dict): # 此处用于从客户端获取相关的数据 t=Task(**task) # 此处进行参数解构 self.tasks[t.id]=t return t.id # 此处用于获取处理id @property def itme_task(self): yield from (task for task in self.tasks.values()) # 此处返回task def get_task(self,agent_id): return [task.id,task.script,task.timeout]

master/handler.py 配置如下

#!/usr/bin/poython3.6#conding:utf-8from utils import getloggerfrom .storage import Storageimport uuidlog=getlogger('handler')class Handler(object): def __init__(self): self.store=Storage() def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数 log.info('客户端agent发送消息为:{}'.format(msg)) try: if msg['type'] in {'hearbeat','register'}: payload=msg['payload'] info={'hostname' :payload['hostname'],'ip' :payload['ip']} self.store.reg_hb(payload['id'],info) log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表 return "agent信息为: {}".format(msg) except Exception as e: log.error("注册客户端信息错误为:{}".format(e)) return "Bad Request...." def add_task(self,task): # 此处用于在webserver 端创建的agent调用方法返回结果 task['task_id']=uuid.uuid4().hex # 用于生成相关的任务id return self.store.add_task(task) # 此处用于调用相关配置 def get_agents(self): return self.store.get_agents() def get_task(self,agent_id): return self.store.get_task(agent_id)5 处理服务端接受result 消息处理机制

master/handler.py中配置

#!/usr/bin/poython3.6#conding:utf-8from utils import getloggerfrom .storage import Storageimport uuidlog=getlogger('handler')class Handler(object): def __init__(self): self.store=Storage() def send(self,msg): # 定义一个可调用的基础函数,此处的msg及就是对应的函数 log.info('客户端agent发送消息为:{}'.format(msg)) try: if msg['type'] in {'hearbeat','register'}: payload=msg['payload'] info={'hostname' :payload['hostname'],'ip' :payload['ip']} self.store.reg_hb(payload['id'],info) log.info("客户端数据列表为:{}".format(self.store.agents)) # 客户端的列表 return "agent信息为: {}".format(msg) elif msg['type']=="result": # 此处用于处理相关返回信息 self.store.result(msg['payload']) # 调用对应方法 except Exception as e: log.error("注册客户端信息错误为:{}".format(e)) return "Bad Request...." def add_task(self,task): # 此处用于在webserver 端创建的agent调用方法返回结果 task['task_id']=uuid.uuid4().hex # 用于生成相关的任务id return self.store.add_task(task) # 此处用于调用相关配置 def get_agents(self): return self.store.get_agents() def get_task(self,agent_id): return self.store.get_task(agent_id) def get_result(self,task_id): # 此处返回对应的值 return self.store.get_result(task_id)

master/stroage.py端配置

#!/usr/bin/poython3.6#conding:utf-8import datetimefrom .task import Taskfrom .state import *class Storage: def __init__(self): self.agents={} # 此处用于存储用户信息 self.tasks={} # 此处用于存储作业信息 self.result={} # 用于存储agent端返回的结果 def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址 self.agents[agent_id] = { 'heaerbeat' : datetime.datetime.now().timestamp(), 'info' :info, 'busy':self.agents.get(agent_id,{}).get('busy',False) } # busy 读不到置False,读到了不变 def get_agents(self): return self.agents def add_task(self,task:dict): # 此处用于从客户端获取相关的数据 t=Task(**task) # 此处进行参数解构 self.tasks[t.id]=t return t.id # 此处用于获取处理id @property def itme_task(self): yield from (task for task in self.tasks.values()) # 此处返回task def get_task(self,agent_id): for task in self.itme_task: if agent_id in task.targets: # 此处用于判断当前节点接入任务情况 return [task.id,task.script,task.timeout] def add_result(self,payload:dict): self.result[payload['id']]=payload # 此处以task_id 为键,以payload为值进行处理 def get_result(self,task_id:dict): return self.result.get(task_id['task_id']) # task_id,获取对应的payload值五 web端配置和处理1 概述

用户通过WEB(HTTP)提交新的任务,任务json信息有:
1 任务脚本script,base64编码
2 超时时间timeout
3 并行度 parallel
4 失败率 fail_rate
5 失败次数 fail_count
6 targets 是跑在agent上的agent_id 列表,可以让用户看到一个列表,通过列表的勾选来完成相关的操作

2 代码实现

根目录创建appwebserver.py配置

1 获取agent相关列表

#!/usr/bin/poython3.6#conding:utf-8import zerorpcfrom aiohttp import request,web_response,web,logCONN_URL="tcp://127.0.0.1:9000"client=zerorpc.Client()client.connect(CONN_URL)async def targetshandler(request:web.Request): txt=client.get_agents() #通过zerorpc调用master端接口 return web.json_response(txt) # 返回json端数据app=web.Application()app.router.add_get('/task/targets',targetshandler) # 使用get方法进行处理2 提交任务端配置1 客户端数据如下

{ "script" : "echo hello", "timeout" :20, "targets" :[]}2 添加提交数据接口

async def taskhandler(request:web.Request): j = await request.json() # 获取post 提交的数据,用于task任务数据生成 txt=client.add_task(j) return web.Response(text=txt,status=201)app.router.add_post('/task',taskhandler)3 添加获取执行结果配置

async def taskresult(request:web.Request): j = await request.json() txt =client.get_result(j) return web.json_response(txt)app.router.add_post('/result',taskresult)4 整体代码如下

#!/usr/bin/poython3.6#conding:utf-8import zerorpcfrom aiohttp import request,web_response,web,logCONN_URL="tcp://127.0.0.1:9000"client=zerorpc.Client()client.connect(CONN_URL)async def targetshandler(request:web.Request): txt=client.get_agents() #通过zerorpc调用master端接口 return web.json_response(txt) # 返回json端数据app=web.Application()app.router.add_get('/task/targets',targetshandler) # 使用get方法进行处理async def taskhandler(request:web.Request): j = await request.json() txt=client.add_task(j) return web.Response(text=txt,status=201)app.router.add_post('/task',taskhandler)async def taskresult(request:web.Request): j = await request.json() txt =client.get_result(j) return web.json_response(txt)app.router.add_post('/result',taskresult)if __name__ == "__main__": web.run_app(app,host='0.0.0.0',port=80)3 测试结果如下

六 处理数据和节点状态1 状态管理类型1 节点状态

当节点在进行相关事件调度处理时,其状态应该是RUNNING状态,当处理完成后,其状态应该恢复称为WAITING状态。

2 task 任务状态

当当前agent下的所有该任务都执行完成时的状态,此处设计较为简单,只是全部执行就将其状态置位成功,否则为RUNNING状态或者WAITING,当有一个agent领取任务时,其状态将被置为RUNNING。

3 task中对应的agent的状态

及就是当前节点执行当前任务的状态,此状态保存在task中的targets字典中,用于对其客户端执行结果进行判断而获取其对应状态。

2 客户端调整代码

主要是cm.py调整如下

#!/usr/bin/poython3.6#conding:utf-8import zerorpc #添加模块import threading # 用于处理中断相关from .msg import Messgae # 获取消息from .state import *from .config import CONN_URLfrom .executor import Executorfrom utils import getloggerclass Conn_Manager: def __init__(self,timeout=3): self.timeout=timeout self.client=zerorpc.Client() self.event=threading.Event() self.message=Messgae('/var/log/mschedule/uuid') # 此处用于初始化消息 self.log=getlogger('agent') # 此处填写相关的log日志名称 self.state=WAITING self.exec=Executor() def start(self): try: self.event.clear() self.client.connect(CONN_URL) # 链接处理 self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg()))) # 发送心跳信息 self.client.send(self.message.reg()) #处理注册消息 while not self.event.wait(self.timeout): # 等待的时间 self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat()))) # 发送心跳信息 if self.state == WAITING: # 如果此处是空闲状态,则进行领任务处理 print('获取任务task') task = self.client.get_task(self.message.id) # 此处返回三个参数,1 为taskid,二是script ,三是timeout if task: # 领取成功,则进行执行相关任务.并上传至服务器端其状态 self.state = RUNNING # 此处任务成功的情况 code,output=self.exec.run(task[1],task[2]) self.client.send(self.message.result(task[0], code, output)) self.state=WAITING #状态更新为当前正常状态 else: return "目前无消息" except Exception as e: self.event.set() raise e # 此处是抛出异常到上一级 def shutdown(self): self.log.info("关闭操作") self.client.close() self.event.set()3 master端代码调整

master/storage.py

#!/usr/bin/poython3.6#conding:utf-8import datetimefrom .task import Taskfrom .state import *from utils import getloggerlog=getlogger('storage')class Storage: def __init__(self): self.agents={} # 此处用于存储用户信息 self.tasks={} # 此处用于存储作业信息 self.result={} # 用于存储agent端返回的结果 self.task_state=0 # 用于处理当所有agent状态都修改为成功或失败时将task的状态也进行相关的修改 def reg_hb(self,agent_id,info): # id 及就是客户端的id ,info 及就是host和ip地址 self.agents[agent_id] = { 'heaerbeat' : datetime.datetime.now().timestamp(), 'info' :info, 'busy':self.agents.get(agent_id,{}).get('busy',False) } # busy 读不到置False,读到了不变 def get_agents(self): return self.agents def add_task(self,task:dict): # 此处用于从客户端获取相关的数据 t=Task(**task) # 此处进行参数解构 self.tasks[t.id]=t return t.id # 此处用于获取处理id @property def itme_task(self): yield from (task for task in self.tasks.values() if task.state in {WAITING,RUNNING}) # 此处返回task,当其中有成功或者失败时,则不用进行相关的操作处理 #当为WAITING或者RUNNING 时,则进行相关的操作,其他情况则不进行相关操作 def get_task(self,agent_id): for task in self.itme_task: if agent_id in task.targets: # 此处用于判断当前节点接入任务情况 if task.state==WAITING: task.state=RUNNING #当前消息的状态 task.targets[agent_id]['state']=RUNNING # 此处是指此消息中的agent是否执行的状态的处理,若获取了,则此处的状态为RUNNING return [task.id,task.script,task.timeout] def add_result(self,payload:dict): for task in self.itme_task: if payload['code']==0: task.targets[payload['agent_id']]['state']=SUCCEED # 此处是指对此消息进行处理,若code=0,则表示客户端执行成功,若为1,则表示失败 self.task_state+=1 else: task.targets[payload['agent_id']]['state']= FAILED# self.task_state+=1 if self.task_state==task.target_count: task.state=SUCCEED self.task_state=0 payload['agent_state']=task.targets[payload['agent_id']]['state'] log.info("当前消息内容为:{}".format(self.result)) self.result[payload['id']]=payload # 此处以task_id 为键,以payload为值进行处理 def get_result(self,task_id:dict): task_id=task_id['task_id'] return self.result.get(task_id) # task_id,获取对应的payload值4 webserver端代码调整如下

webappserver.py

#!/usr/bin/poython3.6#conding:utf-8import zerorpcfrom aiohttp import request,web_response,web,logCONN_URL="tcp://127.0.0.1:9000"client=zerorpc.Client()client.connect(CONN_URL)async def targetshandler(request:web.Request): txt=client.get_agents() #通过zerorpc调用master端接口 return web.json_response(txt) # 返回json端数据app=web.Application()app.router.add_get('/task/targets',targetshandler) # 使用get方法进行处理async def taskhandler(request:web.Request): j = await request.json() txt=client.add_task(j) return web.Response(text=txt,status=201)app.router.add_post('/task',taskhandler)async def taskresult(request:web.Request): j = await request.json() txt =client.get_result(j) if txt['code'] !=0: txt['output']='参数不正确,请重新输入' return web.json_response(txt)app.router.add_post('/result',taskresult)if __name__ == "__main__": web.run_app(app,host='0.0.0.0',port=80)5 结果如下