Tax Practice AI - Implementation Patterns¶
Extracted from ARCHITECTURE.md for use in implementation docs Last updated: 2024-12-24
This document contains code patterns and templates that MUST be applied when implementing features. These patterns are carried forward into implementation sequence docs to ensure nothing is missed.
1. Error Handling¶
1.1 Exception Classes¶
All exceptions inherit from AppError. Use these instead of raw HTTPException:
# src/utils/exceptions.py
from src.utils.exceptions import (
ValidationError, # 400 - Input validation failed
AuthenticationError, # 401 - Not authenticated
AuthorizationError, # 403 - No permission
NotFoundError, # 404 - Resource not found
ConflictError, # 409 - Duplicate, state conflict
WorkflowError, # 422 - Invalid state transition
DatabaseError, # 500 - DB operation failed
ExternalServiceError, # 502 - External API failed
ServiceUnavailableError, # 503 - Temporarily unavailable
)
# Usage examples:
raise NotFoundError("Client", client_id)
raise ConflictError("Email already registered", field="email")
raise ValidationError("Invalid date format", field="date_of_birth")
1.2 Error Logging Pattern¶
ALWAYS log errors with context before raising:
import structlog
logger = structlog.get_logger()
async def some_operation(resource_id: UUID, user_id: UUID):
log = logger.bind(resource_id=str(resource_id), user_id=str(user_id))
try:
result = await do_something(resource_id)
log.info("operation_completed", result_count=len(result))
return result
except SpecificDatabaseError as e:
log.error(
"operation_failed",
error_type=type(e).__name__,
error_message=str(e),
# Never log PII: ssn, passwords, tokens
)
raise DatabaseError(
"Failed to complete operation",
operation="some_operation",
details={"resource_id": str(resource_id)}
)
except Exception as e:
log.exception("unexpected_error") # Includes stack trace
raise
1.3 Database Error Mapping¶
import asyncpg
async def execute_with_error_handling(query: str, *args):
try:
return await self._pool.fetch(query, *args)
except asyncpg.UniqueViolationError as e:
raise ConflictError(
"Resource already exists",
field=e.constraint_name,
)
except asyncpg.ForeignKeyViolationError as e:
raise ValidationError(
"Referenced resource not found",
field=e.constraint_name,
)
except asyncpg.CheckViolationError as e:
raise ValidationError(
"Value out of allowed range",
field=e.constraint_name,
)
except asyncpg.PostgresConnectionError as e:
logger.critical("database_connection_failed", error=str(e))
raise ServiceUnavailableError("Database temporarily unavailable")
2. Logging¶
2.1 Logging Setup¶
# src/utils/logging.py
import logging
import structlog
def setup_logging(environment: str, log_level: str = "INFO"):
"""Configure structured logging for the application."""
# Set root logger level
logging.basicConfig(level=getattr(logging, log_level))
processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
if environment == "production":
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer(colors=True))
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level)
),
context_class=dict,
cache_logger_on_first_use=True,
)
2.2 Correlation ID Middleware¶
# src/api/middleware/correlation.py
import uuid
import structlog
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")
class CorrelationIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Get from header or generate new
correlation_id = request.headers.get(
"X-Correlation-ID",
str(uuid.uuid4())
)
correlation_id_var.set(correlation_id)
# Bind to structlog context for all subsequent logs
structlog.contextvars.bind_contextvars(correlation_id=correlation_id)
# Store on request for error handlers
request.state.correlation_id = correlation_id
response = await call_next(request)
response.headers["X-Correlation-ID"] = correlation_id
return response
2.3 Log Levels by Layer¶
| Layer | Default Level | What to Log |
|---|---|---|
| API | INFO | Request received, response sent, status codes |
| Middleware | INFO | Auth success/failure, rate limit hits |
| Workflow | INFO | Business events, state transitions |
| Repository | DEBUG | Query execution (no sensitive data) |
| Service | DEBUG | External API calls, retries, timeouts |
2.4 Sensitive Data - NEVER Log¶
- SSN (even partial)
- Passwords or tokens
- Bank account numbers
- Full credit card numbers
- PHI fields
3. API Middleware Stack¶
3.1 Error Handler Middleware¶
# src/api/middleware/error_handler.py
from fastapi import Request
from fastapi.responses import JSONResponse
from src.utils.exceptions import AppError
import structlog
logger = structlog.get_logger()
async def app_error_handler(request: Request, exc: AppError) -> JSONResponse:
"""Convert application errors to consistent JSON responses."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.error_code,
"message": exc.message,
"details": exc.details,
"correlation_id": getattr(request.state, "correlation_id", None),
}
)
async def unhandled_error_handler(request: Request, exc: Exception) -> JSONResponse:
"""Catch-all for unexpected errors."""
correlation_id = getattr(request.state, "correlation_id", "unknown")
logger.exception(
"unhandled_error",
correlation_id=correlation_id,
path=request.url.path,
)
return JSONResponse(
status_code=500,
content={
"error": "INTERNAL_ERROR",
"message": "An unexpected error occurred",
"correlation_id": correlation_id,
}
)
3.2 Request Logging Middleware¶
# src/api/middleware/request_logging.py
import time
import structlog
from starlette.middleware.base import BaseHTTPMiddleware
logger = structlog.get_logger()
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time = time.perf_counter()
# Log request
logger.info(
"request_started",
method=request.method,
path=request.url.path,
query=str(request.query_params),
)
response = await call_next(request)
# Log response
duration_ms = (time.perf_counter() - start_time) * 1000
logger.info(
"request_completed",
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
)
return response
4. Repository Pattern¶
4.1 Base Repository with Error Handling¶
# src/repositories/base_repository.py
async def find_by_id(self, id: UUID) -> Optional[T]:
"""Find entity by ID with proper error handling."""
log = self._logger.bind(table=self._table_name, id=str(id))
try:
query = f"SELECT * FROM {self._table_name} WHERE id = $1 AND deleted_at IS NULL"
row = await self._aurora.execute_one(query, id)
if row:
log.debug("entity_found")
else:
log.debug("entity_not_found")
return self._row_to_entity(row) if row else None
except Exception as e:
log.error("find_by_id_failed", error=str(e))
raise DatabaseError(
f"Failed to find {self._table_name}",
operation="find_by_id",
table=self._table_name,
)
5. Testing Patterns¶
5.1 Test Distribution¶
| Type | Target % | Description |
|---|---|---|
| Unit | 80% | Fast, no I/O, mocked dependencies |
| Integration | 15% | Real PostgreSQL (Docker), mock external services |
| E2E | 5% | Full HTTP stack with real database |
5.2 Test Structure (Implemented)¶
tests/
├── conftest.py # Global fixtures (pytest_plugins)
├── unit/ # 80% - Fast, no I/O
│ ├── test_exceptions.py
│ └── middleware/
│ ├── test_auth.py # JWT auth, RBAC
│ ├── test_correlation.py
│ ├── test_error_handler.py
│ └── test_request_logging.py
├── integration/ # 15% - Real DB (Docker)
│ ├── conftest.py # DB fixtures, test schema
│ └── repositories/
│ └── test_client_repository.py # CRUD, search, contacts
└── e2e/ # 5% - Full HTTP stack
├── conftest.py # API client fixtures
└── test_client_api.py # API endpoint tests
5.3 Key Fixtures¶
| Fixture | Scope | Purpose |
|---|---|---|
mock_aurora |
function | Mocked AuroraService for unit tests |
test_db |
function | Fresh PostgreSQL database per test |
aurora_service |
function | AuroraService with test pool |
api_client |
function | httpx AsyncClient with dependency injection |
5.4 Unit Test Template¶
import pytest
from unittest.mock import AsyncMock
@pytest.fixture
def mock_aurora():
"""Mock Aurora service."""
aurora = AsyncMock()
aurora.execute_query = AsyncMock(return_value=[])
aurora.execute_one = AsyncMock(return_value=None)
aurora.execute_returning = AsyncMock()
return aurora
@pytest.mark.asyncio
async def test_create_client_success(mock_aurora):
# Arrange
mock_aurora.execute_returning.return_value = {
"id": uuid4(),
"name": "Test Client",
...
}
repo = ClientRepository(mock_aurora)
# Act
result = await repo.create(Client(name="Test Client", ...))
# Assert
assert result.name == "Test Client"
mock_aurora.execute_returning.assert_called_once()
5.5 Integration Test Template¶
import pytest
@pytest.fixture
async def test_db(postgres_container):
"""Real PostgreSQL with schema applied."""
# Setup - apply schema
# Yield connection
# Teardown - rollback
@pytest.mark.asyncio
async def test_client_crud(test_db):
repo = ClientRepository(test_db)
# Create
client = await repo.create(...)
assert client.id is not None
# Read
found = await repo.find_by_id(client.id)
assert found.name == client.name
# Delete
await repo.soft_delete(client.id)
assert await repo.find_by_id(client.id) is None
6. Async Patterns¶
6.1 Parallel Operations¶
import asyncio
async def load_client_context(client_id: UUID) -> ClientContext:
"""Load all client data in parallel."""
client, contacts, returns, documents = await asyncio.gather(
client_repo.find_by_id(client_id),
contact_repo.find_by_client(client_id),
return_repo.find_by_client(client_id),
document_repo.find_by_client(client_id),
)
return ClientContext(client, contacts, returns, documents)
6.2 Transaction Boundaries¶
async def register_client(data: ClientCreate) -> Client:
"""
Transaction scope: client + engagement + audit.
Side effects (email) happen AFTER commit.
"""
async with aurora.transaction() as conn:
# All writes in same transaction
client = await conn.execute(...)
await conn.execute(...) # engagement
await conn.execute(...) # audit log
# Transaction committed
# Side effects after commit
await email_service.send_welcome(data.email)
return client
Checklist¶
Before marking any feature complete, verify:
- All database operations wrapped in try/catch
- Errors logged with context (operation, parameters, error type)
- Custom exceptions used (not raw HTTPException)
- Correlation ID flows through all logs
- No PII in log messages
- Unit tests exist for business logic
- Integration tests exist for repositories