ADR-032: Hybrid Processing Agent Skills, Commands, and Workflows
Status
PROPOSED
Date
2026-01-15
Context
The Hybrid Document Processing Architecture (ADR-027) requires coordination of multiple specialized components. Users need:
- Agent Skills: Reusable capabilities agents can invoke
- CLI Commands: User-facing operations
- Workflows: Orchestrated multi-step processes
- Scripts: Automation templates
This ADR defines the complete operational interface for the hybrid processing system.
Decision
Implement a comprehensive skill/command/workflow system for hybrid document processing.
Agent Skill Registry
# /coditect/skills/hybrid_processing/SKILL.md
name: hybrid-document-processing
version: 2.0.0
description: Enterprise document processing with compliance
skills:
# ==================== Pre-Processing Skills ====================
- name: parse_document
description: Extract text from various document formats
inputs:
- document_path: string
- ocr_enabled: boolean (default: true)
- preserve_tables: boolean (default: true)
outputs:
- text: string
- metadata: object
- warnings: array
- name: extract_entities
description: Named Entity Recognition on text
inputs:
- text: string
- entity_types: array (default: [PERSON, ORG, DATE])
outputs:
- entities: object (type -> list of entities)
- name: deduplicate_text
description: Remove duplicate content
inputs:
- text: string
- threshold: float (default: 0.9)
outputs:
- cleaned_text: string
- removed_count: integer
- name: chunk_document
description: Split text into semantic chunks
inputs:
- text: string
- chunk_size: integer (default: 2000)
- overlap: integer (default: 200)
outputs:
- chunks: array of Chunk objects
# ==================== Indexing Skills ====================
- name: generate_embeddings
description: Generate vector embeddings for chunks
inputs:
- chunks: array of Chunk objects
- model: string (default: text-embedding-3-large)
outputs:
- embeddings: array of vectors
- chunk_ids: array of strings
- name: build_knowledge_graph
description: Build entity relationship graph
inputs:
- chunks: array of Chunk objects
- entities: object
outputs:
- graph: KnowledgeGraph object
- node_count: integer
- edge_count: integer
- name: index_to_vector_store
description: Store embeddings in vector database
inputs:
- embeddings: array
- metadata: array
- namespace: string
outputs:
- indexed_count: integer
- index_id: string
# ==================== Map-Reduce Skills ====================
- name: create_map_task
description: Define a map task for parallel execution
inputs:
- document: Document object
- extraction_schema: object
- token_budget: integer
outputs:
- task: MapTask object
- name: execute_map
description: Execute map extraction on a document
inputs:
- task: MapTask object
outputs:
- result: MapResult object
- tokens_used: integer
- name: aggregate_results
description: Reduce/aggregate map results
inputs:
- results: array of MapResult
- aggregation_strategy: string (merge|dedupe|synthesize)
outputs:
- aggregated: AggregatedResult object
# ==================== Hierarchy Skills ====================
- name: build_chunk_summary
description: Create extractive summary from extractions
inputs:
- extractions: array
- method: string (textrank|tfidf)
- top_k: integer (default: 10)
outputs:
- summary: ChunkSummary object
- name: build_section_summary
description: Create section-level summary
inputs:
- chunk_summaries: array
- max_synthesis_ratio: float (default: 0.2)
outputs:
- summary: SectionSummary object
- name: build_corpus_summary
description: Create corpus-level summary with validation
inputs:
- section_summaries: array
- validation_required: boolean (default: true)
outputs:
- summary: CorpusSummary object
- validation_report: object
# ==================== RAG Skills ====================
- name: expand_query
description: Expand query for better retrieval
inputs:
- query: string
- expansion_method: string (synonyms|hypothetical|multi)
outputs:
- expanded_queries: array of strings
- name: hybrid_retrieve
description: Retrieve using BM25 + vector + graph
inputs:
- query: string
- corpus_id: string
- top_k: integer (default: 10)
- access_filter: object
outputs:
- results: array of RetrievalResult
- scores: object
- name: generate_cited_response
description: Generate response with mandatory citations
inputs:
- query: string
- context: array of RetrievalResult
- citation_required: boolean (default: true)
outputs:
- response: string
- citations: array
- confidence: float
- name: validate_response
description: Check response for hallucinations
inputs:
- response: string
- citations: array
- sources: array
outputs:
- is_valid: boolean
- hallucination_flags: array
- confidence: float
# ==================== Compliance Skills ====================
- name: log_audit_event
description: Create compliance audit record
inputs:
- event_type: string
- user_context: object
- operation_details: object
outputs:
- audit_id: string
- name: sign_document
description: Apply electronic signature
inputs:
- document_hash: string
- signer_id: string
- signature_meaning: string
outputs:
- signature: string
- timestamp: string
- name: generate_audit_report
description: Generate compliance audit report
inputs:
- start_date: string
- end_date: string
- report_type: string
outputs:
- report: AuditReport object
CLI Command Structure
# /coditect/cli/hybrid_processing.py
"""
Coditect Hybrid Document Processing CLI
Usage:
coditect corpus <command> [options]
coditect preprocess <command> [options]
coditect index <command> [options]
coditect mapreduce <command> [options]
coditect knowledge <command> [options]
coditect rag <command> [options]
coditect audit <command> [options]
"""
# ==================== Corpus Management ====================
@cli.group()
def corpus():
"""Manage document corpora"""
pass
@corpus.command()
@click.argument('name')
@click.option('--input', '-i', required=True, help='Input directory')
@click.option('--description', '-d', help='Corpus description')
@click.option('--classification', default='internal',
type=click.Choice(['public', 'internal', 'confidential', 'restricted']))
def create(name, input, description, classification):
"""Create a new corpus from documents"""
# Implementation
@corpus.command()
@click.argument('corpus_id')
@click.option('--documents', '-d', multiple=True, help='Document paths to add')
def add(corpus_id, documents):
"""Add documents to existing corpus"""
# Implementation
@corpus.command()
@click.option('--status', type=click.Choice(['active', 'archived', 'all']))
def list(status):
"""List all corpora"""
# Implementation
# ==================== Pre-Processing ====================
@cli.group()
def preprocess():
"""Document pre-processing operations"""
pass
@preprocess.command()
@click.option('--input', '-i', required=True, help='Input path')
@click.option('--output', '-o', required=True, help='Output directory')
@click.option('--config', '-c', help='Config file path')
@click.option('--enable-ocr/--no-ocr', default=True)
@click.option('--chunk-size', default=2000, type=int)
@click.option('--parallel', default=4, type=int, help='Parallel workers')
def run(input, output, config, enable_ocr, chunk_size, parallel):
"""Pre-process documents for analysis"""
# Implementation
@preprocess.command()
@click.option('--input', '-i', required=True)
@click.option('--show-entities/--no-entities', default=True)
@click.option('--show-terms/--no-terms', default=True)
def preview(input, show_entities, show_terms):
"""Preview pre-processing without writing"""
# Implementation
@preprocess.command()
@click.option('--input', '-i', required=True)
@click.option('--output', '-o', required=True)
def analyze(input, output):
"""Analyze token reduction potential"""
# Implementation
# ==================== Indexing ====================
@cli.group()
def index():
"""Vector and graph indexing operations"""
pass
@index.command()
@click.argument('corpus_id')
@click.option('--embedding-model', default='text-embedding-3-large')
@click.option('--build-graph/--no-graph', default=True)
@click.option('--force/--no-force', default=False, help='Rebuild existing index')
def build(corpus_id, embedding_model, build_graph, force):
"""Build vector index and knowledge graph"""
# Implementation
@index.command()
@click.argument('corpus_id')
def status(corpus_id):
"""Show indexing status"""
# Implementation
@index.command()
@click.argument('corpus_id')
@click.option('--confirm', is_flag=True, help='Confirm deletion')
def delete(corpus_id, confirm):
"""Delete corpus index"""
# Implementation
# ==================== Map-Reduce ====================
@cli.group()
def mapreduce():
"""Map-reduce batch processing"""
pass
@mapreduce.command()
@click.argument('corpus_id')
@click.option('--schema', '-s', required=True, help='Extraction schema file')
@click.option('--output', '-o', required=True, help='Output directory')
@click.option('--parallelism', default=20, type=int)
@click.option('--budget', default=500000, type=int, help='Total token budget')
@click.option('--checkpoint-interval', default=10, type=int)
def run(corpus_id, schema, output, parallelism, budget, checkpoint_interval):
"""Execute map-reduce extraction"""
# Implementation
@mapreduce.command()
@click.argument('job_id')
def resume(job_id):
"""Resume interrupted job from checkpoint"""
# Implementation
@mapreduce.command()
@click.argument('job_id')
@click.option('--watch', '-w', is_flag=True, help='Watch progress')
def status(job_id, watch):
"""Show job status"""
# Implementation
@mapreduce.command()
@click.option('--status', type=click.Choice(['running', 'completed', 'failed', 'all']))
@click.option('--limit', default=20, type=int)
def list(status, limit):
"""List recent jobs"""
# Implementation
# ==================== Knowledge Hierarchy ====================
@cli.group()
def knowledge():
"""Hierarchical knowledge management"""
pass
@knowledge.command()
@click.argument('corpus_id')
@click.option('--method', default='textrank',
type=click.Choice(['textrank', 'tfidf']))
@click.option('--max-synthesis', default=0.2, type=float)
def build(corpus_id, method, max_synthesis):
"""Build knowledge hierarchy from extractions"""
# Implementation
@knowledge.command()
@click.argument('corpus_id')
@click.option('--query', '-q', required=True)
@click.option('--tier', default='section',
type=click.Choice(['extraction', 'chunk', 'section', 'corpus']))
@click.option('--include-sources/--no-sources', default=True)
@click.option('--format', default='text', type=click.Choice(['text', 'json']))
def query(corpus_id, query, tier, include_sources, format):
"""Query knowledge at specified tier"""
# Implementation
@knowledge.command()
@click.argument('item_id')
@click.option('--format', default='text')
def drilldown(item_id, format):
"""Drill down from summary to sources"""
# Implementation
@knowledge.command()
@click.argument('corpus_id')
@click.option('--check-citations/--no-citations', default=True)
@click.option('--check-synthesis/--no-synthesis', default=True)
def validate(corpus_id, check_citations, check_synthesis):
"""Validate hierarchy integrity"""
# Implementation
# ==================== RAG Queries ====================
@cli.group()
def rag():
"""RAG query operations"""
pass
@rag.command()
@click.argument('corpus_id')
@click.option('--query', '-q', required=True)
@click.option('--user-role', default='analyst')
@click.option('--top-k', default=10, type=int)
@click.option('--include-citations/--no-citations', default=True)
@click.option('--format', default='text', type=click.Choice(['text', 'json']))
def query(corpus_id, query, user_role, top_k, include_citations, format):
"""Execute RAG query with compliance"""
# Implementation
@rag.command()
@click.argument('corpus_id')
def interactive(corpus_id):
"""Start interactive RAG session"""
# Implementation
# ==================== Audit ====================
@cli.group()
def audit():
"""Compliance audit operations"""
pass
@audit.command()
@click.option('--type', '-t', required=True,
type=click.Choice(['access_summary', 'document_access',
'user_activity', 'phi_access']))
@click.option('--start-date', required=True)
@click.option('--end-date', required=True)
@click.option('--output', '-o', required=True)
@click.option('--sign/--no-sign', default=False)
def report(type, start_date, end_date, output, sign):
"""Generate compliance audit report"""
# Implementation
@audit.command()
@click.option('--event-ids', '-e', required=True, multiple=True)
@click.option('--format', default='json', type=click.Choice(['json', 'csv']))
@click.option('--sign/--no-sign', default=False)
@click.option('--output', '-o', required=True)
def export(event_ids, format, sign, output):
"""Export audit trail"""
# Implementation
@audit.command()
@click.option('--start-date', required=True)
@click.option('--check-signatures/--no-signatures', default=True)
@click.option('--check-integrity/--no-integrity', default=True)
def validate(start_date, check_signatures, check_integrity):
"""Validate audit log integrity"""
# Implementation
Workflow Definitions
# /coditect/workflows/full_corpus_analysis.yaml
name: full_corpus_analysis
description: Complete corpus analysis with hierarchy and RAG
version: 1.0.0
parameters:
corpus_name:
type: string
required: true
description: Name for the corpus
input_path:
type: string
required: true
description: Path to input documents
extraction_schema:
type: string
required: true
description: Path to extraction schema YAML
token_budget:
type: integer
default: 1000000
description: Total token budget for processing
parallelism:
type: integer
default: 20
description: Parallel agents for map phase
stages:
- name: create_corpus
command: coditect corpus create
args:
name: "{{ corpus_name }}"
input: "{{ input_path }}"
outputs:
corpus_id: "$.corpus_id"
- name: preprocess
command: coditect preprocess run
args:
input: "{{ input_path }}"
output: "./processed/{{ corpus_name }}"
enable_ocr: true
chunk_size: 2000
depends_on: [create_corpus]
outputs:
processed_count: "$.document_count"
token_reduction: "$.reduction_ratio"
- name: build_index
command: coditect index build
args:
corpus_id: "{{ stages.create_corpus.outputs.corpus_id }}"
build_graph: true
depends_on: [preprocess]
outputs:
index_id: "$.index_id"
- name: map_reduce_extraction
command: coditect mapreduce run
args:
corpus_id: "{{ stages.create_corpus.outputs.corpus_id }}"
schema: "{{ extraction_schema }}"
output: "./extractions/{{ corpus_name }}"
parallelism: "{{ parallelism }}"
budget: "{{ token_budget }}"
depends_on: [build_index]
outputs:
job_id: "$.job_id"
extracted_count: "$.document_count"
- name: build_hierarchy
command: coditect knowledge build
args:
corpus_id: "{{ stages.create_corpus.outputs.corpus_id }}"
method: textrank
max_synthesis: 0.2
depends_on: [map_reduce_extraction]
outputs:
tier_count: "$.tier_count"
- name: validate
command: coditect knowledge validate
args:
corpus_id: "{{ stages.create_corpus.outputs.corpus_id }}"
check_citations: true
check_synthesis: true
depends_on: [build_hierarchy]
outputs:
validation_result: "$.is_valid"
issues: "$.issues"
on_success:
- notify:
channel: slack
message: "Corpus {{ corpus_name }} analysis complete. {{ stages.validate.outputs.issues | length }} issues found."
on_failure:
- notify:
channel: slack
message: "Corpus {{ corpus_name }} analysis failed at stage {{ failed_stage }}."
- save_checkpoint:
path: "./checkpoints/{{ corpus_name }}_{{ timestamp }}.json"
---
# /coditect/workflows/incremental_update.yaml
name: incremental_update
description: Add new documents to existing corpus
version: 1.0.0
parameters:
corpus_id:
type: string
required: true
new_documents:
type: string
required: true
description: Path to new documents
stages:
- name: preprocess_new
command: coditect preprocess run
args:
input: "{{ new_documents }}"
output: "./processed/incremental_{{ timestamp }}"
- name: add_to_corpus
command: coditect corpus add
args:
corpus_id: "{{ corpus_id }}"
documents: "{{ stages.preprocess_new.outputs.processed_path }}"
depends_on: [preprocess_new]
- name: update_index
command: coditect index build
args:
corpus_id: "{{ corpus_id }}"
force: false # Only add new, don't rebuild
depends_on: [add_to_corpus]
- name: extract_new
command: coditect mapreduce run
args:
corpus_id: "{{ corpus_id }}"
schema: "./schemas/default_extraction.yaml"
output: "./extractions/incremental_{{ timestamp }}"
# Only process new documents
filter: "created_at > {{ last_update_time }}"
depends_on: [update_index]
- name: rebuild_hierarchy
command: coditect knowledge build
args:
corpus_id: "{{ corpus_id }}"
# Rebuild affected sections only
incremental: true
depends_on: [extract_new]
---
# /coditect/workflows/compliance_report.yaml
name: compliance_report
description: Generate weekly compliance audit report
version: 1.0.0
schedule: "0 9 * * MON" # Every Monday at 9 AM
parameters:
report_recipients:
type: array
default: ["compliance@company.com"]
stages:
- name: generate_access_report
command: coditect audit report
args:
type: access_summary
start_date: "{{ last_week_start }}"
end_date: "{{ last_week_end }}"
output: "./reports/access_{{ timestamp }}.pdf"
sign: true
- name: generate_phi_report
command: coditect audit report
args:
type: phi_access
start_date: "{{ last_week_start }}"
end_date: "{{ last_week_end }}"
output: "./reports/phi_{{ timestamp }}.pdf"
sign: true
- name: validate_audit_integrity
command: coditect audit validate
args:
start_date: "{{ last_week_start }}"
check_signatures: true
check_integrity: true
- name: send_reports
command: coditect notify email
args:
to: "{{ report_recipients }}"
subject: "Weekly Compliance Report - {{ last_week_start }} to {{ last_week_end }}"
attachments:
- "{{ stages.generate_access_report.outputs.report_path }}"
- "{{ stages.generate_phi_report.outputs.report_path }}"
depends_on: [generate_access_report, generate_phi_report, validate_audit_integrity]
Extraction Schema Templates
# /coditect/schemas/customer_voice.yaml
# Extract customer pain points from transcripts
name: customer_voice_extraction
version: 1.0.0
description: Extract customer language for marketing content
fields:
pain_points:
type: array
description: Customer-expressed problems and frustrations
items:
text:
type: string
description: Exact quote from customer
emotion:
type: enum
values: [frustration, fear, confusion, stress, anger]
intensity:
type: float
range: [0.0, 1.0]
extraction_hints:
- "Look for phrases like 'I hate when...', 'It's so frustrating...'"
- "Pay attention to emotional language and intensity"
questions_asked:
type: array
description: Direct questions from customers
items:
question:
type: string
context:
type: string
description: What prompted the question
answered:
type: boolean
concerns:
type: array
description: Hesitations and objections raised
items:
concern:
type: string
category:
type: enum
values: [price, quality, trust, timing, competition]
positive_language:
type: array
description: Positive phrases for testimonials
items:
text:
type: string
sentiment_score:
type: float
range: [0.0, 1.0]
output_format: json
citation_required: true
---
# /coditect/schemas/faq_extraction.yaml
name: faq_extraction
version: 1.0.0
description: Extract FAQ content from customer interactions
fields:
questions:
type: array
items:
question_text:
type: string
category:
type: string
description: Topic category
frequency:
type: integer
description: Approximate occurrence count
answer_provided:
type: string
description: How the question was answered
answer_quality:
type: enum
values: [complete, partial, insufficient, incorrect]
follow_up_questions:
type: array
items:
type: string
knowledge_gaps:
type: array
description: Topics where documentation is lacking
items:
topic:
type: string
evidence:
type: string
description: Quote showing the gap
priority:
type: enum
values: [high, medium, low]
output_format: json
citation_required: true
Agent Task Templates
# /coditect/agents/tasks/hybrid_processing_tasks.py
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class PreProcessTask:
"""Task for pre-processing agent"""
task_type: str = "preprocess"
document_path: str
output_path: str
config: Dict[str, Any]
def to_prompt(self) -> str:
return f"""
TASK: Pre-process document for analysis
DOCUMENT: {self.document_path}
OUTPUT: {self.output_path}
STEPS:
1. Detect and parse document format
2. Extract text with layout preservation
3. Remove noise (headers, footers, boilerplate)
4. Extract entities and key terms
5. Create semantic chunks
CONFIG:
{json.dumps(self.config, indent=2)}
Return structured output with:
- cleaned_text: Processed text
- chunks: Array of semantic chunks
- entities: Extracted named entities
- key_terms: TF-IDF top terms
- stats: Token counts and reduction ratio
"""
@dataclass
class MapExtractionTask:
"""Task for map-phase extraction agent"""
task_type: str = "map_extract"
chunk_id: str
chunk_content: str
extraction_schema: Dict[str, Any]
token_budget: int
def to_prompt(self) -> str:
return f"""
TASK: Extract structured data from document chunk
CHUNK_ID: {self.chunk_id}
TOKEN_BUDGET: {self.token_budget}
EXTRACTION SCHEMA:
{json.dumps(self.extraction_schema, indent=2)}
CHUNK CONTENT:
{self.chunk_content}
REQUIREMENTS:
1. Extract all fields defined in schema
2. Include exact quotes as citations
3. Assign confidence scores (0.0-1.0)
4. Return null with explanation for missing fields
5. Stay within token budget
OUTPUT FORMAT: JSON matching schema with citations array
"""
@dataclass
class ReduceSynthesisTask:
"""Task for reduce-phase synthesis agent"""
task_type: str = "reduce_synthesize"
map_results: List[Dict[str, Any]]
aggregation_strategy: str
def to_prompt(self) -> str:
results_json = json.dumps(self.map_results, indent=2)
return f"""
TASK: Aggregate and synthesize extraction results
STRATEGY: {self.aggregation_strategy}
MAP RESULTS:
{results_json}
REQUIREMENTS:
1. Merge findings across all results
2. Deduplicate identical items
3. Resolve conflicts with rationale
4. Preserve citation chains
5. Calculate aggregate statistics
OUTPUT FORMAT:
{{
"aggregated_data": {{ merged findings }},
"dedup_stats": {{ count of duplicates removed per field }},
"conflicts": [{{ resolved conflicts with rationale }}],
"statistics": {{ aggregate counts and metrics }}
}}
"""
@dataclass
class HierarchySummaryTask:
"""Task for hierarchy building agent"""
task_type: str = "build_summary"
tier: int
source_items: List[Dict[str, Any]]
max_synthesis_ratio: float
def to_prompt(self) -> str:
tier_instructions = {
2: "Create EXTRACTIVE summary only - select key sentences, do not generate new content",
3: "Create section summary with max 20% synthesis - mostly quotes with minimal transitions",
4: "Create corpus summary - synthesis allowed but every claim must cite sources"
}
return f"""
TASK: Build Tier {self.tier} summary
INSTRUCTION: {tier_instructions.get(self.tier, '')}
MAX_SYNTHESIS_RATIO: {self.max_synthesis_ratio}
SOURCE ITEMS:
{json.dumps(self.source_items, indent=2)}
REQUIREMENTS:
1. Follow tier-specific rules strictly
2. Preserve citation chains to original sources
3. Track synthesis ratio
4. Flag any content without source support
OUTPUT FORMAT:
{{
"summary_content": [{{ type: extractive|synthetic, content, source }}],
"synthesis_ratio": float,
"source_references": [{{ item_id, citation }}]
}}
"""
@dataclass
class RAGResponseTask:
"""Task for RAG response generation agent"""
task_type: str = "rag_response"
query: str
retrieved_context: List[Dict[str, Any]]
require_citations: bool = True
def to_prompt(self) -> str:
context_str = "\n\n".join([
f"[{ctx['chunk_id']}] {ctx['content']}"
for ctx in self.retrieved_context
])
return f"""
TASK: Generate response using retrieved context
QUERY: {self.query}
CITATION_REQUIRED: {self.require_citations}
RETRIEVED CONTEXT:
{context_str}
REQUIREMENTS:
1. Answer using ONLY information from provided context
2. Cite sources using [chunk_id] format
3. If information not in context, say "Not found in available documents"
4. Never make claims without citation
5. Be concise but complete
OUTPUT FORMAT:
{{
"response": "Answer text with [citations]",
"citations": [{{ chunk_id, quoted_text, relevance_score }}],
"confidence": float,
"unanswered_aspects": ["any parts of query not addressed"]
}}
"""
Consequences
Positive
- Complete operational interface for hybrid processing
- Reusable skills for agent composition
- Automated workflows for common patterns
- Consistent CLI across all operations
Negative
- Learning curve for full skill/workflow system
- Maintenance overhead for templates and schemas
- Version compatibility between skills and agents
Metrics
| Metric | Target | Measurement |
|---|---|---|
| Workflow success rate | >95% | Completed / initiated |
| Skill reuse | >80% | Workflows using shared skills |
| CLI command coverage | 100% | Operations accessible via CLI |
| Template usage | >70% | Jobs using standard templates |
Related ADRs
- ADR-027: Hybrid Document Processing Architecture (parent)
- ADR-028: Map-Reduce Agent Orchestration
- ADR-029: Hierarchical Knowledge Store
- ADR-030: Compliance-Aware RAG
- ADR-031: Document Pre-Processing Pipeline