Skip to main content

03 — Integration & ELT: Airbyte, dbt, Dagster & ERP Connectors

Domain: Data ingestion, transformation, orchestration, ERP/bank/payment connectivity Dependencies: 01-Data Architecture (target schema for loading) Outputs: Airbyte configs, dbt project structure, Dagster DAGs, connector specifications


ROLE

You are a Senior Data Engineer specializing in ELT pipelines for multi-tenant financial platforms. You design universal data ingestion systems that normalize heterogeneous ERP chart-of-accounts into a unified star schema, with full CDC support and data lineage.


OBJECTIVE

Design the complete data integration layer that ingests financial data from 10+ ERP systems (Brazilian and US), normalizes it through dbt transformations, orchestrates via Dagster, and loads into the PostgreSQL star schema from Sub-Prompt 01.


DELIVERABLES

D1. Airbyte Connector Configuration

Brazilian ERP Connectors:

  • Omie (REST API) — GL entries, AP, AR, bank statements, NF-e
  • Conta Azul (REST API) — Transactions, contacts, categories, bank reconciliation
  • Tactus (file-based) — Accounting export (CSV/XML), trial balance, DRE
  • Domínio (file-based) — Accounting integration, fiscal books
  • SEFAZ NF-e (XML) — Electronic invoice import via certificate

US/Global ERP Connectors:

  • QuickBooks Online (OAuth2 REST) — Chart of accounts, journals, invoices, payments
  • Xero (OAuth2 REST) — Same scope as QBO
  • NetSuite (SuiteTalk REST/SOAP) — Full ERP: GL, AP, AR, inventory, projects
  • SAP Business One (Service Layer REST) — Journal entries, BP master, banking
  • SAP S/4HANA (OData) — Universal journals, cost centers, profit centers

Banking & Payment Connectors:

  • Open Finance Brasil (BACEN API) — Real-time account balances, transactions
  • Plaid (REST) — US bank account linking, transaction history
  • Stone/Cielo/Rede (REST) — Card receivables, settlement reports, MDR rates
  • Stripe (REST) — Charges, payouts, disputes, balance transactions

For each connector, specify:

  • Authentication method (OAuth2, API key, certificate, file upload)
  • Sync mode (Full Refresh vs. Incremental/CDC)
  • Normalization level (raw → staging)
  • Rate limiting strategy
  • Error handling and retry policy
  • Schema mapping to staging tables

D2. dbt Project Structure

dbt_avivatec/
├── dbt_project.yml
├── packages.yml # dbt-utils, dbt-expectations
├── profiles.yml
├── models/
│ ├── staging/ # 1:1 source mirrors
│ │ ├── stg_quickbooks/
│ │ ├── stg_omie/
│ │ ├── stg_conta_azul/
│ │ ├── stg_netsuite/
│ │ └── stg_bank_statements/
│ ├── intermediate/ # Business logic transforms
│ │ ├── int_coa_unified.sql # Chart of accounts normalization
│ │ ├── int_gl_standardized.sql # GL entry standardization
│ │ ├── int_fx_conversion.sql # Currency conversion
│ │ └── int_intercompany_elim.sql # Intercompany elimination
│ └── marts/ # Star schema facts/dimensions
│ ├── finance/
│ │ ├── fact_gl_transactions.sql
│ │ ├── fact_forecasts.sql
│ │ ├── dim_accounts.sql
│ │ ├── dim_entities.sql
│ │ └── dim_periods.sql
│ └── analytics/
│ ├── rpt_pl_by_entity.sql
│ ├── rpt_cash_flow.sql
│ └── rpt_aging_ar_ap.sql
├── macros/
│ ├── coa_normalization.sql # COA mapping macros per ERP
│ ├── multi_tenant.sql # RLS-aware model generation
│ ├── currency_convert.sql # Point-in-time FX conversion
│ └── audit_columns.sql # Standard audit column injection
├── seeds/
│ ├── coa_mapping_quickbooks.csv
│ ├── coa_mapping_omie.csv
│ ├── currency_codes.csv
│ └── fiscal_calendar_br.csv
├── tests/
│ ├── assert_gl_balanced.sql # Debits = Credits per journal
│ ├── assert_tenant_isolation.sql # No cross-tenant data leakage
│ └── assert_fx_accuracy.sql # FX within tolerance
└── snapshots/
├── snap_budgets.sql # Budget version history (SCD Type 2)
└── snap_forecasts.sql # Forecast version tracking

Key Macros (production-ready SQL):

  • coa_normalize(source, mapping_seed) — Maps source COA codes to unified hierarchy
  • tenant_filter(tenant_id_col) — Injects RLS predicate into all models
  • fx_convert(amount, from_currency, to_currency, as_of_date) — Point-in-time conversion
  • generate_surrogate_key(fields) — Deterministic UUID generation

D3. Dagster Asset-Centric Orchestration

Pipeline Architecture:

Airbyte Sync → dbt Staging → dbt Intermediate → dbt Marts
→ NeuralProphet Training → LangGraph Analysis → Apprise Alerts

Assets to define:

  • raw_* assets (Airbyte output tables)
  • stg_* assets (dbt staging models)
  • int_* assets (dbt intermediate)
  • fact_* / dim_* assets (dbt marts)
  • forecast_* assets (NeuralProphet outputs)
  • alert_* assets (threshold-triggered notifications)

Schedules:

  • Real-time: CDC from ERPs via Airbyte (5-minute intervals where supported)
  • Hourly: Bank statement sync, payment processor reconciliation
  • Nightly: Full dbt run, model retraining, anomaly scan
  • Weekly: Multi-entity consolidation, FX rate history update
  • Monthly: Board report generation, compliance snapshot

D4. CDC Architecture

Design Change Data Capture for:

  • SQL Server → PostgreSQL migration (Phase 1 bridge)
  • ERP real-time sync (incremental actuals)
  • Debezium vs. Airbyte CDC comparison
  • Event ordering guarantees for financial transactions
  • Idempotency for replay safety

D5. Data Quality Framework

  • dbt tests: unique, not_null, accepted_values, relationships
  • dbt-expectations: distribution checks, row count stability
  • Great Expectations integration for complex validation
  • Data freshness monitoring (alert if source stale > threshold)
  • Reconciliation reports: source system totals vs. platform totals

CONSTRAINTS

  • Every dbt model MUST include tenant_id in SELECT and WHERE
  • All financial amounts through fx_convert macro for multi-currency
  • Soft deletes only — dbt snapshots for SCD Type 2 history
  • Airbyte sync logs forwarded to immudb for audit
  • dbt must generate documentation (dbt docs) with lineage graphs
  • Zero data loss guarantee: failed syncs retry with backoff, never skip records

RESEARCH QUESTIONS

  1. What is the best Airbyte sync strategy for ERPs with rate limits (QuickBooks: 500 req/min, Omie: 300 req/min)?
  2. How should COA normalization handle ERPs with flat vs. hierarchical chart of accounts?
  3. What is the optimal Dagster partition strategy for tenant-aware dbt runs?
  4. How to handle schema drift when ERPs update their APIs?
  5. What is the recommended approach for backfilling historical data (3-5 years) without impacting production sync?

ADRs TO PRODUCE

  • ADR-002: Airbyte over custom connectors (maintenance, coverage, CDC)
  • ADR-005: Dagster over Airflow (asset-centric, lineage, observability)
  • ADR-ELT-001: CDC strategy (Debezium vs. Airbyte vs. logical replication)
  • ADR-ELT-002: dbt incremental strategy (merge vs. delete+insert vs. append)