Celery + RabbitMQ signal实现同类型任务同步顺序执行

task.py 代码如下:

"""
celery appp
"""
import time

from celery import Celery
from celery.signals import before_task_publish

app = Celery('tasks', broker='pyamqp://guest@localhost//')


@before_task_publish.connect
def handler(sender=None, headers=None, body=None, **kwargs):
    task_id = headers["id"]
    x = body[0][0]
    websocket_key = headers.get("websocket_key")
    while True:
        ai = app.control.inspect()
        active_tasks = ai.active().get("celery@yangle-legion")
        idle = True
        for t in active_tasks:
            if t["args"][0] == x and t["id"] != task_id:
                idle = False
        if idle:
            return
        time.sleep(2)


@app.task
def add(x, y):
    """add async task
    """
    print(f"{x} {y} task start")
    for i in range(10):
        time.sleep(1)
        print(f"{x} {y} {i}")
    print(f"result is {x + y}")
    return x + y

启动 celery worker:

celery -A tasks worker --loglevel=INFO --concurrency=10 --logfile=celery.log -D

启动 task

>>> from tasks import add
>>> add.delay(1, 2)
>>> add.delay(1, 3)

也可以用apply_async传递额外的参数

add.apply_async((2, 2), headers={"websocket": "bbb"})
add.delay(2, 3)

这两个函数的第一个参数相同, 按照上面的逻辑, 第一个 task 执行完之后, 第二个 task 才会执行
可以通过日志看到效果:

tail -f celery.log