"""FastAPI dependencies for authentication and authorization.
This module provides FastAPI dependency functions for protecting routes with JWT-based
authentication and authorization. Supports scope-based, role-based, permission-based,
and object-level ownership access control with configurable claim names for different
authorization servers.
Example::
from fastapi import FastAPI, Depends
from axioms_fastapi import require_auth, require_scopes, check_object_ownership, init_axioms
app = FastAPI()
init_axioms(app, AXIOMS_AUDIENCE="api.example.com", AXIOMS_DOMAIN="auth.example.com")
@app.get("/protected")
async def protected_route(payload=Depends(require_auth)):
return {"user": payload.sub}
@app.get("/admin")
async def admin_route(payload=Depends(require_auth), _=Depends(require_scopes(["admin"]))):
return {"message": "Admin access"}
@app.patch("/articles/{article_id}")
async def update_article(
article_id: int,
article=Depends(check_object_ownership(get_article))
):
return {"message": "Updated"}
"""
import logging
from typing import Callable, List
from box import Box
from fastapi import Depends, Request
from .config import AxiomsConfig, get_config
from .error import AxiomsHTTPException
from .helper import (
check_permissions,
check_roles,
check_scopes,
get_claim_from_token,
get_expected_issuer,
has_bearer_token,
has_valid_token,
)
logger = logging.getLogger(__name__)
[docs]
def require_auth(
request: Request,
config: AxiomsConfig = Depends(get_config),
safe_methods: List[str] = None,
) -> Box:
"""FastAPI dependency to require valid JWT authentication.
Validates the JWT access token in the Authorization header and returns the
validated payload for use in the route handler.
Args:
request: FastAPI Request object containing HTTP headers.
config: Axioms configuration (injected via dependency).
safe_methods: List of HTTP methods that skip authentication.
Defaults to ['OPTIONS'] for CORS preflight requests.
Returns:
Box: Validated JWT token payload with claims accessible as attributes.
Returns empty Box for safe methods.
Raises:
AxiomsHTTPException: If token is missing, invalid, or expired
(not raised for safe methods).
Example::
@app.get("/api/protected")
async def protected_route(payload=Depends(require_auth)):
user_id = payload.sub
return {"user_id": user_id}
Example with custom safe methods::
from functools import partial
# Allow GET and OPTIONS without auth
require_auth_safe = partial(require_auth, safe_methods=["GET", "OPTIONS"])
@app.get("/api/public-read")
async def public_route(payload=Depends(require_auth_safe)):
# Returns empty Box for GET requests
return {"data": "public"}
"""
# Default safe methods to OPTIONS for CORS preflight
if safe_methods is None:
safe_methods = ["OPTIONS"]
# Skip authentication for safe methods
if request.method in safe_methods:
return Box()
try:
token = has_bearer_token(request)
payload = has_valid_token(token, config)
return payload
except Exception as ex:
if hasattr(ex, "error") and hasattr(ex, "status_code"):
# AxiomsError from token validation
raise AxiomsHTTPException(
error=ex.error,
status_code=ex.status_code,
realm=get_expected_issuer(config) or "",
)
# Unexpected error
raise AxiomsHTTPException(
{
"error": "unauthorized_access",
"error_description": "Authentication failed",
},
401,
get_expected_issuer(config) or "",
)
[docs]
def require_scopes(required_scopes: List[str]) -> Callable:
"""Create a FastAPI dependency to enforce scope-based authorization.
Checks if the authenticated user's token contains any of the required scopes.
Uses OR logic: the token must have at least ONE of the specified scopes.
Args:
required_scopes: List of required scope strings.
Returns:
Callable: FastAPI dependency function that enforces scope check.
Raises:
AxiomsHTTPException: If token doesn't contain required scopes.
Example (OR logic - requires EITHER scope)::
@app.get("/api/resource")
async def resource_route(
payload=Depends(require_auth),
_=Depends(require_scopes(["read:resource", "write:resource"]))
):
return {"data": "protected"}
Example (AND logic - requires BOTH scopes via chaining)::
@app.get("/api/strict")
async def strict_route(
payload=Depends(require_auth),
_=Depends(require_scopes(["read:resource"])),
__=Depends(require_scopes(["write:resource"]))
):
return {"data": "requires both scopes"}
"""
def scope_dependency(
payload: Box = Depends(require_auth), config: AxiomsConfig = Depends(get_config)
) -> None:
"""Dependency function to check scopes."""
# Get scope from configured claim names
token_scope = get_claim_from_token(payload, "SCOPE", config) or ""
if not check_scopes(token_scope, required_scopes):
raise AxiomsHTTPException(
{
"error": "insufficient_permission",
"error_description": "Insufficient role, scope or permission",
},
403,
get_expected_issuer(config) or "",
)
return scope_dependency
[docs]
def require_roles(required_roles: List[str]) -> Callable:
"""Create a FastAPI dependency to enforce role-based authorization.
Checks if the authenticated user's token contains any of the required roles.
Uses OR logic: the token must have at least ONE of the specified roles.
Args:
required_roles: List of required role strings.
Returns:
Callable: FastAPI dependency function that enforces role check.
Raises:
AxiomsHTTPException: If token doesn't contain required roles.
Example (OR logic - requires EITHER role)::
@app.get("/admin/users")
async def admin_route(
payload=Depends(require_auth),
_=Depends(require_roles(["admin", "superuser"]))
):
return {"users": []}
Example (AND logic - requires BOTH roles via chaining)::
@app.get("/admin/critical")
async def critical_route(
payload=Depends(require_auth),
_=Depends(require_roles(["admin"])),
__=Depends(require_roles(["superuser"]))
):
return {"message": "requires both roles"}
"""
def role_dependency(
payload: Box = Depends(require_auth), config: AxiomsConfig = Depends(get_config)
) -> None:
"""Dependency function to check roles."""
# Get roles from configured claim names
token_roles = get_claim_from_token(payload, "ROLES", config) or []
if not check_roles(token_roles, required_roles):
raise AxiomsHTTPException(
{
"error": "insufficient_permission",
"error_description": "Insufficient role, scope or permission",
},
403,
get_expected_issuer(config) or "",
)
return role_dependency
[docs]
def require_permissions(required_permissions: List[str]) -> Callable:
"""Create a FastAPI dependency to enforce permission-based authorization.
Checks if the authenticated user's token contains any of the required permissions.
Uses OR logic: the token must have at least ONE of the specified permissions.
Args:
required_permissions: List of required permission strings.
Returns:
Callable: FastAPI dependency function that enforces permission check.
Raises:
AxiomsHTTPException: If token doesn't contain required permissions.
Example (OR logic - requires EITHER permission)::
@app.get("/api/resource")
async def resource_route(
payload=Depends(require_auth),
_=Depends(require_permissions(["resource:read", "resource:write"]))
):
return {"data": "success"}
Example (AND logic - requires BOTH permissions via chaining)::
@app.get("/api/critical")
async def critical_route(
payload=Depends(require_auth),
_=Depends(require_permissions(["resource:read"])),
__=Depends(require_permissions(["resource:admin"]))
):
return {"message": "requires both permissions"}
"""
def permission_dependency(
payload: Box = Depends(require_auth), config: AxiomsConfig = Depends(get_config)
) -> None:
"""Dependency function to check permissions."""
# Get permissions from configured claim names
token_permissions = get_claim_from_token(payload, "PERMISSIONS", config) or []
if not check_permissions(token_permissions, required_permissions):
raise AxiomsHTTPException(
{
"error": "insufficient_permission",
"error_description": "Insufficient role, scope or permission",
},
403,
get_expected_issuer(config) or "",
)
return permission_dependency
[docs]
def check_object_ownership(
get_object: Callable,
owner_field: str = "user",
claim_field: str = "sub",
) -> Callable:
"""Create a FastAPI dependency to enforce object-level ownership permissions.
Validates that the authenticated user owns the requested object by comparing
a field in the object with a claim in the JWT token. This enables per-object
access control where users can only access resources they own.
Args:
get_object: Callable dependency function that retrieves the object.
owner_field: Field name in the object containing the owner identifier.
Defaults to "user".
claim_field: JWT claim field to compare with owner_field.
Defaults to "sub".
Returns:
Callable: FastAPI dependency function that enforces object ownership check.
Raises:
AxiomsHTTPException:
- 400 Bad Request: If object is missing the specified owner_field.
- 403 Forbidden: If JWT is missing the claim_field or user doesn't own the object.
Example (basic usage with defaults)::
async def get_article(article_id: int):
article = db.query(Article).filter(Article.id == article_id).first()
if not article:
raise HTTPException(status_code=404, detail="Not found")
return article
@app.patch("/articles/{article_id}")
async def update_article(
article_id: int,
title: str,
article = Depends(check_object_ownership(get_article))
):
article.title = title
return article
Example (custom owner field)::
@app.delete("/comments/{comment_id}")
async def delete_comment(
comment_id: int,
comment = Depends(check_object_ownership(get_comment, owner_field="created_by"))
):
db.delete(comment)
return {"message": "Deleted"}
Example (match by email instead of sub)::
@app.patch("/users/{user_id}")
async def update_user(
user_id: int,
name: str,
user = Depends(check_object_ownership(
get_user,
owner_field="owner_email",
claim_field="email"
))
):
user.name = name
return user
Example (with SQLAlchemy)::
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
title = Column(String)
user = Column(String)
def get_article(article_id: int, db: Session = Depends(get_db)):
article = db.query(Article).filter(Article.id == article_id).first()
if not article:
raise HTTPException(status_code=404, detail="Not found")
return article
@app.patch("/articles/{article_id}")
async def update_article(
article_id: int,
title: str,
article: Article = Depends(check_object_ownership(get_article)),
db: Session = Depends(get_db)
):
article.title = title
db.commit()
return article
"""
def ownership_dependency(
obj=Depends(get_object),
payload: Box = Depends(require_auth),
config: AxiomsConfig = Depends(get_config),
):
"""Dependency function to check object ownership."""
# Get owner value from object (dict or object)
if isinstance(obj, dict):
owner = obj.get(owner_field)
else:
owner = getattr(obj, owner_field, None)
# Validate owner field exists
if owner is None:
logger.error(
f"Object ownership check failed: object missing owner field '{owner_field}'. "
f"Object type: {type(obj).__name__}"
)
raise AxiomsHTTPException(
{
"error": "bad_request",
"error_description": "Invalid resource configuration",
},
400,
get_expected_issuer(config) or "",
)
# Get claim value from JWT
claim_value = payload.get(claim_field)
if claim_value is None:
raise AxiomsHTTPException(
{
"error": "insufficient_permission",
"error_description": f"JWT missing required claim: {claim_field}",
},
403,
get_expected_issuer(config) or "",
)
# Compare owner with JWT claim
if owner != claim_value:
raise AxiomsHTTPException(
{
"error": "insufficient_permission",
"error_description": "You don't have permission to access this resource",
},
403,
get_expected_issuer(config) or "",
)
return obj
return ownership_dependency