Software Design Document (SDD)
AI-First Open-Source FP&A Platform
Version: 1.0
Date: February 2026
Classification: CODITECT Engineering
1. System Architecture Overview
1.1 Architectural Vision
The platform follows a microservices architecture with event-driven communication, designed for multi-tenancy, horizontal scalability, and regulatory compliance.
1.2 Architecture Principles
| Principle | Description | Implementation |
|---|---|---|
| Separation of Concerns | Each service owns its domain | Bounded contexts via DDD |
| Event Sourcing | Immutable event log | All state changes as events |
| CQRS | Separate read/write models | Optimized query paths |
| API-First | Contract-driven development | OpenAPI 3.1 specifications |
| Zero Trust | Never trust, always verify | mTLS, RBAC, audit logging |
| Observability | Full system visibility | OpenTelemetry instrumentation |
1.3 High-Level Architecture Diagram
2. Component Design
2.1 Core Services
2.1.1 General Ledger Service
Responsibility: Manage chart of accounts, journal entries, trial balance, and financial statement generation.
Technology Stack:
- Runtime: Python 3.12 / FastAPI
- Database: PostgreSQL 16 with RLS
- Cache: Redis 7
- Events: Kafka/NATS
Key Components:
Data Model:
-- Core entities (simplified)
CREATE TABLE tenants (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}'
);
CREATE TABLE accounts (
id UUID PRIMARY KEY,
tenant_id UUID REFERENCES tenants(id),
code VARCHAR(50) NOT NULL,
name VARCHAR(255) NOT NULL,
account_type VARCHAR(20) NOT NULL,
parent_id UUID REFERENCES accounts(id),
is_active BOOLEAN DEFAULT true,
UNIQUE(tenant_id, code)
);
CREATE TABLE journal_entries (
id UUID PRIMARY KEY,
tenant_id UUID REFERENCES tenants(id),
entry_date DATE NOT NULL,
description TEXT,
status VARCHAR(20) DEFAULT 'draft',
posted_at TIMESTAMP,
posted_by UUID,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE journal_lines (
id UUID PRIMARY KEY,
journal_entry_id UUID REFERENCES journal_entries(id),
account_id UUID REFERENCES accounts(id),
debit DECIMAL(18,2) DEFAULT 0,
credit DECIMAL(18,2) DEFAULT 0,
memo TEXT
);
2.1.2 Planning Service
Responsibility: Budget management, forecasting, scenario modeling, and driver-based planning.
Technology Stack:
- Runtime: Python 3.12 / FastAPI
- ML Engine: NeuralProphet, XGBoost
- Database: PostgreSQL + TimescaleDB
Key Components:
Forecasting Architecture:
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
import numpy as np
class ForecastMethod(Enum):
NEURAL_PROPHET = "neural_prophet"
ARIMA = "arima"
XGBOOST = "xgboost"
ENSEMBLE = "ensemble"
@dataclass
class ForecastConfig:
method: ForecastMethod
horizon_weeks: int = 13
confidence_level: float = 0.95
include_holidays: bool = True
include_regressors: bool = True
auto_seasonality: bool = True
@dataclass
class ForecastResult:
values: List[float]
lower_bound: List[float]
upper_bound: List[float]
confidence: float
method_used: ForecastMethod
feature_importance: Optional[dict] = None
2.1.3 Reporting Service
Responsibility: Financial statement generation, variance analysis, KPI dashboards, and NLG commentary.
Key Components:
2.2 AI/ML Services
2.2.1 Orchestrator Agent (LangGraph)
Responsibility: Coordinate AI agents, manage conversation state, route tasks to specialized agents.
Architecture:
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
task_type: str
current_agent: str
context: dict
results: dict
compliance_status: str
def create_fpa_orchestrator() -> StateGraph:
workflow = StateGraph(AgentState)
# Add nodes for each specialized agent
workflow.add_node("classifier", classify_task)
workflow.add_node("reconciliation_agent", reconciliation_agent)
workflow.add_node("variance_agent", variance_analysis_agent)
workflow.add_node("forecast_agent", forecasting_agent)
workflow.add_node("compliance_agent", compliance_check_agent)
workflow.add_node("synthesizer", synthesize_results)
# Define routing logic
workflow.add_conditional_edges(
"classifier",
route_to_agent,
{
"reconciliation": "reconciliation_agent",
"variance": "variance_agent",
"forecast": "forecast_agent",
"compliance": "compliance_agent",
}
)
# All agents go through compliance check
for agent in ["reconciliation_agent", "variance_agent", "forecast_agent"]:
workflow.add_edge(agent, "compliance_agent")
workflow.add_edge("compliance_agent", "synthesizer")
workflow.add_edge("synthesizer", END)
workflow.set_entry_point("classifier")
return workflow.compile(checkpointer=PostgresSaver.from_conn_string(DB_URL))
Agent Workflow Diagram:
2.2.2 LLM Service (DeepSeek-R1)
Deployment Architecture:
# Kubernetes deployment for DeepSeek-R1
apiVersion: apps/v1
kind: Deployment
metadata:
name: deepseek-r1-inference
spec:
replicas: 2
selector:
matchLabels:
app: deepseek-r1
template:
spec:
containers:
- name: vllm
image: vllm/vllm-openai:latest
args:
- "--model"
- "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
- "--tensor-parallel-size"
- "2"
- "--max-model-len"
- "32768"
- "--gpu-memory-utilization"
- "0.90"
resources:
limits:
nvidia.com/gpu: 2
memory: 80Gi
requests:
nvidia.com/gpu: 2
memory: 64Gi
ports:
- containerPort: 8000
2.3 Integration Layer
2.3.1 ELT Engine (Airbyte)
Connector Architecture:
2.3.2 Event Bus (Kafka/NATS)
Event Schema:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"event_id": {
"type": "string",
"format": "uuid"
},
"event_type": {
"type": "string",
"enum": [
"journal_entry.created",
"journal_entry.posted",
"reconciliation.completed",
"forecast.generated",
"variance.analyzed",
"compliance.checked"
]
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"tenant_id": {
"type": "string",
"format": "uuid"
},
"correlation_id": {
"type": "string",
"format": "uuid"
},
"payload": {
"type": "object"
},
"metadata": {
"type": "object",
"properties": {
"user_id": { "type": "string" },
"source_service": { "type": "string" },
"version": { "type": "string" }
}
}
},
"required": ["event_id", "event_type", "timestamp", "tenant_id", "payload"]
}
3. Data Architecture
3.1 Database Strategy
| Database | Purpose | Data Type |
|---|---|---|
| PostgreSQL 16 | OLTP, application data | Structured, transactional |
| TimescaleDB | Time-series analytics | Metrics, forecasts |
| immudb | Audit trail | Immutable logs |
| Redis 7 | Caching, sessions | Ephemeral |
| pgvector | Vector embeddings | AI/ML |
3.2 Multi-Tenant Data Model
Row-Level Security Pattern:
-- Enable RLS on all tenant tables
ALTER TABLE accounts ENABLE ROW LEVEL SECURITY;
ALTER TABLE journal_entries ENABLE ROW LEVEL SECURITY;
ALTER TABLE journal_lines ENABLE ROW LEVEL SECURITY;
-- Create policies
CREATE POLICY tenant_isolation ON accounts
USING (tenant_id = current_setting('app.current_tenant')::UUID);
CREATE POLICY tenant_isolation ON journal_entries
USING (tenant_id = current_setting('app.current_tenant')::UUID);
-- Application sets tenant context
SET app.current_tenant = '550e8400-e29b-41d4-a716-446655440000';
3.3 Data Flow Architecture
4. Security Architecture
4.1 Authentication & Authorization
OpenFGA Integration:
# OpenFGA Authorization Model (DSL)
model
schema 1.1
type user
type organization
relations
define member: [user]
define admin: [user]
define owner: [user]
type tenant
relations
define organization: [organization]
define viewer: member from organization
define editor: admin from organization
define admin: owner from organization
type account
relations
define tenant: [tenant]
define can_view: viewer from tenant
define can_edit: editor from tenant
type journal_entry
relations
define tenant: [tenant]
define author: [user]
define can_view: can_view from tenant
define can_edit: author or (can_edit from tenant)
define can_approve: admin from tenant but not author
4.2 Audit Trail Architecture
immudb Integration:
from immudb import ImmudbClient
from dataclasses import dataclass
from datetime import datetime
import json
import hashlib
@dataclass
class AuditEntry:
timestamp: datetime
tenant_id: str
user_id: str
action: str
resource_type: str
resource_id: str
old_value: dict
new_value: dict
ip_address: str
user_agent: str
class AuditTrailService:
def __init__(self, immudb_client: ImmudbClient):
self.client = immudb_client
async def log_event(self, entry: AuditEntry) -> str:
payload = json.dumps({
"timestamp": entry.timestamp.isoformat(),
"tenant_id": entry.tenant_id,
"user_id": entry.user_id,
"action": entry.action,
"resource_type": entry.resource_type,
"resource_id": entry.resource_id,
"old_value": entry.old_value,
"new_value": entry.new_value,
"ip_address": entry.ip_address,
"user_agent": entry.user_agent,
})
key = f"audit:{entry.tenant_id}:{entry.resource_type}:{entry.resource_id}:{entry.timestamp.timestamp()}"
tx = await self.client.set(key.encode(), payload.encode())
return tx.id
async def verify_integrity(self, tx_id: int) -> bool:
entry = await self.client.txById(tx_id)
# Verify Merkle tree inclusion proof
return await self.client.verifyTx(tx_id)
5. Integration Architecture
5.1 API Design
OpenAPI Specification (excerpt):
openapi: 3.1.0
info:
title: FP&A Platform API
version: 1.0.0
paths:
/api/v1/journal-entries:
post:
operationId: createJournalEntry
tags: [Journal Entries]
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/JournalEntryCreate'
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/JournalEntry'
'400':
$ref: '#/components/responses/ValidationError'
'401':
$ref: '#/components/responses/Unauthorized'
'403':
$ref: '#/components/responses/Forbidden'
components:
schemas:
JournalEntryCreate:
type: object
required:
- entry_date
- lines
properties:
entry_date:
type: string
format: date
description:
type: string
maxLength: 1000
lines:
type: array
minItems: 2
items:
$ref: '#/components/schemas/JournalLineCreate'
JournalLineCreate:
type: object
required:
- account_id
properties:
account_id:
type: string
format: uuid
debit:
type: number
minimum: 0
credit:
type: number
minimum: 0
memo:
type: string
5.2 ERP Connector Interface
from abc import ABC, abstractmethod
from typing import List, AsyncIterator
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Transaction:
id: str
date: datetime
account_code: str
amount: float
description: str
reference: str
metadata: dict
class ERPConnector(ABC):
@abstractmethod
async def authenticate(self) -> bool:
"""Establish connection to ERP system"""
pass
@abstractmethod
async def get_chart_of_accounts(self) -> List[dict]:
"""Retrieve chart of accounts"""
pass
@abstractmethod
async def get_transactions(
self,
start_date: datetime,
end_date: datetime
) -> AsyncIterator[Transaction]:
"""Stream transactions for date range"""
pass
@abstractmethod
async def post_journal_entry(
self,
entry: dict
) -> str:
"""Post journal entry to ERP, return reference"""
pass
# Implementation example for QuickBooks
class QuickBooksConnector(ERPConnector):
def __init__(self, client_id: str, client_secret: str, realm_id: str):
self.client_id = client_id
self.client_secret = client_secret
self.realm_id = realm_id
self.access_token = None
async def authenticate(self) -> bool:
# OAuth 2.0 flow implementation
pass
async def get_chart_of_accounts(self) -> List[dict]:
response = await self._api_call("GET", "/account")
return [self._map_account(acc) for acc in response["QueryResponse"]["Account"]]
6. Deployment Architecture
6.1 Kubernetes Architecture
# Namespace structure
apiVersion: v1
kind: Namespace
metadata:
name: fpa-platform
labels:
istio-injection: enabled
---
# Core services deployment pattern
apiVersion: apps/v1
kind: Deployment
metadata:
name: gl-service
namespace: fpa-platform
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: gl-service
template:
metadata:
labels:
app: gl-service
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
spec:
serviceAccountName: gl-service
containers:
- name: gl-service
image: fpa-platform/gl-service:v1.0.0
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-credentials
key: url
- name: KAFKA_BROKERS
value: "kafka-cluster:9092"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
6.2 Infrastructure Diagram
7. Observability Architecture
7.1 Monitoring Stack
# Prometheus configuration
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
# Key metrics to collect
# - fpa_journal_entries_total
# - fpa_reconciliation_match_rate
# - fpa_forecast_accuracy
# - fpa_variance_analysis_duration_seconds
# - fpa_api_request_duration_seconds
# - fpa_llm_inference_duration_seconds
# - fpa_compliance_check_results
7.2 Distributed Tracing
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
# Initialize tracing
trace.set_tracer_provider(TracerProvider())
otlp_exporter = OTLPSpanExporter(endpoint="otel-collector:4317")
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_exporter))
# Instrument frameworks
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine)
# Custom span example
tracer = trace.get_tracer(__name__)
async def reconcile_transactions(tenant_id: str, account_id: str):
with tracer.start_as_current_span("reconcile_transactions") as span:
span.set_attribute("tenant_id", tenant_id)
span.set_attribute("account_id", account_id)
# Reconciliation logic
matched_count = await perform_matching()
span.set_attribute("matched_count", matched_count)
return matched_count
8. Appendices
Appendix A: Technology Stack Summary
| Layer | Technology | Version |
|---|---|---|
| Frontend | React + TypeScript | 18.x |
| API | FastAPI | 0.109+ |
| Agent Framework | LangGraph | 0.1+ |
| LLM | DeepSeek-R1-32B | Latest |
| Forecasting | NeuralProphet | 0.8+ |
| Database | PostgreSQL | 16 |
| Time Series | TimescaleDB | 2.x |
| Audit | immudb | 1.9+ |
| Cache | Redis | 7.x |
| Events | Kafka | 3.x |
| ELT | Airbyte | 0.50+ |
| Orchestration | Dagster | 1.6+ |
| Auth | OpenFGA + Keycloak | Latest |
| Container | Kubernetes | 1.28+ |
| Observability | OpenTelemetry | 1.x |
Appendix B: Related Documents
- SPEC-001: FPA-PRD.md (Product Requirements)
- SPEC-003: FPA-TDD.md (Technical Design)
- ADR-001 through ADR-015 (Architecture Decisions)
Appendix C: Revision History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-02-03 | Claude | Initial SDD creation |