Skip to main content

Track L: Data & Business Intelligence — Comprehensive Evidence Document

Classification: Internal — Data & Analytics Engineering Reference Date: 2026-02-17 Status: Production-Ready Regulatory Context: FDA 21 CFR Part 11, HIPAA, SOC 2 Type II


Executive Summary

This document provides comprehensive implementation guidance for Track L (Data & Business Intelligence) of the BIO-QMS platform. The data and analytics infrastructure supports regulatory compliance reporting, operational insights, and strategic decision-making for biotech and pharmaceutical organizations operating under FDA, HIPAA, and SOC 2 requirements.

Architecture Overview

Key Design Principles

PrincipleImplementationRegulatory Alignment
ImmutabilityRaw layer never modified; all changes via new recordsFDA §11.10(e) audit trail
Multi-tenancyRow-level security, tenant-partitioned tablesHIPAA data isolation
Data LineageFull transformation tracking from source to reportSOC 2 CC7.2 traceability
Real-time + BatchStreaming for operational metrics, batch for complianceHybrid processing model
Self-service AnalyticsLooker Explores for ad-hoc queriesBusiness agility

Technology Stack

LayerTechnologyPurposeScale Target
SourcePostgreSQL 15OLTP operational database100M+ rows/tenant
CDCDebezium 2.4Change data capture<5s latency
StreamingCloud Pub/Sub + DataflowReal-time event processing10K events/sec
WarehouseBigQueryColumnar analytics database10TB+ data
Transformdbt Core 1.6SQL-based transformations1000+ models
BI PlatformLookerSelf-service analytics500+ users
OrchestrationCloud Composer (Airflow 2.7)Workflow scheduling100+ DAGs

L.1: Data Warehouse Architecture

L.1.1: BigQuery Data Warehouse Schema

Dimensional Modeling Approach

The warehouse implements a hybrid dimensional model combining Kimball star schemas for operational reporting with normalized structures for compliance/audit data retention.

BigQuery Schema Definition

Fact Table: Work Orders

-- Fact table: work_orders
CREATE TABLE IF NOT EXISTS `bio_qms_dwh.fact_work_orders` (
-- Surrogate key
work_order_key STRING NOT NULL OPTIONS(description="Surrogate key: tenant_id||work_order_id"),

-- Tenant isolation
tenant_id STRING NOT NULL OPTIONS(description="Tenant identifier for RLS"),

-- Dimension foreign keys
work_order_date DATE NOT NULL OPTIONS(description="FK to dim_date"),
assignee_key STRING NOT NULL OPTIONS(description="FK to dim_person"),
assigner_key STRING NOT NULL OPTIONS(description="FK to dim_person"),
asset_key STRING OPTIONS(description="FK to dim_asset"),
status_key STRING NOT NULL OPTIONS(description="FK to dim_status"),

-- Degenerate dimensions (attributes stored in fact)
work_order_id STRING NOT NULL OPTIONS(description="Natural key from source"),
source_type STRING NOT NULL OPTIONS(description="AUTOMATION|EXTERNAL_VENDOR|MANUAL"),
priority INT64 NOT NULL OPTIONS(description="1=Critical, 5=Low"),

-- Additive measures
billable_hours NUMERIC(10,2) OPTIONS(description="Sum of time entries"),
labor_cost NUMERIC(12,2) OPTIONS(description="Calculated labor cost"),
approval_count INT64 OPTIONS(description="Number of approvals required"),

-- Semi-additive measures (snapshot)
days_to_complete INT64 OPTIONS(description="Days from created to completed"),

-- Flags (boolean dimensions)
regulatory_flag BOOL NOT NULL OPTIONS(description="Requires regulatory approval"),
is_master BOOL NOT NULL OPTIONS(description="Master work order indicator"),
is_completed BOOL NOT NULL OPTIONS(description="Terminal status indicator"),

-- Timestamps
created_at TIMESTAMP NOT NULL OPTIONS(description="Source system creation time"),
completed_at TIMESTAMP OPTIONS(description="Source system completion time"),
scheduled_start TIMESTAMP OPTIONS(description="Planned start time"),
scheduled_end TIMESTAMP OPTIONS(description="Planned end time"),
actual_start TIMESTAMP OPTIONS(description="Actual start time"),
actual_end TIMESTAMP OPTIONS(description="Actual end time"),

-- Audit metadata
etl_inserted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
etl_source STRING NOT NULL DEFAULT 'cdc_postgres'
)
PARTITION BY DATE(work_order_date)
CLUSTER BY tenant_id, status_key, regulatory_flag
OPTIONS(
description="Fact table for work order analytics with daily partitioning",
require_partition_filter=TRUE,
partition_expiration_days=2555 -- 7 years retention
);

-- Row-level security policy
CREATE ROW ACCESS POLICY tenant_isolation_work_orders
ON `bio_qms_dwh.fact_work_orders`
GRANT TO ('serviceAccount:analytics@bio-qms.iam.gserviceaccount.com')
FILTER USING (tenant_id = SESSION_USER());

Dimension Table: Person (SCD Type 2)

-- Dimension table: person (Slowly Changing Dimension Type 2)
CREATE TABLE IF NOT EXISTS `bio_qms_dwh.dim_person` (
-- Surrogate key
person_key STRING NOT NULL OPTIONS(description="Surrogate key: tenant_id||person_id||effective_from"),

-- Natural key
tenant_id STRING NOT NULL OPTIONS(description="Tenant identifier"),
person_id STRING NOT NULL OPTIONS(description="Natural key from source"),

-- Attributes (tracked for changes)
name STRING NOT NULL OPTIONS(description="Full name"),
email STRING NOT NULL OPTIONS(description="Email address"),
department STRING OPTIONS(description="Department name"),
role STRING OPTIONS(description="Role/title"),
active BOOL NOT NULL OPTIONS(description="Active status"),

-- SCD Type 2 metadata
effective_from TIMESTAMP NOT NULL OPTIONS(description="Start of validity period"),
effective_to TIMESTAMP OPTIONS(description="End of validity period (NULL=current)"),
is_current BOOL NOT NULL OPTIONS(description="Current version flag"),
version INT64 NOT NULL OPTIONS(description="Version number starting at 1"),

-- Additional attributes
hire_date DATE OPTIONS(description="Hire date"),
termination_date DATE OPTIONS(description="Termination date if applicable"),

-- PII handling metadata
pii_anonymized BOOL NOT NULL DEFAULT FALSE OPTIONS(description="PII replaced with hash"),

-- Audit metadata
etl_inserted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
etl_updated_at TIMESTAMP,
etl_source STRING NOT NULL DEFAULT 'cdc_postgres'
)
CLUSTER BY tenant_id, person_id, is_current
OPTIONS(
description="Person dimension with SCD Type 2 for historical tracking"
);

-- Row-level security
CREATE ROW ACCESS POLICY tenant_isolation_person
ON `bio_qms_dwh.dim_person`
GRANT TO ('serviceAccount:analytics@bio-qms.iam.gserviceaccount.com')
FILTER USING (tenant_id = SESSION_USER());

Dimension Table: Date

-- Dimension table: date (pre-populated)
CREATE TABLE IF NOT EXISTS `bio_qms_dwh.dim_date` (
date_key DATE NOT NULL OPTIONS(description="Primary key: YYYY-MM-DD"),

-- Calendar attributes
year INT64 NOT NULL,
quarter INT64 NOT NULL,
month INT64 NOT NULL,
week INT64 NOT NULL,
day_of_month INT64 NOT NULL,
day_of_year INT64 NOT NULL,
day_of_week INT64 NOT NULL,

-- Named attributes
month_name STRING NOT NULL,
month_name_short STRING NOT NULL,
day_name STRING NOT NULL,
day_name_short STRING NOT NULL,
quarter_name STRING NOT NULL,

-- Business calendar
is_business_day BOOL NOT NULL OPTIONS(description="Mon-Fri, excluding holidays"),
is_weekend BOOL NOT NULL,
is_holiday BOOL NOT NULL,
holiday_name STRING OPTIONS(description="Holiday name if applicable"),

-- Fiscal calendar (configurable per tenant)
fiscal_year INT64,
fiscal_quarter INT64,
fiscal_month INT64,

-- Relative date helpers
is_current_day BOOL NOT NULL,
is_current_week BOOL NOT NULL,
is_current_month BOOL NOT NULL,
is_current_quarter BOOL NOT NULL,
is_current_year BOOL NOT NULL
)
OPTIONS(
description="Date dimension covering 2020-2030 with calendar and fiscal attributes"
);

-- Populate date dimension (executed once during warehouse setup)
-- See dbt model: models/dimensions/dim_date.sql

Partitioning Strategy

Partition Design Principles:

Table TypePartition ColumnPartition GranularityClusteringRationale
Fact tablesDate column (e.g., work_order_date)Dailytenant_id, status, flagQuery patterns filter by date range + tenant
Dimension tablesNo partitioningN/Atenant_id, natural_keySmall-medium size, full scan acceptable
Audit/log tablesevent_timestampHourlytenant_id, entity_type, entity_idHigh volume, time-based queries
Raw layeringestion_timestampHourlytenant_id, source_tableImmutable append-only, time-based retention

Example: Audit Trail Partitioning

CREATE TABLE IF NOT EXISTS `bio_qms_dwh.fact_audit_trail` (
audit_key STRING NOT NULL,
tenant_id STRING NOT NULL,
entity_type STRING NOT NULL,
entity_id STRING NOT NULL,
action STRING NOT NULL,
performed_by STRING NOT NULL,
performer_type STRING NOT NULL, -- HUMAN|AGENT|SYSTEM
event_timestamp TIMESTAMP NOT NULL,
previous_value JSON,
new_value JSON,
etl_inserted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP()
)
PARTITION BY TIMESTAMP_TRUNC(event_timestamp, HOUR)
CLUSTER BY tenant_id, entity_type, entity_id
OPTIONS(
description="Audit trail fact table with hourly partitioning for compliance queries",
require_partition_filter=TRUE,
partition_expiration_days=NULL -- Never expire (regulatory requirement)
);

Partition Pruning Example:

-- Query optimized by partition pruning (scans only 1 day = 24 partitions)
SELECT
entity_type,
COUNT(*) as event_count
FROM `bio_qms_dwh.fact_audit_trail`
WHERE tenant_id = 'tenant_abc'
AND event_timestamp BETWEEN '2026-02-17 00:00:00' AND '2026-02-17 23:59:59'
AND entity_type = 'WORK_ORDER'
GROUP BY entity_type;

-- Execution plan shows: "Partitions scanned: 24 of 50,000" (0.048% scan)

L.1.2: Data Lake Organization

Three-Layer Architecture

gs://bio-qms-data-lake-{env}/
├── raw/ # Immutable source data
│ ├── cdc_events/ # Change data capture events
│ │ ├── postgres/
│ │ │ ├── work_orders/
│ │ │ │ └── dt=2026-02-17/
│ │ │ │ └── hour=14/
│ │ │ │ ├── 00000.avro
│ │ │ │ └── 00001.avro
│ │ │ ├── persons/
│ │ │ └── audit_trail/
│ │ └── _manifest.json
│ ├── api_events/ # Application events
│ └── external_feeds/ # Third-party data

├── cleaned/ # Validated and standardized
│ ├── work_orders/
│ │ └── dt=2026-02-17/
│ │ └── part-00000.parquet
│ ├── persons/
│ └── audit_trail/

└── curated/ # Business-ready models
├── fact_work_orders/
│ └── dt=2026-02-17/
│ └── part-00000.parquet
├── dim_person/
└── regulatory_reports/

Storage Lifecycle Policies

// Infrastructure-as-code: Terraform configuration
// File: terraform/modules/data-lake/lifecycle.tf

resource "google_storage_bucket" "data_lake_raw" {
name = "bio-qms-data-lake-raw-${var.environment}"
location = "US"
storage_class = "STANDARD"

lifecycle_rule {
condition {
age = 90 // days
}
action {
type = "SetStorageClass"
storage_class = "NEARLINE"
}
}

lifecycle_rule {
condition {
age = 365 // 1 year
}
action {
type = "SetStorageClass"
storage_class = "COLDLINE"
}
}

lifecycle_rule {
condition {
age = 2555 // 7 years (regulatory retention)
}
action {
type = "SetStorageClass"
storage_class = "ARCHIVE"
}
}

// NO deletion rule - regulatory requirement
// Deletion requires manual approval + legal hold check

versioning {
enabled = true // Protect against accidental overwrites
}

uniform_bucket_level_access = true
}

resource "google_storage_bucket" "data_lake_cleaned" {
name = "bio-qms-data-lake-cleaned-${var.environment}"
location = "US"
storage_class = "STANDARD"

lifecycle_rule {
condition {
age = 30 // Cleaned data transitions faster
}
action {
type = "SetStorageClass"
storage_class = "NEARLINE"
}
}

lifecycle_rule {
condition {
age = 180
}
action {
type = "SetStorageClass"
storage_class = "COLDLINE"
}
}
}

resource "google_storage_bucket" "data_lake_curated" {
name = "bio-qms-data-lake-curated-${var.environment}"
location = "US"
storage_class = "STANDARD"

// Curated data stays in STANDARD for fast access
// Exported to BigQuery for querying
}

Cost Optimization Through Lifecycle:

Storage ClassCost/GB/MonthAccess Cost/GBUse CaseTransition After
STANDARD$0.020FreeActive data (0-90 days)90 days
NEARLINE$0.010$0.01Warm data (90-365 days)365 days
COLDLINE$0.004$0.02Cold data (1-7 years)7 years
ARCHIVE$0.0012$0.05Regulatory retention (7+ years)Never

Estimated savings: 76% reduction in storage costs after 7 years compared to STANDARD-only.


L.1.3: Schema Versioning and Migration

Schema Evolution Strategy

Backward Compatibility Rules:

  1. Additive changes only in production

    • New columns: nullable or with defaults
    • New tables: independent of existing tables
    • New views: versioned (e.g., v1_work_orders, v2_work_orders)
  2. Breaking changes require blue-green deployment

    • Column removal → deprecation period (90 days minimum)
    • Type changes → new column + migration + old column deprecation
    • Constraint tightening → validation + cleanup + enforcement
  3. Schema version tracking in metadata table

-- Schema version registry
CREATE TABLE IF NOT EXISTS `bio_qms_dwh.schema_versions` (
version_id STRING NOT NULL,
table_name STRING NOT NULL,
schema_json STRING NOT NULL, -- Full schema as JSON
applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
applied_by STRING NOT NULL,
migration_sql STRING, -- DDL statements executed
rollback_sql STRING, -- Rollback procedure
is_breaking BOOL NOT NULL DEFAULT FALSE,
deprecation_date TIMESTAMP, -- For breaking changes
status STRING NOT NULL DEFAULT 'ACTIVE' -- ACTIVE|DEPRECATED|ROLLED_BACK
)
OPTIONS(
description="Schema version history for data warehouse tables"
);

Blue-Green Schema Migration

Scenario: Change person.department from STRING to STRUCT (breaking change)

-- Step 1: Create new column (non-breaking, additive)
ALTER TABLE `bio_qms_dwh.dim_person`
ADD COLUMN department_v2 STRUCT<
id STRING,
name STRING,
code STRING
>;

-- Step 2: Backfill new column from old column
UPDATE `bio_qms_dwh.dim_person`
SET department_v2 = STRUCT(
GENERATE_UUID() AS id,
department AS name,
UPPER(SUBSTR(department, 1, 4)) AS code
)
WHERE department_v2 IS NULL;

-- Step 3: Create versioned views
CREATE OR REPLACE VIEW `bio_qms_dwh.dim_person_v1` AS
SELECT
person_key,
tenant_id,
person_id,
name,
email,
department, -- Old column
role,
effective_from,
effective_to,
is_current
FROM `bio_qms_dwh.dim_person`;

CREATE OR REPLACE VIEW `bio_qms_dwh.dim_person_v2` AS
SELECT
person_key,
tenant_id,
person_id,
name,
email,
department_v2 AS department, -- New column
role,
effective_from,
effective_to,
is_current
FROM `bio_qms_dwh.dim_person`;

-- Step 4: Update dbt models to use v2 view
-- Step 5: After 90-day deprecation period, drop old column
-- ALTER TABLE `bio_qms_dwh.dim_person` DROP COLUMN department;

Migration Validation:

// dbt test: tests/schema_migrations/test_department_v2_complete.sql
// Ensures all records have new column populated before deprecating old column

{{ config(severity: 'error') }}

SELECT
person_key,
tenant_id,
person_id,
department AS old_value,
department_v2 AS new_value
FROM {{ ref('dim_person') }}
WHERE is_current = TRUE
AND department IS NOT NULL
AND department_v2 IS NULL;

-- Expected: 0 rows (all records migrated)

L.1.4: Multi-Tenant Data Isolation in Warehouse

Row-Level Security (RLS) Implementation

BigQuery RLS Policies:

-- Policy 1: Tenant-based isolation for fact_work_orders
CREATE ROW ACCESS POLICY tenant_rls_fact_work_orders
ON `bio_qms_dwh.fact_work_orders`
GRANT TO ('group:analytics-users@bio-qms.com')
FILTER USING (
tenant_id IN (
SELECT tenant_id
FROM `bio_qms_dwh.user_tenant_access`
WHERE user_email = SESSION_USER()
AND access_expires_at > CURRENT_TIMESTAMP()
)
);

-- Policy 2: Superuser access (data engineering team)
CREATE ROW ACCESS POLICY superuser_access_fact_work_orders
ON `bio_qms_dwh.fact_work_orders`
GRANT TO ('group:data-engineering@bio-qms.com')
FILTER USING (TRUE); -- Full access for administration

-- Policy 3: Service account access (for scheduled reports)
CREATE ROW ACCESS POLICY service_account_fact_work_orders
ON `bio_qms_dwh.fact_work_orders`
GRANT TO ('serviceAccount:looker-sa@bio-qms.iam.gserviceaccount.com')
FILTER USING (
-- Service account uses impersonation, extracts tenant from session
tenant_id = SAFE_CAST(
REGEXP_EXTRACT(SESSION_USER(), r'tenant-(.+?)@') AS STRING
)
);

User-Tenant Access Control Table:

CREATE TABLE IF NOT EXISTS `bio_qms_dwh.user_tenant_access` (
user_email STRING NOT NULL,
tenant_id STRING NOT NULL,
access_level STRING NOT NULL, -- VIEWER|ANALYST|ADMIN
granted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),
granted_by STRING NOT NULL,
access_expires_at TIMESTAMP, -- NULL = no expiration
is_active BOOL NOT NULL DEFAULT TRUE
)
CLUSTER BY user_email, tenant_id
OPTIONS(
description="User access control for multi-tenant data warehouse"
);

-- Sample data
INSERT INTO `bio_qms_dwh.user_tenant_access` VALUES
('analyst@company-a.com', 'tenant_abc', 'ANALYST', CURRENT_TIMESTAMP(), 'admin@bio-qms.com', NULL, TRUE),
('exec@company-a.com', 'tenant_abc', 'VIEWER', CURRENT_TIMESTAMP(), 'admin@bio-qms.com', NULL, TRUE),
('analyst@company-b.com', 'tenant_xyz', 'ANALYST', CURRENT_TIMESTAMP(), 'admin@bio-qms.com', NULL, TRUE);

Tenant Partitioning Strategy

Option 1: Shared Tables with RLS (Recommended for <100 tenants)

-- Single table, RLS enforces isolation
-- Pros: Simple schema, easy cross-tenant analysis (for superusers)
-- Cons: Query performance depends on clustering effectiveness

SELECT * FROM `bio_qms_dwh.fact_work_orders`
WHERE tenant_id = 'tenant_abc'
AND work_order_date BETWEEN '2026-01-01' AND '2026-12-31';

-- Execution: RLS filter automatically applied + partition pruning

Option 2: Table Sharding (For 100+ tenants or data sovereignty)

-- Separate table per tenant: fact_work_orders_tenant_abc
-- Pros: Physical isolation, independent retention policies
-- Cons: Schema management complexity, no easy cross-tenant queries

-- Dynamically route to tenant-specific table
EXECUTE IMMEDIATE FORMAT("""
SELECT * FROM `bio_qms_dwh.fact_work_orders_%s`
WHERE work_order_date BETWEEN '2026-01-01' AND '2026-12-31'
""", tenant_id);

Hybrid Approach (Implemented):

  • Shared tables for dimensions and small fact tables
  • Sharded tables for high-volume audit logs (per-tenant partitions)
  • RLS policies on all shared tables
  • Clustering by tenant_id as first cluster column
-- Audit trail: sharded by tenant for volume + sovereignty
CREATE TABLE IF NOT EXISTS `bio_qms_dwh.fact_audit_trail_tenant_abc` (
-- Same schema as base table
...
)
PARTITION BY TIMESTAMP_TRUNC(event_timestamp, HOUR)
CLUSTER BY entity_type, entity_id;

-- View unifies shards (for superuser cross-tenant analysis)
CREATE OR REPLACE VIEW `bio_qms_dwh.fact_audit_trail_all` AS
SELECT 'tenant_abc' AS tenant_id, * FROM `bio_qms_dwh.fact_audit_trail_tenant_abc`
UNION ALL
SELECT 'tenant_xyz' AS tenant_id, * FROM `bio_qms_dwh.fact_audit_trail_tenant_xyz`;

L.2: ETL/ELT Pipeline

L.2.1: Change Data Capture from PostgreSQL

Debezium CDC Architecture

Debezium Configuration

Connector Configuration (JSON):

{
"name": "bio-qms-postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "10.128.0.10",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "${secret:debezium-password}",
"database.dbname": "bio_qms_production",
"database.server.name": "bio-qms-prod",

"slot.name": "debezium_slot",
"plugin.name": "pgoutput",
"publication.name": "debezium_publication",

"table.include.list": "public.work_orders,public.persons,public.audit_trail,public.approvals,public.electronic_signatures",

"snapshot.mode": "initial",
"snapshot.locking.mode": "none",

"heartbeat.interval.ms": "10000",
"heartbeat.topics.prefix": "__debezium-heartbeat",

"tombstones.on.delete": "true",
"skipped.operations": "none",

"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc.$3",

"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,source.ts_ms,source.txId",

"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",

"producer.override.compression.type": "snappy",
"producer.override.max.request.size": "1048576",

"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}

PostgreSQL Setup for CDC:

-- 1. Create replication slot
SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');

-- 2. Create publication for CDC tables
CREATE PUBLICATION debezium_publication FOR TABLE
work_orders,
persons,
audit_trail,
approvals,
electronic_signatures,
teams,
team_members,
vendors,
assets,
tools,
materials,
experiences,
person_experiences,
job_plans,
schedules,
time_entries,
change_items,
parties;

-- 3. Grant replication permissions
CREATE ROLE debezium_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;

-- 4. Enable logical replication (requires restart)
-- Add to postgresql.conf:
-- wal_level = logical
-- max_replication_slots = 10
-- max_wal_senders = 10

CDC Event Schema

Event Structure:

interface DebeziumCDCEvent {
// Event metadata
op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
ts_ms: number; // Event timestamp (milliseconds)
source: {
version: string; // Debezium version
connector: string; // "postgresql"
name: string; // "bio-qms-prod"
ts_ms: number; // Source timestamp
snapshot: boolean | 'true' | 'false' | 'last';
db: string; // "bio_qms_production"
schema: string; // "public"
table: string; // "work_orders"
txId: number; // Transaction ID
lsn: number; // Log sequence number
xmin: number | null;
};

// Row data
before: Record<string, any> | null; // Previous values (null for INSERT)
after: Record<string, any> | null; // New values (null for DELETE)

// Transaction info (if enabled)
transaction?: {
id: string;
total_order: number;
data_collection_order: number;
};
}

// Example: Work order UPDATE event
const updateEvent: DebeziumCDCEvent = {
op: 'u',
ts_ms: 1708185600000,
source: {
version: '2.4.2.Final',
connector: 'postgresql',
name: 'bio-qms-prod',
ts_ms: 1708185599850,
snapshot: false,
db: 'bio_qms_production',
schema: 'public',
table: 'work_orders',
txId: 12345678,
lsn: 987654321,
xmin: null
},
before: {
id: 'wo_abc123',
tenant_id: 'tenant_abc',
status: 'PLANNED',
updated_at: '2026-02-17T10:00:00Z',
version: 1
},
after: {
id: 'wo_abc123',
tenant_id: 'tenant_abc',
status: 'IN_PROGRESS',
updated_at: '2026-02-17T12:00:00Z',
version: 2
}
};

Event Sourcing Pattern

Immutable Event Store (Raw Layer):

CREATE TABLE IF NOT EXISTS `bio_qms_dwh_raw.cdc_events` (
event_id STRING NOT NULL OPTIONS(description="UUID v7 generated at ingestion"),
event_timestamp TIMESTAMP NOT NULL OPTIONS(description="Event occurrence time"),
ingestion_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP(),

source_system STRING NOT NULL DEFAULT 'postgres',
source_database STRING NOT NULL,
source_schema STRING NOT NULL,
source_table STRING NOT NULL,

operation STRING NOT NULL, -- 'c', 'u', 'd', 'r'
transaction_id INT64,
lsn INT64,

before_image JSON, -- Previous row state
after_image JSON, -- New row state

tenant_id STRING NOT NULL, -- Extracted from payload for RLS

-- Full event envelope for debugging
raw_event JSON NOT NULL
)
PARTITION BY TIMESTAMP_TRUNC(event_timestamp, HOUR)
CLUSTER BY tenant_id, source_table, operation
OPTIONS(
description="Immutable CDC event store - never modified or deleted",
require_partition_filter=TRUE
);

-- Row-level security
CREATE ROW ACCESS POLICY tenant_isolation_cdc_events
ON `bio_qms_dwh_raw.cdc_events`
GRANT TO ('group:data-engineering@bio-qms.com')
FILTER USING (tenant_id = SESSION_USER() OR TRUE); -- Full access for engineering

Event Replay Capability:

// scripts/data-pipeline/replay-cdc-events.ts
import { BigQuery } from '@google-cloud/bigquery';

interface ReplayOptions {
tenantId: string;
table: string;
startTime: Date;
endTime: Date;
targetDataset: string;
}

async function replayCDCEvents(options: ReplayOptions): Promise<void> {
const bigquery = new BigQuery();

const query = `
CREATE OR REPLACE TABLE \`${options.targetDataset}.${options.table}_replayed\` AS
SELECT
COALESCE(after_image, before_image) AS row_data,
operation,
event_timestamp
FROM \`bio_qms_dwh_raw.cdc_events\`
WHERE tenant_id = @tenantId
AND source_table = @table
AND event_timestamp BETWEEN @startTime AND @endTime
ORDER BY event_timestamp, transaction_id, lsn;
`;

const [job] = await bigquery.createQueryJob({
query,
params: {
tenantId: options.tenantId,
table: options.table,
startTime: options.startTime.toISOString(),
endTime: options.endTime.toISOString()
}
});

console.log(`Replay job ${job.id} started`);
const [rows] = await job.getQueryResults();
console.log(`Replayed ${rows.length} events for ${options.table}`);
}

L.2.2: Transformation Pipeline (dbt)

dbt Project Structure

dbt_bio_qms/
├── dbt_project.yml
├── profiles.yml
├── packages.yml

├── models/
│ ├── staging/ # Layer 1: Raw → Staging
│ │ ├── _staging.yml
│ │ ├── stg_work_orders.sql
│ │ ├── stg_persons.sql
│ │ └── stg_audit_trail.sql
│ │
│ ├── intermediate/ # Layer 2: Business logic
│ │ ├── _intermediate.yml
│ │ ├── int_work_order_lifecycle.sql
│ │ └── int_person_scd2.sql
│ │
│ ├── marts/ # Layer 3: Business-ready models
│ │ ├── core/
│ │ │ ├── _core.yml
│ │ │ ├── fact_work_orders.sql
│ │ │ ├── dim_person.sql
│ │ │ ├── dim_date.sql
│ │ │ └── dim_asset.sql
│ │ │
│ │ ├── compliance/
│ │ │ ├── _compliance.yml
│ │ │ ├── fact_audit_trail.sql
│ │ │ ├── fact_approvals.sql
│ │ │ └── rpt_part11_audit_summary.sql
│ │ │
│ │ └── operations/
│ │ ├── _operations.yml
│ │ ├── rpt_capa_aging.sql
│ │ └── rpt_resource_utilization.sql
│ │
│ └── metrics/ # dbt metrics (v1.6+)
│ ├── _metrics.yml
│ └── work_order_metrics.yml

├── macros/
│ ├── generate_surrogate_key.sql
│ ├── get_tenant_filter.sql
│ └── regulatory_data_masking.sql

├── tests/
│ ├── generic/
│ │ └── test_scd2_integrity.sql
│ └── data_quality/
│ ├── test_no_orphan_work_orders.sql
│ └── test_audit_trail_completeness.sql

├── seeds/
│ ├── dim_date_seed.csv
│ └── compliance_requirements.csv

├── snapshots/
│ ├── person_snapshot.sql
│ └── asset_snapshot.sql

└── analyses/
└── ad_hoc_investigation.sql

Staging Models (Raw → Clean)

stg_work_orders.sql:

{{
config(
materialized='incremental',
unique_key='work_order_key',
on_schema_change='append_new_columns',
partition_by={
"field": "work_order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=['tenant_id', 'status']
)
}}

WITH source AS (
SELECT *
FROM {{ source('raw', 'cdc_events') }}
WHERE source_table = 'work_orders'
AND operation IN ('c', 'u', 'r') -- Create, Update, Read (snapshot)
{% if is_incremental() %}
AND event_timestamp > (SELECT MAX(etl_updated_at) FROM {{ this }})
{% endif %}
),

parsed AS (
SELECT
-- Generate surrogate key
{{ dbt_utils.generate_surrogate_key(['tenant_id', 'work_order_id']) }} AS work_order_key,

-- Extract from after_image JSON
JSON_VALUE(after_image, '$.id') AS work_order_id,
JSON_VALUE(after_image, '$.tenant_id') AS tenant_id,
JSON_VALUE(after_image, '$.source_type') AS source_type,
JSON_VALUE(after_image, '$.status') AS status,
CAST(JSON_VALUE(after_image, '$.priority') AS INT64) AS priority,
CAST(JSON_VALUE(after_image, '$.regulatory_flag') AS BOOL) AS regulatory_flag,
JSON_VALUE(after_image, '$.summary') AS summary,
JSON_VALUE(after_image, '$.assignee_id') AS assignee_id,
JSON_VALUE(after_image, '$.assigner_id') AS assigner_id,
JSON_VALUE(after_image, '$.asset_id') AS asset_id,

-- Timestamps
CAST(JSON_VALUE(after_image, '$.created_at') AS TIMESTAMP) AS created_at,
CAST(JSON_VALUE(after_image, '$.updated_at') AS TIMESTAMP) AS updated_at,
CAST(JSON_VALUE(after_image, '$.completed_at') AS TIMESTAMP) AS completed_at,

-- Audit metadata
event_timestamp AS etl_source_timestamp,
CURRENT_TIMESTAMP() AS etl_updated_at

FROM source
),

deduped AS (
SELECT *
FROM parsed
QUALIFY ROW_NUMBER() OVER (
PARTITION BY work_order_key
ORDER BY etl_source_timestamp DESC
) = 1
)

SELECT * FROM deduped

Intermediate Models (Business Logic)

int_work_order_lifecycle.sql:

{{
config(
materialized='table',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}

WITH work_orders AS (
SELECT * FROM {{ ref('stg_work_orders') }}
),

approvals AS (
SELECT
work_order_id,
tenant_id,
COUNT(*) AS approval_count,
MIN(decision_timestamp) AS first_approval_at,
MAX(decision_timestamp) AS last_approval_at,
COUNTIF(decision = 'APPROVED') AS approved_count,
COUNTIF(decision = 'REJECTED') AS rejected_count
FROM {{ ref('stg_approvals') }}
GROUP BY work_order_id, tenant_id
),

time_entries AS (
SELECT
work_order_id,
tenant_id,
SUM(billable_hours) AS total_billable_hours,
SUM(labor_cost) AS total_labor_cost,
COUNT(DISTINCT person_id) AS unique_contributors
FROM {{ ref('stg_time_entries') }}
GROUP BY work_order_id, tenant_id
),

lifecycle_metrics AS (
SELECT
wo.*,

-- Join approval data
COALESCE(a.approval_count, 0) AS approval_count,
a.first_approval_at,
a.last_approval_at,

-- Join time entry data
COALESCE(te.total_billable_hours, 0) AS billable_hours,
COALESCE(te.total_labor_cost, 0) AS labor_cost,
COALESCE(te.unique_contributors, 0) AS contributor_count,

-- Calculate lifecycle metrics
DATE_DIFF(
COALESCE(wo.completed_at, CURRENT_TIMESTAMP()),
wo.created_at,
DAY
) AS days_open,

DATE_DIFF(
wo.completed_at,
wo.created_at,
DAY
) AS days_to_complete,

-- Status flags
wo.status IN ('COMPLETED', 'CANCELLED') AS is_terminal_status,
wo.status = 'COMPLETED' AS is_completed,
wo.completed_at IS NOT NULL AS has_completion_date,

-- Date dimension keys
DATE(wo.created_at) AS created_date,
DATE(wo.completed_at) AS completed_date

FROM work_orders wo
LEFT JOIN approvals a
ON wo.work_order_id = a.work_order_id
AND wo.tenant_id = a.tenant_id
LEFT JOIN time_entries te
ON wo.work_order_id = te.work_order_id
AND wo.tenant_id = te.tenant_id
)

SELECT * FROM lifecycle_metrics

Mart Models (Business-Ready)

fact_work_orders.sql:

{{
config(
materialized='incremental',
unique_key='work_order_key',
partition_by={
"field": "work_order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=['tenant_id', 'status_key', 'regulatory_flag']
)
}}

WITH lifecycle AS (
SELECT * FROM {{ ref('int_work_order_lifecycle') }}
{% if is_incremental() %}
WHERE etl_updated_at > (SELECT MAX(etl_updated_at) FROM {{ this }})
{% endif %}
),

-- Join dimension keys
fact AS (
SELECT
lc.work_order_key,
lc.tenant_id,

-- Dimension foreign keys
lc.created_date AS work_order_date,
{{ dbt_utils.generate_surrogate_key(['lc.tenant_id', 'lc.assignee_id']) }} AS assignee_key,
{{ dbt_utils.generate_surrogate_key(['lc.tenant_id', 'lc.assigner_id']) }} AS assigner_key,
{{ dbt_utils.generate_surrogate_key(['lc.tenant_id', 'lc.asset_id']) }} AS asset_key,
{{ dbt_utils.generate_surrogate_key(['lc.status']) }} AS status_key,

-- Degenerate dimensions
lc.work_order_id,
lc.source_type,
lc.priority,

-- Measures
lc.billable_hours,
lc.labor_cost,
lc.approval_count,
lc.days_to_complete,

-- Flags
lc.regulatory_flag,
lc.is_terminal_status AS is_completed,

-- Timestamps
lc.created_at,
lc.completed_at,

-- Audit
lc.etl_updated_at

FROM lifecycle lc
)

SELECT * FROM fact

Data Quality Tests

tests/data_quality/test_audit_trail_completeness.sql:

{{
config(
severity='error',
error_if='>100' -- Allow up to 100 missing audit entries (race condition tolerance)
)
}}

-- Ensure every work order status change has a corresponding audit trail entry

WITH work_order_changes AS (
SELECT
tenant_id,
work_order_id,
status,
updated_at
FROM {{ ref('stg_work_orders') }}
WHERE status != 'DRAFT' -- Draft doesn't require audit
),

audit_entries AS (
SELECT
tenant_id,
entity_id AS work_order_id,
JSON_VALUE(new_value, '$.status') AS new_status,
performed_at
FROM {{ ref('stg_audit_trail') }}
WHERE entity_type = 'WORK_ORDER'
AND action = 'STATUS_CHANGE'
),

missing_audits AS (
SELECT
woc.tenant_id,
woc.work_order_id,
woc.status AS current_status,
woc.updated_at
FROM work_order_changes woc
LEFT JOIN audit_entries ae
ON woc.tenant_id = ae.tenant_id
AND woc.work_order_id = ae.work_order_id
AND woc.status = ae.new_status
WHERE ae.work_order_id IS NULL
)

SELECT * FROM missing_audits

dbt Macros

macros/generate_surrogate_key.sql:

{% macro generate_surrogate_key(columns) %}
{{ dbt_utils.generate_surrogate_key(columns) }}
{% endmacro %}

macros/get_tenant_filter.sql:

{% macro get_tenant_filter(tenant_column='tenant_id') %}
{% if var('target_tenant', none) %}
AND {{ tenant_column }} = '{{ var("target_tenant") }}'
{% endif %}
{% endmacro %}

macros/regulatory_data_masking.sql:

{% macro mask_pii(column_name, mask_type='hash') %}
{% if target.name == 'prod' %}
{%- if mask_type == 'hash' -%}
TO_HEX(SHA256({{ column_name }}))
{%- elif mask_type == 'redact' -%}
'***REDACTED***'
{%- elif mask_type == 'partial' -%}
CONCAT(SUBSTR({{ column_name }}, 1, 3), '***')
{%- endif -%}
{% else %}
{{ column_name }} -- No masking in dev/test
{% endif %}
{% endmacro %}

L.2.3: Real-Time Streaming Pipeline

Dataflow Pipeline Architecture

// dataflow/src/pipelines/cdc-streaming-pipeline.ts
import { Pipeline, PipelineOptions } from '@google-cloud/dataflow';
import { PubSubIO, BigQueryIO, WindowInto, Combine } from '@google-cloud/dataflow/transforms';

interface CDCEvent {
eventId: string;
eventTimestamp: number;
tenantId: string;
sourceTable: string;
operation: 'c' | 'u' | 'd' | 'r';
afterImage: Record<string, any>;
beforeImage: Record<string, any> | null;
}

export class CDCStreamingPipeline {
async run(options: PipelineOptions): Promise<void> {
const pipeline = Pipeline.create(options);

pipeline
// 1. Read from Pub/Sub
.apply('ReadCDCEvents', PubSubIO.read({
subscription: 'projects/bio-qms-prod/subscriptions/cdc-events',
idAttribute: 'eventId',
timestampAttribute: 'eventTimestamp'
}))

// 2. Parse JSON
.apply('ParseJSON', pipeline.pardo(this.parseJSON))

// 3. Validate schema
.apply('ValidateSchema', pipeline.pardo(this.validateSchema))

// 4. Extract tenant and table
.apply('ExtractMetadata', pipeline.pardo(this.extractMetadata))

// 5. Window into 1-minute fixed windows
.apply('WindowInto1Min', WindowInto.fixedWindows({
duration: 60 // seconds
}))

// 6. Deduplicate within window (handle retries)
.apply('Deduplicate', Combine.perKey(this.deduplicateEvents))

// 7. Route to appropriate destination
.apply('RouteByTable', pipeline.partition({
partitionFn: (event: CDCEvent) => event.sourceTable,
numPartitions: 10
}))

// 8. Write to BigQuery streaming inserts
.apply('WriteToBigQuery', BigQueryIO.write({
table: (event: CDCEvent) => `bio_qms_dwh_raw.cdc_events_${event.sourceTable}`,
writeDisposition: 'WRITE_APPEND',
createDisposition: 'CREATE_IF_NEEDED',
method: 'STREAMING_INSERTS'
}))

// 9. Write failures to dead-letter topic
.apply('WriteFailures', PubSubIO.write({
topic: 'projects/bio-qms-prod/topics/cdc-dlq',
errorHandling: 'OUTPUT_TO_DEADLETTER'
}));

await pipeline.run();
}

private parseJSON(element: string): CDCEvent {
try {
return JSON.parse(element);
} catch (error) {
throw new Error(`Failed to parse JSON: ${error.message}`);
}
}

private validateSchema(event: CDCEvent): CDCEvent {
const required = ['eventId', 'eventTimestamp', 'tenantId', 'sourceTable', 'operation'];
for (const field of required) {
if (!(field in event)) {
throw new Error(`Missing required field: ${field}`);
}
}
return event;
}

private extractMetadata(event: CDCEvent): CDCEvent & { metadata: any } {
return {
...event,
metadata: {
pipelineVersion: '1.0.0',
processingTimestamp: Date.now(),
dataflowJobId: process.env.DATAFLOW_JOB_ID
}
};
}

private deduplicateEvents(events: CDCEvent[]): CDCEvent {
// Keep latest event by timestamp
return events.reduce((latest, current) =>
current.eventTimestamp > latest.eventTimestamp ? current : latest
);
}
}

Pub/Sub Topic Configuration

// infrastructure/pubsub/topics.ts
import { PubSub } from '@google-cloud/pubsub';

const pubsub = new PubSub({ projectId: 'bio-qms-prod' });

// CDC events topic
await pubsub.createTopic('cdc-events', {
messageRetentionDuration: { seconds: 604800 }, // 7 days
messageStoragePolicy: {
allowedPersistenceRegions: ['us-central1', 'us-east1']
},
schemaSettings: {
schema: 'projects/bio-qms-prod/schemas/cdc-event-schema',
encoding: 'JSON'
}
});

// Subscription for Dataflow
await pubsub.topic('cdc-events').createSubscription('cdc-events-dataflow', {
ackDeadlineSeconds: 60,
retryPolicy: {
minimumBackoff: { seconds: 10 },
maximumBackoff: { seconds: 600 }
},
deadLetterPolicy: {
deadLetterTopic: 'projects/bio-qms-prod/topics/cdc-dlq',
maxDeliveryAttempts: 5
},
expirationPolicy: {
ttl: { seconds: 2592000 } // 30 days
},
enableMessageOrdering: true,
filter: 'attributes.sourceSystem="postgresql"'
});

Real-Time Aggregation Example

Use Case: Calculate rolling 5-minute work order creation rate per tenant

// dataflow/src/aggregations/wo-creation-rate.ts
import { Pipeline, Combine, WindowInto } from '@google-cloud/dataflow';

export class WorkOrderCreationRateAggregation {
async run(): Promise<void> {
const pipeline = Pipeline.create();

pipeline
.apply('ReadCDC', PubSubIO.read({ subscription: 'cdc-events-dataflow' }))
.apply('FilterCreates', pipeline.filter((event: CDCEvent) =>
event.sourceTable === 'work_orders' && event.operation === 'c'
))
.apply('ExtractTenant', pipeline.map((event: CDCEvent) => ({
tenantId: event.tenantId,
timestamp: event.eventTimestamp
})))
.apply('Window5Min', WindowInto.slidingWindows({
size: 300, // 5 minutes
period: 60 // Slide every 1 minute
}))
.apply('CountPerTenant', Combine.perKey((events) => events.length))
.apply('FormatOutput', pipeline.map(({ key, value, window }) => ({
tenantId: key,
creationRate: value / 5.0, // Events per minute
windowStart: window.start,
windowEnd: window.end
})))
.apply('WriteToBQ', BigQueryIO.write({
table: 'bio_qms_dwh_realtime.work_order_creation_rate',
writeDisposition: 'WRITE_APPEND'
}));

await pipeline.run();
}
}

L.2.4: Pipeline Orchestration (Cloud Composer / Airflow)

Airflow DAG Structure

# dags/daily_warehouse_refresh.py
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.operators.dataflow import DataflowStartFlexTemplateOperator
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
'owner': 'data-engineering',
'depends_on_past': True,
'email': ['data-alerts@bio-qms.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=4), # Must complete within 4 hours
}

with DAG(
'daily_warehouse_refresh',
default_args=default_args,
description='Daily ETL pipeline for data warehouse',
schedule_interval='0 2 * * *', # 2 AM UTC daily
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
tags=['production', 'etl', 'warehouse'],
) as dag:

# Task 1: Check CDC event completeness
check_cdc_completeness = BigQueryInsertJobOperator(
task_id='check_cdc_completeness',
configuration={
'query': {
'query': """
SELECT
source_table,
COUNT(*) as event_count,
MIN(event_timestamp) as earliest_event,
MAX(event_timestamp) as latest_event
FROM `bio_qms_dwh_raw.cdc_events`
WHERE DATE(event_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY source_table
HAVING event_count = 0
""",
'useLegacySql': False
}
},
location='US',
)

# Task 2: Run dbt staging models
dbt_staging = DataflowStartFlexTemplateOperator(
task_id='dbt_staging',
body={
'launchParameter': {
'containerSpecGcsPath': 'gs://bio-qms-dataflow-templates/dbt-runner.json',
'jobName': 'dbt-staging-{{ ds_nodash }}',
'parameters': {
'project': 'bio_qms_dwh',
'models': 'staging.*',
'vars': '{"execution_date": "{{ ds }}"}'
},
'environment': {
'maxWorkers': '10',
'serviceAccountEmail': 'dataflow-runner@bio-qms-prod.iam.gserviceaccount.com'
}
}
},
project_id='bio-qms-prod',
location='us-central1',
)

# Task 3: Run dbt intermediate models
dbt_intermediate = DataflowStartFlexTemplateOperator(
task_id='dbt_intermediate',
body={
'launchParameter': {
'containerSpecGcsPath': 'gs://bio-qms-dataflow-templates/dbt-runner.json',
'jobName': 'dbt-intermediate-{{ ds_nodash }}',
'parameters': {
'project': 'bio_qms_dwh',
'models': 'intermediate.*',
'vars': '{"execution_date": "{{ ds }}"}'
}
}
},
project_id='bio-qms-prod',
location='us-central1',
)

# Task 4: Run dbt mart models
dbt_marts = DataflowStartFlexTemplateOperator(
task_id='dbt_marts',
body={
'launchParameter': {
'containerSpecGcsPath': 'gs://bio-qms-dataflow-templates/dbt-runner.json',
'jobName': 'dbt-marts-{{ ds_nodash }}',
'parameters': {
'project': 'bio_qms_dwh',
'models': 'marts.*',
'vars': '{"execution_date": "{{ ds }}"}'
}
}
},
project_id='bio-qms-prod',
location='us-central1',
)

# Task 5: Run data quality tests
dbt_test = DataflowStartFlexTemplateOperator(
task_id='dbt_test',
body={
'launchParameter': {
'containerSpecGcsPath': 'gs://bio-qms-dataflow-templates/dbt-runner.json',
'jobName': 'dbt-test-{{ ds_nodash }}',
'parameters': {
'project': 'bio_qms_dwh',
'command': 'test',
'vars': '{"execution_date": "{{ ds }}"}'
}
}
},
project_id='bio-qms-prod',
location='us-central1',
)

# Task 6: Generate data lineage
generate_lineage = PythonOperator(
task_id='generate_lineage',
python_callable=generate_data_lineage,
op_kwargs={'execution_date': '{{ ds }}'}
)

# Task 7: Update Looker PDTs
refresh_looker_pdts = BigQueryInsertJobOperator(
task_id='refresh_looker_pdts',
configuration={
'query': {
'query': 'CALL `bio_qms_dwh.refresh_all_pdts`()',
'useLegacySql': False
}
},
location='US',
)

# Task 8: Send completion notification
send_notification = PythonOperator(
task_id='send_notification',
python_callable=send_slack_notification,
op_kwargs={
'message': 'Daily warehouse refresh completed successfully',
'channel': '#data-pipeline-alerts'
}
)

# Define task dependencies
check_cdc_completeness >> dbt_staging >> dbt_intermediate >> dbt_marts >> dbt_test
dbt_test >> generate_lineage >> refresh_looker_pdts >> send_notification


def generate_data_lineage(execution_date: str):
"""Generate data lineage graph for the execution date"""
# Implementation in separate module
from data_lineage import LineageGenerator
generator = LineageGenerator(project='bio_qms_dwh', date=execution_date)
generator.extract_from_dbt()
generator.extract_from_bigquery()
generator.export_to_dataplex()


def send_slack_notification(message: str, channel: str):
"""Send Slack notification"""
from slack_sdk import WebClient
client = WebClient(token=os.environ['SLACK_BOT_TOKEN'])
client.chat_postMessage(channel=channel, text=message)

SLA Monitoring and Alerting

# dags/sla_monitoring.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def check_pipeline_sla(**context):
"""Check if daily pipeline met SLA"""
from airflow.models import DagRun
from airflow.utils.state import State

dag_id = 'daily_warehouse_refresh'
execution_date = context['execution_date']

dag_run = DagRun.find(dag_id=dag_id, execution_date=execution_date)[0]

if dag_run.state != State.SUCCESS:
raise Exception(f"Pipeline failed or still running: {dag_run.state}")

start_time = dag_run.start_date
end_time = dag_run.end_date
duration = (end_time - start_time).total_seconds() / 3600 # hours

sla_hours = 4
if duration > sla_hours:
send_pagerduty_alert(
severity='warning',
summary=f'Data pipeline SLA breach: {duration:.1f}h (SLA: {sla_hours}h)',
details={'dag_id': dag_id, 'execution_date': str(execution_date), 'duration_hours': duration}
)

with DAG(
'sla_monitoring',
schedule_interval='0 7 * * *', # 7 AM UTC (after pipeline should complete)
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:

check_sla = PythonOperator(
task_id='check_pipeline_sla',
python_callable=check_pipeline_sla,
provide_context=True
)

This completes the first major section (L.1 and L.2). Due to token limits, I'll need to continue with the remaining sections (L.3, L.4, L.5) in subsequent responses. Would you like me to continue with L.3: BI Dashboards & Reporting?