最近在使用 Flask 时,需要定期从远端服务器拉取 IP 数据,以提供相关 API 使用。所以使用了 apscheduler 这个定时任务框架来执行数据库读取以及本地数据更新等功能。

这里把踩过的一些坑整理一下。

1 如何设置定时任务

安装 APScheduler(Advanced Python Scheduler):

sudo pip3 install apscheduler

一个简单的定时任务如下:

import sys
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler


def task():
    sys.stdout.write('\nrun task\n')
    sys.stdout.flush()


def main():
    scheduler = BackgroundScheduler()
    scheduler.add_job(func=task, trigger="interval", seconds=10)
    scheduler.start()
    atexit.register(lambda: scheduler.shutdown())

    for i in range(60):
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(1)


if __name__ == '__main__':
    main()

输出内容如下:

..........
run task
..........
run task
..........
run task
..........
run task
..........
run task
..........
run task

这个简单的测试代码,说明了以下问题:

定时任务不是 start 时候立即执行,而是到期后执行第一次,所以对于读取数据库的任务,应该先调用触发一次,否则会有一段时间没有任何数据。

2 使用 APScheduler 遇到的问题

在 uwsgi + Flask 中使用 APScheduler 时,出现了以下问题:

当 uwsgi 设置 processes 个数不为 0 时,APScheduler 定期读取数据库更新的数据,只在父进程得到了更新,而在 worker 子进程中的数据没有更新。所以当用户请求到 worker 进程时,会获取最新的数据失败。

为了解决这个问题,尝试使用了几种方式:

  1. 为每个 process 设置一个 APScheduler;
  2. 将父进程的更新同步到子进程;
  3. 使用 Flask-APScheduler;

以上两个方式,最终均以失败告终,原因如下:

  1. APScheduler 官方文档中,有是否能够在多个 worker 间共享一个定期任务,答案是否定的,并给出了相关的推荐做法;
  2. 多个进程间共享数据,有读写锁的问题,解决更加复杂;
  3. Flask-APScheduler 仅仅是将 APScheduler 与 Flask 结合在一起,没有改变实际的实现方式,但是提供了更适合 Flask 的实践方式;

3 如何解决

解决这个问题,有两种方式:

  1. APScheduler 官方建议的,通过单独的一个服务来提供数据访问;
  2. 使用 redis 等数据库来达到数据同步;

这里使用第二种方式,第 1 种方式需要自己来单独写服务,并且要保证服务的可用性,其实实现难度增大。并且使用 Flask-APScheduler 来提供统一的 APScheduler。

安装 Flask-APScheduler:

sudo pip3 install Flask-APScheduler

实现逻辑如下:

import redis
import pymysql
import flask
import logging
from flask_apscheduler import APScheduler

app = flask.Flask(__name__)
app.logger.setLevel(logging.INFO)
app.config.from_mapping(
    JOBS=[{
        'id': 'reload_server',
        'func': 'app:reload_server',
        'args': (app.logger,),
        'trigger': 'interval',
        'seconds': 60
    }],
    SCHEDULER_API_ENABLED=True,
)

reload_scheduler = APScheduler()
reload_scheduler.init_app(app)
reload_scheduler.start()

redis_pool = redis.ConnectionPool(decode_responses=True)
redis_client = redis.Redis(connection_pool=redis_pool)


def reload_server(logger):
    logger.info('reload servers scheduler start')
    conn = pymysql.connect(host='127.0.0.1', port=3306,
                           user='root', passwd='root',
                           db='servers')
    exec_sql = 'SELECT `ip` FROM `servers`'
    cur = conn.cursor()
    cur.execute(exec_sql)
    for ip in cur:
        redis_client.set(ip, '', ex=60)
    conn.close()
    logger.info('reload servers scheduler stop')


if __name__ == '__main__':
    app.run()

在子进程中,则可以直接通过 redis 来读取数据。

4 解决 redis 写效率

测试过程中,发现 redis 从远端 mysql 获取到数据写到本地,速度是有点慢的,几千条数据需要花费几秒才能结束,所以针对写操作部分,进行了一定的优化。

使用 pipeline 的方式批量写入 redis:

def reload_server(logger):
    logger.info('reload servers scheduler start')
    conn = pymysql.connect(host='127.0.0.1', port=3306,
                           user='root', passwd='root',
                           db='servers')
    exec_sql = 'SELECT `ip` FROM `servers`'
    cur = conn.cursor()
    cur.execute(exec_sql)
    with redis_client.pipeline(transaction=False) as pipe:
        for ip in cur:
            pipe.set(ip, '', ex=60)
        pipe.execute()
    conn.close()
    logger.info('reload servers scheduler stop')