scripts-convert-workflows-to-n8n
#!/usr/bin/env python3 """
title: "Paths" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Workflow to N8N Converter Converts markdown workflow definitions to N8N-compatible JSON files." keywords: ['analysis', 'api', 'automation', 'convert', 'database'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "convert-workflows-to-n8n.py" language: python executable: true usage: "python3 scripts/convert-workflows-to-n8n.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false
Workflow to N8N Converter Converts markdown workflow definitions to N8N-compatible JSON files.
Usage: python3 scripts/convert-workflows-to-n8n.py [--dry-run] [--verbose] """
import json import os import re import sys from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple import hashlib
Paths
SCRIPT_DIR = Path(file).parent ROOT_DIR = SCRIPT_DIR.parent DOCS_WORKFLOWS_DIR = ROOT_DIR / "docs" / "workflows" OUTPUT_WORKFLOWS_DIR = ROOT_DIR / "workflows"
N8N Node type mappings based on common workflow patterns
NODE_TYPE_MAPPINGS = { # Trigger patterns "webhook": "n8n-nodes-base.webhook", "schedule": "n8n-nodes-base.scheduleTrigger", "manual": "n8n-nodes-base.manualTrigger", "file": "n8n-nodes-base.fileWatcher", "email": "n8n-nodes-base.emailReadImap",
# Action patterns
"api": "n8n-nodes-base.httpRequest",
"http": "n8n-nodes-base.httpRequest",
"request": "n8n-nodes-base.httpRequest",
"fetch": "n8n-nodes-base.httpRequest",
# Data processing
"transform": "n8n-nodes-base.function",
"process": "n8n-nodes-base.function",
"calculate": "n8n-nodes-base.function",
"analyze": "n8n-nodes-base.function",
"evaluate": "n8n-nodes-base.function",
"validate": "n8n-nodes-base.function",
"filter": "n8n-nodes-base.filter",
"merge": "n8n-nodes-base.merge",
"split": "n8n-nodes-base.splitInBatches",
# Database
"database": "n8n-nodes-base.postgres",
"query": "n8n-nodes-base.postgres",
"store": "n8n-nodes-base.postgres",
"save": "n8n-nodes-base.postgres",
"retrieve": "n8n-nodes-base.postgres",
# Notifications
"notify": "n8n-nodes-base.slack",
"alert": "n8n-nodes-base.slack",
"send": "n8n-nodes-base.emailSend",
"email": "n8n-nodes-base.emailSend",
"message": "n8n-nodes-base.slack",
# Files
"read": "n8n-nodes-base.readFile",
"write": "n8n-nodes-base.writeFile",
"export": "n8n-nodes-base.writeFile",
"generate": "n8n-nodes-base.function",
"create": "n8n-nodes-base.function",
# Conditionals
"check": "n8n-nodes-base.if",
"verify": "n8n-nodes-base.if",
"decide": "n8n-nodes-base.switch",
"route": "n8n-nodes-base.switch",
"branch": "n8n-nodes-base.switch",
# Wait/Delay
"wait": "n8n-nodes-base.wait",
"delay": "n8n-nodes-base.wait",
"schedule": "n8n-nodes-base.wait",
# Integration specific
"github": "n8n-nodes-base.github",
"git": "n8n-nodes-base.executeCommand",
"slack": "n8n-nodes-base.slack",
"discord": "n8n-nodes-base.discord",
"notion": "n8n-nodes-base.notion",
"google": "n8n-nodes-base.googleSheets",
"spreadsheet": "n8n-nodes-base.googleSheets",
# AI/LLM
"ai": "n8n-nodes-base.httpRequest",
"llm": "n8n-nodes-base.httpRequest",
"claude": "n8n-nodes-base.httpRequest",
"openai": "n8n-nodes-base.openAi",
# Default
"default": "n8n-nodes-base.function"
}
Category to subdirectory mapping
CATEGORY_MAPPINGS = { # Software Development "project": "project-lifecycle", "lifecycle": "project-lifecycle", "setup": "project-lifecycle", "init": "project-lifecycle",
"developer": "developer-experience",
"dev": "developer-experience",
"coding": "developer-experience",
"debug": "developer-experience",
"qa": "quality-assurance",
"test": "quality-assurance",
"quality": "quality-assurance",
"review": "quality-assurance",
"validation": "quality-assurance",
"deploy": "devops",
"ci": "devops",
"cd": "devops",
"pipeline": "devops",
"infrastructure": "devops",
"container": "devops",
"doc": "documentation",
"documentation": "documentation",
"readme": "documentation",
"api-doc": "documentation",
"collab": "collaboration",
"team": "collaboration",
"pr": "collaboration",
"code-review": "collaboration",
"security": "security",
"audit": "security",
"compliance": "security",
"vulnerability": "security",
"perf": "performance",
"performance": "performance",
"optimization": "performance",
"profiling": "performance",
"memory": "memory-context",
"context": "memory-context",
"session": "memory-context",
"checkpoint": "memory-context",
"prototype": "innovation",
"experiment": "innovation",
"spike": "innovation",
"poc": "innovation",
# Business domains - map to new subdirectories
"sales": "business/sales",
"marketing": "business/marketing",
"customer": "business/customer-success",
"lead": "business/sales",
"hr": "professional/hr",
"hiring": "professional/hr",
"onboarding": "professional/hr",
"employee": "professional/hr",
"finance": "professional/finance",
"accounting": "professional/finance",
"invoice": "professional/finance",
"budget": "professional/finance",
"tax": "professional/finance",
"legal": "professional/legal",
"contract": "professional/legal",
"compliance": "professional/legal",
"operations": "operations/process",
"process": "operations/process",
"workflow": "operations/process",
"supply": "operations/supply-chain",
"inventory": "operations/supply-chain",
"procurement": "operations/supply-chain",
"vendor": "operations/supply-chain",
"maintenance": "operations/maintenance",
"facility": "operations/facility",
"asset": "operations/assets",
"research": "research/intelligence",
"intelligence": "research/intelligence",
"analysis": "research/intelligence",
"market": "research/market",
"investment": "finance/investment",
"portfolio": "finance/investment",
"trading": "finance/trading",
"stock": "finance/trading",
"crypto": "finance/crypto",
"defi": "finance/crypto",
"nft": "finance/crypto",
"real-estate": "industry/real-estate",
"property": "industry/real-estate",
"rental": "industry/real-estate",
"education": "industry/education",
"learning": "industry/education",
"course": "industry/education",
"curriculum": "industry/education",
"healthcare": "industry/healthcare",
"medical": "industry/healthcare",
"clinical": "industry/healthcare",
"patient": "industry/healthcare",
"hospital": "industry/healthcare",
"pharmacy": "industry/healthcare",
"hipaa": "industry/healthcare",
"manufacturing": "industry/manufacturing",
"production": "industry/manufacturing",
"quality-control": "industry/manufacturing",
"factory": "industry/manufacturing",
"agriculture": "industry/agriculture",
"farm": "industry/agriculture",
"crop": "industry/agriculture",
# Financial Services
"banking": "industry/financial-services",
"bank": "industry/financial-services",
"loan": "industry/financial-services",
"mortgage": "industry/financial-services",
"insurance": "industry/financial-services",
"underwriting": "industry/financial-services",
"claims": "industry/financial-services",
"kyc": "industry/financial-services",
"aml": "industry/financial-services",
"fintech": "industry/financial-services",
# Retail/E-commerce
"retail": "industry/retail-ecommerce",
"ecommerce": "industry/retail-ecommerce",
"store": "industry/retail-ecommerce",
"inventory": "industry/retail-ecommerce",
"pos": "industry/retail-ecommerce",
"merchandise": "industry/retail-ecommerce",
"cart": "industry/retail-ecommerce",
"checkout": "industry/retail-ecommerce",
"fulfillment": "industry/retail-ecommerce",
# Transportation/Logistics
"transportation": "industry/transportation-logistics",
"logistics": "industry/transportation-logistics",
"shipping": "industry/transportation-logistics",
"freight": "industry/transportation-logistics",
"fleet": "industry/transportation-logistics",
"warehouse": "industry/transportation-logistics",
"carrier": "industry/transportation-logistics",
"delivery": "industry/transportation-logistics",
"customs": "industry/transportation-logistics",
# Energy/Utilities
"energy": "industry/energy-utilities",
"utility": "industry/energy-utilities",
"utilities": "industry/energy-utilities",
"power": "industry/energy-utilities",
"grid": "industry/energy-utilities",
"renewable": "industry/energy-utilities",
"solar": "industry/energy-utilities",
"wind": "industry/energy-utilities",
"meter": "industry/energy-utilities",
"outage": "industry/energy-utilities",
# Hospitality/Tourism
"hospitality": "industry/hospitality-tourism",
"hotel": "industry/hospitality-tourism",
"restaurant": "industry/hospitality-tourism",
"tourism": "industry/hospitality-tourism",
"travel": "industry/hospitality-tourism",
"booking": "industry/hospitality-tourism",
"guest": "industry/hospitality-tourism",
"reservation": "industry/hospitality-tourism",
"catering": "industry/hospitality-tourism",
# Legal Services
"legal": "industry/legal-services",
"law": "industry/legal-services",
"attorney": "industry/legal-services",
"litigation": "industry/legal-services",
"contract": "industry/legal-services",
"court": "industry/legal-services",
"case": "industry/legal-services",
"deposition": "industry/legal-services",
"discovery": "industry/legal-services",
"creative": "creative/general",
"design": "creative/design",
"content": "creative/content",
"freelance": "creative/freelance",
"photography": "creative/photography",
"video": "creative/video",
"music": "creative/music",
"writing": "creative/writing",
"art": "creative/art",
"social": "community/social",
"community": "community/general",
"nonprofit": "community/nonprofit",
"event": "community/events",
"personal": "personal/productivity",
"productivity": "personal/productivity",
"gtd": "personal/productivity",
"habit": "personal/habits",
"health": "personal/health",
"wellness": "personal/health",
"family": "personal/family",
"home": "personal/home",
}
def parse_workflow_file(filepath: Path) -> List[Dict]: """Parse a markdown workflow file and extract workflow definitions.""" workflows = []
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# Section header to category mapping for 50-ESSENTIAL-WORKFLOWS.md
section_category_mappings = {
"project lifecycle": "project-lifecycle",
"developer experience": "developer-experience",
"quality assurance": "quality-assurance",
"devops": "devops",
"documentation": "documentation",
"collaboration": "collaboration",
"security": "security",
"performance": "performance",
"memory context": "memory-context",
"innovation": "innovation",
}
# Track current section for section-based categorization
current_section = None
current_section_category = None
# Parse section headers first to build position map
section_positions = []
section_pattern = re.compile(r'^## \d+\.\s+(.+?)$', re.MULTILINE)
for match in section_pattern.finditer(content):
section_name = match.group(1).strip().lower()
# Map section name to category
category = None
for key, cat in section_category_mappings.items():
if key in section_name:
category = cat
break
section_positions.append((match.start(), category))
# Pattern to match workflow definitions - supports two formats:
# Format 1: ### 1.1. workflow-slug-name (lowercase with hyphens)
# Format 2: ### 1. Workflow Title Name Workflow (Title Case)
# Pattern 1: Original format - lowercase slug names
workflow_pattern_slug = re.compile(
r'###\s+(?:\d+\.(?:\d+\.)?\s+)?([a-z0-9-]+)\s*\n'
r'(.*?)(?=###\s+(?:\d+\.(?:\d+\.)?\s+)?[a-z0-9-]+\s*\n|## [A-Z]|\Z)',
re.DOTALL | re.IGNORECASE
)
# Pattern 2: Title format - "### 1. Title Name Workflow"
workflow_pattern_title = re.compile(
r'###\s+\d+\.\s+([A-Z][A-Za-z0-9\s&/\-]+?Workflow)\s*\n'
r'(.*?)(?=###\s+\d+\.\s+[A-Z]|\Z)',
re.DOTALL
)
# Try slug pattern first
matches = list(workflow_pattern_slug.finditer(content))
# If no matches, try title pattern (used by newer industry workflows)
if not matches:
matches = list(workflow_pattern_title.finditer(content))
for match in matches:
raw_name = match.group(1).strip()
# Convert to slug format: "Vehicle Maintenance Scheduling Workflow" -> "vehicle-maintenance-scheduling-workflow"
workflow_name = re.sub(r'[^a-z0-9]+', '-', raw_name.lower()).strip('-')
workflow_content = match.group(2).strip()
workflow_position = match.start()
# Find which section this workflow belongs to
section_category = None
for pos, cat in reversed(section_positions):
if workflow_position > pos and cat:
section_category = cat
break
# Parse workflow metadata with section category hint
workflow = parse_workflow_metadata(workflow_name, workflow_content, filepath, section_category)
if workflow:
workflows.append(workflow)
return workflows
def parse_workflow_metadata(name: str, content: str, source_file: Path, section_category: str = None) -> Optional[Dict]: """Parse workflow metadata from content block."""
# Extract description
desc_match = re.search(r'\*\*Description:\*\*\s*(.+?)(?:\n|$)', content)
description = desc_match.group(1).strip() if desc_match else f"Workflow: {name}"
# Extract trigger
trigger_match = re.search(r'\*\*Trigger:\*\*\s*(.+?)(?:\n|$)', content)
trigger = trigger_match.group(1).strip() if trigger_match else "manual"
# Extract complexity
complexity_match = re.search(r'\*\*Complexity:\*\*\s*(.+?)(?:\n|$)', content)
complexity = complexity_match.group(1).strip().lower() if complexity_match else "moderate"
# Extract duration
duration_match = re.search(r'\*\*Duration:\*\*\s*(.+?)(?:\n|$)', content)
duration = duration_match.group(1).strip() if duration_match else "15-30m"
# Extract steps
steps = parse_workflow_steps(content)
# Extract tags
tags_match = re.search(r'\*\*Tags:\*\*\s*(.+?)(?:\n|$)', content)
tags = []
if tags_match:
tags_str = tags_match.group(1).strip()
tags = [t.strip().strip('[]') for t in re.split(r'[,\s]+', tags_str) if t.strip()]
# Use section category if provided, otherwise determine from source file and tags
category = section_category if section_category else determine_category(name, tags, source_file)
return {
"name": name,
"description": description,
"trigger": trigger,
"complexity": complexity,
"duration": duration,
"steps": steps,
"tags": tags,
"category": category,
"source_file": str(source_file.name)
}
def parse_workflow_steps(content: str) -> List[Dict]: """Parse workflow steps from content.""" steps = []
# Match numbered steps like "1. Step description - agent - purpose"
step_pattern = re.compile(
r'^\s*(\d+)\.\s+(.+?)(?:\s*-\s*(.+?))?(?:\s*-\s*(.+?))?$',
re.MULTILINE
)
for match in step_pattern.finditer(content):
step_num = int(match.group(1))
step_desc = match.group(2).strip() if match.group(2) else ""
agent = match.group(3).strip() if match.group(3) else ""
purpose = match.group(4).strip() if match.group(4) else ""
if step_desc:
steps.append({
"number": step_num,
"description": step_desc,
"agent": agent,
"purpose": purpose
})
return steps
def determine_category(name: str, tags: List[str], source_file: Path) -> str: """Determine the workflow category for directory organization."""
# Check source file name for category hints
source_name = source_file.stem.lower()
# Map source files to categories
source_mappings = {
"50-essential-workflows": "developer-experience",
"workflow-definitions-ai-ml-data": "devops",
"business-sales-workflows": "business/sales",
"business-operations-workflows": "operations/process",
"operations-process-workflows": "operations/process",
"hr-legal-finance-workflows": "professional/hr",
"creative-professional-freelance-workflows": "creative/general",
"product-creative-workflows": "creative/design",
"research-intelligence-workflows": "research/intelligence",
"investment-trading-crypto-workflows": "finance/investment",
"education-healthcare-workflows": "industry/education",
"workflow-definitions-real-estate-events-travel": "industry/real-estate",
"manufacturing-agriculture-workflows": "industry/manufacturing",
"social-community-nonprofit-workflows": "community/general",
"workflows-personal-lifestyle": "personal/productivity",
# New industry segments (Top 10 expansion)
"healthcare-industry-workflows": "industry/healthcare",
"financial-services-industry-workflows": "industry/financial-services",
"retail-ecommerce-industry-workflows": "industry/retail-ecommerce",
"transportation-logistics-industry-workflows": "industry/transportation-logistics",
"energy-utilities-industry-workflows": "industry/energy-utilities",
"hospitality-tourism-industry-workflows": "industry/hospitality-tourism",
"legal-services-industry-workflows": "industry/legal-services",
}
for key, category in source_mappings.items():
if key in source_name:
return category
# Check workflow name for category hints
name_lower = name.lower()
for keyword, category in CATEGORY_MAPPINGS.items():
if keyword in name_lower:
return category
# Check tags for category hints
for tag in tags:
tag_lower = tag.lower()
for keyword, category in CATEGORY_MAPPINGS.items():
if keyword in tag_lower:
return category
# Default category
return "general"
def step_to_n8n_node(step: Dict, index: int, workflow_name: str) -> Dict: """Convert a workflow step to an N8N node."""
desc_lower = step["description"].lower()
agent_lower = step.get("agent", "").lower()
# Determine node type based on step description and agent
node_type = "n8n-nodes-base.function" # default
for keyword, n8n_type in NODE_TYPE_MAPPINGS.items():
if keyword in desc_lower or keyword in agent_lower:
node_type = n8n_type
break
# Generate unique node ID
node_id = f"{workflow_name}-step-{index}"
# Calculate position (vertical layout)
position_x = 250 + (index % 3) * 250
position_y = 150 + (index // 3) * 150
node = {
"parameters": {},
"name": f"Step {step['number']}: {step['description'][:30]}",
"type": node_type,
"typeVersion": 1,
"position": [position_x, position_y],
"id": node_id,
"notes": f"{step['description']}\nAgent: {step.get('agent', 'N/A')}\nPurpose: {step.get('purpose', 'N/A')}"
}
# Add type-specific parameters
if node_type == "n8n-nodes-base.function":
node["parameters"]["functionCode"] = f"// {step['description']}\n// Agent: {step.get('agent', 'N/A')}\n// Purpose: {step.get('purpose', 'N/A')}\n\nreturn items;"
elif node_type == "n8n-nodes-base.httpRequest":
node["parameters"]["url"] = "https://api.example.com/endpoint"
node["parameters"]["method"] = "POST"
elif node_type == "n8n-nodes-base.slack":
node["parameters"]["channel"] = "#notifications"
node["parameters"]["text"] = step["description"]
elif node_type == "n8n-nodes-base.emailSend":
node["parameters"]["subject"] = step["description"]
return node
def workflow_to_n8n(workflow: Dict) -> Dict: """Convert a parsed workflow to N8N format."""
nodes = []
connections = {}
# Determine trigger node based on workflow trigger
trigger = workflow.get("trigger", "manual").lower()
if "/" in trigger:
# Command trigger - use webhook
trigger_node = {
"parameters": {
"path": workflow["name"],
"httpMethod": "POST"
},
"name": f"Trigger: {trigger}",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [250, 50],
"id": f"{workflow['name']}-trigger",
"notes": f"Triggered by {trigger}"
}
elif "schedule" in trigger:
trigger_node = {
"parameters": {
"rule": {"interval": [{"field": "hours", "hoursInterval": 1}]}
},
"name": "Schedule Trigger",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 50],
"id": f"{workflow['name']}-trigger",
"notes": f"Scheduled execution"
}
else:
trigger_node = {
"parameters": {},
"name": "Manual Trigger",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [250, 50],
"id": f"{workflow['name']}-trigger",
"notes": "Manual execution"
}
nodes.append(trigger_node)
# Convert steps to nodes
prev_node_id = trigger_node["id"]
for i, step in enumerate(workflow.get("steps", [])):
node = step_to_n8n_node(step, i, workflow["name"])
nodes.append(node)
# Create connection from previous node
if prev_node_id not in connections:
connections[prev_node_id] = {"main": [[]]}
connections[prev_node_id]["main"][0].append({
"node": node["name"],
"type": "main",
"index": 0
})
prev_node_id = node["id"]
# Build N8N workflow structure
n8n_workflow = {
"name": workflow["name"].replace("-", " ").title(),
"nodes": nodes,
"connections": connections,
"active": False,
"settings": {
"executionOrder": "v1"
},
"versionId": hashlib.md5(workflow["name"].encode()).hexdigest()[:8],
"meta": {
"instanceId": "coditect-generated",
"templateCredsSetupCompleted": False
},
"tags": workflow.get("tags", []),
"pinData": {},
"staticData": None,
"createdAt": datetime.now().isoformat(),
"updatedAt": datetime.now().isoformat(),
# CODITECT metadata
"_coditect": {
"source": workflow.get("source_file", "unknown"),
"description": workflow.get("description", ""),
"complexity": workflow.get("complexity", "moderate"),
"duration": workflow.get("duration", "15-30m"),
"category": workflow.get("category", "general"),
"generated": datetime.now().isoformat(),
"version": "1.0.0"
}
}
return n8n_workflow
def save_n8n_workflow(workflow: Dict, n8n_data: Dict, output_dir: Path, dry_run: bool = False) -> Tuple[Path, bool]: """Save N8N workflow to appropriate directory."""
category = workflow.get("category", "general")
category_dir = output_dir / category
# Create directory if needed
if not dry_run:
category_dir.mkdir(parents=True, exist_ok=True)
# Generate filename
filename = f"{workflow['name']}.workflow.json"
filepath = category_dir / filename
if not dry_run:
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(n8n_data, f, indent=2, ensure_ascii=False)
return filepath, True
def generate_index_file(workflows_by_category: Dict, output_dir: Path, dry_run: bool = False) -> Path: """Generate an index file for all converted workflows."""
index = {
"generated": datetime.now().isoformat(),
"total_workflows": sum(len(w) for w in workflows_by_category.values()),
"categories": len(workflows_by_category),
"workflows_by_category": {}
}
for category, workflows in sorted(workflows_by_category.items()):
index["workflows_by_category"][category] = {
"count": len(workflows),
"workflows": [
{
"name": w["name"],
"file": f"{category}/{w['name']}.workflow.json",
"description": w.get("description", ""),
"complexity": w.get("complexity", "moderate")
}
for w in workflows
]
}
index_path = output_dir / "workflow-index.json"
if not dry_run:
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
return index_path
def update_workflows_readme(workflows_by_category: Dict, output_dir: Path, dry_run: bool = False) -> Path: """Update the workflows README with generated content."""
readme_path = output_dir / "README.md"
total_workflows = sum(len(w) for w in workflows_by_category.values())
content = f"""# CODITECT Framework Automation Workflows
Version: 1.0.0 Status: Auto-Generated Last Updated: {datetime.now().strftime('%Y-%m-%d')} Total Workflows: {total_workflows}
Overview
This directory contains N8N-compatible executable workflows auto-generated from the CODITECT workflow definition library.
Looking for workflow definitions? See docs/workflows/ for the source markdown workflow definitions.
Quick Statistics
| Metric | Value |
|---|---|
| Total Workflows | {total_workflows} |
| Categories | {len(workflows_by_category)} |
| Format | N8N JSON |
| Auto-Generated | Yes |
Directory Structure
workflows/
├── README.md # This file
├── workflow-index.json # Master index of all workflows
"""
# Add category directories
for category in sorted(workflows_by_category.keys()):
count = len(workflows_by_category[category])
content += f"├── {category}/ # {count} workflows\n"
content += """```
---
## Categories
"""
# Add category details
for category in sorted(workflows_by_category.keys()):
workflows = workflows_by_category[category]
content += f"### {category.replace('/', ' / ').title()}\n\n"
content += f"**Workflows:** {len(workflows)}\n\n"
content += "| Workflow | Complexity | Description |\n"
content += "|----------|------------|-------------|\n"
for w in workflows[:10]: # Limit to first 10
name = w['name']
complexity = w.get('complexity', 'moderate')
desc = w.get('description', '')[:50]
content += f"| {name} | {complexity} | {desc}... |\n"
if len(workflows) > 10:
content += f"| ... | ... | *{len(workflows) - 10} more workflows* |\n"
content += "\n---\n\n"
content += """## Using Workflows
### Import into N8N
```bash
# Via N8N CLI
n8n import:workflow --input=workflows/category/workflow-name.workflow.json
# Or copy JSON content directly into N8N editor
View Workflow Index
cat workflows/workflow-index.json | jq '.workflows_by_category | keys'
Related Documentation
- docs/workflows/ - Source workflow definitions (markdown)
- docs/02-user-guides/ - User guides and tutorials
- CODITECT-STANDARD-WORKFLOWS.md - Workflow standard
Generated by: scripts/convert-workflows-to-n8n.py
Source: CODITECT Workflow Definition Library
"""
if not dry_run:
with open(readme_path, 'w', encoding='utf-8') as f:
f.write(content)
return readme_path
def main(): """Main conversion process."""
dry_run = "--dry-run" in sys.argv
verbose = "--verbose" in sys.argv
print("=" * 60)
print("CODITECT Workflow to N8N Converter")
print("=" * 60)
if dry_run:
print("\n[DRY RUN MODE - No files will be written]\n")
# Find all workflow definition files
workflow_files = list(DOCS_WORKFLOWS_DIR.glob("*WORKFLOWS*.md"))
workflow_files.extend(DOCS_WORKFLOWS_DIR.glob("*WORKFLOW-DEFINITIONS*.md"))
# Remove duplicates and filter out index/readme files
workflow_files = [
f for f in set(workflow_files)
if "INDEX" not in f.name.upper()
and "README" not in f.name.upper()
and "SUMMARY" not in f.name.upper()
and "REPORT" not in f.name.upper()
]
print(f"\nFound {len(workflow_files)} workflow definition files:")
for f in workflow_files:
print(f" - {f.name}")
# Parse all workflows
all_workflows = []
workflows_by_category = {}
print(f"\n{'=' * 60}")
print("Parsing workflow definitions...")
print("=" * 60)
for filepath in workflow_files:
print(f"\nParsing: {filepath.name}")
workflows = parse_workflow_file(filepath)
print(f" Found {len(workflows)} workflows")
for w in workflows:
all_workflows.append(w)
category = w.get("category", "general")
if category not in workflows_by_category:
workflows_by_category[category] = []
workflows_by_category[category].append(w)
if verbose:
print(f" - {w['name']} [{category}]")
print(f"\n{'=' * 60}")
print(f"Total workflows parsed: {len(all_workflows)}")
print(f"Categories: {len(workflows_by_category)}")
print("=" * 60)
# Convert and save workflows
print(f"\n{'=' * 60}")
print("Converting to N8N format...")
print("=" * 60)
converted_count = 0
errors = []
for workflow in all_workflows:
try:
n8n_data = workflow_to_n8n(workflow)
filepath, success = save_n8n_workflow(workflow, n8n_data, OUTPUT_WORKFLOWS_DIR, dry_run)
if success:
converted_count += 1
if verbose:
print(f" Created: {filepath}")
except Exception as e:
errors.append((workflow.get("name", "unknown"), str(e)))
if verbose:
print(f" ERROR: {workflow.get('name', 'unknown')}: {e}")
# Generate index file
print(f"\n{'=' * 60}")
print("Generating index and README...")
print("=" * 60)
index_path = generate_index_file(workflows_by_category, OUTPUT_WORKFLOWS_DIR, dry_run)
print(f" Index: {index_path}")
readme_path = update_workflows_readme(workflows_by_category, OUTPUT_WORKFLOWS_DIR, dry_run)
print(f" README: {readme_path}")
# Summary
print(f"\n{'=' * 60}")
print("CONVERSION COMPLETE")
print("=" * 60)
print(f"\nResults:")
print(f" - Workflows converted: {converted_count}")
print(f" - Categories created: {len(workflows_by_category)}")
print(f" - Errors: {len(errors)}")
if errors and verbose:
print(f"\nErrors:")
for name, error in errors:
print(f" - {name}: {error}")
print(f"\nCategory breakdown:")
for category in sorted(workflows_by_category.keys()):
print(f" - {category}: {len(workflows_by_category[category])} workflows")
if dry_run:
print("\n[DRY RUN - No files were written]")
else:
print(f"\nOutput directory: {OUTPUT_WORKFLOWS_DIR}")
return 0 if not errors else 1
if name == "main": sys.exit(main())