最近在使用 Flask 时,需要定期从远端服务器拉取 IP 数据,以提供相关 API 使用。所以使用了 apscheduler 这个定时任务框架来执行数据库读取以及本地数据更新等功能。
这里把踩过的一些坑整理一下。
1 如何设置定时任务
安装 APScheduler(Advanced Python Scheduler):
sudo pip3 install apscheduler
一个简单的定时任务如下:
import sys
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
def task():
sys.stdout.write('\nrun task\n')
sys.stdout.flush()
def main():
scheduler = BackgroundScheduler()
scheduler.add_job(func=task, trigger="interval", seconds=10)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
for i in range(60):
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
if __name__ == '__main__':
main()
输出内容如下:
..........
run task
..........
run task
..........
run task
..........
run task
..........
run task
..........
run task
这个简单的测试代码,说明了以下问题:
定时任务不是 start 时候立即执行,而是到期后执行第一次,所以对于读取数据库的任务,应该先调用触发一次,否则会有一段时间没有任何数据。
2 使用 APScheduler 遇到的问题
在 uwsgi + Flask 中使用 APScheduler 时,出现了以下问题:
当 uwsgi 设置 processes 个数不为 0 时,APScheduler 定期读取数据库更新的数据,只在父进程得到了更新,而在 worker 子进程中的数据没有更新。所以当用户请求到 worker 进程时,会获取最新的数据失败。
为了解决这个问题,尝试使用了几种方式:
- 为每个 process 设置一个 APScheduler;
- 将父进程的更新同步到子进程;
- 使用 Flask-APScheduler;
以上两个方式,最终均以失败告终,原因如下:
- APScheduler 官方文档中,有是否能够在多个 worker 间共享一个定期任务,答案是否定的,并给出了相关的推荐做法;
- 多个进程间共享数据,有读写锁的问题,解决更加复杂;
- Flask-APScheduler 仅仅是将 APScheduler 与 Flask 结合在一起,没有改变实际的实现方式,但是提供了更适合 Flask 的实践方式;
3 如何解决
解决这个问题,有两种方式:
- APScheduler 官方建议的,通过单独的一个服务来提供数据访问;
- 使用 redis 等数据库来达到数据同步;
这里使用第二种方式,第 1 种方式需要自己来单独写服务,并且要保证服务的可用性,其实实现难度增大。并且使用 Flask-APScheduler 来提供统一的 APScheduler。
安装 Flask-APScheduler:
sudo pip3 install Flask-APScheduler
实现逻辑如下:
import redis
import pymysql
import flask
import logging
from flask_apscheduler import APScheduler
app = flask.Flask(__name__)
app.logger.setLevel(logging.INFO)
app.config.from_mapping(
JOBS=[{
'id': 'reload_server',
'func': 'app:reload_server',
'args': (app.logger,),
'trigger': 'interval',
'seconds': 60
}],
SCHEDULER_API_ENABLED=True,
)
reload_scheduler = APScheduler()
reload_scheduler.init_app(app)
reload_scheduler.start()
redis_pool = redis.ConnectionPool(decode_responses=True)
redis_client = redis.Redis(connection_pool=redis_pool)
def reload_server(logger):
logger.info('reload servers scheduler start')
conn = pymysql.connect(host='127.0.0.1', port=3306,
user='root', passwd='root',
db='servers')
exec_sql = 'SELECT `ip` FROM `servers`'
cur = conn.cursor()
cur.execute(exec_sql)
for ip in cur:
redis_client.set(ip, '', ex=60)
conn.close()
logger.info('reload servers scheduler stop')
if __name__ == '__main__':
app.run()
在子进程中,则可以直接通过 redis 来读取数据。
4 解决 redis 写效率
测试过程中,发现 redis 从远端 mysql 获取到数据写到本地,速度是有点慢的,几千条数据需要花费几秒才能结束,所以针对写操作部分,进行了一定的优化。
使用 pipeline 的方式批量写入 redis:
def reload_server(logger):
logger.info('reload servers scheduler start')
conn = pymysql.connect(host='127.0.0.1', port=3306,
user='root', passwd='root',
db='servers')
exec_sql = 'SELECT `ip` FROM `servers`'
cur = conn.cursor()
cur.execute(exec_sql)
with redis_client.pipeline(transaction=False) as pipe:
for ip in cur:
pipe.set(ip, '', ex=60)
pipe.execute()
conn.close()
logger.info('reload servers scheduler stop')