Skip to content

Authentication & Authorization

Deep dive into the authentication and authorization mechanisms.

Authentication Overview

The system uses JWT (JSON Web Tokens) for stateless authentication with a dual-token strategy:

  • Access Token: Short-lived (30 minutes), used for API requests
  • Refresh Token: Long-lived (7 days), used to obtain new access tokens

Authentication Flow

Registration

sequenceDiagram
    Client->>API: POST /auth/register
    API->>API: Validate email/password
    API->>Database: Create user (hash password)
    API->>API: Generate access + refresh tokens
    API->>Client: Return tokens

Implementation:

  1. Email validation: Pydantic validates email format
  2. Password hashing: bcrypt with automatic salt
  3. User creation: Insert into users table
  4. Token generation: JWT with user_id as subject

Login

sequenceDiagram
    Client->>API: POST /auth/login (email, password)
    API->>Database: Find user by email
    API->>API: Verify password (bcrypt)
    API->>API: Generate tokens
    API->>Client: Return access + refresh tokens

Token Refresh

sequenceDiagram
    Client->>API: POST /auth/refresh (with refresh token)
    API->>API: Verify refresh token
    API->>Database: Check user exists & is_active
    API->>API: Generate new tokens
    API->>Client: Return new access + refresh tokens

Why refresh? - Access tokens expire quickly (security) - Refresh tokens allow seamless re-authentication - Reduces password exposure

Protected Endpoint Access

sequenceDiagram
    Client->>API: GET /users/me (with access token)
    API->>API: Extract Bearer token
    API->>API: Verify JWT signature
    API->>API: Check expiration
    API->>Database: Load user by token.sub (user_id)
    API->>API: Check is_active=True
    API->>API: Execute endpoint logic
    API->>Client: Return response

Token Structure

Access Token Payload

{
  "sub": "123",           // User ID
  "exp": 1704099600,      // Expiration timestamp
  "type": "access"        // Token type
}

Refresh Token Payload

{
  "sub": "123",
  "exp": 1704704400,      // Longer expiration
  "type": "refresh"
}

Signature: HMAC-SHA256 using SECRET_KEY

Password Security

Hashing Strategy

Algorithm: bcrypt (via passlib)

Process:

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Hash password
hashed = pwd_context.hash(password)  # Includes automatic salt

# Verify password
is_valid = pwd_context.verify(plain_password, hashed)

Key features: - Automatic salt generation - Adaptive work factor - Constant-time comparison

bcrypt 72-Byte Limit

bcrypt truncates passwords at 72 bytes. For Unicode safety:

def normalize_password(password: str) -> str:
    """Normalize password to handle bcrypt 72-byte limit"""
    password_bytes = password.encode('utf-8')
    if len(password_bytes) > 72:
        password_bytes = password_bytes[:72]
    return password_bytes.decode('utf-8', errors='ignore')

Authorization Mechanisms

Role-Based Access Control (RBAC)

Model: UserUniversityRole

class UserUniversityRole:
    user_id: int
    university_id: int
    role: Enum  # STUDENT, INSTRUCTOR, TA, ADMIN, etc.
    department_id: Optional[int]  # Optional department scope

Roles: - STUDENT - Basic access to enrolled courses - INSTRUCTOR - Teach courses, view enrolled students - TA - Assist instructor - DEPARTMENT_ADMIN - Manage department - UNIVERSITY_ADMIN - Manage university - PLATFORM_ADMIN - Global admin

Multi-Tenant Scoping

Users can have different roles in different universities:

# User is a student at MIT
UserUniversityRole(user_id=1, university_id=1, role="STUDENT")

# Same user is an instructor at Stanford
UserUniversityRole(user_id=1, university_id=2, role="INSTRUCTOR")

Dependency Injection for Auth

Common dependencies:

# app/dependencies.py

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -> User:
    """Extract and validate user from JWT"""
    credentials_exception = HTTPException(status_code=401)

    try:
        # Decode JWT
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload.get("sub")
        token_type = payload.get("type")

        if user_id is None or token_type != "access":
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    # Load user from database
    user = await crud.user.get(db, id=int(user_id))
    if user is None:
        raise credentials_exception

    return user

async def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    """Ensure user is active"""
    if not current_user.is_active:
        raise HTTPException(status_code=403, detail="Inactive user")
    return current_user

async def get_optional_current_user(
    token: Optional[str] = Depends(oauth2_scheme_optional),
    db: AsyncSession = Depends(get_db)
) -> Optional[User]:
    """Return user if authenticated, None otherwise (for public endpoints)"""
    if not token:
        return None
    try:
        return await get_current_user(token, db)
    except HTTPException:
        return None

Usage in routes:

# Requires authentication
@router.get("/me")
async def get_profile(
    current_user: User = Depends(get_current_active_user)
):
    return current_user

# Optional authentication
@router.get("/courses")
async def list_courses(
    user: Optional[User] = Depends(get_optional_current_user)
):
    # Show different results for authenticated vs anonymous
    if user:
        return get_user_courses(user)
    return get_public_courses()

Resource Ownership Checks

Verify user owns the resource:

@router.get("/enrollments/{enrollment_id}")
async def get_enrollment(
    enrollment_id: int,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_active_user)
):
    enrollment = await crud.enrollment.get(db, id=enrollment_id)

    if not enrollment:
        raise HTTPException(status_code=404)

    # Check ownership
    if enrollment.user_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not your enrollment")

    return enrollment

Security Best Practices

Implemented

Password hashing - bcrypt with salt
JWT expiration - Tokens expire
Active user checks - Disabled accounts can't access
CORS configuration - Restrict origins
SQL injection prevention - SQLAlchemy ORM
Input validation - Pydantic schemas
HTTPS recommended - Use TLS in production

Considerations

⚠️ Token revocation - JWTs can't be revoked (stateless trade-off)
⚠️ Rate limiting - Not implemented (add nginx/API gateway)
⚠️ Password policy - No complexity requirements
⚠️ 2FA - Not implemented
⚠️ Account lockout - No brute-force protection

Token Management Best Practices

Client-Side Storage

Access Token: - Store in memory (variable) - Never in localStorage (XSS risk)

Refresh Token: - httpOnly cookie (best) - Or secure storage (mobile)

Token Rotation

On refresh, issue new tokens:

@router.post("/auth/refresh")
async def refresh_tokens(
    current_user: User = Depends(get_current_user_from_refresh_token)
):
    # Generate NEW access and refresh tokens
    access_token = create_access_token(subject=str(current_user.id))
    refresh_token = create_refresh_token(subject=str(current_user.id))

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

Old tokens remain valid until expiration (stateless limitation).

Logout

Challenge: Can't invalidate JWTs

Solutions:

  1. Client-side logout:
  2. Delete tokens from client
  3. Effective but not server-enforced

  4. Token blacklist (future):

  5. Store revoked tokens in Redis
  6. Check on every request (adds overhead)

  7. Short token lifetime:

  8. Minimize exposure window
  9. Balance UX vs security

Common Authentication Errors

401 Unauthorized

Causes: - Missing token - Expired token - Invalid signature - User not found

Response:

{
  "detail": "Could not validate credentials"
}

403 Forbidden

Causes: - User account disabled (is_active=False) - Insufficient permissions - Resource ownership violation

Response:

{
  "detail": "Inactive user"
}

422 Validation Error

Causes: - Invalid email format - Missing required fields

Response:

{
  "detail": [
    {
      "loc": ["body", "email"],
      "msg": "value is not a valid email address",
      "type": "value_error.email"
    }
  ]
}

Testing Authentication

Obtaining Tokens in Tests

@pytest.fixture
async def test_user_token(client: AsyncClient):
    """Get access token for testing"""
    response = await client.post(
        "/api/v1/auth/register",
        json={
            "email": "test@example.com",
            "password": "testpass",
            "full_name": "Test User"
        }
    )
    return response.json()["access_token"]

@pytest.mark.asyncio
async def test_protected_endpoint(client, test_user_token):
    response = await client.get(
        "/api/v1/users/me",
        headers={"Authorization": f"Bearer {test_user_token}"}
    )
    assert response.status_code == 200

Security Configuration

Environment Variables

# JWT Configuration
SECRET_KEY=your-secret-key-min-32-chars
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7

# CORS
BACKEND_CORS_ORIGINS=["https://app.example.com"]

Production Recommendations

  1. Use long, random SECRET_KEY:

    openssl rand -hex 32
    

  2. Enable HTTPS:

  3. Use TLS certificates
  4. Redirect HTTP → HTTPS

  5. Restrict CORS:

  6. Only allow trusted origins
  7. No wildcards in production

  8. Shorter token lifetimes:

  9. Consider 15-minute access tokens
  10. 1-day refresh tokens

  11. Monitor for suspicious activity:

  12. Failed login attempts
  13. Token reuse patterns
  14. Unusual access patterns

Future Enhancements

  1. OAuth2/OIDC - Social login (Google, GitHub)
  2. 2FA - TOTP/SMS verification
  3. API keys - For service accounts
  4. Session tracking - Track active sessions
  5. Token blacklist - Redis-based revocation
  6. Rate limiting - Prevent brute force
  7. Password policies - Enforce complexity
  8. Audit logging - Track auth events

Next Steps