Periodic Tasks | Cronjob 🔁

Every business needs some periodic analysis, cleaning, and processing of data at night. To handle such common scenarios we can write shell script-based cronjobs. However, there is a much easier tool in the celery toolbox called beat. That's not it, We can utilize beat to perform serious tasks like remembering birthday wish to our boyfriend/girlfriend 😁.

Celery Beat works by using a dedicated worker process that runs in the background and polls a task queue to find tasks that need to be executed. The tasks themselves are defined in Celery as functions or methods and can be scheduled to run at specific times or intervals using a simple syntax. Let's define a celery beat schedule in our config.py

import os
from dotenv import load_dotenv

load_dotenv()


class Config:
    CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL","redis://127.0.0.1:6379/0")
    CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND","redis://127.0.0.1:6379/0")
    CELERY_TASK_ROUTES: dict= {
        'tasks.*': {
            'queue': 'high_priority',
        },
        'low_priority_tasks.*': {
            'queue': 'low_priority',
        },
    }


    #new
    CELERY_BEAT_SCHEDULE: dict = {
        "send_credit_report": {
            "task": "credit_report",
            "schedule": 10,
            'options': {'queue' : 'periodic'},
        },
    }
    CELERY_TIMEZONE:str = "US/Mountain"


settings = Config()


We have defined that celery beat should call a function decorated with @shared_task(name="credit_report") every 10 seconds. Workers working on a queue named periodic will process this task. That's not all, beat can handle different time formats like schedule, solar, crontab, and timedelta. Schedule and crontab are the most used formats.

In case you want your crontab formats to respect your timezone, use the CELERY_TIMEZONE key. Here is a list of timezones for your country/city:  Let us now define the function in tasks.py that celery beat will be calling repeatedly every 10 seconds.

import time
from celery import shared_task


@shared_task(bind=True)
def send_notification(self,device_token: str):
    return True


@shared_task(name="credit_report")
def send_credit_analysis():
    print("Generated and sent credit analysis to all active users")

Now, all we need is a beat worker, a worker on a queue named periodic, and flower dashboard.

celery -A main.celery beat -s /home/nofoobar/Documents/beatlog/
celery -A main.celery flower --port=5555
celery -A main.celery worker -Q periodic --loglevel=info -n periodic

If you visit the terminal with the periodic queue worker, Every 10 seconds you should see the task credit_report being processed 🎉.

FastAPITutorial

Brige the gap between Tutorial hell and Industry. We want to bring in the culture of Clean Code, Test Driven Development.

We know, we might make it hard for you but definitely worth the efforts.

Contacts

Refunds:

Refund Policy
Social

Follow us on our social media channels to stay updated.

© Copyright 2022-23 Team FastAPITutorial