Django测试Celery

如果我们想测试celery任务是否执行正确, 我们可以把任务函数当作普通函数来测试, 但是还有一种情况是测试接口, 接口调用celery任务, 这样应该怎么测试呢?

如果我们启动临时的broker和worker, 天啦, 这样太麻烦了, 怎么可以不启动服务, 还能测试celery任务是否正确呢?

在Django里可以这样做: 在测试函数里加上装饰器@override_settings(task_always_eager=True) 这样运行测试的时候就会直接执行任务, 而不需要启动celery服务

from django.contrib.auth.models import User
from django.test import TestCase, override_settings
from rest_framework.test import APIClient


class ReportTest(TestCase):

    def setUp(self):
        self.client = APIClient()
        user = User.objects.first()
        self.client.force_authenticate(user=user)

    @override_settings(task_always_eager=True)
    def test_stock_daily_summary_report(self):
        post_data = {
            "company_id": 5,
            "report_name": "stock_daily_summary_detail",
            "query_params": {"date_from": "2019-08-27", "date_to": "2019-08-27"}
        }
        response = self.client.post('/api/report/tasks/', post_data, format='json')
        self.assertContains(response, '备注')