celery例子实现定时发送邮件
1、配置文件
vi setting.py
#coding:utf-8import osfrom datetime import timedeltaBROKER_URL = 'redis://127.0.0.1:6379/13'CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/14'CELERY_TIMEZONE = 'Asia/Shanghai'CELERY_ENABLE_UTC = TrueCELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用msgpack方案#CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON#CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显CELERY_ACCEPT_CONTENT = ['json'] # 指定接受的内容类型CELERYBEAT_SCHEDULE = { 'send_mail': { 'task': 'celery_test.tasks.send_mail', 'schedule': timedelta(seconds=10), }}
2、函数模块
vi tasks.py
#coding:utf-8from server import appimport random,string,smtplib@app.taskdef send_mail(): SUBJECT="临时登录密码" HOST="smtp.163.com" # TO=mail TO='456@qq.com' FROM="123@163.com" text=str(random.randint(1000,9999)) BODY=string.join(( "From:%s"%FROM, "To:%s"%TO, "Subject:%s"%SUBJECT, "",text),"\r\n") server=smtplib.SMTP(HOST) server.login("123@qq.com","第三方发送验证码") server.sendmail(FROM,[TO],BODY) server.quit() return True
3、启动模块
vi server.py
from celery import Celeryapp=Celery('celery_test',include=['celery_test.tasks'])app.config_from_object('celery_test.setting')if __name__=='__main__': app.start()
4、启动worker,执行任务
celery -A celery_test.server worker -l info
5、启动beat,生成任务
celery -A celery_test.server beat
6、目录结构
celery_test/{server.py,setting.py,tasks.py}
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。