Skip to main content

scripts-claude-research-automation

#!/usr/bin/env python3 """

title: "Add parent directory to path for imports" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Claude Research Automation Script" keywords: ['api', 'automation', 'claude', 'git', 'research'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "claude-research-automation.py" language: python executable: true usage: "python3 scripts/claude-research-automation.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Claude Research Automation Script

Automated collection, processing, and organization of Claude Code and Anthropic documentation from official sources, community resources, and training materials.

Usage: python3 claude-research-automation.py [options]

Examples: # Process all files in new/ directory python3 claude-research-automation.py --batch --auto-commit

# Watch mode for continuous processing
python3 claude-research-automation.py --watch

# Process specific URL
python3 claude-research-automation.py --url "https://docs.anthropic.com/claude-code/installation"

# Dry run to preview operations
python3 claude-research-automation.py --batch --dry-run --verbose

Author: coditect.ai Version: 1.0.0 Created: 2025-11-29 """

import os import sys import json import argparse import time import hashlib import re import shutil from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional from urllib.parse import urlparse import subprocess

Add parent directory to path for imports

sys.path.insert(0, str(Path(file).parent.parent))

class ClaudeResearchAutomation: """Main automation class for Claude research processing."""

def __init__(self, config_path: Optional[str] = None):
"""Initialize the automation system."""
self.base_dir = Path(__file__).parent.parent.parent
self.config = self.load_config(config_path)
self.stats = {
'processed': 0,
'skipped': 0,
'errors': 0,
'duplicates': 0,
'categories': {}
}

def load_config(self, config_path: Optional[str] = None) -> Dict:
"""Load configuration from settings.json or use defaults."""
default_config = {
'watch_directory': 'docs/source-materials/new/',
'archive_directory': 'docs/source-materials/archived/',
'output_base': 'docs/research-library/',
'auto_commit': False,
'auto_push': False,
'duplicate_detection': True,
'quality_checks': True,
'watch_interval': 5
}

if config_path and Path(config_path).exists():
with open(config_path, 'r') as f:
user_config = json.load(f)
default_config.update(user_config.get('claude-research-agent', {}))

return default_config

def categorize_content(self, content: str, metadata: Dict) -> str:
"""
Determine appropriate category for content.

Returns category path like 'official/api' or 'community/blogs'
"""
source = metadata.get('source', '').lower()
title = metadata.get('title', '').lower()

# Official Anthropic sources
official_domains = ['docs.anthropic.com', 'support.claude.com', 'platform.claude.com', 'anthropic.com']
if any(domain in source for domain in official_domains):
if 'api' in title or 'reference' in title:
return 'official/api'
elif 'tutorial' in title or 'guide' in title or 'how to' in title or 'step-by-step' in title:
return 'official/tutorials'
elif 'best practice' in title or 'tip' in title or 'essential' in title:
return 'official/best-practices'
else:
return 'official/tutorials' # Default official category

# Community content
community_domains = ['github.com', 'reddit.com', 'dev.to', 'medium.com', 'hackernoon.com']
if any(domain in source for domain in community_domains):
if 'blog' in source or 'medium.com' in source or 'dev.to' in source:
return 'community/blogs'
else:
return 'community/discussions'

# Training materials
if 'course' in title or 'training' in title or 'lesson' in title or 'beginner' in title:
return 'training/courses'

# Release notes
if 'release' in title or 'changelog' in title or 'version' in title or '2.0' in title:
if '2.0' in title or 'version 2' in title:
return 'releases/version-2.0'
else:
return 'releases'

# Workflow/tutorial detection
if 'workflow' in title or 'cursor' in title:
return 'community/blogs'

# Default to community if unsure
return 'community/blogs'

def extract_metadata(self, file_path: Path, content: str) -> Dict:
"""Extract metadata from file content and filename."""
metadata = {
'filename': file_path.name,
'source': '',
'title': '',
'date': datetime.now().strftime('%Y-%m-%d'),
'type': 'unknown'
}

# Extract title from filename or first heading
title_from_filename = file_path.stem.replace('-', ' ').replace('_', ' ')
metadata['title'] = title_from_filename

# Try to find title in content (first # heading)
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
if title_match:
metadata['title'] = title_match.group(1).strip()

# Try to extract source URL from content
url_patterns = [
r'Source:\s*(https?://[^\s\)]+)',
r'URL:\s*(https?://[^\s\)]+)',
r'\[Source\]\((https?://[^\s\)]+)\)',
r'(https?://(?:docs|support|platform)\.(?:anthropic|claude)\.com[^\s\)]*)'
]

for pattern in url_patterns:
match = re.search(pattern, content)
if match:
metadata['source'] = match.group(1)
break

# Determine content type
if '.txt' in file_path.suffix:
metadata['type'] = 'transcript'
elif '.md' in file_path.suffix:
metadata['type'] = 'markdown'
elif '.pdf' in file_path.suffix:
metadata['type'] = 'pdf'

return metadata

def convert_transcript_to_markdown(self, content: str, metadata: Dict) -> str:
"""Convert transcript text to properly formatted markdown."""
lines = content.split('\n')
markdown_lines = []

# Add metadata header
markdown_lines.append(f"# {metadata['title']}\n")
markdown_lines.append(f"**Source:** {metadata['source'] or 'Unknown'}\n")
markdown_lines.append(f"**Date:** {metadata['date']}\n")
markdown_lines.append(f"**Type:** {metadata['type'].capitalize()}\n")
markdown_lines.append("---\n")

# Process content
in_code_block = False
current_paragraph = []

for line in lines:
stripped = line.strip()

# Detect code blocks
if stripped.startswith('```'):
if current_paragraph:
markdown_lines.append(' '.join(current_paragraph) + '\n')
current_paragraph = []
markdown_lines.append(line + '\n')
in_code_block = not in_code_block
continue

if in_code_block:
markdown_lines.append(line + '\n')
continue

# Empty line ends paragraph
if not stripped:
if current_paragraph:
markdown_lines.append(' '.join(current_paragraph) + '\n\n')
current_paragraph = []
continue

# Headers (detect all-caps lines or numbered sections)
if stripped.isupper() and len(stripped.split()) < 8:
if current_paragraph:
markdown_lines.append(' '.join(current_paragraph) + '\n\n')
current_paragraph = []
markdown_lines.append(f"## {stripped.title()}\n\n")
continue

# Numbered sections
if re.match(r'^\d+[\.\)]\s+[A-Z]', stripped):
if current_paragraph:
markdown_lines.append(' '.join(current_paragraph) + '\n\n')
current_paragraph = []
markdown_lines.append(f"### {stripped}\n\n")
continue

# Regular content - accumulate into paragraphs
current_paragraph.append(stripped)

# Flush remaining paragraph
if current_paragraph:
markdown_lines.append(' '.join(current_paragraph) + '\n')

return ''.join(markdown_lines)

def check_duplicate(self, content: str, target_dir: Path) -> Optional[Path]:
"""Check if similar content already exists in target directory."""
if not self.config['duplicate_detection']:
return None

content_hash = hashlib.md5(content.encode()).hexdigest()

# Check all markdown files in target directory
if target_dir.exists():
for existing_file in target_dir.glob('*.md'):
if existing_file.name == 'README.md':
continue

try:
with open(existing_file, 'r', encoding='utf-8') as f:
existing_content = f.read()
existing_hash = hashlib.md5(existing_content.encode()).hexdigest()

# Exact match
if content_hash == existing_hash:
return existing_file

# Similar content (>80% similarity)
similarity = self.calculate_similarity(content, existing_content)
if similarity > 0.8:
return existing_file

except Exception as e:
print(f"⚠️ Error checking duplicate in {existing_file}: {e}")

return None

def calculate_similarity(self, text1: str, text2: str) -> float:
"""Calculate similarity between two texts (0.0 to 1.0)."""
# Simple word-based similarity
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())

if not words1 or not words2:
return 0.0

intersection = words1.intersection(words2)
union = words1.union(words2)

return len(intersection) / len(union)

def quality_check(self, content: str, metadata: Dict) -> Tuple[bool, List[str]]:
"""Perform quality checks on processed content."""
if not self.config['quality_checks']:
return True, []

issues = []

# Check for proper markdown headers
if not re.search(r'^#\s+.+$', content, re.MULTILINE):
issues.append("Missing main header (# Title)")

# Check for minimum content length
if len(content.strip()) < 100:
issues.append("Content too short (<100 characters)")

# Check for balanced code blocks
code_blocks = content.count('```')
if code_blocks % 2 != 0:
issues.append("Unbalanced code blocks")

# Check for metadata
if not metadata.get('title'):
issues.append("Missing title")

# Check for broken links (basic check)
broken_links = re.findall(r'\[([^\]]+)\]\(\s*\)', content)
if broken_links:
issues.append(f"Broken links found: {len(broken_links)}")

passed = len(issues) == 0
return passed, issues

def update_category_index(self, category_dir: Path, filename: str, metadata: Dict):
"""Update or create category README.md index."""
index_path = category_dir / 'README.md'

# Read existing index or create new
if index_path.exists():
with open(index_path, 'r', encoding='utf-8') as f:
index_content = f.read()
else:
category_name = category_dir.name.replace('-', ' ').title()
index_content = f"# {category_name}\n\n**Last Updated:** {metadata['date']}\n**Total Documents:** 0\n\n## Documents\n\n"

# Update last updated date
index_content = re.sub(
r'\*\*Last Updated:\*\* \d{4}-\d{2}-\d{2}',
f"**Last Updated:** {metadata['date']}",
index_content
)

# Increment document count
count_match = re.search(r'\*\*Total Documents:\*\* (\d+)', index_content)
if count_match:
current_count = int(count_match.group(1))
new_count = current_count + 1
index_content = re.sub(
r'\*\*Total Documents:\*\* \d+',
f"**Total Documents:** {new_count}",
index_content
)

# Add document entry
doc_entry = f"- [{metadata['title']}]({filename}) - {metadata.get('source', 'No source')}\n"

# Find ## Documents section or create it
if '## Documents' in index_content:
index_content = index_content.replace('## Documents\n\n', f"## Documents\n\n{doc_entry}")
else:
index_content += f"\n## Documents\n\n{doc_entry}"

# Write updated index
with open(index_path, 'w', encoding='utf-8') as f:
f.write(index_content)

def update_master_index(self, category: str, filename: str, metadata: Dict):
"""Update master research plan index."""
master_index = self.base_dir / self.config['output_base'] / 'README.md'

if not master_index.exists():
# Create master index
master_content = """# Claude Code Research Plan

Last Updated: {date} Total Documents: 0 Categories: 0

Quick Navigation

Official Documentation

Community Content

Training Materials

Release Notes

Statistics

  • Most active category: N/A

  • Most recent addition: {date}

  • Growth rate: N/A """.format(date=metadata['date']) else: with open(master_index, 'r', encoding='utf-8') as f: master_content = f.read()

      # Update last updated date
    master_content = re.sub(
    r'\*\*Last Updated:\*\* \d{4}-\d{2}-\d{2}',
    f"**Last Updated:** {metadata['date']}",
    master_content
    )

    # Increment total documents
    count_match = re.search(r'\*\*Total Documents:\*\* (\d+)', master_content)
    if count_match:
    current_count = int(count_match.group(1))
    new_count = current_count + 1
    master_content = re.sub(
    r'\*\*Total Documents:\*\* \d+',
    f"**Total Documents:** {new_count}",
    master_content
    )

    # Write updated master index
    with open(master_index, 'w', encoding='utf-8') as f:
    f.write(master_content)

    def process_file(self, file_path: Path, dry_run: bool = False, verbose: bool = False) -> bool: """Process a single file.""" try: if verbose: print(f"\n📄 Processing: {file_path.name}")

          # Read file content
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
    content = f.read()

    # Extract metadata
    metadata = self.extract_metadata(file_path, content)

    if verbose:
    print(f" Title: {metadata['title']}")
    print(f" Source: {metadata['source'] or 'Unknown'}")

    # Convert to markdown if transcript
    if metadata['type'] == 'transcript':
    content = self.convert_transcript_to_markdown(content, metadata)
    if verbose:
    print(" ✓ Converted transcript to markdown")

    # Categorize content
    category = self.categorize_content(content, metadata)
    if verbose:
    print(f" Category: {category}")

    # Determine target directory
    target_dir = self.base_dir / self.config['output_base'] / category
    target_file = target_dir / f"{file_path.stem}.md"

    # Check for duplicates
    duplicate = self.check_duplicate(content, target_dir)
    if duplicate:
    print(f"⚠️ Duplicate detected: {duplicate.name}")
    print(f" Action: Skipped processing")
    self.stats['duplicates'] += 1
    self.stats['skipped'] += 1
    return False

    # Quality check
    passed, issues = self.quality_check(content, metadata)
    if not passed:
    print(f"⚠️ Quality issues found:")
    for issue in issues:
    print(f" - {issue}")
    if not dry_run:
    print(f" Proceeding anyway...")

    if dry_run:
    print(f" [DRY RUN] Would save to: {target_file}")
    print(f" [DRY RUN] Would archive to: {self.config['archive_directory']}")
    return True

    # Create target directory
    target_dir.mkdir(parents=True, exist_ok=True)

    # Write processed file
    with open(target_file, 'w', encoding='utf-8') as f:
    f.write(content)

    if verbose:
    print(f" ✓ Saved to: {target_file.relative_to(self.base_dir)}")

    # Update category index
    self.update_category_index(target_dir, target_file.name, metadata)

    # Update master index
    self.update_master_index(category, target_file.name, metadata)

    # Archive source file
    archive_dir = self.base_dir / self.config['archive_directory']
    archive_dir.mkdir(parents=True, exist_ok=True)
    archive_file = archive_dir / file_path.name

    # Handle archive collision
    if archive_file.exists():
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    archive_file = archive_dir / f"{file_path.stem}_{timestamp}{file_path.suffix}"

    shutil.move(str(file_path), str(archive_file))

    if verbose:
    print(f" ✓ Archived to: {archive_file.relative_to(self.base_dir)}")

    # Update stats
    self.stats['processed'] += 1
    self.stats['categories'][category] = self.stats['categories'].get(category, 0) + 1

    return True

    except Exception as e:
    print(f"❌ Error processing {file_path.name}: {e}")
    self.stats['errors'] += 1
    return False

    def process_batch(self, dry_run: bool = False, verbose: bool = False): """Process all files in new/ directory.""" new_dir = self.base_dir / self.config['watch_directory']

      if not new_dir.exists():
    print(f"❌ Directory not found: {new_dir}")
    return

    # Find all processable files
    files = list(new_dir.glob('*.txt')) + list(new_dir.glob('*.md'))

    if not files:
    print("ℹ️ No files to process in new/ directory")
    return

    print(f"\n🔄 Processing {len(files)} files...\n")

    for file_path in files:
    self.process_file(file_path, dry_run, verbose)

    self.print_summary()

    def watch_directory(self, interval: int = 5, auto_commit: bool = False): """Watch new/ directory for new files and process automatically.""" new_dir = self.base_dir / self.config['watch_directory'] new_dir.mkdir(parents=True, exist_ok=True)

      print(f"👁️  Watching: {new_dir}")
    print(f" Interval: {interval} seconds")
    print(f" Auto-commit: {auto_commit}")
    print("\nPress Ctrl+C to stop\n")

    processed_files = set()

    try:
    while True:
    # Find new files
    current_files = set(new_dir.glob('*.txt')) | set(new_dir.glob('*.md'))
    new_files = current_files - processed_files

    for file_path in new_files:
    print(f"\n📥 New file detected: {file_path.name}")
    success = self.process_file(file_path, dry_run=False, verbose=True)

    if success and auto_commit:
    self.git_commit(f"docs: Add {file_path.stem}")

    processed_files.add(file_path)

    time.sleep(interval)

    except KeyboardInterrupt:
    print("\n\n⏹️ Stopped watching")
    self.print_summary()

    def git_commit(self, message: str, auto_push: bool = False): """Commit changes to git repository.""" try: # Stage all changes subprocess.run(['git', 'add', '.'], cwd=self.base_dir, check=True)

          # Commit with message
    full_message = f"{message}\n\nProcessed by claude-research-agent"
    subprocess.run(['git', 'commit', '-m', full_message], cwd=self.base_dir, check=True)

    print(f"✓ Git commit: {message}")

    # Push if requested
    if auto_push:
    subprocess.run(['git', 'push'], cwd=self.base_dir, check=True)
    print("✓ Git push: Success")

    except subprocess.CalledProcessError as e:
    print(f"❌ Git operation failed: {e}")

    def print_summary(self): """Print processing summary statistics.""" print("\n" + "="*60) print("📊 Processing Summary") print("="*60)

      print(f"\nFiles Processed: {self.stats['processed']}")
    print(f"Files Skipped: {self.stats['skipped']}")
    print(f"Errors: {self.stats['errors']}")
    print(f"Duplicates Detected: {self.stats['duplicates']}")

    if self.stats['categories']:
    print("\nCategorization:")
    for category, count in sorted(self.stats['categories'].items()):
    print(f"├─ {category}: {count} files")

    print("\n" + "="*60)

def main(): """Main entry point.""" parser = argparse.ArgumentParser( description='Claude Research Automation - Automated documentation collection and organization', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:

Process all files in NEW/ directory

%(prog)s --batch --auto-commit

Watch mode for continuous processing

%(prog)s --watch

Dry run to preview operations

%(prog)s --batch --dry-run --verbose """ )

parser.add_argument('--batch', action='store_true',
help='Process all files in new/ directory')
parser.add_argument('--watch', action='store_true',
help='Watch new/ directory for new files')
parser.add_argument('--file', type=str,
help='Process specific file')
parser.add_argument('--auto-commit', action='store_true',
help='Automatically git commit processed files')
parser.add_argument('--auto-push', action='store_true',
help='Automatically git push after commit')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without executing')
parser.add_argument('--verbose', '-v', action='store_true',
help='Detailed output logging')
parser.add_argument('--config', type=str,
help='Path to custom config file')
parser.add_argument('--interval', type=int, default=5,
help='Watch mode interval in seconds (default: 5)')

args = parser.parse_args()

# Initialize automation
automation = ClaudeResearchAutomation(config_path=args.config)

# Execute requested operation
if args.batch:
automation.process_batch(dry_run=args.dry_run, verbose=args.verbose)
if args.auto_commit and not args.dry_run:
count = automation.stats['processed']
automation.git_commit(
f"docs: Add {count} new research documents",
auto_push=args.auto_push
)

elif args.watch:
automation.watch_directory(
interval=args.interval,
auto_commit=args.auto_commit
)

elif args.file:
file_path = Path(args.file)
if not file_path.exists():
print(f"❌ File not found: {file_path}")
sys.exit(1)

automation.process_file(file_path, dry_run=args.dry_run, verbose=True)
if args.auto_commit and not args.dry_run:
automation.git_commit(
f"docs: Add {file_path.stem}",
auto_push=args.auto_push
)

else:
parser.print_help()

if name == 'main': main()