Agent Skills Framework Extension
Data Engineering Patterns Skill
When to Use This Skill
Use this skill when implementing data engineering patterns patterns in your codebase.
How to Use This Skill
- Review the patterns and examples below
- Apply the relevant patterns to your implementation
- Follow the best practices outlined in this skill
Production data pipelines, ETL/ELT, streaming, and data warehouse patterns.
Core Capabilities
- ETL/ELT Pipelines - Batch and streaming data movement
- Data Warehousing - Star schema, dimensional modeling
- Stream Processing - Real-time data transformation
- Data Quality - Validation, profiling, monitoring
- Orchestration - Airflow DAGs, dependency management
Airflow ETL Pipeline
# dags/daily_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'data-eng',
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'daily_user_analytics_etl',
default_args=default_args,
description='Daily ETL for user analytics',
schedule_interval='0 2 * * *',
start_date=datetime(2024, 1, 1),
catchup=False,
)
def extract_user_events(**context):
"""Extract user events from source database"""
import sqlalchemy as sa
engine = sa.create_engine('postgresql://...')
execution_date = context['ds']
query = f"""
SELECT user_id, event_type, properties, timestamp
FROM events
WHERE DATE(timestamp) = '{execution_date}'
"""
df = pd.read_sql(query, engine)
# Save to temporary location
df.to_parquet(f'/tmp/events_{execution_date}.parquet')
return len(df)
def transform_events(**context):
"""Transform and enrich events"""
execution_date = context['ds']
df = pd.read_parquet(f'/tmp/events_{execution_date}.parquet')
# Transformations
df['event_date'] = pd.to_datetime(df['timestamp']).dt.date
df['event_hour'] = pd.to_datetime(df['timestamp']).dt.hour
# Aggregate metrics
daily_stats = df.groupby(['user_id', 'event_date']).agg({
'event_type': 'count',
'timestamp': ['min', 'max']
}).reset_index()
daily_stats.columns = ['user_id', 'date', 'total_events', 'first_event', 'last_event']
# Save transformed data
daily_stats.to_parquet(f'/tmp/user_stats_{execution_date}.parquet')
return len(daily_stats)
extract_task = PythonOperator(
task_id='extract_user_events',
python_callable=extract_user_events,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_events',
python_callable=transform_events,
dag=dag,
)
load_task = BigQueryInsertJobOperator(
task_id='load_to_warehouse',
configuration={
'load': {
'sourceUris': ['gs://bucket/user_stats_{{ ds }}.parquet'],
'destinationTable': {
'projectId': 'project',
'datasetId': 'analytics',
'tableId': 'user_daily_stats'
},
'sourceFormat': 'PARQUET',
'writeDisposition': 'WRITE_APPEND',
}
},
dag=dag,
)
extract_task >> transform_task >> load_task
Data Quality Validation
# data_quality/validator.py
from dataclasses import dataclass
from typing import List, Dict, Any
import pandas as pd
import great_expectations as ge
@dataclass
class ValidationResult:
passed: bool
total_checks: int
failed_checks: List[str]
metrics: Dict[str, Any]
class DataQualityValidator:
"""Data quality validation framework"""
def __init__(self):
self.validations = []
def validate_dataset(self, df: pd.DataFrame) -> ValidationResult:
"""Run all validation checks"""
failed = []
# Check for nulls in required columns
required_cols = ['user_id', 'timestamp', 'event_type']
for col in required_cols:
if df[col].isnull().any():
failed.append(f"Null values in required column: {col}")
# Check data types
if df['timestamp'].dtype not in ['datetime64[ns]', 'object']:
failed.append("Invalid timestamp data type")
# Check value ranges
if (df['timestamp'] > pd.Timestamp.now()).any():
failed.append("Future timestamps detected")
# Check duplicates
duplicates = df.duplicated(['user_id', 'timestamp']).sum()
if duplicates > 0:
failed.append(f"Duplicate records: {duplicates}")
# Statistical checks
metrics = {
'row_count': len(df),
'null_count': df.isnull().sum().sum(),
'duplicate_count': duplicates,
'unique_users': df['user_id'].nunique()
}
return ValidationResult(
passed=len(failed) == 0,
total_checks=4,
failed_checks=failed,
metrics=metrics
)
Stream Processing (Kafka)
# streaming/kafka_processor.py
from kafka import KafkaConsumer, KafkaProducer
import json
from typing import Dict, Any
class EventStreamProcessor:
"""Real-time event processing"""
def __init__(self, bootstrap_servers: str):
self.consumer = KafkaConsumer(
'raw-events',
bootstrap_servers=[bootstrap_servers],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False
)
self.producer = KafkaProducer(
bootstrap_servers=[bootstrap_servers],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
def process_events(self):
"""Process incoming events"""
for message in self.consumer:
event = message.value
# Transform event
transformed = self.transform_event(event)
# Enrich with additional data
enriched = self.enrich_event(transformed)
# Send to output topic
self.producer.send('processed-events', enriched)
# Commit offset
self.consumer.commit()
def transform_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Apply transformations"""
return {
'user_id': event['user_id'],
'event_type': event['type'],
'timestamp': event['timestamp'],
'properties': self.normalize_properties(event.get('props', {}))
}
def enrich_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich with user metadata"""
# Lookup user data from cache/database
user_data = self.lookup_user(event['user_id'])
event['user_segment'] = user_data.get('segment')
event['user_tier'] = user_data.get('tier')
return event
def lookup_user(self, user_id: str) -> Dict[str, Any]:
# Lookup from Redis cache or database
return {'segment': 'premium', 'tier': 'gold'}
def normalize_properties(self, props: Dict) -> Dict:
"""Normalize property names"""
return {k.lower().replace(' ', '_'): v for k, v in props.items()}
Usage Examples
ETL Pipeline
Apply data-engineering-patterns skill to create Airflow DAG for daily user analytics ETL
Data Quality
Apply data-engineering-patterns skill to implement data quality validation framework
Stream Processing
Apply data-engineering-patterns skill to build real-time event processing with Kafka
Integration Points
- database-design-patterns - Data storage
- mlops-patterns - Feature engineering
- cloud-infrastructure-patterns - Data warehouses
Success Output
When successful, this skill MUST output:
✅ SKILL COMPLETE: data-engineering-patterns
Completed:
- [x] ETL/ELT pipeline implemented (extract, transform, load stages)
- [x] Data quality validation framework deployed
- [x] Stream processing configured (Kafka consumer/producer)
- [x] Airflow DAG created with dependency management
- [x] Data warehouse schema designed (star/snowflake schema)
- [x] Pipeline monitoring and alerting configured
Outputs:
- dags/daily_etl.py (Airflow DAG definition)
- data_quality/validator.py (validation framework)
- streaming/kafka_processor.py (stream processing logic)
- warehouse/schema.sql (data warehouse DDL)
- pipeline-metrics-dashboard.json (Grafana dashboard config)
Pipeline Metrics:
- Daily ETL Runtime: XX minutes
- Data Quality Pass Rate: XX%
- Stream Processing Throughput: XXX events/second
- Data Warehouse Size: XX GB
- Pipeline Success Rate: XX% (last 30 days)
Completion Checklist
Before marking this skill as complete, verify:
- Airflow DAG created with all ETL tasks (extract, transform, load)
- Task dependencies correctly defined (extract >> transform >> load)
- Data quality validation checks implemented (nulls, types, ranges, duplicates)
- Kafka stream processing configured (consumer and producer)
- Data enrichment logic implemented (lookup tables, joins)
- Data warehouse schema designed (star schema with fact/dimension tables)
- Pipeline retry logic configured (max retries, backoff strategy)
- Error handling and dead letter queue implemented
- Pipeline monitoring configured (Airflow alerts, metrics dashboard)
- Data freshness checks implemented (SLA monitoring)
- Pipeline documentation created (data lineage, transformation logic)
Failure Indicators
This skill has FAILED if:
- ❌ Airflow DAG fails to parse (syntax errors)
- ❌ ETL tasks fail repeatedly (no retry logic or exhausted retries)
- ❌ Data quality validation fails (> 5% records rejected)
- ❌ Kafka consumer cannot connect (configuration errors)
- ❌ Data warehouse load fails (schema mismatch, permission errors)
- ❌ Pipeline runs but produces no output (silent failures)
- ❌ Duplicate data loaded (idempotency not implemented)
- ❌ No monitoring/alerting configured (failures go unnoticed)
When NOT to Use
Do NOT use this skill when:
- Working with small datasets (< 1GB) that fit in memory (use pandas directly)
- One-time data migrations (use manual scripts instead of pipelines)
- Real-time analytics requiring sub-second latency (use stream processing only)
- Simple data transformations without orchestration needs
- Use batch-processing-patterns for simple scheduled jobs
- Use stream-processing-patterns for pure streaming use cases
Alternative skills for different data needs:
- batch-processing-patterns - Simple scheduled batch jobs without DAG orchestration
- stream-processing-patterns - Real-time streaming without batch components
- data-warehousing-patterns - Data warehouse design without pipeline orchestration
- mlops-patterns - ML feature pipelines with model training integration
Anti-Patterns (Avoid)
| Anti-Pattern | Problem | Solution |
|---|---|---|
| No data quality validation | Bad data propagates to warehouse | Implement validation checks at each stage |
| Hardcoded SQL dates | Pipeline breaks on date changes | Use Airflow macros ({{ ds }}, {{ execution_date }}) |
| No idempotency | Re-runs create duplicates | Use WRITE_APPEND with deduplication or WRITE_TRUNCATE |
| Missing error handling | Silent failures, data loss | Add try/except, dead letter queue, retry logic |
| Tight coupling between stages | Changes break entire pipeline | Use intermediate storage (S3/GCS) between stages |
| No monitoring | Pipeline failures go unnoticed | Configure Airflow SLAs, alerts, and metrics dashboards |
| Unbounded stream processing | Memory leaks, crashes | Implement windowing, watermarks, and backpressure |
| No schema validation | Type mismatches, load failures | Validate schema before loading to warehouse |
| Processing all data every run | Inefficient, high costs | Implement incremental processing (WHERE timestamp > last_run) |
Principles
This skill embodies CODITECT foundational principles:
#2 First Principles Thinking
- ETL stages (extract, transform, load) are fundamental to data pipelines
- Data quality validation prevents bad data from propagating
- Idempotency ensures pipelines can be safely re-run
#4 Separation of Concerns
- Extract logic separate from transform logic
- Validation separate from transformation
- Batch processing (Airflow) separate from stream processing (Kafka)
#5 Eliminate Ambiguity
- Explicit task dependencies (extract >> transform >> load)
- Clear data quality rules (nulls not allowed in required columns)
- Documented transformation logic (no black box transformations)
#7 Measurable Outcomes
- Data quality pass rate (percentage of valid records)
- Pipeline runtime (end-to-end latency)
- Stream processing throughput (events per second)
- Pipeline success rate (percentage of successful runs)
#10 Automation First
- Airflow DAG orchestration (no manual pipeline execution)
- Automated retries on transient failures
- Scheduled execution (no manual triggers)
- Automated data quality validation (no manual checks)
Full Principles: CODITECT-STANDARD-AUTOMATION.md
Version: 1.1.0 | Updated: 2026-01-04 | Author: CODITECT Team