Data Pipeline Architecture
Integrating Agentic AI with Data Engineering
Document ID: C7-DATA-PIPELINE-ARCHITECTURE
Version: 1.0
Category: Technical Deep Dive
Architecture Overview
┌─────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ (Databases, APIs, Files, Streams, Events) │
└───────────────────────────┬─────────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────────┐
│ INGESTION LAYER │
│ (Batch, Streaming, CDC) │
└───────────────────────────┬─────────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────────┐
│ PROCESSING LAYER │
│ (Transform, Enrich, Validate, AI Augment) │
└───────────────────────────┬─────────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────────┐
│ STORAGE LAYER │
│ (Data Lake, Warehouse, Vector Store) │
└───────────────────────────┬─────────────────────────────────┘
│
┌───────────────────────────▼─────────────────────────────────┐
│ SERVING LAYER │
│ (Agents, Analytics, APIs, Applications) │
└─────────────────────────────────────────────────────────────┘
Batch Processing with Agentic AI
Document Processing Pipeline
from prefect import flow, task
from typing import List, Dict
@task
def extract_documents(source_path: str) -> List[bytes]:
"""Extract raw documents from source"""
documents = []
for file_path in glob.glob(f"{source_path}/*"):
with open(file_path, 'rb') as f:
documents.append({
'path': file_path,
'content': f.read(),
'metadata': extract_metadata(file_path)
})
return documents
@task
def parse_documents(documents: List[Dict]) -> List[Dict]:
"""Parse documents using appropriate parsers"""
parsed = []
for doc in documents:
parser = get_parser(doc['metadata']['type'])
parsed.append({
**doc,
'text': parser.parse(doc['content']),
'structure': parser.extract_structure(doc['content'])
})
return parsed
@task
def ai_enrich_documents(documents: List[Dict]) -> List[Dict]:
"""Use LLM to enrich document metadata"""
enriched = []
for doc in documents:
# Batch for efficiency
enrichment = llm_client.batch_complete([
{
'task': 'classify',
'prompt': f"Classify this document: {doc['text'][:1000]}"
},
{
'task': 'extract_entities',
'prompt': f"Extract key entities: {doc['text'][:2000]}"
},
{
'task': 'summarize',
'prompt': f"Summarize in 2 sentences: {doc['text'][:3000]}"
}
])
enriched.append({
**doc,
'category': enrichment['classify'],
'entities': enrichment['extract_entities'],
'summary': enrichment['summarize']
})
return enriched
@task
def generate_embeddings(documents: List[Dict]) -> List[Dict]:
"""Generate vector embeddings for semantic search"""
texts = [d['text'] for d in documents]
embeddings = embedding_model.encode(texts, batch_size=32)
for doc, embedding in zip(documents, embeddings):
doc['embedding'] = embedding.tolist()
return documents
@task
def load_to_stores(documents: List[Dict]):
"""Load to data warehouse and vector store"""
# Load structured data to warehouse
warehouse_records = [{
'id': doc['metadata']['id'],
'path': doc['path'],
'category': doc['category'],
'summary': doc['summary'],
'entities': json.dumps(doc['entities']),
'processed_at': datetime.utcnow()
} for doc in documents]
warehouse.bulk_insert('documents', warehouse_records)
# Load to vector store
vector_records = [{
'id': doc['metadata']['id'],
'embedding': doc['embedding'],
'text': doc['text'][:10000], # Truncate for storage
'metadata': doc['metadata']
} for doc in documents]
vector_store.upsert(vector_records)
@flow
def document_processing_pipeline(source_path: str):
"""Main pipeline flow"""
raw_docs = extract_documents(source_path)
parsed_docs = parse_documents(raw_docs)
enriched_docs = ai_enrich_documents(parsed_docs)
embedded_docs = generate_embeddings(enriched_docs)
load_to_stores(embedded_docs)
Streaming Architecture
Real-Time Event Processing
from kafka import KafkaConsumer, KafkaProducer
import asyncio
class StreamingAgentProcessor:
def __init__(self, kafka_config, agent_config):
self.consumer = KafkaConsumer(
'incoming_events',
**kafka_config,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
**kafka_config,
value_serializer=lambda m: json.dumps(m).encode('utf-8')
)
self.agent = create_agent(agent_config)
async def process_stream(self):
"""Process events from Kafka stream"""
for message in self.consumer:
event = message.value
# Determine if AI processing needed
if self.needs_ai_processing(event):
result = await self.agent.process(event)
enriched_event = {
**event,
'ai_analysis': result,
'processed_at': datetime.utcnow().isoformat()
}
else:
enriched_event = event
# Route to appropriate output topic
output_topic = self.route_event(enriched_event)
self.producer.send(output_topic, enriched_event)
def needs_ai_processing(self, event: Dict) -> bool:
"""Determine if event needs AI analysis"""
# Process high-priority or complex events
return (
event.get('priority') == 'high' or
event.get('requires_analysis') or
len(event.get('text', '')) > 500
)
def route_event(self, event: Dict) -> str:
"""Route event to appropriate output topic"""
category = event.get('ai_analysis', {}).get('category', 'default')
return f"processed_{category}"
Flink Integration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction
class AIEnrichmentFunction(ProcessFunction):
def __init__(self, model_config):
self.model_config = model_config
self.llm_client = None
def open(self, runtime_context):
# Initialize LLM client (once per task)
self.llm_client = create_llm_client(self.model_config)
def process_element(self, value, ctx):
# Process element with AI
if value['needs_analysis']:
analysis = self.llm_client.analyze(value['content'])
value['ai_result'] = analysis
yield value
# Pipeline definition
env = StreamExecutionEnvironment.get_execution_environment()
source = env.add_source(kafka_source)
enriched = source.process(AIEnrichmentFunction(model_config))
enriched.add_sink(kafka_sink)
env.execute("AI Enrichment Pipeline")
ETL with AI Augmentation
dbt + LLM Integration
# dbt model with AI augmentation
# models/enriched_customers.sql
{{ config(materialized='incremental') }}
WITH raw_customers AS (
SELECT * FROM {{ ref('stg_customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
ai_enriched AS (
SELECT
customer_id,
{{ ai_classify('customer_description', 'segment') }} as predicted_segment,
{{ ai_extract('customer_notes', 'interests') }} as extracted_interests,
{{ ai_sentiment('feedback_text') }} as sentiment_score
FROM raw_customers
)
SELECT
r.*,
e.predicted_segment,
e.extracted_interests,
e.sentiment_score
FROM raw_customers r
LEFT JOIN ai_enriched e ON r.customer_id = e.customer_id
# dbt macro for AI classification
# macros/ai_classify.sql
{% macro ai_classify(column, classification_type) %}
(
SELECT result
FROM TABLE(
ai_service.classify(
input => {{ column }},
type => '{{ classification_type }}'
)
)
)
{% endmacro %}
Data Quality with Agentic AI
Automated Data Validation
from great_expectations.core import ExpectationSuite
from typing import List, Dict
class AIDataValidator:
def __init__(self, llm_client):
self.llm = llm_client
def validate_with_ai(self, dataframe, context: str) -> List[Dict]:
"""Use AI to identify data quality issues"""
# Get statistical profile
profile = dataframe.describe().to_dict()
# Sample for AI analysis
sample = dataframe.sample(min(100, len(dataframe))).to_dict('records')
prompt = f"""
Analyze this data for quality issues.
Context: {context}
Statistical Profile:
{json.dumps(profile, indent=2)}
Sample Records:
{json.dumps(sample[:10], indent=2)}
Identify:
1. Missing value patterns
2. Outliers and anomalies
3. Format inconsistencies
4. Logical errors
5. Referential integrity issues
Return as JSON with severity (high/medium/low) for each issue.
"""
analysis = self.llm.complete(prompt)
return json.loads(analysis)
def generate_expectations(self, analysis: List[Dict]) -> ExpectationSuite:
"""Generate Great Expectations suite from AI analysis"""
suite = ExpectationSuite(name="ai_generated")
for issue in analysis:
if issue['type'] == 'missing_value':
suite.add_expectation(
expectation_type="expect_column_values_to_not_be_null",
column=issue['column']
)
elif issue['type'] == 'outlier':
suite.add_expectation(
expectation_type="expect_column_values_to_be_between",
column=issue['column'],
min_value=issue['expected_min'],
max_value=issue['expected_max']
)
# ... additional expectation types
return suite
Architecture Patterns
Lambda Architecture with AI
┌─────────────────────────────────────┐
│ DATA SOURCES │
└─────────────────┬───────────────────┘
│
┌─────────────────▼───────────────────┐
│ MESSAGE QUEUE │
│ (Kafka) │
└─────────────────┬───────────────────┘
┌─────────┴─────────┐
│ │
┌──────────▼─────────┐ ┌──────▼──────────┐
│ BATCH LAYER │ │ SPEED LAYER │
│ (Deep AI Analysis) │ │(Fast AI Enrichm)│
│ Spark + LLM │ │ Flink + LLM │
└──────────┬─────────┘ └──────┬──────────┘
│ │
┌──────────▼─────────────────▼──────────┐
│ SERVING LAYER │
│ (Query across batch + speed) │
└───────────────────────────────────────┘
Medallion Architecture
BRONZE (Raw) SILVER (Cleaned) GOLD (Aggregated)
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Raw events │──────►│AI Validated│────────►│ AI Insights│
│ Raw docs │ Parse │AI Enriched │ Agg │ AI Summary │
│ Raw data │ Clean │AI Classified│ Join │ AI Predict │
└────────────┘ └────────────┘ └────────────┘
│ │ │
│ │ │
Append Upsert/Merge Overwrite
Only or Append
Performance Optimization
Batch Size Optimization
class OptimalBatchProcessor:
def __init__(self, llm_client, max_tokens_per_batch=100000):
self.llm = llm_client
self.max_tokens = max_tokens_per_batch
def process_in_optimal_batches(self, items: List[Dict]):
"""Process items in optimally-sized batches"""
batches = self.create_batches(items)
results = []
for batch in batches:
batch_result = self.llm.batch_complete(batch)
results.extend(batch_result)
return results
def create_batches(self, items: List[Dict]) -> List[List[Dict]]:
"""Create batches that maximize throughput"""
batches = []
current_batch = []
current_tokens = 0
for item in items:
item_tokens = estimate_tokens(item)
if current_tokens + item_tokens > self.max_tokens:
batches.append(current_batch)
current_batch = [item]
current_tokens = item_tokens
else:
current_batch.append(item)
current_tokens += item_tokens
if current_batch:
batches.append(current_batch)
return batches
Quick Reference
| Pattern | Use Case | AI Role |
|---|---|---|
| Batch ETL | Document processing | Classification, enrichment |
| Streaming | Real-time events | Routing, anomaly detection |
| CDC | Database changes | Impact analysis |
| Data Quality | Validation | Issue detection, fixing |
| Lakehouse | Analytics | Summarization, insights |
Document maintained by CODITECT Data Engineering Team