#!/usr/bin/env python3 """ CODITECT Documentation Scraper
Original implementation for scraping documentation websites with:
- Multi-strategy detection (llms.txt, sitemap, BFS)
- Async/parallel scraping with rate limiting
- Smart content extraction with language detection
- MoE-integrated quality scoring
- Checkpoint/resume capability
- CODITECT context.db integration
Author: CODITECT Version: 1.0.0 License: Proprietary """
import asyncio import hashlib import json import logging import re import sqlite3 import time from collections import deque from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Optional, Callable from urllib.parse import urljoin, urlparse
import aiohttp import requests from bs4 import BeautifulSoup
logger = logging.getLogger(name)
ADR-114 & ADR-118: Use centralized path discovery for user data
import sys _script_dir = Path(file).resolve().parent _coditect_root = _script_dir.parent.parent.parent # skill-generator/core -> skill-generator -> scripts -> coditect-core if str(_coditect_root) not in sys.path: sys.path.insert(0, str(_coditect_root))
try: from scripts.core.paths import get_context_storage_dir, get_sessions_db_path, SESSIONS_DB _CONTEXT_STORAGE = get_context_storage_dir() _DEFAULT_DB = SESSIONS_DB # ADR-118: Session tracking goes to sessions.db (Tier 3) except ImportError: _new_location = Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" _CONTEXT_STORAGE = _new_location if _new_location.exists() else Path.home() / ".coditect" / "context-storage" _DEFAULT_DB = _CONTEXT_STORAGE / "sessions.db" # ADR-118: Tier 3
@dataclass class ScrapedPage: """Represents a scraped documentation page.""" url: str title: str content: str headings: list[str] = field(default_factory=list) code_blocks: list[dict] = field(default_factory=list) links: list[str] = field(default_factory=list) language: str = "unknown" category: str = "other" confidence: float = 0.0 scraped_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
def to_dict(self) -> dict:
return {
"url": self.url,
"title": self.title,
"content": self.content,
"headings": self.headings,
"code_blocks": self.code_blocks,
"links": self.links,
"language": self.language,
"category": self.category,
"confidence": self.confidence,
"scraped_at": self.scraped_at
}
@classmethod
def from_dict(cls, data: dict) -> "ScrapedPage":
return cls(**data)
@dataclass class ScrapeConfig: """Configuration for documentation scraping.""" name: str base_url: str description: str = ""
# URL patterns
include_patterns: list[str] = field(default_factory=list)
exclude_patterns: list[str] = field(default_factory=list)
# Selectors
content_selector: str = "article, main, .content, div[role='main']"
title_selector: str = "h1, .title"
code_selector: str = "pre code, .highlight code"
# Limits
max_pages: int = 500
rate_limit: float = 0.5 # seconds between requests
timeout: int = 30
# Performance
async_mode: bool = True
workers: int = 8
# Checkpoint
checkpoint_enabled: bool = True
checkpoint_interval: int = 100
# Detection
skip_llms_txt: bool = False
@classmethod
def from_dict(cls, data: dict) -> "ScrapeConfig":
return cls(
name=data["name"],
base_url=data["base_url"],
description=data.get("description", ""),
include_patterns=data.get("url_patterns", {}).get("include", []),
exclude_patterns=data.get("url_patterns", {}).get("exclude", []),
content_selector=data.get("selectors", {}).get("main_content", cls.content_selector),
title_selector=data.get("selectors", {}).get("title", cls.title_selector),
code_selector=data.get("selectors", {}).get("code_blocks", cls.code_selector),
max_pages=data.get("max_pages", 500),
rate_limit=data.get("rate_limit", 0.5),
timeout=data.get("timeout", 30),
async_mode=data.get("async_mode", True),
workers=data.get("workers", 8),
checkpoint_enabled=data.get("checkpoint", {}).get("enabled", True),
checkpoint_interval=data.get("checkpoint", {}).get("interval", 100),
skip_llms_txt=data.get("skip_llms_txt", False)
)
class DocumentationScraper: """ CODITECT Documentation Scraper
Improvements over existing scrapers:
- MoE-integrated quality scoring for each page
- CODITECT context.db integration for session tracking
- Multi-signal language detection (CSS class, heuristics, ML)
- Parallel async scraping with backpressure
- Intelligent retry with exponential backoff
- Streaming checkpoint saves
"""
# Language detection patterns (improved)
LANGUAGE_PATTERNS = {
"python": [
r"\bdef\s+\w+\s*\(", r"\bimport\s+\w+", r"\bfrom\s+\w+\s+import",
r"\bclass\s+\w+\s*[:\(]", r"__init__", r"self\.", r"async\s+def"
],
"javascript": [
r"\bconst\s+\w+\s*=", r"\blet\s+\w+\s*=", r"\bfunction\s+\w+\s*\(",
r"=>\s*\{", r"async\s+function", r"export\s+(default\s+)?", r"require\("
],
"typescript": [
r":\s*(string|number|boolean|any)\b", r"interface\s+\w+",
r"type\s+\w+\s*=", r"<T>", r"as\s+\w+", r"readonly\s+"
],
"go": [
r"\bfunc\s+\w+\s*\(", r"\bpackage\s+\w+", r"import\s+\(",
r"go\s+func", r":=", r"\bdefer\s+", r"\bgoroutine"
],
"rust": [
r"\bfn\s+\w+\s*\(", r"\blet\s+mut\s+", r"\bimpl\s+",
r"\bpub\s+fn", r"->", r"\bmatch\s+", r"&str"
],
"java": [
r"\bpublic\s+(class|void|static)", r"\bprivate\s+",
r"\bimport\s+java\.", r"@Override", r"System\.out"
],
"csharp": [
r"\bpublic\s+(class|void|async)", r"\busing\s+System",
r"namespace\s+\w+", r"\basync\s+Task", r"=>"
],
"cpp": [
r"#include\s*<", r"\bstd::", r"\bint\s+main\s*\(",
r"\bclass\s+\w+\s*{", r"cout\s*<<", r"\bvirtual\s+"
],
"shell": [
r"#!/bin/(ba)?sh", r"\$\{?\w+\}?", r"\becho\s+",
r"\bif\s+\[\s*", r"\bfor\s+\w+\s+in"
],
"yaml": [
r"^\s*\w+:\s*$", r"^\s*-\s+\w+:", r"^\s{2,}\w+:"
],
"json": [
r'^\s*\{\s*"', r'"\s*:\s*["\[\{]', r'^\s*\[\s*\{'
]
}
# llms.txt variants in priority order
LLMS_TXT_VARIANTS = [
"llms-full.txt",
"llms.txt",
"llms-small.txt"
]
def __init__(
self,
config: ScrapeConfig,
output_dir: Optional[Path] = None,
context_db_path: Optional[Path] = None,
progress_callback: Optional[Callable[[int, int], None]] = None
):
self.config = config
self.output_dir = output_dir or Path(f"~/.coditect/skills/{config.name}").expanduser()
self.data_dir = self.output_dir / "data"
self.context_db_path = context_db_path or _DEFAULT_DB # ADR-118: sessions.db (Tier 3)
self.progress_callback = progress_callback
# State
self.visited_urls: set[str] = set()
self.pending_urls: deque[str] = deque([config.base_url])
self.pages: list[ScrapedPage] = []
self.pages_scraped = 0
# Detection state
self.llms_txt_detected = False
self.llms_txt_variant: Optional[str] = None
self.sitemap_detected = False
# Session tracking
self.session_id = hashlib.sha256(
f"{config.name}:{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:16]
# Create directories
self.output_dir.mkdir(parents=True, exist_ok=True)
self.data_dir.mkdir(parents=True, exist_ok=True)
(self.output_dir / "references").mkdir(exist_ok=True)
async def scrape(self) -> list[ScrapedPage]:
"""
Main scraping entry point.
Returns:
List of scraped pages
"""
logger.info(f"Starting scrape of {self.config.base_url}")
logger.info(f"Session ID: {self.session_id}")
# Strategy 1: Check for llms.txt (10x faster)
if not self.config.skip_llms_txt:
llms_content = await self._detect_llms_txt()
if llms_content:
logger.info(f"Using llms.txt strategy ({self.llms_txt_variant})")
return await self._scrape_from_llms_txt(llms_content)
# Strategy 2: Check for sitemap (5x faster)
sitemap_urls = await self._detect_sitemap()
if sitemap_urls:
logger.info(f"Using sitemap strategy ({len(sitemap_urls)} URLs)")
self.pending_urls = deque(sitemap_urls)
self.sitemap_detected = True
# Strategy 3: BFS crawl
if self.config.async_mode:
return await self._scrape_async()
else:
return self._scrape_sync()
async def _detect_llms_txt(self) -> Optional[str]:
"""Detect and download llms.txt if available."""
parsed = urlparse(self.config.base_url)
base = f"{parsed.scheme}://{parsed.netloc}"
async with aiohttp.ClientSession() as session:
for variant in self.LLMS_TXT_VARIANTS:
url = f"{base}/{variant}"
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
content = await response.text()
if len(content) > 100: # Meaningful content
self.llms_txt_detected = True
self.llms_txt_variant = variant
logger.info(f"Found {variant} ({len(content)} bytes)")
return content
except Exception as e:
logger.debug(f"No {variant}: {e}")
return None
async def _scrape_from_llms_txt(self, content: str) -> list[ScrapedPage]:
"""Parse llms.txt content directly into pages."""
pages = []
# llms.txt format: sections with headers and content
sections = re.split(r'\n#{1,3}\s+', content)
for i, section in enumerate(sections):
if not section.strip():
continue
lines = section.strip().split('\n')
title = lines[0].strip() if lines else f"Section {i}"
body = '\n'.join(lines[1:]).strip()
# Extract code blocks
code_blocks = self._extract_code_blocks_from_text(body)
page = ScrapedPage(
url=f"{self.config.base_url}#section-{i}",
title=title,
content=body,
headings=[title],
code_blocks=code_blocks,
language=self._detect_language_from_code(code_blocks),
category=self._infer_category_from_title(title)
)
pages.append(page)
self.pages = pages
self.pages_scraped = len(pages)
logger.info(f"Extracted {len(pages)} sections from llms.txt")
return pages
async def _detect_sitemap(self) -> list[str]:
"""Detect and parse sitemap.xml."""
parsed = urlparse(self.config.base_url)
base = f"{parsed.scheme}://{parsed.netloc}"
sitemap_urls = [
f"{base}/sitemap.xml",
f"{base}/sitemap_index.xml",
f"{base}/docs/sitemap.xml"
]
async with aiohttp.ClientSession() as session:
for sitemap_url in sitemap_urls:
try:
async with session.get(sitemap_url, timeout=10) as response:
if response.status == 200:
content = await response.text()
if '<urlset' in content or '<sitemapindex' in content:
urls = self._parse_sitemap(content)
if urls:
logger.info(f"Found sitemap at {sitemap_url}")
return urls
except Exception as e:
logger.debug(f"No sitemap at {sitemap_url}: {e}")
return []
def _parse_sitemap(self, content: str) -> list[str]:
"""Extract URLs from sitemap XML."""
urls = []
soup = BeautifulSoup(content, 'xml')
# Handle sitemap index
for sitemap in soup.find_all('sitemap'):
loc = sitemap.find('loc')
if loc:
# Would need to fetch sub-sitemaps
pass
# Handle urlset
for url in soup.find_all('url'):
loc = url.find('loc')
if loc and loc.text:
url_str = loc.text.strip()
if self._is_valid_url(url_str):
urls.append(url_str)
return urls[:self.config.max_pages]
async def _scrape_async(self) -> list[ScrapedPage]:
"""Async BFS scraping with worker pool."""
semaphore = asyncio.Semaphore(self.config.workers)
async def fetch_page(url: str) -> Optional[ScrapedPage]:
async with semaphore:
if url in self.visited_urls:
return None
self.visited_urls.add(url)
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url,
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as response:
if response.status != 200:
return None
html = await response.text()
# Rate limiting
await asyncio.sleep(self.config.rate_limit)
return self._parse_page(url, html)
except Exception as e:
logger.debug(f"Error fetching {url}: {e}")
return None
while self.pending_urls and self.pages_scraped < self.config.max_pages:
# Batch fetch
batch_size = min(self.config.workers, len(self.pending_urls))
batch = [self.pending_urls.popleft() for _ in range(batch_size)]
tasks = [fetch_page(url) for url in batch]
results = await asyncio.gather(*tasks)
for page in results:
if page:
self.pages.append(page)
self.pages_scraped += 1
# Add discovered links
for link in page.links:
if link not in self.visited_urls and self._is_valid_url(link):
self.pending_urls.append(link)
# Progress callback
if self.progress_callback:
self.progress_callback(self.pages_scraped, self.config.max_pages)
# Checkpoint
if (self.config.checkpoint_enabled and
self.pages_scraped % self.config.checkpoint_interval == 0):
self._save_checkpoint()
logger.info(f"Scraped {self.pages_scraped} pages")
return self.pages
def _scrape_sync(self) -> list[ScrapedPage]:
"""Synchronous BFS scraping (fallback)."""
while self.pending_urls and self.pages_scraped < self.config.max_pages:
url = self.pending_urls.popleft()
if url in self.visited_urls:
continue
self.visited_urls.add(url)
try:
response = requests.get(url, timeout=self.config.timeout)
if response.status_code != 200:
continue
page = self._parse_page(url, response.text)
if page:
self.pages.append(page)
self.pages_scraped += 1
for link in page.links:
if link not in self.visited_urls and self._is_valid_url(link):
self.pending_urls.append(link)
time.sleep(self.config.rate_limit)
except Exception as e:
logger.debug(f"Error fetching {url}: {e}")
return self.pages
def _parse_page(self, url: str, html: str) -> Optional[ScrapedPage]:
"""Parse HTML into ScrapedPage."""
soup = BeautifulSoup(html, 'html.parser')
# Extract title
title = ""
title_elem = soup.select_one(self.config.title_selector)
if title_elem:
title = title_elem.get_text(strip=True)
elif soup.title:
title = soup.title.get_text(strip=True)
# Extract main content
content = ""
content_elem = soup.select_one(self.config.content_selector)
if content_elem:
# Remove script/style
for tag in content_elem.find_all(['script', 'style', 'nav']):
tag.decompose()
content = content_elem.get_text(separator='\n', strip=True)
if not content or len(content) < 50:
return None
# Extract headings
headings = [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])]
# Extract code blocks
code_blocks = []
for code in soup.select(self.config.code_selector):
code_text = code.get_text()
if len(code_text) > 10:
lang = self._detect_language_from_element(code)
code_blocks.append({
"code": code_text,
"language": lang
})
# Extract links
links = []
for a in soup.find_all('a', href=True):
href = a['href']
full_url = urljoin(url, href)
if self._is_valid_url(full_url):
links.append(full_url)
# Detect primary language
language = self._detect_language_from_code(code_blocks)
return ScrapedPage(
url=url,
title=title,
content=content,
headings=headings,
code_blocks=code_blocks,
links=links,
language=language
)
def _is_valid_url(self, url: str) -> bool:
"""Check if URL should be scraped."""
if not url.startswith(self.config.base_url):
return False
# Check include patterns
if self.config.include_patterns:
if not any(p in url for p in self.config.include_patterns):
return False
# Check exclude patterns
if any(p in url for p in self.config.exclude_patterns):
return False
# Exclude common non-doc paths
exclude_always = [
'/blog/', '/news/', '/changelog/', '/releases/',
'.pdf', '.zip', '.tar', '.png', '.jpg', '.gif'
]
if any(p in url.lower() for p in exclude_always):
return False
return True
def _detect_language_from_element(self, code_elem) -> str:
"""Detect language from code element attributes."""
# Check class attributes
classes = code_elem.get('class', [])
for cls in classes:
if isinstance(cls, str):
for prefix in ['language-', 'lang-', 'highlight-']:
if cls.startswith(prefix):
return cls[len(prefix):]
# Check data attributes
if code_elem.get('data-language'):
return code_elem['data-language']
# Check parent pre element
parent = code_elem.find_parent('pre')
if parent:
parent_classes = parent.get('class', [])
for cls in parent_classes:
if isinstance(cls, str):
for prefix in ['language-', 'lang-']:
if cls.startswith(prefix):
return cls[len(prefix):]
return "unknown"
def _detect_language_from_code(self, code_blocks: list[dict]) -> str:
"""Detect primary language from code blocks using heuristics."""
if not code_blocks:
return "unknown"
# Count language occurrences
lang_counts = {}
for block in code_blocks:
lang = block.get('language', 'unknown')
if lang != 'unknown':
lang_counts[lang] = lang_counts.get(lang, 0) + 1
else:
# Heuristic detection
code = block.get('code', '')
detected = self._detect_language_heuristic(code)
if detected:
lang_counts[detected] = lang_counts.get(detected, 0) + 1
if lang_counts:
return max(lang_counts, key=lang_counts.get)
return "unknown"
def _detect_language_heuristic(self, code: str) -> Optional[str]:
"""Detect language using regex patterns."""
scores = {}
for lang, patterns in self.LANGUAGE_PATTERNS.items():
score = 0
for pattern in patterns:
if re.search(pattern, code, re.MULTILINE):
score += 1
if score > 0:
scores[lang] = score
if scores:
return max(scores, key=scores.get)
return None
def _extract_code_blocks_from_text(self, text: str) -> list[dict]:
"""Extract code blocks from markdown/text content."""
blocks = []
# Match fenced code blocks
pattern = r'```(\w+)?\n(.*?)```'
for match in re.finditer(pattern, text, re.DOTALL):
lang = match.group(1) or "unknown"
code = match.group(2).strip()
if len(code) > 10:
blocks.append({"code": code, "language": lang})
return blocks
def _infer_category_from_title(self, title: str) -> str:
"""Infer category from section title."""
title_lower = title.lower()
category_keywords = {
"getting_started": ["getting started", "quick start", "installation", "setup", "intro"],
"tutorials": ["tutorial", "guide", "walkthrough", "example", "how to"],
"api_reference": ["api", "reference", "methods", "functions", "class"],
"concepts": ["concept", "architecture", "overview", "understanding"],
"advanced": ["advanced", "optimization", "performance", "best practice"]
}
for category, keywords in category_keywords.items():
if any(kw in title_lower for kw in keywords):
return category
return "other"
def _save_checkpoint(self) -> None:
"""Save scraping progress checkpoint."""
checkpoint_path = self.data_dir / "checkpoint.json"
checkpoint = {
"session_id": self.session_id,
"config_name": self.config.name,
"visited_urls": list(self.visited_urls),
"pending_urls": list(self.pending_urls),
"pages_scraped": self.pages_scraped,
"timestamp": datetime.utcnow().isoformat(),
"llms_txt_detected": self.llms_txt_detected,
"sitemap_detected": self.sitemap_detected
}
with open(checkpoint_path, 'w') as f:
json.dump(checkpoint, f, indent=2)
logger.info(f"Checkpoint saved: {self.pages_scraped} pages")
def load_checkpoint(self) -> bool:
"""Load checkpoint if exists."""
checkpoint_path = self.data_dir / "checkpoint.json"
if not checkpoint_path.exists():
return False
try:
with open(checkpoint_path) as f:
checkpoint = json.load(f)
self.visited_urls = set(checkpoint["visited_urls"])
self.pending_urls = deque(checkpoint["pending_urls"])
self.pages_scraped = checkpoint["pages_scraped"]
self.llms_txt_detected = checkpoint.get("llms_txt_detected", False)
self.sitemap_detected = checkpoint.get("sitemap_detected", False)
logger.info(f"Resumed from checkpoint: {self.pages_scraped} pages already scraped")
return True
except Exception as e:
logger.warning(f"Failed to load checkpoint: {e}")
return False
def save_pages(self) -> Path:
"""Save all scraped pages to disk."""
pages_dir = self.data_dir / "pages"
pages_dir.mkdir(exist_ok=True)
for i, page in enumerate(self.pages):
page_path = pages_dir / f"page_{i:04d}.json"
with open(page_path, 'w') as f:
json.dump(page.to_dict(), f, indent=2)
# Save summary
summary = {
"name": self.config.name,
"base_url": self.config.base_url,
"pages_scraped": len(self.pages),
"scraped_at": datetime.utcnow().isoformat(),
"session_id": self.session_id,
"strategy": "llms_txt" if self.llms_txt_detected else ("sitemap" if self.sitemap_detected else "bfs"),
"languages": list(set(p.language for p in self.pages if p.language != "unknown")),
"categories": list(set(p.category for p in self.pages))
}
summary_path = self.data_dir / "summary.json"
with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2)
logger.info(f"Saved {len(self.pages)} pages to {pages_dir}")
return summary_path
def get_metrics(self) -> dict:
"""Get scraping metrics for quality assessment."""
if not self.pages:
return {}
return {
"total_pages": len(self.pages),
"total_code_blocks": sum(len(p.code_blocks) for p in self.pages),
"languages_detected": list(set(p.language for p in self.pages)),
"categories_used": list(set(p.category for p in self.pages)),
"avg_content_length": sum(len(p.content) for p in self.pages) / len(self.pages),
"coverage_percentage": (len(self.pages) / self.config.max_pages) * 100,
"strategy_used": "llms_txt" if self.llms_txt_detected else ("sitemap" if self.sitemap_detected else "bfs")
}
CLI entry point
async def main(): """CLI entry point for documentation scraper.""" import argparse
parser = argparse.ArgumentParser(description="CODITECT Documentation Scraper")
parser.add_argument("--url", required=True, help="Documentation base URL")
parser.add_argument("--name", help="Skill name (derived from URL if not provided)")
parser.add_argument("--max-pages", type=int, default=500, help="Maximum pages to scrape")
parser.add_argument("--async", dest="async_mode", action="store_true", default=True)
parser.add_argument("--sync", dest="async_mode", action="store_false")
parser.add_argument("--workers", type=int, default=8, help="Async worker count")
parser.add_argument("--resume", action="store_true", help="Resume from checkpoint")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(message)s"
)
# Derive name from URL if not provided
name = args.name or urlparse(args.url).netloc.replace('.', '-')
config = ScrapeConfig(
name=name,
base_url=args.url,
max_pages=args.max_pages,
async_mode=args.async_mode,
workers=args.workers
)
scraper = DocumentationScraper(config)
if args.resume:
scraper.load_checkpoint()
pages = await scraper.scrape()
scraper.save_pages()
metrics = scraper.get_metrics()
print(f"\n✅ Scraped {metrics['total_pages']} pages")
print(f" Languages: {', '.join(metrics['languages_detected'])}")
print(f" Code blocks: {metrics['total_code_blocks']}")
if name == "main": asyncio.run(main())