03 — Integration & ELT: Airbyte, dbt, Dagster & ERP Connectors
Domain: Data ingestion, transformation, orchestration, ERP/bank/payment connectivity Dependencies: 01-Data Architecture (target schema for loading) Outputs: Airbyte configs, dbt project structure, Dagster DAGs, connector specifications
ROLE
You are a Senior Data Engineer specializing in ELT pipelines for multi-tenant financial platforms. You design universal data ingestion systems that normalize heterogeneous ERP chart-of-accounts into a unified star schema, with full CDC support and data lineage.
OBJECTIVE
Design the complete data integration layer that ingests financial data from 10+ ERP systems (Brazilian and US), normalizes it through dbt transformations, orchestrates via Dagster, and loads into the PostgreSQL star schema from Sub-Prompt 01.
DELIVERABLES
D1. Airbyte Connector Configuration
Brazilian ERP Connectors:
- Omie (REST API) — GL entries, AP, AR, bank statements, NF-e
- Conta Azul (REST API) — Transactions, contacts, categories, bank reconciliation
- Tactus (file-based) — Accounting export (CSV/XML), trial balance, DRE
- Domínio (file-based) — Accounting integration, fiscal books
- SEFAZ NF-e (XML) — Electronic invoice import via certificate
US/Global ERP Connectors:
- QuickBooks Online (OAuth2 REST) — Chart of accounts, journals, invoices, payments
- Xero (OAuth2 REST) — Same scope as QBO
- NetSuite (SuiteTalk REST/SOAP) — Full ERP: GL, AP, AR, inventory, projects
- SAP Business One (Service Layer REST) — Journal entries, BP master, banking
- SAP S/4HANA (OData) — Universal journals, cost centers, profit centers
Banking & Payment Connectors:
- Open Finance Brasil (BACEN API) — Real-time account balances, transactions
- Plaid (REST) — US bank account linking, transaction history
- Stone/Cielo/Rede (REST) — Card receivables, settlement reports, MDR rates
- Stripe (REST) — Charges, payouts, disputes, balance transactions
For each connector, specify:
- Authentication method (OAuth2, API key, certificate, file upload)
- Sync mode (Full Refresh vs. Incremental/CDC)
- Normalization level (raw → staging)
- Rate limiting strategy
- Error handling and retry policy
- Schema mapping to staging tables
D2. dbt Project Structure
dbt_avivatec/
├── dbt_project.yml
├── packages.yml # dbt-utils, dbt-expectations
├── profiles.yml
├── models/
│ ├── staging/ # 1:1 source mirrors
│ │ ├── stg_quickbooks/
│ │ ├── stg_omie/
│ │ ├── stg_conta_azul/
│ │ ├── stg_netsuite/
│ │ └── stg_bank_statements/
│ ├── intermediate/ # Business logic transforms
│ │ ├── int_coa_unified.sql # Chart of accounts normalization
│ │ ├── int_gl_standardized.sql # GL entry standardization
│ │ ├── int_fx_conversion.sql # Currency conversion
│ │ └── int_intercompany_elim.sql # Intercompany elimination
│ └── marts/ # Star schema facts/dimensions
│ ├── finance/
│ │ ├── fact_gl_transactions.sql
│ │ ├── fact_forecasts.sql
│ │ ├── dim_accounts.sql
│ │ ├── dim_entities.sql
│ │ └── dim_periods.sql
│ └── analytics/
│ ├── rpt_pl_by_entity.sql
│ ├── rpt_cash_flow.sql
│ └── rpt_aging_ar_ap.sql
├── macros/
│ ├── coa_normalization.sql # COA mapping macros per ERP
│ ├── multi_tenant.sql # RLS-aware model generation
│ ├── currency_convert.sql # Point-in-time FX conversion
│ └── audit_columns.sql # Standard audit column injection
├── seeds/
│ ├── coa_mapping_quickbooks.csv
│ ├── coa_mapping_omie.csv
│ ├── currency_codes.csv
│ └── fiscal_calendar_br.csv
├── tests/
│ ├── assert_gl_balanced.sql # Debits = Credits per journal
│ ├── assert_tenant_isolation.sql # No cross-tenant data leakage
│ └── assert_fx_accuracy.sql # FX within tolerance
└── snapshots/
├── snap_budgets.sql # Budget version history (SCD Type 2)
└── snap_forecasts.sql # Forecast version tracking
Key Macros (production-ready SQL):
coa_normalize(source, mapping_seed)— Maps source COA codes to unified hierarchytenant_filter(tenant_id_col)— Injects RLS predicate into all modelsfx_convert(amount, from_currency, to_currency, as_of_date)— Point-in-time conversiongenerate_surrogate_key(fields)— Deterministic UUID generation
D3. Dagster Asset-Centric Orchestration
Pipeline Architecture:
Airbyte Sync → dbt Staging → dbt Intermediate → dbt Marts
→ NeuralProphet Training → LangGraph Analysis → Apprise Alerts
Assets to define:
raw_*assets (Airbyte output tables)stg_*assets (dbt staging models)int_*assets (dbt intermediate)fact_*/dim_*assets (dbt marts)forecast_*assets (NeuralProphet outputs)alert_*assets (threshold-triggered notifications)
Schedules:
- Real-time: CDC from ERPs via Airbyte (5-minute intervals where supported)
- Hourly: Bank statement sync, payment processor reconciliation
- Nightly: Full dbt run, model retraining, anomaly scan
- Weekly: Multi-entity consolidation, FX rate history update
- Monthly: Board report generation, compliance snapshot
D4. CDC Architecture
Design Change Data Capture for:
- SQL Server → PostgreSQL migration (Phase 1 bridge)
- ERP real-time sync (incremental actuals)
- Debezium vs. Airbyte CDC comparison
- Event ordering guarantees for financial transactions
- Idempotency for replay safety
D5. Data Quality Framework
- dbt tests: unique, not_null, accepted_values, relationships
- dbt-expectations: distribution checks, row count stability
- Great Expectations integration for complex validation
- Data freshness monitoring (alert if source stale > threshold)
- Reconciliation reports: source system totals vs. platform totals
CONSTRAINTS
- Every dbt model MUST include
tenant_idin SELECT and WHERE - All financial amounts through
fx_convertmacro for multi-currency - Soft deletes only — dbt snapshots for SCD Type 2 history
- Airbyte sync logs forwarded to immudb for audit
- dbt must generate documentation (dbt docs) with lineage graphs
- Zero data loss guarantee: failed syncs retry with backoff, never skip records
RESEARCH QUESTIONS
- What is the best Airbyte sync strategy for ERPs with rate limits (QuickBooks: 500 req/min, Omie: 300 req/min)?
- How should COA normalization handle ERPs with flat vs. hierarchical chart of accounts?
- What is the optimal Dagster partition strategy for tenant-aware dbt runs?
- How to handle schema drift when ERPs update their APIs?
- What is the recommended approach for backfilling historical data (3-5 years) without impacting production sync?
ADRs TO PRODUCE
- ADR-002: Airbyte over custom connectors (maintenance, coverage, CDC)
- ADR-005: Dagster over Airflow (asset-centric, lineage, observability)
- ADR-ELT-001: CDC strategy (Debezium vs. Airbyte vs. logical replication)
- ADR-ELT-002: dbt incremental strategy (merge vs. delete+insert vs. append)