python执行SQL语句怎么实现
这篇文章主要介绍了python执行SQL语句怎么实现,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
安装pip3installdirectsql导入
directsql 目前只提供三个外部类
__all__=["SqlGenerator","MysqlConnection","MysqlPool"]
导入方式
fromdirectsql.sqlgeneratorimportSqlGenerator#该类用于生成sql语句#下面是一个池化连接对象MysqlPool和一个简单连接对象MysqlConnectorfromdirectsql.connectorimportMysqlConnection,MysqlConnector使用
1 创建连接
#1.传入有名参数conn=MysqlConnection(host='127.0.0.1',port=3306,password='123456',database='test_base')print(conn.database)conn=MysqlPool(host='127.0.0.1',port=3306,password='123456',database='test_base')#也可使用直接参数字典conn_args={'host':'127.0.0.1','port':3306,'password':'123456','database':'test_base',}conn=MysqlConnection(**conn_args)#单个连接print(conn.database)conn=MysqlPool(**conn_args)#池化连接对象print(conn.database)#2直接使用字符串#以下字符串是常用的终端连接命令string_arg="mysql-uroot-h227.0.0.1-P3306-p123456-Dtest_base"conn=MysqlConnection(string_arg=string_arg)print(conn.database)conn=MysqlPool(string_arg=string_arg)print(conn.database)2 执行sql语句
事实上directsql 封装了 很多 语句。可以满足很大一部分日常使用场景。但是如果有复杂的语句,仍然需要调用原生的sql 执行。而且directsql 中很多封装好的方法是先拼接sql 再 调用该语句,所以这里还是先简单介绍下,directsql 如何执行原生sql。
无论是 MysqlConnection 类 还是 MysqlPool 类 都通过 execute_sql 方法 来执行sql。
例如 :
conn=MysqlConnection(string_arg="mysql-uroot-h227.0.0.1-P3306-p123456-Dtest")result,count=conn.execute_sql("select*fromtest_table")print(result)print(count)>>>((1,'罗辑','28'),(2,'庄颜','25'),(3,'叶文洁','54'),(4,'程心','25'),(5,'云天明','27'))>>>5#这里默认是普通游标,你也可以指定使用字典游标:result,count=conn.execute_sql("select*fromtest_table",cursor_type='dict')>>>[{'ID':1,'name':'罗辑','age':'28'},{'ID':2,'name':'庄颜','age':'25'},{'ID':3,'name':'叶文洁','age':'54'},{'ID':4,'name':'程心','age':'25'},{'ID':5,'name':'云天明','age':'27'}]>>>5
execute_sql 方法 返回的是一个元组,(结果集,条数)
下文出现的所有方法无特殊说明都是返回元组,且支持dict游标
附带参数执行语句
这里的参数使用起来和 pymysql 提供的 execute 以及executemany 没有任何 差别,以下简单提供几个示例:
#传元组result,count=conn.execute_sql("select*fromtest_tablewhereage=%s",param=(25,))#传字典result,count=conn.execute_sql("select*fromtest_tablewhereage=%(age)s",param={'age':25})#元组列表result,count=conn.execute_sql("insertintotest_table(`age`,`name`)values(%s,%s)",param=[('宋运辉',37),('程开颜',33)])#字典列表result,count=conn.execute_sql("insertintotest_table(`age`,`name`)values(%(age)s,%(name)s)",param=[{"name":"宋运辉",'age':37},{"name":"程开颜",'age':33}])3 select 方法
select 方法 可以接受多参数,参数列表如下。
defselect(self,columns='id',table=None,where=None,group_by:str=None,order_by:str=None,limit:int=None,offset=None,cursor_type=None):
》》》 conn.select('*', 'test_table')
select id from test_table where age=25
》》》 conn.select('*', 'test_table', where={'age': 25})
select name,age from test_table where age=25 and id=2
多字段直接传入字符串
》》》 conn.select("age,name", 'test_table', where={'age': 25,'id':2})
传入列表/元组
》》》 conn.select(['age','name'], 'test_table', where={'age': 25,'id':2})
select * from test_table group by id order by age desc limit 1 offset 1
》》》conn.select('*', 'test_table', order_by='age desc',group_by='id',limit=1,offset=1)
select 功能看起来甚至不如直接写原生sql 快,但是如果查询条件是在不断变化的,尤其是where条件,那么使用select 方法 会比自行拼接更方便。
例如,需要不断地读取一个字典变量,然后根据这个变量中的条件去查询数据,而这个字典的键个数会变化,但是键都恰好是表的字段。这个时候使用select 方法会十分简便,只需要令where参数等于那个字典即可。
平心而论,这个方法确实用处不大。
4 insert_into 方法definsert_into(self,table,data:dictorlist,columns=None,ignroe=False,on_duplicate_key_update:str=None,return_id=False):
该方法可以接受传入字典或者 字典列表,并且可选 返回 游标影响的条数 或者是 新插入的数据的id。
columns 为空时,将取第一条数据的所有键,此时请确保所有数据键相同。
#传入字典data_1={"age":44,'name':"雷东宝"}count=conn.insert_into('test_table',data_1)#默认返回受影响条数print(count)#>>>1return_id=conn.insert_into('test_table',data_1,return_id=True)#可选返回idprint(return_id)>>>22533#传入字典列表data_2={"age":22,'name':"宋运萍"}all_data=[data_1,data_2]count=conn.insert_into('test_table',all_data)#限定插入的字段。(字典有多字段,但是只需要往表里插入指定的字段时)data_3={"age":44,'name':"雷东宝","title":"村支书"}#title不需要,只要age和namecount=conn.insert_into('test_table',data_1,columns=["age","name"])#ignore参数data_1={"age":44,'name':"雷东宝","id":22539}count=conn.insert_into('test_table',ignore=True)print(count)>>>0#由于表中id22539已经存在,该条记录不会插入,影响0条数据#on_duplicate_key_update参数data_1={"age":44,'name':"雷东宝","id":22539}#id=22539已经存在count=conn.insert_into('test_table',data_1,on_duplicate_key_update='name="雷copy"')print(count)#返回影响条数>>>2#尝试插入一条,但是发生重复,于是删除新数据,并更新旧数据。实际上影响了两条。
在insert_into 方法中提供了 on_duplicate_key_update 参数,但是实际上使用起来比较鸡肋,需要自己传入 on_duplicate_key_update 后的语句进行拼接。
如果你仅仅只是需要在发生重复时将旧数据的特定字段更新为新数据对应字段的值时。merge_into 方法更适合。
5 merge_into 方法在 其他关系型数据库中,提供有merge into 的语法,但是mysql 中没有提供。 不过这里我们通过insert 和 on_duplicate_key_update 语法 封装出了一个 类似merge_into 的方法。 该方法返回的是影响的条数
def* merge_into(self, table, data, columns=None, need_merge_columns: list = None):
columns 为空时,将取第一条数据的所有键,此时请确保所有数据键相同。
need_merge_columns 为在发生重复时需要替换(覆盖)的字段。
data_1={"age":44,'name':"雷东宝","id":22539}data_2={"age":22,'name':"宋运萍","id":22540}all_data=[data_1,data_2,]count=conn.merge_into('test_table',all_data,need_merge_columns=['name',])print(count)>>>4#两条数据正好都是重复的,插入两条又删除后修改两条,返回46 replace_into 方法
该方法简单,不做过多说明。该方法 返回的是影响的条数
def replace_into(self,table, data: dict or list, columns=None)
data_1={"age":44,'name':"雷东宝","id":22539}data_2={"age":22,'name':"宋运萍","id":22540}all_data=[data_1,data_2,]count=conn.replace_into('test_table',all_data)7 update 方法
def update(self,table, data: dict, where, columns: None or list = None, limit=None):
该方法data 参数只接受传入字典。该方法 返回的是影响的条数
data_1={"age":44,'name':"雷copy"}count=conn.update('test_table',data_1,where={'id':22539})#更新id=22539的数据为新的data_1print(count)>>>1
除此之外,还提供了一个衍生的方法
def update_by_primary(self, table, data: dict, pri_value, columns=None, primary: str = 'id'):
用于通过主键去更新数据。pri_value 即为主键的值。primary 为主键,默认为id
data_1={"age":44,'name':"雷cpy"}count=conn.update_by_primary('test_table',data_1,pri_value=22539)8 delete 方法
defdelete_by_primary(self,table,pri_value,primary='id'):"""通过主键删除数据"""defdelete(self,table,where:strordict,limit:int=0):"""通过where条件删除数据"""count=conn.delete('test_table',where={'name':'雷东宝'})#删除name=雷东宝的数据count=conn.delete_by_primary('test_table',pri_value=22539)#删除主键等于22539的数据9 使用 事务
def do_transaction(self, sql_params: list, cursor_type=None):
sql_params 为 元组列表。 【(sql_1,param_1),(sql_2,param_2】
如果sql 不需要参数也要传入 None ,如 【(sql_1,None),】
sql_params=[("updatetest_tablesetname=%(name)swhereid=%(id)s",{'name':'洛基','id':22539}),("updatetest_tablesetname=%(name)swhereid=%(id)s",{'name':'mask','id':22540}),]count=conn.do_transaction(sql_params)>>>((),1)#返回最后一条执行语句的结果和影响条数10 读取流式游标结果
def read_ss_result(self, sql, param=None, cursor_type='ss'):
cursor_type 可选 ss 和 ssdict
注意,该方法返回的是 生成器对象,拿到结果需要不断进行遍历。
result=conn.read_ss_result("select*fromtest_table")fordatainresult:print(data)
感谢你能够认真阅读完这篇文章,希望小编分享的“python执行SQL语句怎么实现”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。