Skip to main content

Software Design Document (SDD)

AI-First Open-Source FP&A Platform

Version: 1.0
Date: February 2026
Classification: CODITECT Engineering


1. System Architecture Overview

1.1 Architectural Vision

The platform follows a microservices architecture with event-driven communication, designed for multi-tenancy, horizontal scalability, and regulatory compliance.

1.2 Architecture Principles

PrincipleDescriptionImplementation
Separation of ConcernsEach service owns its domainBounded contexts via DDD
Event SourcingImmutable event logAll state changes as events
CQRSSeparate read/write modelsOptimized query paths
API-FirstContract-driven developmentOpenAPI 3.1 specifications
Zero TrustNever trust, always verifymTLS, RBAC, audit logging
ObservabilityFull system visibilityOpenTelemetry instrumentation

1.3 High-Level Architecture Diagram


2. Component Design

2.1 Core Services

2.1.1 General Ledger Service

Responsibility: Manage chart of accounts, journal entries, trial balance, and financial statement generation.

Technology Stack:

  • Runtime: Python 3.12 / FastAPI
  • Database: PostgreSQL 16 with RLS
  • Cache: Redis 7
  • Events: Kafka/NATS

Key Components:

Data Model:

-- Core entities (simplified)
CREATE TABLE tenants (
id UUID PRIMARY KEY,
name VARCHAR(255) NOT NULL,
settings JSONB DEFAULT '{}'
);

CREATE TABLE accounts (
id UUID PRIMARY KEY,
tenant_id UUID REFERENCES tenants(id),
code VARCHAR(50) NOT NULL,
name VARCHAR(255) NOT NULL,
account_type VARCHAR(20) NOT NULL,
parent_id UUID REFERENCES accounts(id),
is_active BOOLEAN DEFAULT true,
UNIQUE(tenant_id, code)
);

CREATE TABLE journal_entries (
id UUID PRIMARY KEY,
tenant_id UUID REFERENCES tenants(id),
entry_date DATE NOT NULL,
description TEXT,
status VARCHAR(20) DEFAULT 'draft',
posted_at TIMESTAMP,
posted_by UUID,
created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE journal_lines (
id UUID PRIMARY KEY,
journal_entry_id UUID REFERENCES journal_entries(id),
account_id UUID REFERENCES accounts(id),
debit DECIMAL(18,2) DEFAULT 0,
credit DECIMAL(18,2) DEFAULT 0,
memo TEXT
);

2.1.2 Planning Service

Responsibility: Budget management, forecasting, scenario modeling, and driver-based planning.

Technology Stack:

  • Runtime: Python 3.12 / FastAPI
  • ML Engine: NeuralProphet, XGBoost
  • Database: PostgreSQL + TimescaleDB

Key Components:

Forecasting Architecture:

from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
import numpy as np

class ForecastMethod(Enum):
NEURAL_PROPHET = "neural_prophet"
ARIMA = "arima"
XGBOOST = "xgboost"
ENSEMBLE = "ensemble"

@dataclass
class ForecastConfig:
method: ForecastMethod
horizon_weeks: int = 13
confidence_level: float = 0.95
include_holidays: bool = True
include_regressors: bool = True
auto_seasonality: bool = True

@dataclass
class ForecastResult:
values: List[float]
lower_bound: List[float]
upper_bound: List[float]
confidence: float
method_used: ForecastMethod
feature_importance: Optional[dict] = None

2.1.3 Reporting Service

Responsibility: Financial statement generation, variance analysis, KPI dashboards, and NLG commentary.

Key Components:

2.2 AI/ML Services

2.2.1 Orchestrator Agent (LangGraph)

Responsibility: Coordinate AI agents, manage conversation state, route tasks to specialized agents.

Architecture:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
task_type: str
current_agent: str
context: dict
results: dict
compliance_status: str

def create_fpa_orchestrator() -> StateGraph:
workflow = StateGraph(AgentState)

# Add nodes for each specialized agent
workflow.add_node("classifier", classify_task)
workflow.add_node("reconciliation_agent", reconciliation_agent)
workflow.add_node("variance_agent", variance_analysis_agent)
workflow.add_node("forecast_agent", forecasting_agent)
workflow.add_node("compliance_agent", compliance_check_agent)
workflow.add_node("synthesizer", synthesize_results)

# Define routing logic
workflow.add_conditional_edges(
"classifier",
route_to_agent,
{
"reconciliation": "reconciliation_agent",
"variance": "variance_agent",
"forecast": "forecast_agent",
"compliance": "compliance_agent",
}
)

# All agents go through compliance check
for agent in ["reconciliation_agent", "variance_agent", "forecast_agent"]:
workflow.add_edge(agent, "compliance_agent")

workflow.add_edge("compliance_agent", "synthesizer")
workflow.add_edge("synthesizer", END)

workflow.set_entry_point("classifier")

return workflow.compile(checkpointer=PostgresSaver.from_conn_string(DB_URL))

Agent Workflow Diagram:

2.2.2 LLM Service (DeepSeek-R1)

Deployment Architecture:

# Kubernetes deployment for DeepSeek-R1
apiVersion: apps/v1
kind: Deployment
metadata:
name: deepseek-r1-inference
spec:
replicas: 2
selector:
matchLabels:
app: deepseek-r1
template:
spec:
containers:
- name: vllm
image: vllm/vllm-openai:latest
args:
- "--model"
- "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B"
- "--tensor-parallel-size"
- "2"
- "--max-model-len"
- "32768"
- "--gpu-memory-utilization"
- "0.90"
resources:
limits:
nvidia.com/gpu: 2
memory: 80Gi
requests:
nvidia.com/gpu: 2
memory: 64Gi
ports:
- containerPort: 8000

2.3 Integration Layer

2.3.1 ELT Engine (Airbyte)

Connector Architecture:

2.3.2 Event Bus (Kafka/NATS)

Event Schema:

{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"event_id": {
"type": "string",
"format": "uuid"
},
"event_type": {
"type": "string",
"enum": [
"journal_entry.created",
"journal_entry.posted",
"reconciliation.completed",
"forecast.generated",
"variance.analyzed",
"compliance.checked"
]
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"tenant_id": {
"type": "string",
"format": "uuid"
},
"correlation_id": {
"type": "string",
"format": "uuid"
},
"payload": {
"type": "object"
},
"metadata": {
"type": "object",
"properties": {
"user_id": { "type": "string" },
"source_service": { "type": "string" },
"version": { "type": "string" }
}
}
},
"required": ["event_id", "event_type", "timestamp", "tenant_id", "payload"]
}

3. Data Architecture

3.1 Database Strategy

DatabasePurposeData Type
PostgreSQL 16OLTP, application dataStructured, transactional
TimescaleDBTime-series analyticsMetrics, forecasts
immudbAudit trailImmutable logs
Redis 7Caching, sessionsEphemeral
pgvectorVector embeddingsAI/ML

3.2 Multi-Tenant Data Model

Row-Level Security Pattern:

-- Enable RLS on all tenant tables
ALTER TABLE accounts ENABLE ROW LEVEL SECURITY;
ALTER TABLE journal_entries ENABLE ROW LEVEL SECURITY;
ALTER TABLE journal_lines ENABLE ROW LEVEL SECURITY;

-- Create policies
CREATE POLICY tenant_isolation ON accounts
USING (tenant_id = current_setting('app.current_tenant')::UUID);

CREATE POLICY tenant_isolation ON journal_entries
USING (tenant_id = current_setting('app.current_tenant')::UUID);

-- Application sets tenant context
SET app.current_tenant = '550e8400-e29b-41d4-a716-446655440000';

3.3 Data Flow Architecture


4. Security Architecture

4.1 Authentication & Authorization

OpenFGA Integration:

# OpenFGA Authorization Model (DSL)
model
schema 1.1

type user

type organization
relations
define member: [user]
define admin: [user]
define owner: [user]

type tenant
relations
define organization: [organization]
define viewer: member from organization
define editor: admin from organization
define admin: owner from organization

type account
relations
define tenant: [tenant]
define can_view: viewer from tenant
define can_edit: editor from tenant

type journal_entry
relations
define tenant: [tenant]
define author: [user]
define can_view: can_view from tenant
define can_edit: author or (can_edit from tenant)
define can_approve: admin from tenant but not author

4.2 Audit Trail Architecture

immudb Integration:

from immudb import ImmudbClient
from dataclasses import dataclass
from datetime import datetime
import json
import hashlib

@dataclass
class AuditEntry:
timestamp: datetime
tenant_id: str
user_id: str
action: str
resource_type: str
resource_id: str
old_value: dict
new_value: dict
ip_address: str
user_agent: str

class AuditTrailService:
def __init__(self, immudb_client: ImmudbClient):
self.client = immudb_client

async def log_event(self, entry: AuditEntry) -> str:
payload = json.dumps({
"timestamp": entry.timestamp.isoformat(),
"tenant_id": entry.tenant_id,
"user_id": entry.user_id,
"action": entry.action,
"resource_type": entry.resource_type,
"resource_id": entry.resource_id,
"old_value": entry.old_value,
"new_value": entry.new_value,
"ip_address": entry.ip_address,
"user_agent": entry.user_agent,
})

key = f"audit:{entry.tenant_id}:{entry.resource_type}:{entry.resource_id}:{entry.timestamp.timestamp()}"
tx = await self.client.set(key.encode(), payload.encode())

return tx.id

async def verify_integrity(self, tx_id: int) -> bool:
entry = await self.client.txById(tx_id)
# Verify Merkle tree inclusion proof
return await self.client.verifyTx(tx_id)

5. Integration Architecture

5.1 API Design

OpenAPI Specification (excerpt):

openapi: 3.1.0
info:
title: FP&A Platform API
version: 1.0.0

paths:
/api/v1/journal-entries:
post:
operationId: createJournalEntry
tags: [Journal Entries]
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/JournalEntryCreate'
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/JournalEntry'
'400':
$ref: '#/components/responses/ValidationError'
'401':
$ref: '#/components/responses/Unauthorized'
'403':
$ref: '#/components/responses/Forbidden'

components:
schemas:
JournalEntryCreate:
type: object
required:
- entry_date
- lines
properties:
entry_date:
type: string
format: date
description:
type: string
maxLength: 1000
lines:
type: array
minItems: 2
items:
$ref: '#/components/schemas/JournalLineCreate'

JournalLineCreate:
type: object
required:
- account_id
properties:
account_id:
type: string
format: uuid
debit:
type: number
minimum: 0
credit:
type: number
minimum: 0
memo:
type: string

5.2 ERP Connector Interface

from abc import ABC, abstractmethod
from typing import List, AsyncIterator
from dataclasses import dataclass
from datetime import datetime

@dataclass
class Transaction:
id: str
date: datetime
account_code: str
amount: float
description: str
reference: str
metadata: dict

class ERPConnector(ABC):
@abstractmethod
async def authenticate(self) -> bool:
"""Establish connection to ERP system"""
pass

@abstractmethod
async def get_chart_of_accounts(self) -> List[dict]:
"""Retrieve chart of accounts"""
pass

@abstractmethod
async def get_transactions(
self,
start_date: datetime,
end_date: datetime
) -> AsyncIterator[Transaction]:
"""Stream transactions for date range"""
pass

@abstractmethod
async def post_journal_entry(
self,
entry: dict
) -> str:
"""Post journal entry to ERP, return reference"""
pass

# Implementation example for QuickBooks
class QuickBooksConnector(ERPConnector):
def __init__(self, client_id: str, client_secret: str, realm_id: str):
self.client_id = client_id
self.client_secret = client_secret
self.realm_id = realm_id
self.access_token = None

async def authenticate(self) -> bool:
# OAuth 2.0 flow implementation
pass

async def get_chart_of_accounts(self) -> List[dict]:
response = await self._api_call("GET", "/account")
return [self._map_account(acc) for acc in response["QueryResponse"]["Account"]]

6. Deployment Architecture

6.1 Kubernetes Architecture

# Namespace structure
apiVersion: v1
kind: Namespace
metadata:
name: fpa-platform
labels:
istio-injection: enabled

---
# Core services deployment pattern
apiVersion: apps/v1
kind: Deployment
metadata:
name: gl-service
namespace: fpa-platform
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
selector:
matchLabels:
app: gl-service
template:
metadata:
labels:
app: gl-service
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
spec:
serviceAccountName: gl-service
containers:
- name: gl-service
image: fpa-platform/gl-service:v1.0.0
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-credentials
key: url
- name: KAFKA_BROKERS
value: "kafka-cluster:9092"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5

6.2 Infrastructure Diagram


7. Observability Architecture

7.1 Monitoring Stack

# Prometheus configuration
global:
scrape_interval: 15s
evaluation_interval: 15s

scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true

# Key metrics to collect
# - fpa_journal_entries_total
# - fpa_reconciliation_match_rate
# - fpa_forecast_accuracy
# - fpa_variance_analysis_duration_seconds
# - fpa_api_request_duration_seconds
# - fpa_llm_inference_duration_seconds
# - fpa_compliance_check_results

7.2 Distributed Tracing

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

# Initialize tracing
trace.set_tracer_provider(TracerProvider())
otlp_exporter = OTLPSpanExporter(endpoint="otel-collector:4317")
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(otlp_exporter))

# Instrument frameworks
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine)

# Custom span example
tracer = trace.get_tracer(__name__)

async def reconcile_transactions(tenant_id: str, account_id: str):
with tracer.start_as_current_span("reconcile_transactions") as span:
span.set_attribute("tenant_id", tenant_id)
span.set_attribute("account_id", account_id)

# Reconciliation logic
matched_count = await perform_matching()

span.set_attribute("matched_count", matched_count)
return matched_count

8. Appendices

Appendix A: Technology Stack Summary

LayerTechnologyVersion
FrontendReact + TypeScript18.x
APIFastAPI0.109+
Agent FrameworkLangGraph0.1+
LLMDeepSeek-R1-32BLatest
ForecastingNeuralProphet0.8+
DatabasePostgreSQL16
Time SeriesTimescaleDB2.x
Auditimmudb1.9+
CacheRedis7.x
EventsKafka3.x
ELTAirbyte0.50+
OrchestrationDagster1.6+
AuthOpenFGA + KeycloakLatest
ContainerKubernetes1.28+
ObservabilityOpenTelemetry1.x
  • SPEC-001: FPA-PRD.md (Product Requirements)
  • SPEC-003: FPA-TDD.md (Technical Design)
  • ADR-001 through ADR-015 (Architecture Decisions)

Appendix C: Revision History

VersionDateAuthorChanges
1.02026-02-03ClaudeInitial SDD creation