Skip to content

Tax Practice AI - Implementation Patterns

Extracted from ARCHITECTURE.md for use in implementation docs Last updated: 2024-12-24

This document contains code patterns and templates that MUST be applied when implementing features. These patterns are carried forward into implementation sequence docs to ensure nothing is missed.


1. Error Handling

1.1 Exception Classes

All exceptions inherit from AppError. Use these instead of raw HTTPException:

# src/utils/exceptions.py

from src.utils.exceptions import (
    ValidationError,      # 400 - Input validation failed
    AuthenticationError,  # 401 - Not authenticated
    AuthorizationError,   # 403 - No permission
    NotFoundError,        # 404 - Resource not found
    ConflictError,        # 409 - Duplicate, state conflict
    WorkflowError,        # 422 - Invalid state transition
    DatabaseError,        # 500 - DB operation failed
    ExternalServiceError, # 502 - External API failed
    ServiceUnavailableError, # 503 - Temporarily unavailable
)

# Usage examples:
raise NotFoundError("Client", client_id)
raise ConflictError("Email already registered", field="email")
raise ValidationError("Invalid date format", field="date_of_birth")

1.2 Error Logging Pattern

ALWAYS log errors with context before raising:

import structlog

logger = structlog.get_logger()

async def some_operation(resource_id: UUID, user_id: UUID):
    log = logger.bind(resource_id=str(resource_id), user_id=str(user_id))

    try:
        result = await do_something(resource_id)
        log.info("operation_completed", result_count=len(result))
        return result
    except SpecificDatabaseError as e:
        log.error(
            "operation_failed",
            error_type=type(e).__name__,
            error_message=str(e),
            # Never log PII: ssn, passwords, tokens
        )
        raise DatabaseError(
            "Failed to complete operation",
            operation="some_operation",
            details={"resource_id": str(resource_id)}
        )
    except Exception as e:
        log.exception("unexpected_error")  # Includes stack trace
        raise

1.3 Database Error Mapping

import asyncpg

async def execute_with_error_handling(query: str, *args):
    try:
        return await self._pool.fetch(query, *args)
    except asyncpg.UniqueViolationError as e:
        raise ConflictError(
            "Resource already exists",
            field=e.constraint_name,
        )
    except asyncpg.ForeignKeyViolationError as e:
        raise ValidationError(
            "Referenced resource not found",
            field=e.constraint_name,
        )
    except asyncpg.CheckViolationError as e:
        raise ValidationError(
            "Value out of allowed range",
            field=e.constraint_name,
        )
    except asyncpg.PostgresConnectionError as e:
        logger.critical("database_connection_failed", error=str(e))
        raise ServiceUnavailableError("Database temporarily unavailable")

2. Logging

2.1 Logging Setup

# src/utils/logging.py
import logging
import structlog

def setup_logging(environment: str, log_level: str = "INFO"):
    """Configure structured logging for the application."""

    # Set root logger level
    logging.basicConfig(level=getattr(logging, log_level))

    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
    ]

    if environment == "production":
        processors.append(structlog.processors.JSONRenderer())
    else:
        processors.append(structlog.dev.ConsoleRenderer(colors=True))

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level)
        ),
        context_class=dict,
        cache_logger_on_first_use=True,
    )

2.2 Correlation ID Middleware

# src/api/middleware/correlation.py
import uuid
import structlog
from contextvars import ContextVar
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request

correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")

class CorrelationIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Get from header or generate new
        correlation_id = request.headers.get(
            "X-Correlation-ID",
            str(uuid.uuid4())
        )
        correlation_id_var.set(correlation_id)

        # Bind to structlog context for all subsequent logs
        structlog.contextvars.bind_contextvars(correlation_id=correlation_id)

        # Store on request for error handlers
        request.state.correlation_id = correlation_id

        response = await call_next(request)
        response.headers["X-Correlation-ID"] = correlation_id

        return response

2.3 Log Levels by Layer

Layer Default Level What to Log
API INFO Request received, response sent, status codes
Middleware INFO Auth success/failure, rate limit hits
Workflow INFO Business events, state transitions
Repository DEBUG Query execution (no sensitive data)
Service DEBUG External API calls, retries, timeouts

2.4 Sensitive Data - NEVER Log

  • SSN (even partial)
  • Passwords or tokens
  • Bank account numbers
  • Full credit card numbers
  • PHI fields

3. API Middleware Stack

3.1 Error Handler Middleware

# src/api/middleware/error_handler.py
from fastapi import Request
from fastapi.responses import JSONResponse
from src.utils.exceptions import AppError
import structlog

logger = structlog.get_logger()

async def app_error_handler(request: Request, exc: AppError) -> JSONResponse:
    """Convert application errors to consistent JSON responses."""
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": exc.error_code,
            "message": exc.message,
            "details": exc.details,
            "correlation_id": getattr(request.state, "correlation_id", None),
        }
    )

async def unhandled_error_handler(request: Request, exc: Exception) -> JSONResponse:
    """Catch-all for unexpected errors."""
    correlation_id = getattr(request.state, "correlation_id", "unknown")
    logger.exception(
        "unhandled_error",
        correlation_id=correlation_id,
        path=request.url.path,
    )
    return JSONResponse(
        status_code=500,
        content={
            "error": "INTERNAL_ERROR",
            "message": "An unexpected error occurred",
            "correlation_id": correlation_id,
        }
    )

3.2 Request Logging Middleware

# src/api/middleware/request_logging.py
import time
import structlog
from starlette.middleware.base import BaseHTTPMiddleware

logger = structlog.get_logger()

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        start_time = time.perf_counter()

        # Log request
        logger.info(
            "request_started",
            method=request.method,
            path=request.url.path,
            query=str(request.query_params),
        )

        response = await call_next(request)

        # Log response
        duration_ms = (time.perf_counter() - start_time) * 1000
        logger.info(
            "request_completed",
            method=request.method,
            path=request.url.path,
            status_code=response.status_code,
            duration_ms=round(duration_ms, 2),
        )

        return response

4. Repository Pattern

4.1 Base Repository with Error Handling

# src/repositories/base_repository.py

async def find_by_id(self, id: UUID) -> Optional[T]:
    """Find entity by ID with proper error handling."""
    log = self._logger.bind(table=self._table_name, id=str(id))

    try:
        query = f"SELECT * FROM {self._table_name} WHERE id = $1 AND deleted_at IS NULL"
        row = await self._aurora.execute_one(query, id)

        if row:
            log.debug("entity_found")
        else:
            log.debug("entity_not_found")

        return self._row_to_entity(row) if row else None

    except Exception as e:
        log.error("find_by_id_failed", error=str(e))
        raise DatabaseError(
            f"Failed to find {self._table_name}",
            operation="find_by_id",
            table=self._table_name,
        )

5. Testing Patterns

5.1 Test Distribution

Type Target % Description
Unit 80% Fast, no I/O, mocked dependencies
Integration 15% Real PostgreSQL (Docker), mock external services
E2E 5% Full HTTP stack with real database

5.2 Test Structure (Implemented)

tests/
├── conftest.py                          # Global fixtures (pytest_plugins)
├── unit/                                # 80% - Fast, no I/O
│   ├── test_exceptions.py
│   └── middleware/
│       ├── test_auth.py                 # JWT auth, RBAC
│       ├── test_correlation.py
│       ├── test_error_handler.py
│       └── test_request_logging.py
├── integration/                         # 15% - Real DB (Docker)
│   ├── conftest.py                      # DB fixtures, test schema
│   └── repositories/
│       └── test_client_repository.py    # CRUD, search, contacts
└── e2e/                                 # 5% - Full HTTP stack
    ├── conftest.py                      # API client fixtures
    └── test_client_api.py               # API endpoint tests

5.3 Key Fixtures

Fixture Scope Purpose
mock_aurora function Mocked AuroraService for unit tests
test_db function Fresh PostgreSQL database per test
aurora_service function AuroraService with test pool
api_client function httpx AsyncClient with dependency injection

5.4 Unit Test Template

import pytest
from unittest.mock import AsyncMock

@pytest.fixture
def mock_aurora():
    """Mock Aurora service."""
    aurora = AsyncMock()
    aurora.execute_query = AsyncMock(return_value=[])
    aurora.execute_one = AsyncMock(return_value=None)
    aurora.execute_returning = AsyncMock()
    return aurora

@pytest.mark.asyncio
async def test_create_client_success(mock_aurora):
    # Arrange
    mock_aurora.execute_returning.return_value = {
        "id": uuid4(),
        "name": "Test Client",
        ...
    }
    repo = ClientRepository(mock_aurora)

    # Act
    result = await repo.create(Client(name="Test Client", ...))

    # Assert
    assert result.name == "Test Client"
    mock_aurora.execute_returning.assert_called_once()

5.5 Integration Test Template

import pytest

@pytest.fixture
async def test_db(postgres_container):
    """Real PostgreSQL with schema applied."""
    # Setup - apply schema
    # Yield connection
    # Teardown - rollback

@pytest.mark.asyncio
async def test_client_crud(test_db):
    repo = ClientRepository(test_db)

    # Create
    client = await repo.create(...)
    assert client.id is not None

    # Read
    found = await repo.find_by_id(client.id)
    assert found.name == client.name

    # Delete
    await repo.soft_delete(client.id)
    assert await repo.find_by_id(client.id) is None

6. Async Patterns

6.1 Parallel Operations

import asyncio

async def load_client_context(client_id: UUID) -> ClientContext:
    """Load all client data in parallel."""
    client, contacts, returns, documents = await asyncio.gather(
        client_repo.find_by_id(client_id),
        contact_repo.find_by_client(client_id),
        return_repo.find_by_client(client_id),
        document_repo.find_by_client(client_id),
    )
    return ClientContext(client, contacts, returns, documents)

6.2 Transaction Boundaries

async def register_client(data: ClientCreate) -> Client:
    """
    Transaction scope: client + engagement + audit.
    Side effects (email) happen AFTER commit.
    """
    async with aurora.transaction() as conn:
        # All writes in same transaction
        client = await conn.execute(...)
        await conn.execute(...)  # engagement
        await conn.execute(...)  # audit log
    # Transaction committed

    # Side effects after commit
    await email_service.send_welcome(data.email)

    return client

Checklist

Before marking any feature complete, verify:

  • All database operations wrapped in try/catch
  • Errors logged with context (operation, parameters, error type)
  • Custom exceptions used (not raw HTTPException)
  • Correlation ID flows through all logs
  • No PII in log messages
  • Unit tests exist for business logic
  • Integration tests exist for repositories