Python 官方文档:入门教程 => 点击学习
目录基本使用trigger启动方式cron启动方式使用装饰器定时启动任务flask-apscheduler将apscheduler移植到了flask应用中,使得在flask中可以非常
flask-apscheduler
将apscheduler
移植到了flask
应用中,使得在flask
中可以非常方便的使用定时任务了,除此之外,它还有如下几个特性
下载安装
pip install flask-apscheduler
flask-apscheduler
的相关配置,我们会将它和其它扩展一起,放在应用的配置里
class Config(object):
// 配置项
JOBS = [
{
'id': 'job1',
'func': 'run:add',
'args': (1, 2),
'trigger': 'interval',
'seconds': 3
}
]
SCHEDULER_API_ENABLED = True
def add(a, b):
print(a+b)
JOBS列表的每一个元素表示一个定时任务,列子中只有一个interval任务,表示每隔3秒运行一次函数add。func指定调用的函数,args表示传入函数的参数,trigger表示启动方式,常用的有两种,分别是trigger和cron。
上边我们设置了SCHEDULER_API_ENABLED = True
,可以通过访问Http://127.0.0.1:5000/scheduler,其中scheduler
是默认的RESTful API
前缀
通过查看源码,可以发现flask-apscheduler提供了以下的接口
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
如果需要查看当前运行的所有定时任务,则请求http://127.0.0.1:5000/scheduler/jobs即可。
trigger表示间隔启动,在trigger方式中,使用seconds配置间隔多久启动一次,单位是秒。
cron表示定时启动
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'scheduler:task',
'args': (1, 2),
'trigger': 'cron',
'day': '*',
'hour': '13',
'minute': '16',
'second': '20'
}
]
SCHEDULER_API_ENABLED = True
def task(a, b):
print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.fORMat(a, b, a + b))
该配置项则表示每天的13点16分20秒启动一次。*表示全部。
有关常用的cron配置有:
day
hour
minute
second
week
day_of_week
from flask import Flask
from flask_apscheduler import APScheduler
import datetime
class Config(object):
SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
# interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
print(str(datetime.datetime.now()) + ' Job 1 executed')
表示每隔30秒调用一次job1函数。
到此这篇关于python flask框架定时任务apscheduler应用介绍的文章就介绍到这了,更多相关Python flask apscheduler内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: Pythonflask框架定时任务apscheduler应用介绍
本文链接: https://www.lsjlt.com/news/120716.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0