Let’s look at how to implement authentication and authorization in FastAPI. I’m going to assume you use VS Code or an IDE that’s capable of automatically resolving the required import
s. There’s already a page in the FastAPIdocumentation about authentication but I’m going to extend that information to show you how to add authorization using JWT where your roles sit in a database. And it’s going to be so easy.
When we’re all done we’ll be protecting routes like this
@router.post("/", tags=["Notes"])
@security.authorize(roles='user,admin')
async def create_note(note_create:schemas.NoteCreate, db: Session = Depends(get_db), token_data:TokenData=Depends(security.get_token_data)):
#do something that only users with the user and admin profiles can do
return Response(status_code = status.HTTP_201_CREATED)
Here, only authenticated users with the user
or admin
role can execute the POST
method. All of the checking is handled transparently, and if the user is not authenticated or authorized it will return a 400
-Unauthorized 401
-Forbidden response.
Database setup
My project is using the SQLAlchemy ORM so I am storing these entities in a database. This is the code I used to define the models.
database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from utils import settings
engine = create_engine(settings.Config.SQLALCHEMY_DATABASE_URL, connect_args={'check_same_thread':False}, future=True, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
models.py
role_users = Table("user_roles", Base.metadata,
Column("role_id", ForeignKey("roles.id"), primary_key=True),
Column("user_id", ForeignKey("users.id"), primary_key=True))
class User(Base):
__tablename__="users"
id = Column(Integer, primary_key=True)
hashed_password = Column(String)
roles = relationship("Role", secondary="user_roles", back_populates="users")
class Role(Base):
__tablename__ ="roles"
id = Column(Integer, primary_key=True)
description = Column(String, unique=True,index=True)
users = relationship("User", secondary="user_roles", back_populates="roles")
Since a User can have multiple roles assigned to them, and a Role can be assigned multiple users this is called a Many:Many relationship. Another place you might see this kind of relationship is when you want to tag a keyword to a blog post. The role_users
table relates to the Users
and Roles
tables as follows
We need a way to load the User record in a way that also loads its associated roles
In my crud.py
file I have something like the following (assuming the load is by email_address
). Note how it’s doing a joinedload, joining the User
to the Roles
table and populating the roles
property.
def get_user_by_email(db:Session, email:str):
return db.query(models.User).options(joinedload(models.User.roles)).where(models.User.email_address == email).one_or_none()
This method is going to be called from the route method which is going to instantiate the database session for us using dependency injection.
note_routes.py
In every route in my application I have the following code
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close
Here I create a session database session and yield it to the caller, when the caller is done with it the session will be closed.
Here is an example of a route injecting the database session
@router.get("/document/src/{chat_src}", tags=["Notes"], response_model=List[schemas.NoteForList])
@security.authorize(roles='user')
async def get_notes_by_document(chat_src:str, db: Session = Depends(get_db), token_data:TokenData=Depends(security.get_token_data)):
return crud.get_notes_by_document_str(db, chat_src)
You can see here that get_db
is injected as a parameter into get_notes_by_document
endpoint.
FastAPI and JWT
In my project when the user logs in, they receive an encrypted JWT that has the sub
, and roles
claims. This JWT will then be passed to the API for authentication purposes in subsequent requests. So an Angular service calling this API will have code which looks something like this.
identifyme(token:string){
let headers = {'Authorization': `Bearer ${token}`};
return this.httpClient.get(`${environment.apiUrl}/identify/me`,{headers:headers})
}
You can see the token is being passed in the request Authorization
header with a Bearer
prefix.
Let’s see how this token is generated on login
Because I am using oauth2
the login route must accept a form at /token
so that route looks like the below. Notice this Depends
method being called as well.
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate":"Bearer"}
)
@app.post("/token", response_model=Token)
async def login(form_data:Annotated[OAuth2PasswordRequestForm,Depends()], db:Session=Depends(get_db)):
user = authenticate_user(form_data.username, form_data.password,db)
if not user:
raise credentials_exception
access_token_expires = datetime.timedelta(minutes=int(Config.ACCESS_TOKEN_EXPIRE_IN_MINUTES))
access_token = create_access_token(data={"sub":user.email,"id":user.id, "roles":",".join([r.description for r in user.roles])}, expires_delta=access_token_expires)
return Token(access_token=access_token, token_type="bearer")
You can see that authenticate_user
requires a username
and password
to do the authentication.
def get_user(email_address:str, db:Session):
db_user = crud.get_user_by_email(db, email=email_address)
if db_user is None:
raise credentials_exception
return db_user
def authenticate_user(email_address:str, password:str, db:Session):
user = get_user(email_address, db)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
Another thing to notice is the response_model
which is a Token
class. It is an encrypted representation of the TokenData
class.
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str
roles:Union[List[str], None] = []
id:int
Here is the create_access_token
method
def create_access_token(data:dict, expires_delta:Union[datetime.timedelta, None]=None):
to_encode = data.copy()
if expires_delta:
expire = datetime.datetime.now(datetime.timezone.utc) + expires_delta
else:
expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(minutes=Config.ACCESS_TOKEN_EXPIRE_IN_MINUTES)
to_encode.update({"exp":expire})
encoded_jwt = jwt.encode(to_encode, Config.JWT_SECRET_KEY, algorithm=Config.JWT_ALGORITHM)
return encoded_jwt
It accepts a dictionary that can map to the class TokenData
and return a Token
where property access_token
is the encrypted version of TokenData
.
Here is the complete flow
Bringing it all together
I have added some records into my tables like so
The wrapper
Recall that earlier I showed you that we will be protecting our roles like so
@router.post("/?operation=split", tags=["PdfConversation"])
@security.authorize(roles='admin,system,user')
async def rebuild_pdf(item: PdfRebuildInstructions, token_data:TokenData=Depends(security.get_token_data)):
fn = os.path.join(settings.BASE_FILES_DIRECTORY, item.id,f"{item.id}")
if (not os.path.exists(fn)):
return HTTPException(status_code=404, detail="Document not found")
pdf = Pdf.open(fn)
You can see that token_data
is injected into the route after being retrieved by method get_token_data
in security.py
And you can see that security.authorize
is a wrapper function.
security.py
def get_token_data(token:Annotated[str , Depends(oauth2_scheme)]) -> TokenData:
try:
payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=Config.JWT_ALGORITHM)
username:str = payload.get("sub")
if username is None:
raise credentials_exception
roles:List[str] = payload.get("roles").split(',')
result = TokenData(username=username,roles=roles,id=payload.get("id"))
return result
except jwt.InvalidTokenError:
raise credentials_exception
def authorize(roles: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs, ):
user_roles = kwargs.get("token_data").roles
for n in roles.split(','):
if n in user_roles:
return await func(*args, **kwargs)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Forbidden")
return wrapper
return decorator
We can see get_token_data
extracts the encrypted token passed in via the request headers and validates it. If the token is invalid a 401-Unauthorized
exception will be returned. It makes available an instance of the TokenData
class we saw earler.
And we can finally see that the wrapper function authorize
uses this TokenData
class to compare the user’s roles to the roles requires for executing the given route. If the user does not have the roles, it will raise a 403-Forbidden
exception.
There you have it.