Authentication & Authorization¶
Deep dive into the authentication and authorization mechanisms.
Authentication Overview¶
The system uses JWT (JSON Web Tokens) for stateless authentication with a dual-token strategy:
- Access Token: Short-lived (30 minutes), used for API requests
- Refresh Token: Long-lived (7 days), used to obtain new access tokens
Authentication Flow¶
Registration¶
sequenceDiagram
Client->>API: POST /auth/register
API->>API: Validate email/password
API->>Database: Create user (hash password)
API->>API: Generate access + refresh tokens
API->>Client: Return tokens
Implementation:
- Email validation: Pydantic validates email format
- Password hashing: bcrypt with automatic salt
- User creation: Insert into
userstable - Token generation: JWT with user_id as subject
Login¶
sequenceDiagram
Client->>API: POST /auth/login (email, password)
API->>Database: Find user by email
API->>API: Verify password (bcrypt)
API->>API: Generate tokens
API->>Client: Return access + refresh tokens
Token Refresh¶
sequenceDiagram
Client->>API: POST /auth/refresh (with refresh token)
API->>API: Verify refresh token
API->>Database: Check user exists & is_active
API->>API: Generate new tokens
API->>Client: Return new access + refresh tokens
Why refresh? - Access tokens expire quickly (security) - Refresh tokens allow seamless re-authentication - Reduces password exposure
Protected Endpoint Access¶
sequenceDiagram
Client->>API: GET /users/me (with access token)
API->>API: Extract Bearer token
API->>API: Verify JWT signature
API->>API: Check expiration
API->>Database: Load user by token.sub (user_id)
API->>API: Check is_active=True
API->>API: Execute endpoint logic
API->>Client: Return response
Token Structure¶
Access Token Payload¶
{
"sub": "123", // User ID
"exp": 1704099600, // Expiration timestamp
"type": "access" // Token type
}
Refresh Token Payload¶
Signature: HMAC-SHA256 using SECRET_KEY
Password Security¶
Hashing Strategy¶
Algorithm: bcrypt (via passlib)
Process:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Hash password
hashed = pwd_context.hash(password) # Includes automatic salt
# Verify password
is_valid = pwd_context.verify(plain_password, hashed)
Key features: - Automatic salt generation - Adaptive work factor - Constant-time comparison
bcrypt 72-Byte Limit¶
bcrypt truncates passwords at 72 bytes. For Unicode safety:
def normalize_password(password: str) -> str:
"""Normalize password to handle bcrypt 72-byte limit"""
password_bytes = password.encode('utf-8')
if len(password_bytes) > 72:
password_bytes = password_bytes[:72]
return password_bytes.decode('utf-8', errors='ignore')
Authorization Mechanisms¶
Role-Based Access Control (RBAC)¶
Model: UserUniversityRole
class UserUniversityRole:
user_id: int
university_id: int
role: Enum # STUDENT, INSTRUCTOR, TA, ADMIN, etc.
department_id: Optional[int] # Optional department scope
Roles:
- STUDENT - Basic access to enrolled courses
- INSTRUCTOR - Teach courses, view enrolled students
- TA - Assist instructor
- DEPARTMENT_ADMIN - Manage department
- UNIVERSITY_ADMIN - Manage university
- PLATFORM_ADMIN - Global admin
Multi-Tenant Scoping¶
Users can have different roles in different universities:
# User is a student at MIT
UserUniversityRole(user_id=1, university_id=1, role="STUDENT")
# Same user is an instructor at Stanford
UserUniversityRole(user_id=1, university_id=2, role="INSTRUCTOR")
Dependency Injection for Auth¶
Common dependencies:
# app/dependencies.py
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
"""Extract and validate user from JWT"""
credentials_exception = HTTPException(status_code=401)
try:
# Decode JWT
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
token_type = payload.get("type")
if user_id is None or token_type != "access":
raise credentials_exception
except JWTError:
raise credentials_exception
# Load user from database
user = await crud.user.get(db, id=int(user_id))
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Ensure user is active"""
if not current_user.is_active:
raise HTTPException(status_code=403, detail="Inactive user")
return current_user
async def get_optional_current_user(
token: Optional[str] = Depends(oauth2_scheme_optional),
db: AsyncSession = Depends(get_db)
) -> Optional[User]:
"""Return user if authenticated, None otherwise (for public endpoints)"""
if not token:
return None
try:
return await get_current_user(token, db)
except HTTPException:
return None
Usage in routes:
# Requires authentication
@router.get("/me")
async def get_profile(
current_user: User = Depends(get_current_active_user)
):
return current_user
# Optional authentication
@router.get("/courses")
async def list_courses(
user: Optional[User] = Depends(get_optional_current_user)
):
# Show different results for authenticated vs anonymous
if user:
return get_user_courses(user)
return get_public_courses()
Resource Ownership Checks¶
Verify user owns the resource:
@router.get("/enrollments/{enrollment_id}")
async def get_enrollment(
enrollment_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_active_user)
):
enrollment = await crud.enrollment.get(db, id=enrollment_id)
if not enrollment:
raise HTTPException(status_code=404)
# Check ownership
if enrollment.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not your enrollment")
return enrollment
Security Best Practices¶
Implemented¶
✅ Password hashing - bcrypt with salt
✅ JWT expiration - Tokens expire
✅ Active user checks - Disabled accounts can't access
✅ CORS configuration - Restrict origins
✅ SQL injection prevention - SQLAlchemy ORM
✅ Input validation - Pydantic schemas
✅ HTTPS recommended - Use TLS in production
Considerations¶
⚠️ Token revocation - JWTs can't be revoked (stateless trade-off)
⚠️ Rate limiting - Not implemented (add nginx/API gateway)
⚠️ Password policy - No complexity requirements
⚠️ 2FA - Not implemented
⚠️ Account lockout - No brute-force protection
Token Management Best Practices¶
Client-Side Storage¶
Access Token: - Store in memory (variable) - Never in localStorage (XSS risk)
Refresh Token: - httpOnly cookie (best) - Or secure storage (mobile)
Token Rotation¶
On refresh, issue new tokens:
@router.post("/auth/refresh")
async def refresh_tokens(
current_user: User = Depends(get_current_user_from_refresh_token)
):
# Generate NEW access and refresh tokens
access_token = create_access_token(subject=str(current_user.id))
refresh_token = create_refresh_token(subject=str(current_user.id))
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
Old tokens remain valid until expiration (stateless limitation).
Logout¶
Challenge: Can't invalidate JWTs
Solutions:
- Client-side logout:
- Delete tokens from client
-
Effective but not server-enforced
-
Token blacklist (future):
- Store revoked tokens in Redis
-
Check on every request (adds overhead)
-
Short token lifetime:
- Minimize exposure window
- Balance UX vs security
Common Authentication Errors¶
401 Unauthorized¶
Causes: - Missing token - Expired token - Invalid signature - User not found
Response:
403 Forbidden¶
Causes:
- User account disabled (is_active=False)
- Insufficient permissions
- Resource ownership violation
Response:
422 Validation Error¶
Causes: - Invalid email format - Missing required fields
Response:
{
"detail": [
{
"loc": ["body", "email"],
"msg": "value is not a valid email address",
"type": "value_error.email"
}
]
}
Testing Authentication¶
Obtaining Tokens in Tests¶
@pytest.fixture
async def test_user_token(client: AsyncClient):
"""Get access token for testing"""
response = await client.post(
"/api/v1/auth/register",
json={
"email": "test@example.com",
"password": "testpass",
"full_name": "Test User"
}
)
return response.json()["access_token"]
@pytest.mark.asyncio
async def test_protected_endpoint(client, test_user_token):
response = await client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {test_user_token}"}
)
assert response.status_code == 200
Security Configuration¶
Environment Variables¶
# JWT Configuration
SECRET_KEY=your-secret-key-min-32-chars
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# CORS
BACKEND_CORS_ORIGINS=["https://app.example.com"]
Production Recommendations¶
-
Use long, random SECRET_KEY:
-
Enable HTTPS:
- Use TLS certificates
-
Redirect HTTP → HTTPS
-
Restrict CORS:
- Only allow trusted origins
-
No wildcards in production
-
Shorter token lifetimes:
- Consider 15-minute access tokens
-
1-day refresh tokens
-
Monitor for suspicious activity:
- Failed login attempts
- Token reuse patterns
- Unusual access patterns
Future Enhancements¶
- OAuth2/OIDC - Social login (Google, GitHub)
- 2FA - TOTP/SMS verification
- API keys - For service accounts
- Session tracking - Track active sessions
- Token blacklist - Redis-based revocation
- Rate limiting - Prevent brute force
- Password policies - Enforce complexity
- Audit logging - Track auth events