Flask中常见的数据库定义和数据库迁移
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from info import constants
from . import db
class BaseModel(object):
"""模型基类,为每个模型补充创建时间与更新时间"""
create_time = db.Column(db.DateTime, default=datetime.now) # 记录的创建时间
update_time = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now) # 记录的更新时间
# 用户收藏表,建立用户与其收藏新闻多对多的关系
tb_user_collection = db.Table(
"info_user_collection",
db.Column("user_id", db.Integer, db.ForeignKey("info_user.id"), primary_key=True), # 新闻编号
db.Column("news_id", db.Integer, db.ForeignKey("info_news.id"), primary_key=True), # 分类编号
db.Column("create_time", db.DateTime, default=datetime.now) # 收藏创建时间
)
tb_user_follows = db.Table(
"info_user_fans",
db.Column('follower_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True), # 粉丝id
db.Column('followed_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True) # 被关注人的id
)
class User(BaseModel, db.Model):
"""用户"""
__tablename__ = "info_user"
id = db.Column(db.Integer, primary_key=True) # 用户编号
nick_name = db.Column(db.String(32), unique=True, nullable=False) # 用户昵称
password_hash = db.Column(db.String(128), nullable=False) # 加密的密码
mobile = db.Column(db.String(11), unique=True, nullable=False) # 手机号
avatar_url = db.Column(db.String(256)) # 用户头像路径
last_login = db.Column(db.DateTime, default=datetime.now) # 最后一次登录时间
is_admin = db.Column(db.Boolean, default=False)
signature = db.Column(db.String(512)) # 用户签名
gender = db.Column( # 订单的状态
db.Enum(
"MAN", # 男
"WOMAN" # 女
),
default="MAN")
# 当前用户收藏的所有新闻
collection_news = db.relationship("News", secondary=tb_user_collection, lazy="dynamic") # 用户收藏的新闻
# 用户所有的粉丝,添加了反向引用followed,代表用户都关注了哪些人
followers = db.relationship('User',
secondary=tb_user_follows,
primaryjoin=id == tb_user_follows.c.followed_id,
secondaryjoin=id == tb_user_follows.c.follower_id,
backref=db.backref('followed', lazy='dynamic'),
lazy='dynamic')
# 当前用户所发布的新闻
news_list = db.relationship('News', backref='user', lazy='dynamic')
@property
def password(self):
raise AttributeError("当前属性不可读")
@password.setter
def password(self, value):
self.password_hash = generate_password_hash(value)
def check_passowrd(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
resp_dict = {
"id": self.id,
"nick_name": self.nick_name,
"avatar_url": constants.QINIU_DOMIN_PREFIX + self.avatar_url if self.avatar_url else "",
"mobile": self.mobile,
"gender": self.gender if self.gender else "MAN",
"signature": self.signature if self.signature else "",
"followers_count": self.followers.count(),
"news_count": self.news_list.count()
}
return resp_dict
def to_admin_dict(self):
resp_dict = {
"id": self.id,
"nick_name": self.nick_name,
"mobile": self.mobile,
"register": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"last_login": self.last_login.strftime("%Y-%m-%d %H:%M:%S"),
}
return resp_dict
class News(BaseModel, db.Model):
"""新闻"""
__tablename__ = "info_news"
id = db.Column(db.Integer, primary_key=True) # 新闻编号
title = db.Column(db.String(256), nullable=False) # 新闻标题
source = db.Column(db.String(64), nullable=False) # 新闻来源
digest = db.Column(db.String(512), nullable=False) # 新闻摘要
content = db.Column(db.Text, nullable=False) # 新闻内容
clicks = db.Column(db.Integer, default=0) # 浏览量
index_image_url = db.Column(db.String(256)) # 新闻列表图片路径
category_id = db.Column(db.Integer, db.ForeignKey("info_category.id"))
user_id = db.Column(db.Integer, db.ForeignKey("info_user.id")) # 当前新闻的作者id
status = db.Column(db.Integer, default=0) # 当前新闻状态 如果为0代表审核通过,1代表审核中,-1代表审核不通过
reason = db.Column(db.String(256)) # 未通过原因,status = -1 的时候使用
# 当前新闻的所有评论
comments = db.relationship("Comment", lazy="dynamic")
def to_review_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"status": self.status,
"reason": self.reason if self.reason else ""
}
return resp_dict
def to_basic_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"source": self.source,
"digest": self.digest,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"index_image_url": self.index_image_url,
"clicks": self.clicks,
}
return resp_dict
def to_dict(self):
resp_dict = {
"id": self.id,
"title": self.title,
"source": self.source,
"digest": self.digest,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"content": self.content,
"comments_count": self.comments.count(),
"clicks": self.clicks,
"category": self.category.to_dict(),
"index_image_url": self.index_image_url,
"author": self.user.to_dict() if self.user else None
}
return resp_dict
class Comment(BaseModel, db.Model):
"""评论"""
__tablename__ = "info_comment"
id = db.Column(db.Integer, primary_key=True) # 评论编号
user_id = db.Column(db.Integer, db.ForeignKey("info_user.id"), nullable=False) # 用户id
news_id = db.Column(db.Integer, db.ForeignKey("info_news.id"), nullable=False) # 新闻id
content = db.Column(db.Text, nullable=False) # 评论内容
parent_id = db.Column(db.Integer, db.ForeignKey("info_comment.id")) # 父评论id
parent = db.relationship("Comment", remote_side=[id]) # 自关联
like_count = db.Column(db.Integer, default=0) # 点赞条数
def to_dict(self):
resp_dict = {
"id": self.id,
"create_time": self.create_time.strftime("%Y-%m-%d %H:%M:%S"),
"content": self.content,
"parent": self.parent.to_dict() if self.parent else None,
"user": User.query.get(self.user_id).to_dict(),
"news_id": self.news_id,
"like_count": self.like_count
}
return resp_dict
class CommentLike(BaseModel, db.Model):
"""评论点赞"""
__tablename__ = "info_comment_like"
comment_id = db.Column("comment_id", db.Integer, db.ForeignKey("info_comment.id"), primary_key=True) # 评论编号
user_id = db.Column("user_id", db.Integer, db.ForeignKey("info_user.id"), primary_key=True) # 用户编号
class Category(BaseModel, db.Model):
"""新闻分类"""
__tablename__ = "info_category"
id = db.Column(db.Integer, primary_key=True) # 分类编号
name = db.Column(db.String(64), nullable=False) # 分类名
news_list = db.relationship('News', backref='category', lazy='dynamic')
def to_dict(self):
resp_dict = {
"id": self.id,
"name": self.name
}
return resp_dict
####################################
数据库基本操作在Flask-SQLAlchemy中,插入、修改、删除操作,均由数据库会话管理。
会话用 db.session 表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用 commit() 方法提交会话。
在 Flask-SQLAlchemy 中,查询操作是通过 query 对象操作数据。
最基本的查询是返回表中所有数据,可以通过过滤器进行更精确的数据库查询。
在视图函数中定义模型类fromflaskimportFlaskfromflask_sqlalchemyimportSQLAlchemyapp=Flask(__name__)#设置连接数据库的URLapp.config['SQLALCHEMY_DATABASE_URI']='mysql://root:mysql@127.0.0.1:3306/test'app.config['SQLALCHEMY_TRACK_MODIFICATIONS']=True#查询时会显示原始SQL语句app.config['SQLALCHEMY_ECHO']=Truedb=SQLAlchemy(app)classRole(db.Model):#定义表名__tablename__='roles'#定义列对象id=db.Column(db.Integer,primary_key=True)name=db.Column(db.String(64),unique=True)us=db.relationship('User',backref='role')#repr()方法显示一个可读字符串def__repr__(self):return'Role:%s'%self.nameclassUser(db.Model):__tablename__='users'id=db.Column(db.Integer,primary_key=True)name=db.Column(db.String(64),unique=True,index=True)email=db.Column(db.String(64),unique=True)password=db.Column(db.String(64))role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))def__repr__(self):return'User:%s'%self.nameif__name__=='__main__':app.run(debug=True)模型之前的关联一对多
classRole(db.Model):...#关键代码us=db.relationship('User',backref='role',lazy='dynamic')...classUser(db.Model):...role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))
其中realtionship描述了Role和User的关系。在此文中,第一个参数为对应参照的类"User"
第二个参数backref为类User申明新属性的方法
第三个参数lazy决定了什么时候SQLALchemy从数据库中加载数据
设置为 dynamic 的话,role.users 返回查询对象,并没有做真正的查询,可以利用查询对象做其他逻辑,比如:先排序再返回结果
设置为 subquery 的话,role.users 返回所有数据列表
如果设置为子查询方式(subquery),则会在加载完Role对象后,就立即加载与其关联的对象,这样会让总查询数量减少,但如果返回的条目数量很多,就会比较慢
另外,也可以设置为动态方式(dynamic),这样关联对象会在被使用的时候再进行加载,并且在返回前进行过滤,如果返回的对象数很多,或者未来会变得很多,那最好采用这种方式
常用的SQLAlchemy查询过滤器过滤器说明filter()把过滤器添加到原查询上,返回一个新查询filter_by()把等值过滤器添加到原查询上,返回一个新查询limit使用指定的值限定原查询返回的结果offset()偏移原查询返回的结果,返回一个新查询order_by()根据指定条件对原查询结果进行排序,返回一个新查询group_by()根据指定条件对原查询结果进行分组,返回一个新查询常用的SQLAlchemy查询执行器方法说明all()以列表形式返回查询的所有结果first()返回查询的第一个结果,如果未查到,返回Nonefirst_or_404()返回查询的第一个结果,如果未查到,返回404get()返回指定主键对应的行,如不存在,返回Noneget_or_404()返回指定主键对应的行,如不存在,返回404count()返回查询结果的数量paginate()返回一个Paginate对象,它包含指定范围内的结果创建表:db.create_all()删除表
db.drop_all()插入一条数据
ro1=Role(name='admin')db.session.add(ro1)db.session.commit()#再次插入一条数据ro2=Role(name='user')db.session.add(ro2)db.session.commit()一次插入多条数据
us1=User(name='wang',email='wang@163.com',password='123456',role_id=ro1.id)us2=User(name='zhang',email='zhang@189.com',password='201512',role_id=ro2.id)us3=User(name='chen',email='chen@126.com',password='987654',role_id=ro2.id)us4=User(name='zhou',email='zhou@163.com',password='456789',role_id=ro1.id)us5=User(name='tang',email='tang@itheima.com',password='158104',role_id=ro2.id)us6=User(name='wu',email='wu@gmail.com',password='5623514',role_id=ro2.id)us7=User(name='qian',email='qian@gmail.com',password='1543567',role_id=ro1.id)us8=User(name='liu',email='liu@itheima.com',password='867322',role_id=ro1.id)us9=User(name='li',email='li@163.com',password='4526342',role_id=ro2.id)us10=User(name='sun',email='sun@163.com',password='235523',role_id=ro2.id)db.session.add_all([us1,us2,us3,us4,us5,us6,us7,us8,us9,us10])db.session.commit()"""查询所有用户数据查询有多少个用户查询第1个用户查询id为4的用户[3种方式]查询名字结尾字符为g的所有数据[开始/包含]查询名字不等于wang的所有数据[2种方式]查询名字和邮箱都以li开头的所有数据[2种方式]查询password是`123456`或者`email`以`itheima.com`结尾的所有数据查询id为[1,3,5,7,9]的用户列表查询name为liu的角色数据查询所有用户数据,并以邮箱排序每页3个,查询第2页的数据"""查询:filter_by精确查询
返回名字等于wang的所有人
User.query.filter_by(name='wang').all()
first()返回查询到的第一个对象User.query.first()all()返回查询到的所有对象
User.query.all()
filter模糊查询,返回名字结尾字符为g的所有数据。User.query.filter(User.name.endswith('g')).all()
get():参数为主键,如果主键不存在没有返回内容User.query.get()逻辑非,返回名字不等于wang的所有数据
User.query.filter(User.name!='wang').all()
not_ 相当于取反fromsqlalchemyimportnot_User.query.filter(not_(User.name=='chen')).all()
逻辑与,需要导入and,返回and()条件满足的所有数据fromsqlalchemyimportand_User.query.filter(and_(User.name!='wang',User.email.endswith('163.com'))).all()逻辑或,需要导入or_
fromsqlalchemyimportor_User.query.filter(or_(User.name!='wang',User.email.endswith('163.com'))).all()
查询数据后删除user=User.query.first()db.session.delete(user)db.session.commit()User.query.all()更新数据
user=User.query.first()user.name='dong'db.session.commit()User.query.first()
关联查询示例:角色和用户的关系是一对多的关系,一个角色可以有多个用户,一个用户只能属于一个角色。
查询角色的所有用户
#查询roles表id为1的角色ro1=Role.query.get(1)#查询该角色的所有用户ro1.us.all()
查询用户所属角色
#查询users表id为3的用户us1=User.query.get(3)#查询用户属于什么角色us1.role
################################################
数据库迁移在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。
更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中。
在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。
为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。
首先要在虚拟环境中安装Flask-Migrate。
pipinstallflask-migrate
代码文件内容:
#coding=utf-8fromflaskimportFlaskfromflask_sqlalchemyimportSQLAlchemyfromflask_migrateimportMigrate,MigrateCommandfromflask_scriptimportShell,Managerapp=Flask(__name__)manager=Manager(app)app.config['SQLALCHEMY_DATABASE_URI']='mysql://root:mysql@127.0.0.1:3306/Flask_test'app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']=Trueapp.config['SQLALCHEMY_TRACK_MODIFICATIONS']=Truedb=SQLAlchemy(app)#第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例migrate=Migrate(app,db)#manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令manager.add_command('db',MigrateCommand)#定义模型RoleclassRole(db.Model):#定义表名__tablename__='roles'#定义列对象id=db.Column(db.Integer,primary_key=True)name=db.Column(db.String(64),unique=True)user=db.relationship('User',backref='role')#repr()方法显示一个可读字符串,def__repr__(self):return'Role:'.format(self.name)#定义用户classUser(db.Model):__talbe__='users'id=db.Column(db.Integer,primary_key=True)username=db.Column(db.String(64),unique=True,index=True)#设置外键role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))def__repr__(self):return'User:'.format(self.username)if__name__=='__main__':manager.run()创建迁移仓库
#这个命令会创建migrations文件夹,所有迁移文件都放在里面。pythondatabase.pydbinit
创建迁移脚本自动创建迁移脚本有两个函数
upgrade():函数把迁移中的改动应用到数据库中。
downgrade():函数则将改动删除。
自动创建的迁移脚本会根据模型定义和数据库当前状态的差异,生成upgrade()和downgrade()函数的内容。
对比不一定完全正确,有可能会遗漏一些细节,需要进行检查
pythondatabase.pydbmigrate-m'initialmigration'
更新数据库pythondatabase.pydbupgrade返回以前的版本
可以根据history命令找到版本号,然后传给downgrade命令:
pythonapp.pydbhistory输出格式:<base>->版本号(head),initialmigration
回滚到指定版本
pythonapp.pydbdowngrade版本号实际操作顺序:
1.python 文件 db init
2.python 文件 db migrate -m"版本名(注释)"
3.python 文件 db upgrade 然后观察表结构
4.根据需求修改模型
5.python 文件 db migrate -m"新版本名(注释)"
6.python 文件 db upgrade 然后观察表结构
7.若返回版本,则利用 python 文件 db history查看版本号
8.python 文件 db downgrade(upgrade) 版本号
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。