Let’s look at how to implement authentication and authorization in FastAPI. I’m going to assume you use VS Code or an IDE that’s capable of automatically resolving the required imports. There’s already a page in the FastAPIdocumentation about authentication but I’m going to extend that information to show you how to add authorization using JWT where your roles sit in a database. And it’s going to be so easy.

When we’re all done we’ll be protecting routes like this

@router.post("/", tags=["Notes"])
@security.authorize(roles='user,admin')
async def create_note(note_create:schemas.NoteCreate,  db: Session = Depends(get_db), token_data:TokenData=Depends(security.get_token_data)):                    
    #do something that only users with the user and admin profiles can do
    return Response(status_code = status.HTTP_201_CREATED)

Here, only authenticated users with the user or admin role can execute the POST method. All of the checking is handled transparently, and if the user is not authenticated or authorized it will return a 400-Unauthorized 401-Forbidden response.

Database setup

My project is using the SQLAlchemy ORM so I am storing these entities in a database. This is the code I used to define the models.

database.py

from sqlalchemy import create_engine

from sqlalchemy.orm import sessionmaker

from utils import settings

engine = create_engine(settings.Config.SQLALCHEMY_DATABASE_URL, connect_args={'check_same_thread':False}, future=True, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

models.py

role_users = Table("user_roles", Base.metadata,
                   Column("role_id", ForeignKey("roles.id"), primary_key=True),
                   Column("user_id", ForeignKey("users.id"), primary_key=True))

class User(Base):
    __tablename__="users"
    id = Column(Integer, primary_key=True)    
    hashed_password = Column(String)    
    roles = relationship("Role", secondary="user_roles", back_populates="users")
    

class Role(Base):
    __tablename__ ="roles"
    id = Column(Integer, primary_key=True)
    description = Column(String, unique=True,index=True)
    users = relationship("User", secondary="user_roles", back_populates="roles")

Since a User can have multiple roles assigned to them, and a Role can be assigned multiple users this is called a Many:Many relationship. Another place you might see this kind of relationship is when you want to tag a keyword to a blog post. The role_users table relates to the Users and Roles tables as follows

many 2 many relationship

We need a way to load the User record in a way that also loads its associated roles

In my crud.py file I have something like the following (assuming the load is by email_address). Note how it’s doing a joinedload, joining the User to the Roles table and populating the roles property.

def get_user_by_email(db:Session, email:str):
    return db.query(models.User).options(joinedload(models.User.roles)).where(models.User.email_address == email).one_or_none()            

This method is going to be called from the route method which is going to instantiate the database session for us using dependency injection.

note_routes.py

In every route in my application I have the following code

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close

Here I create a session database session and yield it to the caller, when the caller is done with it the session will be closed.

Here is an example of a route injecting the database session

@router.get("/document/src/{chat_src}", tags=["Notes"], response_model=List[schemas.NoteForList])
@security.authorize(roles='user')
async def get_notes_by_document(chat_src:str,  db: Session = Depends(get_db), token_data:TokenData=Depends(security.get_token_data)):                    
    return crud.get_notes_by_document_str(db, chat_src)
    

You can see here that get_db is injected as a parameter into get_notes_by_document endpoint.

FastAPI and JWT

In my project when the user logs in, they receive an encrypted JWT that has the sub, and roles claims. This JWT will then be passed to the API for authentication purposes in subsequent requests. So an Angular service calling this API will have code which looks something like this.

identifyme(token:string){
    let headers = {'Authorization': `Bearer ${token}`};
    return this.httpClient.get(`${environment.apiUrl}/identify/me`,{headers:headers})
  }

You can see the token is being passed in the request Authorization header with a Bearer prefix.

Let’s see how this token is generated on login

Because I am using oauth2 the login route must accept a form at /token so that route looks like the below. Notice this Depends method being called as well.


credentials_exception = HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate":"Bearer"}
        )

@app.post("/token", response_model=Token)
async def login(form_data:Annotated[OAuth2PasswordRequestForm,Depends()], db:Session=Depends(get_db)):
    user = authenticate_user(form_data.username, form_data.password,db)
    if not user:
        raise credentials_exception
    access_token_expires = datetime.timedelta(minutes=int(Config.ACCESS_TOKEN_EXPIRE_IN_MINUTES))
    access_token = create_access_token(data={"sub":user.email,"id":user.id, "roles":",".join([r.description for r in user.roles])}, expires_delta=access_token_expires)
    return Token(access_token=access_token, token_type="bearer")    

You can see that authenticate_user requires a username and password to do the authentication.

def get_user(email_address:str, db:Session):
    db_user = crud.get_user_by_email(db, email=email_address)
    if db_user is None:
        raise credentials_exception
    return db_user        

def authenticate_user(email_address:str, password:str, db:Session):
    user = get_user(email_address, db)
    if not user:
        return False    
    if not verify_password(password, user.hashed_password):
        return False    
    return user

Another thing to notice is the response_model which is a Token class. It is an encrypted representation of the TokenData class.

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str
    roles:Union[List[str], None] = []
    id:int

Here is the create_access_token method

def create_access_token(data:dict, expires_delta:Union[datetime.timedelta, None]=None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.datetime.now(datetime.timezone.utc) + expires_delta
    else:
        expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=Config.ACCESS_TOKEN_EXPIRE_IN_MINUTES)
    to_encode.update({"exp":expire})
    encoded_jwt = jwt.encode(to_encode, Config.JWT_SECRET_KEY, algorithm=Config.JWT_ALGORITHM)
    return encoded_jwt

It accepts a dictionary that can map to the class TokenData and return a Token where property access_token is the encrypted version of TokenData.

Here is the complete flow

auth flow

Bringing it all together

I have added some records into my tables like so

users roles user_roles

The wrapper

Recall that earlier I showed you that we will be protecting our roles like so

@router.post("/?operation=split", tags=["PdfConversation"])
@security.authorize(roles='admin,system,user')
async def rebuild_pdf(item: PdfRebuildInstructions, token_data:TokenData=Depends(security.get_token_data)):        
    fn = os.path.join(settings.BASE_FILES_DIRECTORY, item.id,f"{item.id}")
    if (not os.path.exists(fn)):
        return HTTPException(status_code=404, detail="Document not found")
    pdf = Pdf.open(fn)

You can see that token_data is injected into the route after being retrieved by method get_token_data in security.py

And you can see that security.authorize is a wrapper function.

security.py

def get_token_data(token:Annotated[str , Depends(oauth2_scheme)]) -> TokenData:            
    try:        
        payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=Config.JWT_ALGORITHM)                        
        username:str = payload.get("sub")
        if username is None:
            raise credentials_exception
        roles:List[str] = payload.get("roles").split(',')
        result = TokenData(username=username,roles=roles,id=payload.get("id"))                                
        return result
    except jwt.InvalidTokenError:
        raise credentials_exception  

def authorize(roles: str):
    def decorator(func):        
        @wraps(func)
        async def wrapper(*args, **kwargs, ):            
            user_roles = kwargs.get("token_data").roles            
            for n in roles.split(','):
                if n in user_roles:                    
                    return await func(*args, **kwargs)            
            raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")            
        return wrapper
    return decorator

We can see get_token_data extracts the encrypted token passed in via the request headers and validates it. If the token is invalid a 401-Unauthorized exception will be returned. It makes available an instance of the TokenData class we saw earler.

And we can finally see that the wrapper function authorize uses this TokenData class to compare the user’s roles to the roles requires for executing the given route. If the user does not have the roles, it will raise a 403-Forbidden exception.

There you have it.