Some tasks can be VIP tasks, they demand a new queue in itself! This is also done so that they are not competing with slow tasks or low-priority tasks.

In the previous post only we got a gist of how can we achieve it by defining a queue argument in the decorator. In this post, we are going to configure manual routing so that all our task routing information can be in one place. It is much easier to keep a note of tasks, the load on a queue and to easily modify it if needed.

Let's start by defining a TASK_ROUTE property in our config. We would like it to be prepended by CELERY_ namespace so that Celery can extract all the necessary setting variables. This is our with CELERY_TASK_ROUTES . This configuration makes sure that all tasks in the file named are routed to high_priority queue. The tasks in to low_priority queue.

import os
from dotenv import load_dotenv


class Config:
	CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL","redis://")
	CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND","redis://")
    'tasks.*': {
        'queue': 'high_priority',
    'low_priority_tasks.*': {
        'queue': 'low_priority',

settings = Config()

Let's modify our file. We can tell Celery to take the configuration information from the settings instance. Below is our updated file. Notice the celery.config_from_object(settings, namespace="CELERY")

from fastapi import FastAPI
from celery import Celery

from config import settings
from tasks import send_notification
from low_priority_tasks import generate_transaction_report

app = FastAPI()

celery = Celery()
celery.config_from_object(settings, namespace="CELERY")

async def notify(device_token: str):
    return {"message": "Notification sent"}

async def notify():
    return {"message": "Generating report"}

Let's not define the queue argument in below This time we are going to rely on CELERY_TASK_ROUTES for the routing.

import time
from celery import shared_task

def send_notification(self,device_token: str):
    except Exception as e:
        raise self.retry(exc=e, countdown=10, max_retries=3)

Finally, we need to put our report_generation task in a file named

import time
from celery import shared_task

def generate_transaction_report(user_id=1):
    return True

That's it, now all we need is to start uvicorn, high_priority worker, low_priority worker, and flower to monitor.
uvicorn main:app --reload
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A main.celery worker -Q high_priority --loglevel=info -n higher_priority
celery -A main.celery flower --port=5555
watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A main.celery worker -Q low_priority --loglevel=info -n lower_priority_queue


If you hit the route for sending a notification the notification task should be routed to the high priority queue, because this task resides in the files named If we invoke report generation this task would be routed to low_priority_queue.


