Skip to main content

Agent Skills Framework Extension

Data Engineering Patterns Skill

When to Use This Skill

Use this skill when implementing data engineering patterns patterns in your codebase.

How to Use This Skill

  1. Review the patterns and examples below
  2. Apply the relevant patterns to your implementation
  3. Follow the best practices outlined in this skill

Production data pipelines, ETL/ELT, streaming, and data warehouse patterns.

Core Capabilities

  1. ETL/ELT Pipelines - Batch and streaming data movement
  2. Data Warehousing - Star schema, dimensional modeling
  3. Stream Processing - Real-time data transformation
  4. Data Quality - Validation, profiling, monitoring
  5. Orchestration - Airflow DAGs, dependency management

Airflow ETL Pipeline

# dags/daily_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime, timedelta
import pandas as pd

default_args = {
'owner': 'data-eng',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'daily_user_analytics_etl',
default_args=default_args,
description='Daily ETL for user analytics',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
)

def extract_user_events(**context):
"""Extract user events from source database"""
import sqlalchemy as sa

engine = sa.create_engine('postgresql://...')

execution_date = context['ds']

query = f"""
SELECT user_id, event_type, properties, timestamp
FROM events
WHERE DATE(timestamp) = '{execution_date}'
"""

df = pd.read_sql(query, engine)

# Save to temporary location
df.to_parquet(f'/tmp/events_{execution_date}.parquet')

return len(df)

def transform_events(**context):
"""Transform and enrich events"""
execution_date = context['ds']

df = pd.read_parquet(f'/tmp/events_{execution_date}.parquet')

# Transformations
df['event_date'] = pd.to_datetime(df['timestamp']).dt.date
df['event_hour'] = pd.to_datetime(df['timestamp']).dt.hour

# Aggregate metrics
daily_stats = df.groupby(['user_id', 'event_date']).agg({
'event_type': 'count',
'timestamp': ['min', 'max']
}).reset_index()

daily_stats.columns = ['user_id', 'date', 'total_events', 'first_event', 'last_event']

# Save transformed data
daily_stats.to_parquet(f'/tmp/user_stats_{execution_date}.parquet')

return len(daily_stats)

extract_task = PythonOperator(
task_id='extract_user_events',
python_callable=extract_user_events,
dag=dag,
)

transform_task = PythonOperator(
task_id='transform_events',
python_callable=transform_events,
dag=dag,
)

load_task = BigQueryInsertJobOperator(
task_id='load_to_warehouse',
configuration={
'load': {
'sourceUris': ['gs://bucket/user_stats_{{ ds }}.parquet'],
'destinationTable': {
'projectId': 'project',
'datasetId': 'analytics',
'tableId': 'user_daily_stats'
},
'sourceFormat': 'PARQUET',
'writeDisposition': 'WRITE_APPEND',
}
},
dag=dag,
)

extract_task >> transform_task >> load_task

Data Quality Validation

# data_quality/validator.py
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
import great_expectations as ge

@dataclass
class ValidationResult:
passed: bool
total_checks: int
failed_checks: List[str]
metrics: Dict[str, Any]

class DataQualityValidator:
"""Data quality validation framework"""

def __init__(self):
self.validations = []

def validate_dataset(self, df: pd.DataFrame) -> ValidationResult:
"""Run all validation checks"""
failed = []

# Check for nulls in required columns
required_cols = ['user_id', 'timestamp', 'event_type']
for col in required_cols:
if df[col].isnull().any():
failed.append(f"Null values in required column: {col}")

# Check data types
if df['timestamp'].dtype not in ['datetime64[ns]', 'object']:
failed.append("Invalid timestamp data type")

# Check value ranges
if (df['timestamp'] > pd.Timestamp.now()).any():
failed.append("Future timestamps detected")

# Check duplicates
duplicates = df.duplicated(['user_id', 'timestamp']).sum()
if duplicates > 0:
failed.append(f"Duplicate records: {duplicates}")

# Statistical checks
metrics = {
'row_count': len(df),
'null_count': df.isnull().sum().sum(),
'duplicate_count': duplicates,
'unique_users': df['user_id'].nunique()
}

return ValidationResult(
passed=len(failed) == 0,
total_checks=4,
failed_checks=failed,
metrics=metrics
)

Stream Processing (Kafka)

# streaming/kafka_processor.py
from kafka import KafkaConsumer, KafkaProducer
import json
from typing import Dict, Any

class EventStreamProcessor:
"""Real-time event processing"""

def __init__(self, bootstrap_servers: str):
self.consumer = KafkaConsumer(
'raw-events',
bootstrap_servers=[bootstrap_servers],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False
)

self.producer = KafkaProducer(
bootstrap_servers=[bootstrap_servers],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

def process_events(self):
"""Process incoming events"""
for message in self.consumer:
event = message.value

# Transform event
transformed = self.transform_event(event)

# Enrich with additional data
enriched = self.enrich_event(transformed)

# Send to output topic
self.producer.send('processed-events', enriched)

# Commit offset
self.consumer.commit()

def transform_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Apply transformations"""
return {
'user_id': event['user_id'],
'event_type': event['type'],
'timestamp': event['timestamp'],
'properties': self.normalize_properties(event.get('props', {}))
}

def enrich_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich with user metadata"""
# Lookup user data from cache/database
user_data = self.lookup_user(event['user_id'])

event['user_segment'] = user_data.get('segment')
event['user_tier'] = user_data.get('tier')

return event

def lookup_user(self, user_id: str) -> Dict[str, Any]:
# Lookup from Redis cache or database
return {'segment': 'premium', 'tier': 'gold'}

def normalize_properties(self, props: Dict) -> Dict:
"""Normalize property names"""
return {k.lower().replace(' ', '_'): v for k, v in props.items()}

Usage Examples

ETL Pipeline

Apply data-engineering-patterns skill to create Airflow DAG for daily user analytics ETL

Data Quality

Apply data-engineering-patterns skill to implement data quality validation framework

Stream Processing

Apply data-engineering-patterns skill to build real-time event processing with Kafka

Integration Points

  • database-design-patterns - Data storage
  • mlops-patterns - Feature engineering
  • cloud-infrastructure-patterns - Data warehouses

Success Output

When successful, this skill MUST output:

✅ SKILL COMPLETE: data-engineering-patterns

Completed:
- [x] ETL/ELT pipeline implemented (extract, transform, load stages)
- [x] Data quality validation framework deployed
- [x] Stream processing configured (Kafka consumer/producer)
- [x] Airflow DAG created with dependency management
- [x] Data warehouse schema designed (star/snowflake schema)
- [x] Pipeline monitoring and alerting configured

Outputs:
- dags/daily_etl.py (Airflow DAG definition)
- data_quality/validator.py (validation framework)
- streaming/kafka_processor.py (stream processing logic)
- warehouse/schema.sql (data warehouse DDL)
- pipeline-metrics-dashboard.json (Grafana dashboard config)

Pipeline Metrics:
- Daily ETL Runtime: XX minutes
- Data Quality Pass Rate: XX%
- Stream Processing Throughput: XXX events/second
- Data Warehouse Size: XX GB
- Pipeline Success Rate: XX% (last 30 days)

Completion Checklist

Before marking this skill as complete, verify:

  • Airflow DAG created with all ETL tasks (extract, transform, load)
  • Task dependencies correctly defined (extract >> transform >> load)
  • Data quality validation checks implemented (nulls, types, ranges, duplicates)
  • Kafka stream processing configured (consumer and producer)
  • Data enrichment logic implemented (lookup tables, joins)
  • Data warehouse schema designed (star schema with fact/dimension tables)
  • Pipeline retry logic configured (max retries, backoff strategy)
  • Error handling and dead letter queue implemented
  • Pipeline monitoring configured (Airflow alerts, metrics dashboard)
  • Data freshness checks implemented (SLA monitoring)
  • Pipeline documentation created (data lineage, transformation logic)

Failure Indicators

This skill has FAILED if:

  • ❌ Airflow DAG fails to parse (syntax errors)
  • ❌ ETL tasks fail repeatedly (no retry logic or exhausted retries)
  • ❌ Data quality validation fails (> 5% records rejected)
  • ❌ Kafka consumer cannot connect (configuration errors)
  • ❌ Data warehouse load fails (schema mismatch, permission errors)
  • ❌ Pipeline runs but produces no output (silent failures)
  • ❌ Duplicate data loaded (idempotency not implemented)
  • ❌ No monitoring/alerting configured (failures go unnoticed)

When NOT to Use

Do NOT use this skill when:

  • Working with small datasets (< 1GB) that fit in memory (use pandas directly)
  • One-time data migrations (use manual scripts instead of pipelines)
  • Real-time analytics requiring sub-second latency (use stream processing only)
  • Simple data transformations without orchestration needs
  • Use batch-processing-patterns for simple scheduled jobs
  • Use stream-processing-patterns for pure streaming use cases

Alternative skills for different data needs:

  • batch-processing-patterns - Simple scheduled batch jobs without DAG orchestration
  • stream-processing-patterns - Real-time streaming without batch components
  • data-warehousing-patterns - Data warehouse design without pipeline orchestration
  • mlops-patterns - ML feature pipelines with model training integration

Anti-Patterns (Avoid)

Anti-PatternProblemSolution
No data quality validationBad data propagates to warehouseImplement validation checks at each stage
Hardcoded SQL datesPipeline breaks on date changesUse Airflow macros ({{ ds }}, {{ execution_date }})
No idempotencyRe-runs create duplicatesUse WRITE_APPEND with deduplication or WRITE_TRUNCATE
Missing error handlingSilent failures, data lossAdd try/except, dead letter queue, retry logic
Tight coupling between stagesChanges break entire pipelineUse intermediate storage (S3/GCS) between stages
No monitoringPipeline failures go unnoticedConfigure Airflow SLAs, alerts, and metrics dashboards
Unbounded stream processingMemory leaks, crashesImplement windowing, watermarks, and backpressure
No schema validationType mismatches, load failuresValidate schema before loading to warehouse
Processing all data every runInefficient, high costsImplement incremental processing (WHERE timestamp > last_run)

Principles

This skill embodies CODITECT foundational principles:

#2 First Principles Thinking

  • ETL stages (extract, transform, load) are fundamental to data pipelines
  • Data quality validation prevents bad data from propagating
  • Idempotency ensures pipelines can be safely re-run

#4 Separation of Concerns

  • Extract logic separate from transform logic
  • Validation separate from transformation
  • Batch processing (Airflow) separate from stream processing (Kafka)

#5 Eliminate Ambiguity

  • Explicit task dependencies (extract >> transform >> load)
  • Clear data quality rules (nulls not allowed in required columns)
  • Documented transformation logic (no black box transformations)

#7 Measurable Outcomes

  • Data quality pass rate (percentage of valid records)
  • Pipeline runtime (end-to-end latency)
  • Stream processing throughput (events per second)
  • Pipeline success rate (percentage of successful runs)

#10 Automation First

  • Airflow DAG orchestration (no manual pipeline execution)
  • Automated retries on transient failures
  • Scheduled execution (no manual triggers)
  • Automated data quality validation (no manual checks)

Full Principles: CODITECT-STANDARD-AUTOMATION.md


Version: 1.1.0 | Updated: 2026-01-04 | Author: CODITECT Team