Debugging with AsyncResult

Let us consider a scenario, We provide a utility that lets our clients export CSV all of their web requests to our server up to the last 3 months.

The client tries to export the sheet at 10 A.M. They have not received the CSV file in their mail at 9:30 A.M. Now, they have reached out to us saying.
 

We exported the CSV at 9, still not received. We need the results till EOD?
- A Frustrated Client 😤

To this, we ask if they have the task id, that we provide them on request to export data. They provide us with the task id(very rare!). We tried looking up the task id in the logs. However, because of log rotation, the logs are gone!
This is the moment we can utilize another tool in our Armory. Introducing ... AsyncResult 🗡️.

When we initiate a background task, celery returns an instance of AsyncResult. This AsyncResult also provides various utility properties like status, result, etc to keep track of our task. Let's see a working example of it. Let's modify our celery task so that it produces an error.

@celery.task
def send_notification(device_token: str):
    logger.info("starting background task")
    time.sleep(10)  # simulates slow network call
    a=11/0   #results in error
    logger.info(f"notification sent {device_token}")

We can simulate this task by hitting a request from docs/ making an API call to the related fastapi route.  When you make the request, notice the task_id being printed on the terminal or log file. We can utilize this task id to inspect the state of our background task. We can also try it out in the terminal.

(env) fastapi@fastapitutorial:~/$ python
Python 3.9.5 


>>> from main import celery
>>> from tasks import send_notification
>>> send_notification.delay("123")
<AsyncResult: d2b6118f-c6ac-4f8a-80ce-22a06d9cec5e>
>>> 
>>> 
>>> task = celery.AsyncResult("d2b6118f-c6ac-4f8a-80ce-22a06d9cec5e")
>>> task.status
'FAILURE'
>>> task.result
ZeroDivisionError('division by zero')
>>> 

There are many more available utility methods that e.g.

>>> dir(task)
['forget', 'get', 'get_leaf', 'graph', 'id', 'ignored', 'info', 'iterdeps', 'kwargs', 'maybe_reraise', 'maybe_throw', 'name', 'on_ready', 'parent', 'queue', 'ready', 'result', 'retries', 'revoke', 'state', 'status', 'successful', 'supports_native_join', 'task_id', 'then', 'throw', 'traceback', 'wait', 'worker']

We can also provide a route to our client so that they don't ping us again and again for the task status. We can utilize this AsyncResult to get the state of task and retun it as API response.

@app.get("/status/{task_id}")
async def task_status(task_id: str):
    task = celery.AsyncResult(task_id)
    if task.state == "SUCCESS":
        return {"status": "done", "result": task.result}
    elif task.state == "PENDING":
        return {"status": "pending"}
    else:
        return {"status": "failed"}

 

FastAPITutorial

Brige the gap between Tutorial hell and Industry. We want to bring in the culture of Clean Code, Test Driven Development.

We know, we might make it hard for you but definitely worth the efforts.

Contacts

Refunds:

Refund Policy
Social

Follow us on our social media channels to stay updated.

© Copyright 2022-23 Team FastAPITutorial