使用Python怎么实现SQL Server数据库的对象同步轻量级
这期内容当中小编将会给大家带来有关使用Python怎么实现SQL Server数据库的对象同步轻量级,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1,表的存在依赖于schema,需要考虑到表的schema是否存在,如果不存在先在target库上创建表对应的schema
2,target表中是否有数据?如果有数据,是否以覆盖的方式执行
对于存储过程的同步:
1,类似于表,需要考虑存储过程的schema是否存在,如果不存在先在target库上创建表对应的schema
2,类似于表,arget数据库中是否已经存在对应的存储过程,是否以覆盖的方式执行
3,存储过程可能依赖于b表,某些函数,用户自定义表变量等等,同步存储过程的时候需要先同步依赖的对象,这一点比较复杂,实现过程中遇到在很多很多的坑
可能存在对象A依赖于对象B,对象B依赖于对象C……,这里有点递归的意思
这一点导致了重构大量的代码,一开始都是直来直去的同步,无法实现这个逻辑,切实体会到代码的“单一职责”原则
参数说明
参数说明如下,大的包括四类:
1,源服务器信息 (服务器地址,实例名,数据库名称,用户名,密码),没有用户名密码的情况下,使用windows身份认证模式
2,目标服务器信息(服务器地址,实例名,数据库名称,用户名,密码),没有用户名密码的情况下,使用windows身份认证模式
3,同步的对象类型以及对象
4,同步的对象在目标服务器上存在的情况下,是否强制覆盖
其实在同步数据的时候,也可以把需要同步的行数提取出来做参数,比较简单,这里暂时没有做。
比如需要快速搭建一个测试环境,需要同步所有的表结构和每个表的一部分数据即可。
表以及数据同步
表同步的原理是,创建目标表,遍历源数据的表,生成insert into values(***),(***),(***)格式的sql,然后插入目标数据库,这里大概步骤如下:
1,表依赖于schema,所以同步表之前先同步schema
2,强制覆盖的情况下,会drop掉目标表(如果存在的话),防止目标表与源表结构不一致,非强制覆盖的情况下,如果字段不一致,则抛出异常
3,同步表结构,包括字段,索引,约束等等,但是无法支持外键,刻意去掉了外键,想想为什么?因吹斯汀。
4,需要筛选出来非计算列字段,insert语句只能是非计算列字段(又导致重构了部分代码)
5,转义处理,在拼凑SQL的时候,需要进行转义处理,否则会导致SQL语句错误,目前处理了字符串中的'字符,二进制字段,时间字段的转义处理(最容易发生问题的地方)
6,鉴于insert into values(***),(***),(***)
语法上允许的最大值是1000,因此每生成1000条数据,就同步一次
7,自增列的identity_insert
标识打开与关闭处理
使用如下参数,同步源数据库的三张表到目标数据库,因为这里是在本机命名实例下测试,因此实例名和端口号输入
执行同步的效果
说明:
1,如果输入obj_type="tab" 且-obj=为None的情况下,会同步源数据库中的所有表。
2,这个效率取决于机器性能和网络传输,本机测试的话,每秒中可以提交3到4次,也就是每秒钟可以提交3000~4000行左右的数据。
已知的问题:
1,当表的索引为filter index的时候,无法生成包含where条件的索引创建语句,那个看起来蛋疼的表结构导出语句,暂时没时间改它。
2,暂时不支持其他少用的类型字段,比如地理空间字段什么的。
存储过程对象的同步
存储过程同步的原理是,在源数据库上生成创建存储过程的语句,然后写入目标库,这里大概步骤如下:
1,存储过程依赖于schema,所以同步存储过程之前先同步schema(同表)
2,同步的过程会检查依赖对象,如果依赖其他对象,暂停当前对象同步,先同步依赖对象
3,重复第二步骤,直至完成
4,对于存储过程的同步,如果是强制覆盖的话,强制覆盖仅仅对存储过程自己生效(删除&重建),对依赖对象并不生效,如果依赖对象不存在,就创建,否则不做任何事情
使用如下参数,同步源数据库的两个存储过程到目标数据库,因为这里是在本机命名实例下测试,因此实例名和端口号输入
说明:测试要同步的存储过程之一为[dbo].[sp_test01],它依赖于其他两个对象:dbo.table01和dbo.fn_test01()
createproc[dbo].[sp_test01]asbeginsetnocounton;deletefromdbo.table01whereid=1000selectdbo.fn_test01()end
而dbo.fn_test01()的如下,依赖于另外一个对象:dbo.table02
createfunction[dbo].[fn_test01]()RETURNSintASBEGINdeclare@countint=0select@count=count(1)fromdbo.table02return@countEND
因此,这个测试的[dbo].[sp_test01]就依赖于其他对象,如果其依赖的对象不存在,同步的时候,仅仅同步这个存储过程本身,是没有意义的
同步某一个对象的依赖对象,使用如下SQL查出来对象依赖信息,因此这里就层层深入,同步依赖对象。
这里就类似于同步A的时候,A依赖于B和C,然后停止同步A,先同步B和C,同步B或者C的时候,可能又依赖于其他对象,然后继续先同步其依赖对象。
效果如下
如果输入obj_type="sp" 且-obj=为None的情况下,会同步源数据库中的所有存储过程以及其依赖对象
已知的问题:
1,加密的存储过程或者函数是无法实现同步的,因为无法生成创建对象的脚本
1,table type的同步也是一个蛋疼的过程,目前支持,但是支持的并不好,原因是创建table type之前,先删除依赖于table type的对象,否则无法删除与创建。
特别说明
依赖对象的解决,还是比较蛋疼的
如果在默认schema为dbo的对象,在存储过程或者函数中没有写schema(参考如下修改后的sp,不写相关表的schema dbo,dbo.test01==>test01),
使用 sys.dm_sql_referenced_entities这个系统函数是无法找到其依赖的对象的,奇葩的是可以找到schema的类型,却没有返回对象本身。
这一点导致在代码中层层深入,进行了长时间的debug,完全没有想到这个函数是这个鸟样子,因为这里找到依赖对象的类型,却找不到对象本身,次奥!!!
另外一种情况就是动态SQL了,无法使用 sys.dm_sql_referenced_entities这个系统函数找到其依赖的对象。
其他对象的同步
支持其他数据库对象的同步,比如function,table type等,因为可以在同步其他存储过程对象的时候附带的同步function,table type,这个与表或者存储过程类似,不做过多说明。
已知问题:
1,201906122030:经测试,目前暂时不支持Sequence对象的同步。
需要改进的地方
1,代码结构优化,更加清晰和条例的结构(一开始用最直接简单粗暴的方式快速实现,后面重构了很多代码,现在自己看起来还有很多不舒服的痕迹)
2,数据同步的效率问题,对于多表的导入导出操作,依赖于单线程,多个大表导出串行的话,可能存在效率上的瓶颈,如何根据表的数据量,尽可能平均地分配多多个线程中,提升效率
3,更加友好清晰的异常提示以及日志记录,生成导出日志信息。
4,异构数据同步,MySQL《==》SQL Server《==》Oracle《==》PGSQL
代码端午节写好了,这几天抽空进行了一些测试以及bug fix,应该还潜在不少未知的bug,工作量比想象中的大的多了去了。
#-*-coding:utf-8-*-#!/usr/bin/envpython3__author__='MSSQL123'__date__='2019-06-0709:36'importosimportsysimporttimeimportdatetimeimportpymssqlfromdecimalimportDecimalusage='''-----parameterexplain-----sourcedatabaseparameter-s_h:souredatabasehost-----mustrequireparameter-s_i:souredatabaseinstacename-----defaultinstancenameMSSQL-s_d:souredatabasename-----mustrequireparameter-s_u:souredatabaselogin-----defaultwindowsidentifier-s_p:souredatabaseloginpassword-----mustrequirewhens_uisnotnull-s_P:souredatabaseinstanceport-----defaultport1433targetdatabaseparameter-t_h:targetdatabasehost-----mustrequireparameter-t_i:targetdatabaseinstacename-----defaultinstancenameMSSQL-t_d:targetdatabasename-----mustrequireparameter-t_u:targetdatabaselogin-----defaultwindowsidentifier-t_p:targetdatabaseloginpassword-----mustrequirewhens_uisnotnull-t_P:targetdatabaseinstanceport-----defaultport1433syncobjectparameter-obj_type:tableorsporfunctionorotherdatabseobject-----taborsporfnortp-obj:table|sp|function|typename-----whicktableorspsyncoverwirteparameter-f:forceoverwirtetargetdatabaseobject-----ForN--help:helpdocumentExample:pythonDataTransfer.py-s_h=127.0.0.1-s_P=1433-s_i="MSSQL"-s_d="DB01"-obj_type="tab"-obj="dbo.t1,dbo.t2"-t_h=127.0.0.1-t_P=1433-t_i="MSSQL"-t_d="DB02"-f="Y"pythonDataTransfer.py-s_h=127.0.0.1-s_P=1433-s_i="MSSQL"-s_d="DB01"-obj_type="sp"-obj="dbo.sp1,dbo.sp2"-t_h=127.0.0.1-t_P=1433-t_i="MSSQL"-t_d="DB02"-f="Y"'''classSyncDatabaseObject(object):#sourcedatabses_h=Nones_i=Nones_P=Nones_u=Nones_p=Nones_d=None#objtypes_obj_type=None#syncobjectss_obj=None#targetdatabaset_h=Nonet_i=Nonet_P=Nonet_u=Nonet_p=Nonet_d=Nonef=Nonefile_path=Nonedef__init__(self,*args,**kwargs):fork,vinkwargs.items():setattr(self,k,v)#connecttosqlserverdefget_connect(self,_h,_i,_P,_u,_p,_d):cursor=Falsetry:if(_u)and(_p):conn=pymssql.connect(host=_h,server=_i,port=_P,user=_u,password=_p,database=_d)else:conn=pymssql.connect(host=_h,server=_i,port=_P,database=_d)if(conn):returnconnexcept:raisereturnconn#checkconnectiondefvalidated_connect(self,_h,_i,_P,_u,_p,_d):ifnot(self.get_connect(_h,_i,_P,_u,_p,_d)):print("connectto"+str(_h)+"failed,pleasecheckyouparameter")exit(0)'''thisissupposedtobeavalidobjectnamejustlikexxx_name,ordbo.xxx_name,or[schema].xxx_nameorschema.[xxx_name]thentransferthiskindofvalidobjectnametoformatobjectnamelike[dbo].[xxx_name](giveadefaultdboschemanamewhennoschemaname)otherformatobjectnameconsiderasunvalid,willberasieerrorinprocessformatobjectname1,xxx_name======>[dbo].[xxx_name]2,dbo.xxx_name======>[dbo].[xxx_name]3,[schema].xxx_name======>[dbo].[xxx_name]3,schema.xxx_name======>[schema].[xxx_name]4,[schema].[xxx_name]======>[schema].[xxx_name]5,[schema].[xxx_name======>rasieerrorformatmessage'''@staticmethoddefformat_object_name(name):format_name=""if("."inname):schema_name=name[0:name.find(".")]object_name=name[name.find(".")+1:]ifnot("["inschema_name):schema_name="["+schema_name+"]"ifnot("["inobject_name):object_name="["+object_name+"]"format_name=schema_name+"."+object_nameelse:if("["inname):format_name="[dbo]."+nameelse:format_name="[dbo]."+"["+name+"]"returnformat_name'''checkuserinputobjectisavalidobject'''defexits_object(self,conn,name):conn=conncursor_source=conn.cursor()#getobjectbynamefromsourcedbsql_script=r'''selecttop11from(selectconcat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name))asobj_namefromsys.objectsunionallselectconcat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name))asobj_namefromsys.types)twhereobj_name='{0}''''.format(self.format_object_name(name))cursor_source.execute(sql_script)result=cursor_source.fetchall()ifnotresult:return0else:return1conn.cursor.close()conn.close()#tablevariablesyncdefsync_table_variable(self,tab_name,is_reference):conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()if(self.exits_object(conn_source,self.format_object_name(tab_name)))>0:passelse:print("-----------------------warningmessage-----------------------")print("--------warning:object"+tab_name+"notexistinginsourcedatabase------------")print("-----------------------warningmessage-----------------------")print()returnexists_in_target=0sql_script=r'''selecttop11fromsys.table_typestpwhereis_user_defined=1andconcat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name))='{0}''''\.format((self.format_object_name(tab_name)))#ifthetableschemaexistsintargetserver,skipcursor_target.execute(sql_script)exists_in_target=cursor_target.fetchone()#weatherexistsintargetserverdatabaseif(self.f=="Y"):if(is_reference!="Y"):#skiped,tabletypecannotdropwhenusedbyspsql_script=r'''ifOBJECT_ID('{0}')isnotnulldroptype{0}'''.format(self.format_object_name(tab_name))cursor_target.execute(sql_script)conn_target.commit()else:ifexists_in_target:print("-----------------------warningmessage-----------------------")print("thetargettabletype"+tab_name+"exists,skipedsynctabletypefromsource")print("-----------------------warningmessage-----------------------")print()returnsql_script=r'''DECLARE@SQLNVARCHAR(MAX)=''SELECT@SQL='CREATETYPE'+'{0}'+'ASTABLE'+CHAR(13)+'('+CHAR(13)+STUFF((SELECTCHAR(13)+',['+c.name+']'+CASEWHENc.is_computed=1THEN'AS'+OBJECT_DEFINITION(c.[object_id],c.column_id)ELSECASEWHENc.system_type_id!=c.user_type_idTHEN'['+SCHEMA_NAME(tp.[schema_id])+'].['+tp.name+']'ELSE'['+UPPER(y.name)+']'END+CASEWHENy.nameIN('varchar','char','varbinary','binary')THEN'('+CASEWHENc.max_length=-1THEN'MAX'ELSECAST(c.max_lengthASVARCHAR(5))END+')'WHENy.nameIN('nvarchar','nchar')THEN'('+CASEWHENc.max_length=-1THEN'MAX'ELSECAST(c.max_length/2ASVARCHAR(5))END+')'WHENy.nameIN('datetime2','time2','datetimeoffset')THEN'('+CAST(c.scaleASVARCHAR(5))+')'WHENy.name='decimal'THEN'('+CAST(c.[precision]ASVARCHAR(5))+','+CAST(c.scaleASVARCHAR(5))+')'ELSE''END+CASEWHENc.collation_nameISNOTNULLANDc.system_type_id=c.user_type_idTHEN'COLLATE'+c.collation_nameELSE''END+CASEWHENc.is_nullable=1THEN'NULL'ELSE'NOTNULL'END+CASEWHENc.default_object_id!=0THEN'CONSTRAINT['+OBJECT_NAME(c.default_object_id)+']'+'DEFAULT'+OBJECT_DEFINITION(c.default_object_id)ELSE''ENDENDFromsys.table_typestpInnerjoinsys.columnsconc.object_id=tp.type_table_object_idInnerjoinsys.typesyONy.system_type_id=c.system_type_idWHEREtp.is_user_defined=1andy.name<>'sysname'andconcat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name))='{0}'ORDERBYc.column_idFORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)'),1,7,'')+');'select@SQLasscript'''.format(self.format_object_name(self.format_object_name((tab_name))))cursor_target=conn_target.cursor()cursor_source.execute(sql_script)row=cursor_source.fetchone()try:ifnotexists_in_target:#executethescriptontargetservercursor_target.execute(str(row[0]))#dropcurrentstored_procudreifexistsconn_target.commit()print("*************tabletype"+self.format_object_name(tab_name)+"synced*********************")print()#giveablankrowwhenfinishexcept:print("-----------------------errormessage-----------------------")print("-----------tabletype"+self.format_object_name(tab_name)+"syncederror---------------")print("-----------------------errormessage-----------------------")print()#raisecursor_source.close()conn_source.close()cursor_target.close()conn_target.close()#schemasyncdefsync_schema(self):conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()arr_schema=[]#getalltableindatabasewhennotdefinetablenameschema_result=cursor_source.execute(r'''selectnamefromsys.schemaswhereschema_id>4andschema_id<16384''')forrowincursor_source.fetchall():cursor_target.execute(r'''ifnotexists(select*fromsys.schemaswherename='{0}')beginexec('createschema[{0}]')end'''.format(str(row[0])))conn_target.commit()cursor_source.close()conn_source.close()cursor_target.close()conn_target.close()defsync_table_schema_byname(self,tab_name,is_reference):conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()if(self.exits_object(conn_source,self.format_object_name(tab_name))==0):print("-----------------------warningmessage-----------------------")print("---------------warning:object"+tab_name+"notexistinginsourcedatabase----------------")print("-----------------------warningmessage-----------------------")print()return#ifexistsareferencetableforsp,notsyncthetableagaginif(self.exits_object(conn_target,self.format_object_name(tab_name))>0):if(self.f!="Y"):print("-----------------------warningmessage-----------------------")print("---------------warning:object"+tab_name+"existingintargetdatabase----------------")print("-----------------------warningmessage-----------------------")print()returnsql_script=r'''selecttop11fromsys.tableswheretype_desc='USER_TABLE'andconcat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name))='{0}''''.format((self.format_object_name(tab_name)))#ifthetableschemaexistsintargetserver,skipcursor_target.execute(sql_script)exists_in_target=cursor_target.fetchone()ifexists_in_target:if(self.f=="Y"):if(is_reference!="Y"):cursor_target.execute("droptable{0}".format(tab_name))else:print("-----------------------warningmessage-----------------------")print("thetargettable"+tab_name+"exists,skipedsynctableschemafromsource")print("-----------------------warningmessage-----------------------")print()returnsql_script=r'''DECLARE@object_nameSYSNAME,@object_idINTSELECT@object_name='['+s.name+'].['+o.name+']',@object_id=o.[object_id]FROMsys.objectsoWITH(NOWAIT)JOINsys.schemassWITH(NOWAIT)ONo.[schema_id]=s.[schema_id]WHEREQUOTENAME(s.name)+'.'+QUOTENAME(o.name)='{0}'ANDo.[type]='U'ANDo.is_ms_shipped=0DECLARE@SQLNVARCHAR(MAX)='';WITHindex_columnAS(SELECTic.[object_id],ic.index_id,ic.is_descending_key,ic.is_included_column,c.nameFROMsys.index_columnsicWITH(NOWAIT)JOINsys.columnscWITH(NOWAIT)ONic.[object_id]=c.[object_id]ANDic.column_id=c.column_idWHEREic.[object_id]=@object_id),fk_columnsAS(SELECTk.constraint_object_id,cname=c.name,rcname=rc.nameFROMsys.foreign_key_columnskWITH(NOWAIT)JOINsys.columnsrcWITH(NOWAIT)ONrc.[object_id]=k.referenced_object_idANDrc.column_id=k.referenced_column_idJOINsys.columnscWITH(NOWAIT)ONc.[object_id]=k.parent_object_idANDc.column_id=k.parent_column_idWHEREk.parent_object_id=@object_id)SELECT@SQL='CREATETABLE'+@object_name+''+'('+''+STUFF((SELECT''+',['+c.name+']'+CASEWHENc.is_computed=1THEN'AS'+cc.[definition]ELSEUPPER(tp.name)+CASEWHENtp.nameIN('varchar','char','varbinary','binary','text')THEN'('+CASEWHENc.max_length=-1THEN'MAX'ELSECAST(c.max_lengthASVARCHAR(5))END+')'WHENtp.nameIN('nvarchar','nchar')THEN'('+CASEWHENc.max_length=-1THEN'MAX'ELSECAST(c.max_length/2ASVARCHAR(5))END+')'WHENtp.nameIN('datetime2','time2','datetimeoffset')THEN'('+CAST(c.scaleASVARCHAR(5))+')'WHENtp.name='decimal'THEN'('+CAST(c.[precision]ASVARCHAR(5))+','+CAST(c.scaleASVARCHAR(5))+')'ELSE''END+CASEWHENc.collation_nameISNOTNULLTHEN'COLLATE'+c.collation_nameELSE''END+CASEWHENc.is_nullable=1THEN'NULL'ELSE'NOTNULL'END+CASEWHENdc.[definition]ISNOTNULLTHEN'DEFAULT'+dc.[definition]ELSE''END+CASEWHENic.is_identity=1THEN'IDENTITY('+CAST(ISNULL(/*ic.seed_value*/1,'0')ASCHAR(1))+','+CAST(ISNULL(ic.increment_value,'1')ASCHAR(1))+')'ELSE''ENDEND+''FROMsys.columnscWITH(NOWAIT)JOINsys.typestpWITH(NOWAIT)ONc.user_type_id=tp.user_type_idLEFTJOINsys.computed_columnsccWITH(NOWAIT)ONc.[object_id]=cc.[object_id]ANDc.column_id=cc.column_idLEFTJOINsys.default_constraintsdcWITH(NOWAIT)ONc.default_object_id!=0ANDc.[object_id]=dc.parent_object_idANDc.column_id=dc.parent_column_idLEFTJOINsys.identity_columnsicWITH(NOWAIT)ONc.is_identity=1ANDc.[object_id]=ic.[object_id]ANDc.column_id=ic.column_idWHEREc.[object_id]=@object_idORDERBYc.column_idFORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)'),1,2,''+'')+ISNULL((SELECT''+',CONSTRAINT['+k.name+']PRIMARYKEY('+(SELECTSTUFF((SELECT',['+c.name+']'+CASEWHENic.is_descending_key=1THEN'DESC'ELSE'ASC'ENDFROMsys.index_columnsicWITH(NOWAIT)JOINsys.columnscWITH(NOWAIT)ONc.[object_id]=ic.[object_id]ANDc.column_id=ic.column_idWHEREic.is_included_column=0ANDic.[object_id]=k.parent_object_idANDic.index_id=k.unique_index_idFORXMLPATH(N''),TYPE).value('.','NVARCHAR(MAX)'),1,2,''))+')'+''FROMsys.key_constraintskWITH(NOWAIT)WHEREk.parent_object_id=@object_idANDk.[type]='PK'),'')+')'+''+ISNULL((SELECT(SELECT''+'ALTERTABLE'+@object_name+'WITH'+CASEWHENfk.is_not_trusted=1THEN'NOCHECK'ELSE'CHECK'END+'ADDCONSTRAINT['+fk.name+']FOREIGNKEY('+STUFF((SELECT',['+k.cname+']'FROMfk_columnskWHEREk.constraint_object_id=fk.[object_id]and1=2FORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)'),1,2,'')+')'+'REFERENCES['+SCHEMA_NAME(ro.[schema_id])+'].['+ro.name+']('+STUFF((SELECT',['+k.rcname+']'FROMfk_columnskWHEREk.constraint_object_id=fk.[object_id]FORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)'),1,2,'')+')'+CASEWHENfk.delete_referential_action=1THEN'ONDELETECASCADE'WHENfk.delete_referential_action=2THEN'ONDELETESETNULL'WHENfk.delete_referential_action=3THEN'ONDELETESETDEFAULT'ELSE''END+CASEWHENfk.update_referential_action=1THEN'ONUPDATECASCADE'WHENfk.update_referential_action=2THEN'ONUPDATESETNULL'WHENfk.update_referential_action=3THEN'ONUPDATESETDEFAULT'ELSE''END+''+'ALTERTABLE'+@object_name+'CHECKCONSTRAINT['+fk.name+']'+''FROMsys.foreign_keysfkWITH(NOWAIT)JOINsys.objectsroWITH(NOWAIT)ONro.[object_id]=fk.referenced_object_idWHEREfk.parent_object_id=@object_idFORXMLPATH(N''),TYPE).value('.','NVARCHAR(MAX)')),'')+ISNULL(((SELECT''+'CREATE'+CASEWHENi.is_unique=1THEN'UNIQUE'ELSE''END+'NONCLUSTEREDINDEX['+i.name+']ON'+@object_name+'('+STUFF((SELECT',['+c.name+']'+CASEWHENc.is_descending_key=1THEN'DESC'ELSE'ASC'ENDFROMindex_columncWHEREc.is_included_column=0ANDc.index_id=i.index_idFORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)'),1,2,'')+')'+ISNULL(''+'INCLUDE('+STUFF((SELECT',['+c.name+']'FROMindex_columncWHEREc.is_included_column=1ANDc.index_id=i.index_idFORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)'),1,2,'')+')','')+''FROMsys.indexesiWITH(NOWAIT)WHEREi.[object_id]=@object_idANDi.is_primary_key=0ANDi.[type]=2FORXMLPATH(''),TYPE).value('.','NVARCHAR(MAX)')),'')select@SQLasscript'''.format(self.format_object_name(tab_name))cursor_target=conn_target.cursor()cursor_source.execute(sql_script)row=cursor_source.fetchone()ifnotrow[0]:returntry:cursor_target.execute(row[0])#dropcurrenttableschemaifexistsconn_target.commit()print("*************schema"+self.format_object_name(tab_name)+"synced*************")print()#giveablankrowwhenfinishexcept:print("-----------------------warningmessage-----------------------")print("-----------schema"+self.format_object_name(tab_name)+"syncedfailed---------------")print("-----------------------warningmessage-----------------------")print()cursor_source.close()conn_source.close()cursor_target.close()conn_target.close()defget_table_column(self,conn,tab_name):column_names=""conn=conncursor_source=conn.cursor()#getobjectbynamefromsourcedbsql_script=r'''selectnamefromsys.columnswhereobject_id=object_id('{0}')andis_computed=0orderbyobject_id'''.format(self.format_object_name(tab_name))cursor_source.execute(sql_script)result=cursor_source.fetchall()forrowinresult:column_names=column_names+row[0]+","returncolumn_names[0:len(column_names)-1]conn.cursor.close()conn.close()defsync_table_schema(self):#defaultnotsyncbyreferencedotherobjectis_reference="N"conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()arr_table=[]if(self.s_obj):fortab_nameinself.s_obj.split(","):if(tab_name)and(self.exits_object(conn_source,tab_name)>0):self.sync_table_schema_byname(tab_name,is_reference)else:print("-----------------------warningmessage-----------------------")print("-----------schema"+self.format_object_name(tab_name)+"notexistinginsourcedatabase---------------")print("-----------------------warningmessage-----------------------")print()else:#syncalltables#getalltableindatabasewhennotdefinetablenamesql_script='''SELECTQUOTENAME(s.name)+'.'+QUOTENAME(o.name)FROMsys.objectsoWITH(NOWAIT)JOINsys.schemassWITH(NOWAIT)ONo.[schema_id]=s.[schema_id]WHEREo.[type]='U'ANDo.is_ms_shipped=0'''cursor_source.execute(sql_script)forrowincursor_source.fetchall():self.sync_table_schema_byname(str(row[0]),is_reference)#syncdatafromsouretabletotargettabledefsync_table_data(self):conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()arr_table=[]if(self.s_obj):arr_table=self.s_obj.split(',')fortab_nameinarr_table:if(self.exits_object(conn_target,self.format_object_name(tab_name))==0):arr_table.remove(tab_name)print("-----------------warningmessage-----------------------")print("-----------------warning:table"+tab_name+"notexistingintargetdatabase---------------------")print("-----------------warningmessage-----------------------")else:#getalltableindatabasewhennotdefinetablenametab_result=cursor_source.execute(r'''SELECTQUOTENAME(s.name)+'.'+QUOTENAME(o.name)FROMsys.objectsoWITH(NOWAIT)JOINsys.schemassWITH(NOWAIT)ONo.[schema_id]=s.[schema_id]WHEREo.[type]='U'ANDo.is_ms_shipped=0''')forrowincursor_source.fetchall():arr_table.append(str(row[0]))insert_columns=""insert_columns=self.get_table_column(conn_source,tab_name)fortab_nameinarr_table:if(self.f!="Y"):sql_script="selecttop1{0}from{1}".format(insert_columns,tab_name)#ifexistsdataintargettable,breakcursor_target.execute(sql_script)exists=cursor_target.fetchone()ifexists:print("-----------------------warningmessage-----------------------")print("thetargettable"+tab_name+"existsdata,skipedsynctabletypefromsource")print("-----------------------warningmessage-----------------------")print()continueelse:sql_script="truncatetable{0}".format(tab_name)#ifexistsdataintargettable,breakcursor_target.execute(sql_script)conn_target.commit()insert_columns=""insert_columns=self.get_table_column(conn_source,tab_name)insert_prefix=""#weatherhasidentitycolumncursor_source.execute(r'''select1fromsys.columnswhereobject_id=OBJECT_ID('{0}')andis_identity=1'''.format(tab_name))exists_identity=Noneexists_identity=cursor_source.fetchone()if(exists_identity):insert_prefix="setidentity_insert{0}on;".format(tab_name)#datasourceinsert_sql=""values_sql=""current_row=""counter=0sql_script=r'''select{0}from{1}'''.format(insert_columns,tab_name)cursor_source.execute(sql_script)#createinsertcolumns'''forfieldincursor_source.description:insert_columns=insert_columns+str(field[0])+","insert_columns=insert_columns[0:len(insert_columns)-1]'''insert_prefix=insert_prefix+"insertinto{0}({1})values".format(tab_name,insert_columns)forrowincursor_source.fetchall():counter=counter+1forkeyinrow:if(str(key)=="None"):current_row=current_row+r'''null,'''else:if(type(key)isdatetime.datetime):current_row=current_row+r''''{0}','''.format(str(key)[0:23])elif(type(key)isstr):#我槽!!!,这里又有一个坑:https://blog.csdn.net/dadaowuque/article/details/81016127current_row=current_row+r''''{0}','''.format(key.replace("'","''").replace('\u0000','').replace('\x00',''))elif(type(key)isDecimal):d=Decimal(key)s='{0:f}'.format(d)current_row=current_row+r''''{0}','''.format(s)elif(type(key)isbytes):#print(hex(int.from_bytes(key,'big',signed=True)))current_row=current_row+r'''{0},'''.format(hex(int.from_bytes(key,'big',signed=False)))else:current_row=current_row+r''''{0}','''.format(key)current_row=current_row[0:len(current_row)-2]#removethethelastonechar","values_sql=values_sql+"("+current_row+"),"current_row=""#executetheonebatchwhenif(counter==1000):insert_sql=insert_prefix+values_sqlinsert_sql=insert_sql[0:len(insert_sql)-1]#removethethelastonechar","if(exists_identity):insert_sql=insert_sql+";setidentity_insert{0}off;".format(tab_name)try:cursor_target.execute(insert_sql)except:print("----------------------error"+tab_name+"datasyncedfailed-------------------------")raiseconn_target.commit()insert_sql=""values_sql=""current_row=""counter=0print(time.strftime("%Y-%m-%d%H:%M:%S",time.localtime())+"***************"+self.format_object_name(tab_name)+""+str(1000)+"rowssynced*************")if(values_sql):insert_sql=insert_prefix+values_sqlinsert_sql=insert_sql[0:len(insert_sql)-1]#removethethelastonechar","if(exists_identity):insert_sql=insert_sql+";setidentity_insert{0}off;".format(tab_name)#executethelastbatchtry:cursor_target.execute(insert_sql)except:print("------------------error"+tab_name+"datasyncedfailed------------------------")raiseconn_target.commit()insert_sql=""values_sql=""current_row=""print(time.strftime("%Y-%m-%d%H:%M:%S",time.localtime())+"***************"+self.format_object_name(tab_name)+""+str(counter)+"rowssynced*************")print(time.strftime("%Y-%m-%d%H:%M:%S",time.localtime())+"----------------synced"+self.format_object_name(tab_name)+"datafinished---------------")print()cursor_source.close()conn_source.close()cursor_target.close()conn_target.close()defsync_dependent_object(self,obj_name):#强制覆盖,不需要对依赖对象生效,如果是因为属于依赖对象而被同步的,先检查target中是否存在,如果存在就不继续同步,这里打一个标记来实现is_refernece="Y"conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()'''finddependentobjectsifexistsdependentobjects,syncDependentobjectsobjectsinadvance'''sql_check_dependent=r'''SELECT*FROM(SELECTdistinctrtrim(lower(s.type))COLLATEChinese_PRC_CI_ASasobj_type,QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name)COLLATEChinese_PRC_CI_ASasobjFROMsys.dm_sql_referenced_entities('{0}','OBJECT')asdinnerjoinsys.sysobjectssons.id=d.referenced_idunionallSELECTdistinctrtrim(lower(d.referenced_class_desc))COLLATEChinese_PRC_CI_ASasobj_type,QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name)COLLATEChinese_PRC_CI_ASasobjFROMsys.dm_sql_referenced_entities('{0}','OBJECT')asdinnerjoinsys.typessons.user_type_id=d.referenced_id)t'''.format(self.format_object_name(obj_name))cursor_source.execute(sql_check_dependent)result=cursor_source.fetchall()forrowinresult:ifrow[1]:if(row[0]=="u"):if(row[1]):self.sync_table_schema_byname(row[1],is_refernece)elif(row[0]=="fn"orrow[0]=="if"):if(row[1]):self.sync_procudre_by_name("f",row[1],is_refernece)elif(row[0]=="type"):if(row[1]):self.sync_table_variable(row[1],is_refernece)defsync_procudre_by_name(self,type,obj_name,is_reference):conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()if(self.exits_object(conn_source,self.format_object_name(obj_name))==0):print("---------------warningmessage----------------")print("---------------warning:object"+obj_name+"notexistinginsourcedatabase----------------")print("---------------warningmessage----------------")print()returnif(self.exits_object(conn_target,self.format_object_name(obj_name))>0):if(self.f!="Y"):print("---------------warningmessage----------------")print("---------------warning:object"+obj_name+"existingintargetdatabase----------------")print("---------------warningmessage----------------")print()return'''本来想直接生成删除语句的:这里有一个该死的转义,怎么都弄不好,中午先去吃饭吧,下午回来想了一下,换一种方式,不要死磕转义问题了sql_script=select'ifobject_id('+''''+QUOTENAME(schema_name(uid))+''+QUOTENAME(name)+''''+')isnotnull'+'dropproc'+QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),OBJECT_DEFINITION(id)fromsys.sysobjectswherextype='P'anduidnotin(16,19)'''sql_script=r'''selectQUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),OBJECT_DEFINITION(id)fromsys.sysobjectswherextypein('P','IF','FN')anduidnotin(16,19)'''if(obj_name):sql_script=sql_script+"andQUOTENAME(schema_name(uid))+'.'+QUOTENAME(name)='{0}'".format(self.format_object_name(obj_name))cursor_source.execute(sql_script)row=cursor_source.fetchone()try:iftype=="f":sql_script=r'''ifobject_id('{0}')isnotnulldropfunction{0}'''.format(self.format_object_name(row[0]))eliftype=="p":sql_script=r'''ifobject_id('{0}')isnotnulldropproc{0}'''.format(self.format_object_name(row[0]))cursor_target.execute(sql_script)#dropcurrentstored_procudreifexistsconn_target.commit()#syncdependentobjectif(is_reference!="N"):self.sync_dependent_object(self.format_object_name(row[0]))#syncobjectitselfcursor_target.execute(str(row[1]))#executecreatestored_procudrescriptconn_target.commit()print("*************syncsp:"+self.format_object_name(row[0])+"finished*****************")print()except:print("---------------errormessage----------------")print("------------------sync"+row[0]+"sperror--------------------------")print("---------------errormessage----------------")print()cursor_source.close()conn_source.close()cursor_target.close()conn_target.close()defsync_procudre(self,type):is_reference="N"conn_source=self.get_connect(self.s_h,self.s_i,self.s_P,self.s_u,self.s_p,self.s_d)conn_target=self.get_connect(self.t_h,self.t_i,self.t_P,self.t_u,self.t_p,self.t_d)cursor_source=conn_source.cursor()cursor_target=conn_target.cursor()if(self.s_obj):forproc_nameinself.s_obj.split(","):self.sync_dependent_object(proc_name)self.sync_procudre_by_name(type,proc_name,is_reference)#syncallspandfunctionelse:sql_script=r'''selectQUOTENAME(schema_name(uid))+'.'+QUOTENAME(name),OBJECT_DEFINITION(id)fromsys.sysobjectswherextype=upper('{0}')anduidnotin(16,19)'''.format(type)cursor_source.execute(sql_script)forrowincursor_source.fetchall():self.sync_dependent_object(row[0])self.sync_procudre_by_name(type,row[0],is_reference)if__name__=="__main__":'''sync=SyncDatabaseObject(s_h="127.0.0.1",s_i="sql2017",s_P=49744,s_d="DB01",t_h="127.0.0.1",t_i="sql2017",t_P=49744,t_d="DB02",s_obj_type="sp",s_obj="dbo.sp_test01",f="Y")sync.sync_procudre("p")'''p_s_h=""p_s_i="MSSQL"p_s_P=1433p_s_d=""p_s_u=Nonep_s_p=Nonep_s_obj=""p_type=""p_t_s=""p_t_i="MSSQL"p_t_P="1433"p_t_d=""p_t_u=Nonep_t_p=None#forceconvertargetdatabaseobject,defaultnotforcecovertargetdatabaseobjectp_f="N"#syncobjtypetable|spp_obj_type=None#syncwhickdatabaseobjectp_obj=Noneiflen(sys.argv)==1:print(usage)sys.exit(1)elifsys.argv[1]=='--help':print(usage)sys.exit()eliflen(sys.argv)>=2:foriinsys.argv[1:]:_argv=i.split('=')#sourceservernameif_argv[0]=='-s_h':p_s_h=_argv[1]#sourceserverinstancenameif_argv[0]=='-s_i':if(_argv[1]):p_s_i=_argv[1]#sourceserverinstancePORTif_argv[0]=='-s_P':if(_argv[1]):p_s_P=_argv[1]#sourcedatabasenameif_argv[0]=='-s_d':p_s_d=_argv[1]if_argv[0]=='-s_u':p_s_u=_argv[1]if_argv[0]=='-s_p':p_s_p=_argv[1]if_argv[0]=='-t_h':p_t_h=_argv[1]if_argv[0]=='-t_i':if(_argv[1]):p_t_i=_argv[1]if_argv[0]=='-t_P':if(_argv[1]):p_t_P=_argv[1]if_argv[0]=='-t_d':p_t_d=_argv[1]if_argv[0]=='-t_u':p_t_u=_argv[1]if_argv[0]=='-t_p':p_t_p=_argv[1]if_argv[0]=='-f':if(_argv[1]):p_f=_argv[1]#objecttypeif_argv[0]=='-obj_type':ifnot(_argv[1]):print("-obj_typecannotbenull(-obj=tab|-obj=sp|-obj=fn|-obj=type)")exit(0)else:p_obj_type=_argv[1]#objectnameif_argv[0]=='-obj':if(_argv[1]):p_obj=_argv[1]#requireparaifp_s_h.strip()=="":print("sourceserverhostcannotbenull")exit(0)ifp_s_d.strip()=="":print("sourceserverhostdatabasenamecannotbenull")exit(0)ifp_t_h.strip()=="":print("targetserverhostcannotbenull")exit(0)ifp_t_d.strip()=="":print("targetserverhostdatabasenamecannotbenull")exit(0)sync=SyncDatabaseObject(s_h=p_s_h,s_i=p_s_i,s_P=p_s_P,s_d=p_s_d,s_u=p_s_u,s_p=p_s_p,s_obj=p_obj,t_h=p_t_h,t_i=p_t_i,t_P=p_t_P,t_d=p_t_d,t_u=p_t_u,t_p=p_t_p,f=p_f)sync.validated_connect(p_s_h,p_s_i,p_s_P,p_s_d,p_s_u,p_s_p)sync.validated_connect(p_t_h,p_t_i,p_t_P,p_t_d,p_t_u,p_t_p)if(p_f.upper()=="Y"):confirm=input("confirmyouwanttooverwritethetargetobject?")ifconfirm.upper()!="Y":exit(0)print("--------------------------syncbegin----------------------------------")print()if(p_obj_type=="tab"):#syncschemasync.sync_schema()#synctableschemasync.sync_table_schema()#syncdatasync.sync_table_data()elif(p_obj_type=="sp"):#syncschemasync.sync_schema()#syncspsync.sync_procudre("p")elif(p_obj_type=="fn"):#syncschemasync.sync_schema()#syncspsync.sync_procudre("fn")elif(p_obj_type=="tp"):#syncschemasync.sync_schema()#syncspsync.sync_table_variable()else:print("-obj_typeisnotvalidated")print()print("--------------------------syncfinish----------------------------------")
上述就是小编为大家分享的使用Python怎么实现SQL Server数据库的对象同步轻量级了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。