Routing tasks to specific queues

Some tasks can be VIP tasks, they demand a new queue in itself! This is also done so that they are not competing with slow tasks or low-priority tasks.

In the previous post only we got a gist of how can we achieve it by defining a queue argument in the decorator. In this post, we are going to configure manual routing so that all our task routing information can be in one place. It is much easier to keep a note of tasks, the load on a queue and to easily modify it if needed.

Let's start by defining a TASK_ROUTE property in our config. We would like it to be prepended by CELERY_ namespace so that Celery can extract all the necessary setting variables. This is our config.py with CELERY_TASK_ROUTES . This configuration makes sure that all tasks in the file named tasks.py are routed to high_priority queue. The tasks in low_priority_tasks.py to low_priority queue.
 

import os
from dotenv import load_dotenv

load_dotenv()


class Config:
	CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL","redis://127.0.0.1:6379/0")
	CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND","redis://127.0.0.1:6379/0")
	CELERY_TASK_ROUTES = {
    'tasks.*': {
        'queue': 'high_priority',
    },
    'low_priority_tasks.*': {
        'queue': 'low_priority',
    },
	}


settings = Config()


Let's modify our main.py file. We can tell Celery to take the configuration information from the settings instance. Below is our updated main.py file. Notice the celery.config_from_object(settings, namespace="CELERY")

from fastapi import FastAPI
from celery import Celery

from config import settings
from tasks import send_notification
from low_priority_tasks import generate_transaction_report


app = FastAPI()

celery = Celery()
celery.config_from_object(settings, namespace="CELERY")


@app.get("/push/{device_token}")
async def notify(device_token: str):
    send_notification.delay(device_token)
    return {"message": "Notification sent"}


@app.get("/export/report")
async def notify():
    generate_transaction_report.delay()
    return {"message": "Generating report"}

Let's not define the queue argument in below tasks.py. This time we are going to rely on CELERY_TASK_ROUTES for the routing.

import time
from celery import shared_task


@shared_task(bind=True)
def send_notification(self,device_token: str):
    try:
        time.sleep(1)
    except Exception as e:
        raise self.retry(exc=e, countdown=10, max_retries=3)

Finally, we need to put our report_generation task in a file named low_priority_tasks.py.

import time
from celery import shared_task


@shared_task
def generate_transaction_report(user_id=1):
    time.sleep(5)
    return True

That's it, now all we need is to start uvicorn, high_priority worker, low_priority worker, and flower to monitor.
uvicorn main:app --reload
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A main.celery worker -Q high_priority --loglevel=info -n higher_priority
celery -A main.celery flower --port=5555
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A main.celery worker -Q low_priority --loglevel=info -n lower_priority_queue

 

If you hit the route for sending a notification the notification task should be routed to the high priority queue, because this task resides in the files named tasks.py. If we invoke report generation this task would be routed to low_priority_queue.

FastAPITutorial

Brige the gap between Tutorial hell and Industry. We want to bring in the culture of Clean Code, Test Driven Development.

We know, we might make it hard for you but definitely worth the efforts.

Contacts

Refunds:

Refund Policy
Social

Follow us on our social media channels to stay updated.

© Copyright 2022-23 Team FastAPITutorial