Skip to main content

FP&A Platform — Test Strategy Document

Version: 1.0
Last Updated: 2026-02-03
Document ID: SPEC-004
Classification: Internal


1. Executive Summary

This document defines the comprehensive test strategy for the AI-First FP&A Platform, covering all test levels from unit testing through compliance validation. The strategy ensures quality gates are enforced at every stage while maintaining velocity for rapid iteration.

Key Metrics Targets:

  • Code Coverage: ≥80% (unit), ≥70% (integration)
  • Defect Escape Rate: <2% to production
  • Test Automation: ≥90% of regression suite
  • Mean Time to Detect: <15 minutes for critical issues

2. Test Levels

2.1 Unit Testing

Scope: Individual functions, classes, and components in isolation

Tools:

LanguageFrameworkCoverage ToolMocking
Pythonpytestcoverage.pypytest-mock, responses
TypeScriptJestistanbul/nycjest.mock, msw
SQLpgTAPN/AN/A

Standards:

# Example: Journal Entry Validation Unit Test
import pytest
from decimal import Decimal
from fpa.accounting.journal import JournalEntry, JournalLine, ValidationError

class TestJournalEntryValidation:
"""Unit tests for journal entry validation logic."""

def test_balanced_entry_passes_validation(self):
"""Journal entry with equal debits and credits should validate."""
entry = JournalEntry(
entity_id="ent_001",
date="2026-01-31",
description="Test entry"
)
entry.add_line(JournalLine(account="1000", debit=Decimal("100.00")))
entry.add_line(JournalLine(account="2000", credit=Decimal("100.00")))

assert entry.validate() is True
assert entry.is_balanced() is True

def test_unbalanced_entry_fails_validation(self):
"""Journal entry with unequal debits/credits should fail."""
entry = JournalEntry(entity_id="ent_001", date="2026-01-31")
entry.add_line(JournalLine(account="1000", debit=Decimal("100.00")))
entry.add_line(JournalLine(account="2000", credit=Decimal("50.00")))

with pytest.raises(ValidationError) as exc_info:
entry.validate()

assert "Entry is not balanced" in str(exc_info.value)
assert exc_info.value.imbalance == Decimal("50.00")

def test_future_dated_entry_requires_approval(self):
"""Future-dated entries should require additional approval."""
entry = JournalEntry(
entity_id="ent_001",
date="2026-12-31", # Future date
description="Future entry"
)
entry.add_line(JournalLine(account="1000", debit=Decimal("100.00")))
entry.add_line(JournalLine(account="2000", credit=Decimal("100.00")))

assert entry.requires_approval is True
assert "future_dated" in entry.approval_reasons

Coverage Thresholds:

# pytest.ini / pyproject.toml
[tool.coverage.report]
fail_under = 80
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise NotImplementedError",
"if TYPE_CHECKING:",
]

[tool.coverage.run]
branch = true
source = ["fpa"]
omit = ["*/tests/*", "*/migrations/*"]

2.2 Integration Testing

Scope: Service interactions, database operations, external API calls

Tools:

  • Testcontainers: PostgreSQL, Redis, Kafka containers
  • pytest-asyncio: Async service testing
  • httpx: API client testing
  • Factory Boy: Test data generation

Database Integration Tests:

import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

@pytest.fixture(scope="session")
def postgres_container():
"""Spin up PostgreSQL container for integration tests."""
with PostgresContainer("postgres:16-alpine") as postgres:
postgres.with_env("POSTGRES_DB", "fpa_test")
yield postgres

@pytest.fixture
async def db_session(postgres_container):
"""Create async database session."""
engine = create_async_engine(postgres_container.get_connection_url())
async with AsyncSession(engine) as session:
yield session
await session.rollback()

class TestJournalEntryRepository:
"""Integration tests for journal entry persistence."""

async def test_create_and_retrieve_journal_entry(self, db_session):
"""Journal entry should persist and retrieve correctly."""
repo = JournalEntryRepository(db_session)

entry = JournalEntry(
entity_id="ent_001",
date="2026-01-31",
description="Integration test entry"
)
entry.add_line(JournalLine(account="1000", debit=Decimal("100.00")))
entry.add_line(JournalLine(account="2000", credit=Decimal("100.00")))

created = await repo.create(entry)
retrieved = await repo.get_by_id(created.id)

assert retrieved.description == "Integration test entry"
assert len(retrieved.lines) == 2
assert retrieved.total_debit == Decimal("100.00")

async def test_rls_prevents_cross_tenant_access(self, db_session):
"""Row-level security should prevent accessing other tenant data."""
# Set tenant context
await db_session.execute("SET app.tenant_id = 'tenant_A'")

# Create entry for tenant A
repo = JournalEntryRepository(db_session)
entry_a = await repo.create(make_journal_entry(entity_id="ent_A"))

# Switch to tenant B
await db_session.execute("SET app.tenant_id = 'tenant_B'")

# Attempt to access tenant A's entry
result = await repo.get_by_id(entry_a.id)
assert result is None # RLS should filter it out

API Contract Tests:

from pact import Consumer, Provider

class TestGLServiceContract:
"""Contract tests between API Gateway and GL Service."""

def test_create_journal_entry_contract(self):
"""Verify contract for creating journal entries."""
pact = Consumer('APIGateway').has_pact_with(Provider('GLService'))

pact.given(
'a valid journal entry payload'
).upon_receiving(
'a request to create a journal entry'
).with_request(
method='POST',
path='/gl/journal-entries',
headers={'Content-Type': 'application/json'},
body={
'entity_id': 'ent_001',
'date': '2026-01-31',
'description': 'Test entry',
'lines': [
{'account': '1000', 'debit': '100.00'},
{'account': '2000', 'credit': '100.00'}
]
}
).will_respond_with(
status=201,
headers={'Content-Type': 'application/json'},
body={
'id': Like('je_abc123'),
'status': 'draft',
'created_at': Like('2026-01-31T12:00:00Z')
}
)

with pact:
response = gl_client.create_journal_entry(payload)
assert response.status_code == 201

2.3 End-to-End Testing

Scope: Complete user workflows through UI and API

Tools:

  • Playwright: Browser automation
  • pytest-playwright: Python integration

UI E2E Tests:

import pytest
from playwright.sync_api import Page, expect

class TestMonthEndCloseWorkflow:
"""E2E tests for month-end close process."""

def test_complete_month_end_close(self, page: Page, authenticated_user):
"""User should be able to complete full month-end close."""
# Navigate to close checklist
page.goto("/close/2026-01")
expect(page.locator("h1")).to_have_text("January 2026 Close")

# Verify initial status
expect(page.locator("[data-testid='close-status']")).to_have_text("In Progress")

# Complete reconciliation task
page.click("[data-testid='task-bank-recon']")
page.click("[data-testid='mark-complete']")
expect(page.locator("[data-testid='task-bank-recon'] .status")).to_have_text("Complete")

# Run trial balance
page.click("[data-testid='task-trial-balance']")
page.click("[data-testid='generate-tb']")
page.wait_for_selector("[data-testid='tb-balanced']")
expect(page.locator("[data-testid='tb-balanced']")).to_be_visible()

# Generate financial statements
page.click("[data-testid='task-financials']")
page.click("[data-testid='generate-statements']")
page.wait_for_selector("[data-testid='statements-ready']", timeout=30000)

# Close period
page.click("[data-testid='close-period-btn']")
page.click("[data-testid='confirm-close']")

# Verify closed status
expect(page.locator("[data-testid='close-status']")).to_have_text("Closed")
expect(page.locator("[data-testid='closed-by']")).to_have_text(authenticated_user.name)

def test_close_blocked_with_unbalanced_entries(self, page: Page):
"""Period close should be blocked if unposted entries exist."""
page.goto("/close/2026-02")

# Attempt to close with pending entries
page.click("[data-testid='close-period-btn']")

# Verify blocker message
expect(page.locator("[data-testid='close-blocker']")).to_be_visible()
expect(page.locator("[data-testid='blocker-reason']")).to_contain_text(
"3 unposted journal entries"
)

2.4 Performance Testing

Scope: Load testing, stress testing, endurance testing

Tools:

  • k6: Load testing scripting
  • Grafana k6 Cloud: Distributed load generation
  • Prometheus: Metrics collection

Load Test Scripts:

// k6/scenarios/month-end-close.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const trialBalanceDuration = new Trend('trial_balance_duration');

export const options = {
scenarios: {
// Simulate 50 concurrent users during month-end close
month_end_load: {
executor: 'ramping-vus',
startVUs: 0,
stages: [
{ duration: '2m', target: 50 }, // Ramp up
{ duration: '10m', target: 50 }, // Sustained load
{ duration: '2m', target: 100 }, // Spike
{ duration: '5m', target: 100 }, // Sustained spike
{ duration: '2m', target: 0 }, // Ramp down
],
},
},
thresholds: {
http_req_duration: ['p(95)<2000', 'p(99)<5000'],
errors: ['rate<0.01'],
trial_balance_duration: ['p(95)<10000'],
},
};

export default function () {
const baseUrl = __ENV.BASE_URL || 'https://api.fpa-platform.com';
const token = __ENV.AUTH_TOKEN;

const headers = {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json',
};

// Simulate trial balance generation
const start = Date.now();
const tbResponse = http.get(
`${baseUrl}/gl/trial-balance?entity_id=ent_001&period=2026-01`,
{ headers }
);
trialBalanceDuration.add(Date.now() - start);

check(tbResponse, {
'trial balance status 200': (r) => r.status === 200,
'trial balance is balanced': (r) => {
const body = JSON.parse(r.body);
return body.total_debit === body.total_credit;
},
}) || errorRate.add(1);

sleep(Math.random() * 3 + 1); // 1-4 second think time
}

Performance Baselines:

OperationP50P95P99Target
API Authentication50ms150ms300ms<500ms
Journal Entry Create100ms300ms500ms<1s
Trial Balance (1K accounts)500ms2s5s<10s
Reconciliation Match (10K txns)5s15s30s<60s
Forecast Generation10s30s60s<120s
Report Generation3s10s20s<30s

2.5 Security Testing

Scope: Vulnerability assessment, penetration testing, security scanning

Tools:

  • OWASP ZAP: Dynamic application security testing
  • Snyk: Dependency vulnerability scanning
  • Trivy: Container image scanning
  • Checkov: Infrastructure-as-code scanning

SAST Configuration:

# .github/workflows/security-scan.yml
name: Security Scanning

on: [push, pull_request]

jobs:
dependency-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Run Snyk to check for vulnerabilities
uses: snyk/actions/python@master
with:
args: --severity-threshold=high
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}

- name: Upload results to GitHub Security
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: snyk.sarif

container-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Build image
run: docker build -t fpa-platform:${{ github.sha }} .

- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@master
with:
image-ref: 'fpa-platform:${{ github.sha }}'
severity: 'CRITICAL,HIGH'
exit-code: '1'

iac-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Run Checkov
uses: bridgecrewio/checkov-action@master
with:
directory: terraform/
framework: terraform
soft_fail: false

DAST Configuration (OWASP ZAP):

# zap-config.yaml
env:
contexts:
- name: "FPA Platform"
urls:
- "https://staging.fpa-platform.com"
includePaths:
- "https://staging.fpa-platform.com/.*"
excludePaths:
- ".*\\.js$"
- ".*\\.css$"
authentication:
method: "script"
parameters:
script: "auth-script.js"
username: "${ZAP_USERNAME}"
password: "${ZAP_PASSWORD}"

jobs:
- type: spider
parameters:
maxDuration: 30
maxDepth: 10

- type: spiderAjax
parameters:
maxDuration: 30

- type: activeScan
parameters:
maxRuleDurationInMins: 5
maxScanDurationInMins: 60

- type: report
parameters:
template: "sarif-json"
reportFile: "zap-results.sarif"

2.6 Chaos Testing

Scope: Fault injection, resilience validation

Tools:

  • Chaos Mesh: Kubernetes chaos engineering
  • Gremlin: Commercial chaos platform
  • Litmus: CNCF chaos engineering

Chaos Experiments:

# chaos-mesh/database-failure.yaml
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: postgres-primary-kill
namespace: fpa-platform
spec:
action: pod-kill
mode: one
selector:
namespaces:
- fpa-platform
labelSelectors:
app: postgresql
role: primary
scheduler:
cron: "@every 24h" # Run daily in staging

---
# chaos-mesh/network-partition.yaml
apiVersion: chaos-mesh.org/v1alpha1
kind: NetworkChaos
metadata:
name: gl-service-partition
namespace: fpa-platform
spec:
action: partition
mode: all
selector:
namespaces:
- fpa-platform
labelSelectors:
app: gl-service
direction: both
target:
selector:
namespaces:
- fpa-platform
labelSelectors:
app: reconciliation-service
duration: "5m"

3. Compliance Testing

3.1 SOX Control Testing

Automated Control Validation:

class TestSOXControls:
"""Automated SOX Section 404 control testing."""

async def test_sox_itgc_01_access_control(self, db_session, audit_log):
"""ITGC-01: User access is appropriately provisioned and reviewed."""
# Verify no orphaned accounts
orphaned = await db_session.execute("""
SELECT u.id, u.email
FROM users u
LEFT JOIN user_roles ur ON u.id = ur.user_id
WHERE ur.id IS NULL AND u.status = 'active'
""")
assert orphaned.rowcount == 0, f"Found {orphaned.rowcount} orphaned active users"

# Verify privileged access is limited
admins = await db_session.execute("""
SELECT COUNT(*) FROM user_roles
WHERE role = 'admin' AND status = 'active'
""")
assert admins.scalar() <= 5, "Too many active admin accounts"

# Verify access reviews completed
last_review = await db_session.execute("""
SELECT MAX(completed_at) FROM access_reviews
WHERE review_type = 'quarterly'
""")
review_date = last_review.scalar()
assert review_date is not None
assert (datetime.now() - review_date).days <= 90, "Access review overdue"

async def test_sox_itgc_02_segregation_of_duties(self, db_session):
"""ITGC-02: Incompatible duties are segregated."""
# Verify no user can both create and approve journal entries
violations = await db_session.execute("""
SELECT u.id, u.email
FROM users u
JOIN user_permissions up1 ON u.id = up1.user_id AND up1.permission = 'journal_entry.create'
JOIN user_permissions up2 ON u.id = up2.user_id AND up2.permission = 'journal_entry.approve'
WHERE u.status = 'active'
""")
assert violations.rowcount == 0, f"SoD violation: {violations.rowcount} users can create AND approve"

async def test_sox_itgc_03_change_management(self, db_session):
"""ITGC-03: Changes follow documented procedures."""
# Verify all production changes have approvals
unapproved = await db_session.execute("""
SELECT d.id, d.deployed_at
FROM deployments d
WHERE d.environment = 'production'
AND d.approval_id IS NULL
AND d.deployed_at > NOW() - INTERVAL '90 days'
""")
assert unapproved.rowcount == 0, "Found unapproved production deployments"

async def test_sox_ac_01_journal_entry_controls(self, db_session, audit_log):
"""AC-01: Journal entries require appropriate approval."""
# Sample 25 journal entries from the period
sample = await db_session.execute("""
SELECT je.id, je.amount, je.status, je.approved_by, je.created_by
FROM journal_entries je
WHERE je.posted_at BETWEEN '2026-01-01' AND '2026-01-31'
AND je.amount > 10000 -- Material entries
ORDER BY RANDOM()
LIMIT 25
""")

for entry in sample:
assert entry.status == 'posted', f"Entry {entry.id} not posted"
assert entry.approved_by is not None, f"Entry {entry.id} missing approval"
assert entry.approved_by != entry.created_by, f"Entry {entry.id} self-approved"

# Verify audit trail exists
audit = await audit_log.get_events(
object_type='journal_entry',
object_id=entry.id,
action='approve'
)
assert len(audit) >= 1, f"Entry {entry.id} missing approval audit trail"

3.2 HIPAA Testing (Healthcare Customers)

class TestHIPAACompliance:
"""HIPAA technical safeguard validation."""

async def test_hipaa_access_control(self, db_session):
"""164.312(a)(1): Unique user identification."""
# Verify no shared accounts
shared = await db_session.execute("""
SELECT email, COUNT(*) as login_count
FROM user_sessions
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY email
HAVING COUNT(DISTINCT ip_address) > 10
""")
assert shared.rowcount == 0, "Potential shared account detected"

async def test_hipaa_audit_controls(self, audit_log):
"""164.312(b): Audit controls for PHI access."""
# Verify PHI access is logged
phi_tables = ['patients', 'diagnoses', 'treatments', 'prescriptions']

for table in phi_tables:
recent_access = await audit_log.get_events(
object_type=table,
start_time=datetime.now() - timedelta(hours=24)
)
# Verify each access has required fields
for event in recent_access:
assert event.user_id is not None, f"PHI access missing user_id"
assert event.ip_address is not None, f"PHI access missing IP"
assert event.timestamp is not None, f"PHI access missing timestamp"
assert event.action in ['read', 'write', 'delete'], f"Invalid action"

async def test_hipaa_transmission_security(self):
"""164.312(e)(1): Encryption of PHI in transit."""
# Verify TLS is enforced
import ssl
import socket

hostname = 'api.fpa-platform.com'
context = ssl.create_default_context()

with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
protocol = ssock.version()

assert protocol in ['TLSv1.2', 'TLSv1.3'], f"Weak TLS: {protocol}"
assert cert is not None, "No certificate presented"

3.3 FDA 21 CFR Part 11 Testing

class TestFDA21CFRPart11:
"""FDA electronic records and signatures validation."""

async def test_11_10_a_validation(self, db_session):
"""11.10(a): System validation for accuracy and reliability."""
# Verify calculation accuracy
test_cases = [
{"input": [100, 200, 300], "expected_sum": 600},
{"input": [1.23, 4.56, 7.89], "expected_sum": 13.68},
]

for case in test_cases:
result = await db_session.execute("""
SELECT SUM(amount) FROM (
SELECT unnest(:amounts::decimal[]) as amount
) t
""", {"amounts": case["input"]})
assert result.scalar() == case["expected_sum"]

async def test_11_10_e_audit_trail(self, audit_log, db_session):
"""11.10(e): Secure, computer-generated, time-stamped audit trails."""
# Create test record
record_id = await db_session.execute("""
INSERT INTO test_records (value, created_by)
VALUES ('original', 'test_user')
RETURNING id
""")
record_id = record_id.scalar()

# Modify record
await db_session.execute("""
UPDATE test_records SET value = 'modified' WHERE id = :id
""", {"id": record_id})

# Verify audit trail
audit = await audit_log.get_events(
object_type='test_records',
object_id=record_id
)

assert len(audit) >= 2, "Missing audit trail entries"

# Verify audit trail is immutable (using immudb)
for event in audit:
verification = await audit_log.verify_integrity(event.id)
assert verification.valid is True, f"Audit trail tampered: {event.id}"

async def test_11_50_signature_manifestations(self, db_session):
"""11.50: Signature manifestations include name, date/time, meaning."""
signatures = await db_session.execute("""
SELECT es.* FROM electronic_signatures es
WHERE es.signed_at > NOW() - INTERVAL '30 days'
LIMIT 100
""")

for sig in signatures:
assert sig.signer_name is not None, "Signature missing signer name"
assert sig.signed_at is not None, "Signature missing timestamp"
assert sig.meaning is not None, "Signature missing meaning"
assert sig.meaning in [
'approved', 'reviewed', 'authored',
'verified', 'rejected'
], f"Invalid signature meaning: {sig.meaning}"

4. AI/ML Testing

4.1 Model Accuracy Testing

class TestReconciliationModel:
"""Tests for bank reconciliation ML model."""

@pytest.fixture
def test_dataset(self):
"""Load labeled test dataset."""
return pd.read_parquet("tests/data/recon_test_set.parquet")

def test_match_accuracy(self, test_dataset, model):
"""Model should achieve ≥85% match accuracy."""
predictions = model.predict(test_dataset.features)
accuracy = accuracy_score(test_dataset.labels, predictions)

assert accuracy >= 0.85, f"Match accuracy {accuracy:.2%} below threshold"

def test_precision_recall(self, test_dataset, model):
"""Model should maintain precision/recall balance."""
predictions = model.predict(test_dataset.features)

precision = precision_score(test_dataset.labels, predictions)
recall = recall_score(test_dataset.labels, predictions)

assert precision >= 0.90, f"Precision {precision:.2%} too low"
assert recall >= 0.80, f"Recall {recall:.2%} too low"

def test_confidence_calibration(self, test_dataset, model):
"""High-confidence predictions should be accurate."""
predictions = model.predict_proba(test_dataset.features)

# Bin by confidence
for threshold in [0.9, 0.95, 0.99]:
high_conf_mask = predictions.max(axis=1) >= threshold
high_conf_accuracy = accuracy_score(
test_dataset.labels[high_conf_mask],
predictions[high_conf_mask].argmax(axis=1)
)

assert high_conf_accuracy >= threshold, \
f"Confidence {threshold} accuracy {high_conf_accuracy:.2%} miscalibrated"

def test_no_data_leakage(self, model):
"""Model should not use future information."""
# Test with timestamp-ordered data
train_data = load_data(end_date="2025-12-31")
test_data = load_data(start_date="2026-01-01")

# Ensure no test data in training
train_ids = set(train_data.transaction_id)
test_ids = set(test_data.transaction_id)

assert train_ids.isdisjoint(test_ids), "Data leakage detected"


class TestForecastingModel:
"""Tests for cash flow forecasting model."""

def test_forecast_mape(self, historical_data, model):
"""Forecast MAPE should be ≤10%."""
forecasts = model.predict(historical_data, horizon=13) # 13 weeks
actuals = load_actuals(historical_data.end_date, weeks=13)

mape = mean_absolute_percentage_error(actuals, forecasts)
assert mape <= 0.10, f"MAPE {mape:.2%} exceeds threshold"

def test_prediction_intervals(self, historical_data, model):
"""90% prediction intervals should contain 90% of actuals."""
forecasts = model.predict(historical_data, horizon=13, intervals=[0.9])
actuals = load_actuals(historical_data.end_date, weeks=13)

coverage = calculate_interval_coverage(
actuals,
forecasts.lower_90,
forecasts.upper_90
)

assert coverage >= 0.85, f"90% interval only covers {coverage:.2%}"

4.2 Bias Detection

class TestModelFairness:
"""Fairness and bias testing for AI models."""

def test_demographic_parity(self, model, test_data):
"""Model predictions should not vary by protected attributes."""
# Test across entity sizes (proxy for customer size)
small_entities = test_data[test_data.entity_size == 'small']
large_entities = test_data[test_data.entity_size == 'large']

small_match_rate = model.predict(small_entities).mean()
large_match_rate = model.predict(large_entities).mean()

# Match rates should be within 5% of each other
assert abs(small_match_rate - large_match_rate) < 0.05, \
f"Size bias detected: small={small_match_rate:.2%}, large={large_match_rate:.2%}"

def test_error_rate_parity(self, model, test_data):
"""Error rates should be similar across segments."""
segments = test_data.groupby('industry')

error_rates = {}
for industry, segment in segments:
preds = model.predict(segment)
error_rate = 1 - accuracy_score(segment.labels, preds)
error_rates[industry] = error_rate

max_error = max(error_rates.values())
min_error = min(error_rates.values())

assert max_error - min_error < 0.10, \
f"Error rate disparity: {error_rates}"

4.3 Hallucination Detection

class TestNLGAccuracy:
"""Tests for natural language generation accuracy."""

async def test_factual_grounding(self, nlg_agent, test_cases):
"""Generated commentary should be factually grounded."""
for case in test_cases:
commentary = await nlg_agent.generate(
variance_data=case.variance_data,
context=case.context
)

# Extract factual claims
claims = extract_claims(commentary)

for claim in claims:
# Verify each claim against source data
verified = verify_claim(claim, case.variance_data)
assert verified, f"Ungrounded claim: {claim}"

async def test_no_invented_numbers(self, nlg_agent, test_cases):
"""Generated text should not contain invented numbers."""
for case in test_cases:
commentary = await nlg_agent.generate(case.data)

# Extract all numbers from commentary
numbers = extract_numbers(commentary)
source_numbers = extract_numbers(str(case.data))

for num in numbers:
# Allow for rounding/formatting differences
assert any(
abs(num - src) / max(abs(src), 1) < 0.01
for src in source_numbers
), f"Invented number: {num}"

async def test_citation_accuracy(self, nlg_agent):
"""Citations should point to correct sources."""
commentary = await nlg_agent.generate_with_citations(test_data)

for citation in commentary.citations:
# Verify citation source exists
source = await fetch_source(citation.source_id)
assert source is not None, f"Invalid citation: {citation.source_id}"

# Verify cited text appears in source
assert citation.quoted_text in source.content, \
f"Citation mismatch: {citation.quoted_text}"

5. Data Quality Testing

5.1 Great Expectations Integration

# great_expectations/expectations/fpa_expectations.py
import great_expectations as gx

context = gx.get_context()

# Define expectation suite for journal entries
suite = context.add_or_update_expectation_suite("journal_entries_suite")

# Basic validity
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="entity_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="date")
)

# Accounting equation
suite.add_expectation(
gx.expectations.ExpectColumnPairValuesToBeEqual(
column_A="total_debit",
column_B="total_credit",
mostly=1.0 # Must always balance
)
)

# Referential integrity
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status",
value_set=["draft", "pending_approval", "approved", "posted", "reversed"]
)
)

# Date validity
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="date",
min_value="2020-01-01",
max_value={"$PARAMETER": "today"}
)
)

5.2 dbt Tests

# dbt/models/marts/fct_journal_entries.yml
version: 2

models:
- name: fct_journal_entries
description: "Fact table for journal entries"
columns:
- name: journal_entry_id
tests:
- unique
- not_null

- name: entity_id
tests:
- not_null
- relationships:
to: ref('dim_legal_entities')
field: entity_id

- name: posted_date
tests:
- not_null
- dbt_utils.accepted_range:
min_value: "'2020-01-01'"
max_value: "current_date"

- name: total_debit
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"

- name: total_credit
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"

tests:
# Custom test: entries must balance
- dbt_utils.expression_is_true:
expression: "ABS(total_debit - total_credit) < 0.01"
config:
severity: error

# Custom test: no future-dated posted entries
- dbt_utils.expression_is_true:
expression: "posted_date <= current_date OR status != 'posted'"

6. Test Environments

6.1 Environment Matrix

EnvironmentPurposeDataIntegrationsAccess
LocalDeveloper testingFixtures/mocksMockedIndividual
CIAutomated testingSyntheticTestcontainersAutomated
DevFeature developmentSyntheticSandbox APIsTeam
StagingPre-release validationAnonymized prodProduction APIsQA Team
ProductionLive systemRealProductionOperations

6.2 Test Data Management

# tests/factories.py
import factory
from factory.alchemy import SQLAlchemyModelFactory
from faker import Faker

fake = Faker()

class TenantFactory(SQLAlchemyModelFactory):
class Meta:
model = Tenant

id = factory.LazyFunction(lambda: f"tenant_{fake.uuid4()[:8]}")
name = factory.Faker('company')
status = 'active'

class LegalEntityFactory(SQLAlchemyModelFactory):
class Meta:
model = LegalEntity

id = factory.LazyFunction(lambda: f"ent_{fake.uuid4()[:8]}")
tenant = factory.SubFactory(TenantFactory)
name = factory.Faker('company')
currency = 'USD'

class JournalEntryFactory(SQLAlchemyModelFactory):
class Meta:
model = JournalEntry

id = factory.LazyFunction(lambda: f"je_{fake.uuid4()[:8]}")
entity = factory.SubFactory(LegalEntityFactory)
date = factory.Faker('date_this_year')
description = factory.Faker('sentence')
status = 'draft'

@factory.post_generation
def lines(self, create, extracted, **kwargs):
if extracted:
for line in extracted:
self.lines.append(line)
else:
# Create balanced entry by default
amount = fake.pydecimal(min_value=100, max_value=10000, right_digits=2)
self.lines.append(JournalLineFactory(
journal_entry=self,
account_id='1000',
debit=amount
))
self.lines.append(JournalLineFactory(
journal_entry=self,
account_id='2000',
credit=amount
))

7. Test Automation & CI/CD

7.1 GitHub Actions Pipeline

# .github/workflows/test.yml
name: Test Suite

on:
push:
branches: [main, develop]
pull_request:
branches: [main]

jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'

- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-test.txt

- name: Run unit tests
run: |
pytest tests/unit \
--cov=fpa \
--cov-report=xml \
--cov-fail-under=80 \
-v

- name: Upload coverage
uses: codecov/codecov-action@v4
with:
files: coverage.xml

integration-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16
env:
POSTGRES_DB: fpa_test
POSTGRES_PASSWORD: test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432

redis:
image: redis:7
ports:
- 6379:6379

steps:
- uses: actions/checkout@v4

- name: Run integration tests
run: |
pytest tests/integration \
--cov=fpa \
--cov-report=xml \
-v
env:
DATABASE_URL: postgresql://postgres:test@localhost:5432/fpa_test
REDIS_URL: redis://localhost:6379

e2e-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install Playwright
run: |
pip install playwright pytest-playwright
playwright install chromium

- name: Run E2E tests
run: |
pytest tests/e2e \
--browser chromium \
--video on \
--screenshot only-on-failure

- name: Upload artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: playwright-results
path: test-results/

compliance-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Run compliance tests
run: |
pytest tests/compliance \
--junitxml=compliance-results.xml \
-v

- name: Generate compliance report
run: python scripts/generate_compliance_report.py

- name: Upload compliance evidence
uses: actions/upload-artifact@v4
with:
name: compliance-evidence
path: compliance-evidence/

8. Acceptance Criteria Templates

8.1 Functional Acceptance Criteria

# features/journal_entry.feature

Feature: Journal Entry Management
As a Finance Manager
I want to create and post journal entries
So that I can record financial transactions accurately

Background:
Given I am logged in as a Finance Manager
And I have access to entity "ACME Corp"
And the period "2026-01" is open

Scenario: Create a balanced journal entry
Given I am on the journal entry creation page
When I enter the following journal lines:
| Account | Description | Debit | Credit |
| 1000 | Cash | 1000 | |
| 4000 | Revenue | | 1000 |
And I enter description "Monthly revenue recognition"
And I select date "2026-01-15"
And I click "Save as Draft"
Then the journal entry should be created with status "Draft"
And the entry should show total debits of $1,000.00
And the entry should show total credits of $1,000.00
And an audit trail entry should be created

Scenario: Reject unbalanced journal entry
Given I am on the journal entry creation page
When I enter the following journal lines:
| Account | Description | Debit | Credit |
| 1000 | Cash | 1000 | |
| 4000 | Revenue | | 500 |
And I click "Save as Draft"
Then I should see error "Entry is not balanced. Difference: $500.00"
And the journal entry should not be saved

Scenario: Approve journal entry with proper segregation
Given there is a draft journal entry created by "john@acme.com"
And I am logged in as "jane@acme.com" with role "Approver"
When I navigate to the pending approvals queue
And I select the journal entry
And I click "Approve"
Then the journal entry status should change to "Approved"
And the approved_by field should show "jane@acme.com"
And an audit trail entry should record the approval

8.2 Non-Functional Acceptance Criteria

# acceptance_criteria/nfr.yaml

performance:
api_response_time:
description: "API endpoints must respond within acceptable time"
criteria:
- endpoint: "/gl/journal-entries"
method: POST
p95_latency_ms: 500
p99_latency_ms: 1000
- endpoint: "/gl/trial-balance"
method: GET
p95_latency_ms: 2000
p99_latency_ms: 5000
- endpoint: "/reports/generate"
method: POST
p95_latency_ms: 10000
p99_latency_ms: 30000

throughput:
description: "System must handle expected load"
criteria:
- scenario: "Month-end close peak"
concurrent_users: 100
transactions_per_second: 50
error_rate_max: 0.01

availability:
uptime:
description: "System availability targets"
criteria:
- tier: "core_services"
target: 99.9%
measurement_period: "monthly"
- tier: "ai_services"
target: 99.5%
measurement_period: "monthly"

security:
authentication:
description: "Authentication requirements"
criteria:
- mfa_required: true
for_roles: ["admin", "approver"]
- session_timeout_minutes: 30
- failed_login_lockout_attempts: 5

encryption:
description: "Data encryption requirements"
criteria:
- data_at_rest: "AES-256"
- data_in_transit: "TLS 1.2+"
- pii_fields: "field-level encryption"

compliance:
audit_trail:
description: "Audit trail requirements"
criteria:
- all_data_changes_logged: true
- immutable_storage: true
- retention_days: 2555 # 7 years
- timestamp_precision: "millisecond"

9. Appendix: Test Metrics Dashboard

Key Metrics to Track

MetricTargetMeasurement
Code Coverage (Unit)≥80%Per PR
Code Coverage (Integration)≥70%Per PR
Test Pass Rate≥99%Per build
Flaky Test Rate<1%Weekly
Mean Time to Detect<15 minPer incident
Defect Escape Rate<2%Monthly
Compliance Test Pass100%Per release
Performance BaselineWithin 10%Per release
Security Scan Clean0 criticalPer PR

Test Strategy Document v1.0 — FP&A Platform Document ID: SPEC-004