ansible2.x 版本api 调用(适用于web开发使用)
摘自jumpserver 中 ansible模块 重写runner. ansible2.3版本 pip3 install ansible
目录结构如下,
新建以下 三个文件。 例子在 runner 最后。
├── callback.py
├── __init__.py
├── inventory.py
└── runner.py
callback.py
#~*~coding:utf-8~*~fromcollectionsimportdefaultdictfromansible.plugins.callbackimportCallbackBaseclassCommandResultCallback(CallbackBase):def__init__(self,display=None):self.result_q=dict(contacted={},dark={})super(CommandResultCallback,self).__init__(display)defgather_result(self,n,res):self.result_q[n][res._host.name]={}self.result_q[n][res._host.name]['cmd']=res._result.get('cmd')self.result_q[n][res._host.name]['stderr']=res._result.get('stderr')self.result_q[n][res._host.name]['stdout']=res._result.get('stdout')self.result_q[n][res._host.name]['rc']=res._result.get('rc')defv2_runner_on_ok(self,result):self.gather_result("contacted",result)defv2_runner_on_failed(self,result,ignore_errors=False):self.gather_result("dark",result)defv2_runner_on_unreachable(self,result):self.gather_result("dark",result)defv2_runner_on_skipped(self,result):self.gather_result("dark",result)classAdHocResultCallback(CallbackBase):"""AdHocresultCallback"""def__init__(self,display=None):self.result_q=dict(contacted={},dark={})super(AdHocResultCallback,self).__init__(display)defgather_result(self,n,res):ifres._host.nameinself.result_q[n]:self.result_q[n][res._host.name].append(res._result)else:self.result_q[n][res._host.name]=[res._result]defv2_runner_on_ok(self,result):self.gather_result("contacted",result)defv2_runner_on_failed(self,result,ignore_errors=False):self.gather_result("dark",result)defv2_runner_on_unreachable(self,result):self.gather_result("dark",result)defv2_runner_on_skipped(self,result):self.gather_result("dark",result)defv2_playbook_on_task_start(self,task,is_conditional):passdefv2_playbook_on_play_start(self,play):passclassPlaybookResultCallBack(CallbackBase):"""Customcallbackmodelforhandleringtheoutputdataofexecuteplaybookfile,Baseonthebuild-incallbackpluginsofansiblewhichnamed`json`."""CALLBACK_VERSION=2.0CALLBACK_TYPE='stdout'CALLBACK_NAME='Dict'def__init__(self,display=None):super(PlaybookResultCallBack,self).__init__(display)self.results=[]self.output=""self.item_results={}#{"host":[]}def_new_play(self,play):return{'play':{'name':play.name,'id':str(play._uuid)},'tasks':[]}def_new_task(self,task):return{'task':{'name':task.get_name(),},'hosts':{}}defv2_playbook_on_no_hosts_matched(self):self.output="skipping:Nomatchhosts."defv2_playbook_on_no_hosts_remaining(self):passdefv2_playbook_on_task_start(self,task,is_conditional):self.results[-1]['tasks'].append(self._new_task(task))defv2_playbook_on_play_start(self,play):self.results.append(self._new_play(play))defv2_playbook_on_stats(self,stats):hosts=sorted(stats.processed.keys())summary={}forhinhosts:s=stats.summarize(h)summary[h]=sifself.output:passelse:self.output={'plays':self.results,'stats':summary}defgather_result(self,res):ifres._task.loopand"results"inres._resultandres._host.nameinself.item_results:res._result.update({"results":self.item_results[res._host.name]})delself.item_results[res._host.name]self.results[-1]['tasks'][-1]['hosts'][res._host.name]=res._resultdefv2_runner_on_ok(self,res,**kwargs):if"ansible_facts"inres._result:delres._result["ansible_facts"]self.gather_result(res)defv2_runner_on_failed(self,res,**kwargs):self.gather_result(res)defv2_runner_on_unreachable(self,res,**kwargs):self.gather_result(res)defv2_runner_on_skipped(self,res,**kwargs):self.gather_result(res)defgather_item_result(self,res):self.item_results.setdefault(res._host.name,[]).append(res._result)defv2_runner_item_on_ok(self,res):self.gather_item_result(res)defv2_runner_item_on_failed(self,res):self.gather_item_result(res)defv2_runner_item_on_skipped(self,res):self.gather_item_result(res)
inventory.py
#~*~coding:utf-8~*~fromansible.inventoryimportInventory,Host,Groupfromansible.varsimportVariableManagerfromansible.parsing.dataloaderimportDataLoaderclassJMSHost(Host):def__init__(self,asset):self.asset=assetself.name=name=asset.get('hostname')orasset.get('ip')self.port=port=asset.get('port')or22super(JMSHost,self).__init__(name,port)self.set_all_variable()defset_all_variable(self):asset=self.assetself.set_variable('ansible_host',asset['ip'])self.set_variable('ansible_port',asset['port'])self.set_variable('ansible_user',asset['username'])#添加密码和秘钥ifasset.get('password'):self.set_variable('ansible_ssh_pass',asset['password'])ifasset.get('private_key'):self.set_variable('ansible_ssh_private_key_file',asset['private_key'])#添加become支持become=asset.get("become",False)ifbecome:self.set_variable("ansible_become",True)self.set_variable("ansible_become_method",become.get('method','sudo'))self.set_variable("ansible_become_user",become.get('user','root'))self.set_variable("ansible_become_pass",become.get('pass',''))else:self.set_variable("ansible_become",False)classJMSInventory(Inventory):"""提供生成Ansibleinventory对象的方法"""def__init__(self,host_list=None):ifhost_listisNone:host_list=[]assertisinstance(host_list,list)self.host_list=host_listself.loader=DataLoader()self.variable_manager=VariableManager()super(JMSInventory,self).__init__(self.loader,self.variable_manager,host_list=host_list)defparse_inventory(self,host_list):"""用于生成动态构建AnsibleInventory.self.host_list:[{"name":"asset_name","ip":<ip>,"port":<port>,"user":<user>,"pass":<pass>,"key":<sshKey>,"groups":['group1','group2'],"other_host_var":<other>},{...},]:return:返回一个Ansible的inventory对象"""#TODO:验证输入#创建AnsibleGroup,如果没有则创建default组ungrouped=Group('ungrouped')all=Group('all')all.add_child_group(ungrouped)self.groups=dict(all=all,ungrouped=ungrouped)forassetinhost_list:host=JMSHost(asset=asset)asset_groups=asset.get('groups')ifasset_groups:forgroup_nameinasset_groups:ifgroup_namenotinself.groups:group=Group(group_name)self.groups[group_name]=groupelse:group=self.groups[group_name]group.add_host(host)else:ungrouped.add_host(host)all.add_host(host)
runner.py
#~*~coding:utf-8~*~from__future__importunicode_literalsimportosfromcollectionsimportnamedtuple,defaultdictimportsyssys.path.append('hostinfo/ansible_runner/')fromansible.executor.task_queue_managerimportTaskQueueManagerfromansible.varsimportVariableManagerfromansible.parsing.dataloaderimportDataLoaderfromansible.executor.playbook_executorimportPlaybookExecutorfromansible.playbook.playimportPlayimportansible.constantsasCfromansible.utils.varsimportload_extra_varsfromansible.utils.varsimportload_options_varsfrominventoryimportJMSInventoryfromcallbackimportAdHocResultCallback,PlaybookResultCallBack,\CommandResultCallback#fromcommon.utilsimportget_logger__all__=["AdHocRunner","PlayBookRunner"]C.HOST_KEY_CHECKING=False#logger=get_logger(__name__)#JumpservernotuseplaybookclassPlayBookRunner(object):"""用于执行AnsiblePlaybook的接口.简化Playbook对象的使用."""Options=namedtuple('Options',['listtags','listtasks','listhosts','syntax','connection','module_path','forks','remote_user','private_key_file','timeout','ssh_common_args','ssh_extra_args','sftp_extra_args','scp_extra_args','become','become_method','become_user','verbosity','check','extra_vars'])def__init__(self,hosts=None,playbook_path=None,forks=C.DEFAULT_FORKS,listtags=False,listtasks=False,listhosts=False,syntax=False,module_path=None,remote_user='root',timeout=C.DEFAULT_TIMEOUT,ssh_common_args=None,ssh_extra_args=None,sftp_extra_args=None,scp_extra_args=None,become=True,become_method=None,become_user="root",verbosity=None,extra_vars=None,connection_type="ssh",passwords=None,private_key_file=None,check=False):C.RETRY_FILES_ENABLED=Falseself.callbackmodule=PlaybookResultCallBack()ifplaybook_pathisNoneornotos.path.exists(playbook_path):raiseAnsibleError("NotFoundtheplaybookfile:%s."%playbook_path)self.playbook_path=playbook_pathself.loader=DataLoader()self.variable_manager=VariableManager()self.passwords=passwordsor{}self.inventory=JMSInventory(hosts)self.options=self.Options(listtags=listtags,listtasks=listtasks,listhosts=listhosts,syntax=syntax,timeout=timeout,connection=connection_type,module_path=module_path,forks=forks,remote_user=remote_user,private_key_file=private_key_file,ssh_common_args=ssh_common_argsor"",ssh_extra_args=ssh_extra_argsor"",sftp_extra_args=sftp_extra_args,scp_extra_args=scp_extra_args,become=become,become_method=become_method,become_user=become_user,verbosity=verbosity,extra_vars=extra_varsor[],check=check)self.variable_manager.extra_vars=load_extra_vars(loader=self.loader,options=self.options)self.variable_manager.options_vars=load_options_vars(self.options)self.variable_manager.set_inventory(self.inventory)#初始化playbook的executorself.runner=PlaybookExecutor(playbooks=[self.playbook_path],inventory=self.inventory,variable_manager=self.variable_manager,loader=self.loader,options=self.options,passwords=self.passwords)ifself.runner._tqm:self.runner._tqm._stdout_callback=self.callbackmoduledefrun(self):ifnotself.inventory.list_hosts('all'):raiseAnsibleError('Inventoryisempty')self.runner.run()self.runner._tqm.cleanup()returnself.callbackmodule.outputclassAdHocRunner(object):"""ADHoc接口"""Options=namedtuple("Options",['connection','module_path','private_key_file',"remote_user",'timeout','forks','become','become_method','become_user','check','extra_vars',])results_callback_class=AdHocResultCallbackdef__init__(self,hosts=C.DEFAULT_HOST_LIST,forks=C.DEFAULT_FORKS,#5timeout=C.DEFAULT_TIMEOUT,#SSHtimeout=10sremote_user=C.DEFAULT_REMOTE_USER,#rootmodule_path=None,#dirsofcustomemodulesconnection_type="smart",become=None,become_method=None,become_user=None,check=False,passwords=None,extra_vars=None,private_key_file=None,gather_facts='no'):self.pattern=''self.variable_manager=VariableManager()self.loader=DataLoader()self.gather_facts=gather_factsself.results_callback=AdHocRunner.results_callback_class()self.options=self.Options(connection=connection_type,timeout=timeout,module_path=module_path,forks=forks,become=become,become_method=become_method,become_user=become_user,check=check,remote_user=remote_user,extra_vars=extra_varsor[],private_key_file=private_key_file,)self.variable_manager.extra_vars=load_extra_vars(self.loader,options=self.options)self.variable_manager.options_vars=load_options_vars(self.options)self.passwords=passwordsor{}self.inventory=JMSInventory(hosts)self.variable_manager.set_inventory(self.inventory)self.tasks=[]self.play_source=Noneself.play=Noneself.runner=None@staticmethoddefcheck_module_args(module_name,module_args=''):ifmodule_nameinC.MODULE_REQUIRE_ARGSandnotmodule_args:err="Noargumentpassedto'%s'module."%module_nameprint(err)returnFalsereturnTruedefrun(self,task_tuple,pattern='all',task_name='AnsibleAd-hoc'):""":paramtask_tuple:(('shell','ls'),('ping','')):parampattern::paramtask_name::return:"""formodule,argsintask_tuple:ifnotself.check_module_args(module,args):returnself.tasks.append(dict(action=dict(module=module,args=args,)))self.play_source=dict(name=task_name,hosts=pattern,gather_facts=self.gather_facts,tasks=self.tasks)self.play=Play().load(self.play_source,variable_manager=self.variable_manager,loader=self.loader,)self.runner=TaskQueueManager(inventory=self.inventory,variable_manager=self.variable_manager,loader=self.loader,options=self.options,passwords=self.passwords,stdout_callback=self.results_callback,)ifnotself.inventory.list_hosts("all"):raiseAnsibleError("Inventoryisempty.")ifnotself.inventory.list_hosts(self.pattern):raiseAnsibleError("pattern:%sdosenotmatchanyhosts."%self.pattern)try:self.runner.run(self.play)exceptExceptionase:logger.warning(e)else:#logger.debug(self.results_callback.result_q)returnself.results_callback.result_qfinally:ifself.runner:self.runner.cleanup()ifself.loader:self.loader.cleanup_all_tmp_files()defclean_result(self):""":return:{"success":['hostname',],"failed":[('hostname','msg'),{}],}"""result={'success':[],'failed':[]}forhostinself.results_callback.result_q['contacted']:result['success'].append(host)forhost,msgsinself.results_callback.result_q['dark'].items():msg='\n'.join(['{}{}:{}'.format(msg.get('module_stdout',''),msg.get('invocation',{}).get('module_name'),msg.get('msg',''))formsginmsgs])result['failed'].append((host,msg))returnresultdeftest_run():assets=[{"hostname":"192.168.244.129","ip":"192.168.244.129","port":22,"username":"root","password":"redhat",},]task_tuple=(('shell','ls'),)##例子,调用普通的模块命令hoc=AdHocRunner(hosts=assets)hoc.results_callback=CommandResultCallback()ret=hoc.run(task_tuple)print(ret)task_tuple=(('setup',''),)##例子,调用setup,获取资产信息runner=AdHocRunner(assets)result=runner.run(task_tuple=task_tuple,pattern='all',task_name='AnsibleAd-hoc')print(result)#play=PlayBookRunner(assets,playbook_path='/tmp/some.yml')##yml"""#/tmp/some.yml----name:TesttheplabybookAPI.hosts:allremote_user:rootgather_facts:yestasks:-name:execuptimeshell:uptime"""#play.run()if__name__=="__main__":test_run()
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。