Celery + RabbitMQ

参考资料

用 celery 和 rabbitmq 做异步任务调度
celery 官方文档
任务队列在 Web 服务里的应用

启动 broker

macOS10.13

$ brew install rabbitmq
==> Summary
🍺  /usr/local/Cellar/rabbitmq/3.7.4: 232 files, 12.6MB, built in 2 seconds

可以看到安装的路径是/usr/local/Cellar/rabbitmq/3.7.4,需要将此路径加入到环境变量里,这样才能直接输入 rabbitmq-server 启动,而不用输入全部路径,将下面的内容添加到.zshrc里(注:不用的 shell 不同的文件,这里以 zsh 为例):

PATH=$PATH:/usr/local/Cellar/rabbitmq/3.7.4/sbin

启动

# 守护进程启动服务
$ sudo rabbitmq-server -detached

Ubuntu

sudo apt-get install rabbitmq-server

安装之后就自动启动了
可以用这个命令来看状态

systemctl status rabbitmq-server

也可以停止和重启

创建用户密码和 virtual host:

https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/rabbitmq.html

我们在服务器上运行了 rabbitmq, 可能要供多个应用使用, 那么我们可以给不同应用创建不同的用户和 virtual host:

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

之后在应用里的配置需要用到, 上面的文档里也有讲到
virtual host 的理解: https://www.cnblogs.com/jxxblogs/p/12260029.html

docker

https://hub.docker.com/_/rabbitmq

docker run -d --restart always --hostname systest-rabbit --name systest-rabbit -p 5672:5672 -e RABBITMQ_DEFAULT_USER=simu -e RABBITMQ_DEFAULT_PASS=simu123 -e RABBITMQ_DEFAULT_VHOST=simu rabbitmq:latest

this command have config user, password and vhost

安装 Celery

$ pip install celery

Celery 应用

创建一个 celery 应用 application, 这个 app 是入口, 可以创建任务, 管理 worker 等等 我在 Django 项目下, 在 app 目录下创建一个 tasks.py 文件, 代码示例如下:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

在 Flask application 里, 可以这么写:

from celery import Celery
from celery.schedules import crontab

from . import create_app


def make_celery(flask_app):
    """celery factory
    """
    celery = Celery(flask_app.import_name)
    celery.conf.update(flask_app.config["CELERY_CONFIG"])

    class ContextTask(celery.Task):
        """celery task context
        """

        def __call__(self, *args, **kwargs):
            with flask_app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery


app = make_celery(create_app())

app.conf.beat_schedule = {
    'cal_everyday': {
        'task': 'surge.tasks.my_task',
        'schedule': crontab(hour=1),
    },
}
app.conf.timezone = 'Asia/Shanghai'


@app.task
def cal_task():
    add.delay(1, 2)


@app.task
def add(x, y):
  print(x + y)

start Celery worker

celery -A tasks worker --loglevel=info -f /tmp/surge/logs/celery-worker.log

-A指的是 APP,就是当前的 tasks.py,如果在某个路径下, 则输入完整路径, 例如mmflowproject.tasks, worker 后面可以加-c 2 制定 worker 数量 如果要守护进程运行, 需要用到 supervisord, 暂且不表
-f is log file path

在 Django 里面可以加上配置 DJANGO_SETTINGS_MODULE='fsp.settings_env' celery -A fsp -l info worker

start celery beat

celery -A surge.tasks beat

caution: celery beat can’t work alone, it must work with celery worker, because task is call in the worker, not in beat, beat is only schedule, so we need to start celery worker first. or, if your only have beat task, and only need one worker, you can use this command:

celery -A surge.tasks worker -B

触发任务

进入 python shell:

>>> from tasks import add
>>> add.delay(1, 2)
<AsyncResult: 8d1b69e5-dd51-4194-9d9e-4e5f68a94d81>

我们在 celery worker 下面也会看到日志:

...
[2019-06-27 11:37:14,038: INFO/MainProcess] Received task: mmflowproject.tasks.add[d4ab8105-d267-4deb-9957-505d8810cddc]
[2019-06-27 11:37:14,047: INFO/ForkPoolWorker-2] Task mmflowproject.tasks.add[d4ab8105-d267-4deb-9957-505d8810cddc] succeeded in 0.0011335039999949004s: 3

这样就是一个基本的 Celery+RabbitMQ 的一个基本的过程

celery command

  • get active tasks
celery -A surge.tasks inspect active
  • get reserved tasks
celery -A surge.tasks inspect reserved

测试

测试采用 mock 的方式, task 函数单独测试, 参照官方文档https://docs.celeryproject.org/en/latest/userguide/testing.html

其他

自定义 task id

add.apply_async(args, kwargs, task_id=i)
add.apply_async((1, 4), task_id=i)

根据 id 获取 task

from celery.result import AsyncResult
from cel.tasks import app

task = app.AsyncResult('432890aa-4f02-437d-aaca-1999b70efe8d')
task.state

更新 state

https://docs.celeryq.dev/en/latest/userguide/tasks.html#custom-states

根据 id 获取的 task, 如果这个 task 不存在, task 的 state 是 PENDING, 如果 task 还在执行中, state 也是 PENDING, 如何区分呢?
我们可以更新 task 的 state:

@celery.task(bind=True)
def push_version(self, token, task_id, version_name, car_id):
    """往车上推送版本
    """
    self.update_state(state="PROGRESS", meta={"current": 1, "total": 100})
    return

meta 信息可以通过 task.result 获取

前端获取进度

https://buildwithdjango.com/blog/post/celery-progress-bars/

kill subprocess in celery task

celery task contains subprocess process, if we revoke celery task, the subprocess task will not be killed autonomously, if we want kill them, we can follow this code:

import os
import subprocess

from app import celery_app

def kill_subprocesses_decorator(original_function):
    """kill subprocesses decorator
    """

    def wrapper_function(*args, **kwargs):
        subprocesses = []
        try:
            original_function(subprocesses, *args, **kwargs)
        except Exception:
            print("exception * 100")
            print(subprocesses)
            for pid in subprocesses:
                os.killpg(pid, signal.SIGTERM)
        return

    return wrapper_function

@celery_app.task
@kill_subprocesses_decorator
def my_task(subprocesses, my_arg):
    ...
    process = subprocess.Popen(
        command, shell=True, stdout=subprocess.PIPE,
        stderr=subprocess.PIPE, universal_newlines=True,
        preexec_fn=os.setsid)
    subprocesses.append(os.getpgid(process.pid))
    ...

if we revoke:

celery_app.control.revoke(celery_task_id, terminate=True)

subprocess will be killed

kill all tasks

from celery.result import AsyncResult
from app import celery_app

task_ids = []
active_tasks = celery_app.control.inspect().active().values()
pending_tasks = celery_app.control.inspect().reserved().values()
for task in active_tasks:
    for t in task:
        task_ids.append(t["id"])
for task in pending_tasks:
    for t in task:
        task_ids.append(t["id"])

for task_id in task_ids:
    print(task_id)
    result = AsyncResult(task_id, app=celery_app)
    result.revoke(terminate=True)