Authorization in FastAPI

Authorization and authentication are 2 different topics. Authentication is related to login and authorization is related to permission. Even if a person is logged in he/she may not have the necessary permissions. Consider our blog has admins and users. Now, anyone who knows our endpoints may make a put request and change our post! Even worse if a maniac person puts a delete request for each of our blog posts! 😭

So, we need to verify if the person making the request has the necessary permissions. In our case, we want only the authors or the superuser to be able to modify or delete the post. Enough talk let's see some code. Before that, do you remember dependencies? We had created a dependency get_db which allows us to supply a database connection to each request. This time also, we are going to create a dependency to identify the current_user. We would need the below code in apis > v1 > route_login.py

from datetime import timedelta

from core.config import settings  #new
from core.hashing import Hasher
from core.security import create_access_token
from db.repository.login import get_user
from db.session import get_db
from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from fastapi import status
from fastapi.security import OAuth2PasswordRequestForm,OAuth2PasswordBearer #new
from jose import JWTError, jwt   #new
from schemas.tokens import Token
from sqlalchemy.orm import Session


router = APIRouter()
...


@router.post("/token", response_model=Token)
def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)
):
    ###
    ###

 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")

def get_current_user(token: str = Depends(oauth2_scheme), db: Session= Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials"
    )

    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(email=username, db=db)
    if user is None:
        raise credentials_exception
    return user
  • We are decoding the token if it is present and we are validating the user. Once we have a user, we are simply returning it. Thus, we get access to the current user.
  • This we can use as a dependency to identify the current user.
  • We can also compare the author_id of the current user with that of the one who is trying to delete a blog and we can allow or disallow.

Let's integrate permissions with our post view to attach a real user id instead of the hardcoded user id. And we are also going to attach this dependency with our route for deletion, So, that only the users who have created the post or is superuser then only can delete the blog post.

from typing import List
...
from sqlalchemy.orm import Session
from db.models.users import User #new
from apis.v1.route_login import get_current_user  #new

router = APIRouter()

@router.put("/blog/{id}", response_model=ShowBlog)
def update_a_blog(id: int, blog: UpdateBlog, db: Session = Depends(get_db), current_user: User=Depends(get_current_user)):
    blog = update_blog(id=id, blog=blog, author_id=current_user.id, db=db)
    if isinstance(blog,dict):
        raise HTTPException(
            detail=blog.get("error"),
            status_code=status.HTTP_404_NOT_FOUND,
        )
    return blog

@router.delete("/delete/{id}")
def delete_a_blog(id: int, db: Session = Depends(get_db), current_user: User=Depends(get_current_user)):
    message = delete_blog(id=id, author_id=current_user.id, db=db)
    if message.get("error"):
        raise HTTPException(
            detail=message.get("error"), status_code=status.HTTP_400_BAD_REQUEST
        )
    return {"msg": f"Successfully deleted blog with id {id}"}

Now, the only thing left is to write some code in db > repository >blog.py to take decisions on the basis of the current user.

def update_blog(id: int, blog: UpdateBlog, author_id: int, db: Session):
    blog_in_db = db.query(Blog).filter(Blog.id == id).first()
    if not blog_in_db:
        return {"error":f"Blog with id {id} does not exist"}
    if not blog_in_db.author_id == author_id:                   #new
        return {"error":f"Only the author can modify the blog"}
    blog_in_db.title = blog.title
    blog_in_db.content = blog.content
    db.add(blog_in_db)
    db.commit()
    return blog_in_db


def delete_blog(id: int, author_id: int, db: Session):
    blog_in_db = db.query(Blog).filter(Blog.id == id)
    if not blog_in_db.first():
        return {"error": f"Could not find blog with id {id}"}
    if not blog_in_db.first().author_id ==author_id:             #new
        return {"error":f"Only the author can delete a blog"}
    blog_in_db.delete()
    db.commit()
    return {"msg": f"deleted blog with id {id}"}

All done, let's give it a try.🙈

FastAPITutorial

Brige the gap between Tutorial hell and Industry. We want to bring in the culture of Clean Code, Test Driven Development.

We know, we might make it hard for you but definitely worth the efforts.

Contacts

Refunds:

Refund Policy
Social

Follow us on our social media channels to stay updated.

© Copyright 2022-23 Team FastAPITutorial