#!/usr/bin/env python3 """ CP-16: Document Node Extractor (ADR-213)
Scans all documentary content from coditect-documentation into kg_nodes:
- node_type: 'document'
- Subtypes: analysis, research, guide, reference, standard, workflow, template, session_log, sdd, tdd, diagram, suggestion, archive
Source: coditect-documentation/ (centralized SSOT per ADR-213) Session logs from ~/.coditect-data/session-logs/ Target: org.db kg_nodes table
Created: 2026-02-20 Track: J (Memory Intelligence) Task: F.14 """
import logging import re from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple
import yaml
from .base_extractor import BaseExtractor
logger = logging.getLogger(name)
Document type classification by directory path and filename patterns
DOCUMENT_TYPE_RULES = [ # (path_contains, filename_pattern, subtype) ("adrs/", r"^ADR-\d+", "adr"), # Skip — handled by ADRExtractor ("analysis/", None, "analysis"), ("research/", None, "research"), ("guides/", None, "guide"), ("reference/", None, "reference"), ("standards/", None, "standard"), ("workflows/", None, "workflow"), ("templates/", None, "template"), ("architecture/", None, "architecture"), ("contributor/", None, "contributor"), ("archive/", None, "archive"), ("project/plans/tracks/", r"^TRACK-", "track"), # Skip — handled by TrackExtractor ("project/", None, "project"), ("session-logs/", r"^SESSION-LOG-", "session_log"), # Filename-based patterns (any directory) (None, r"(?i)SDD[-]", "sdd"), (None, r"(?i)TDD[-]", "tdd"), (None, r"(?i)mermaid|diagram", "diagram"), (None, r"(?i)suggestion", "suggestion"), ]
Skip these — they have dedicated extractors
SKIP_SUBTYPES = {"adr", "track"}
class DocumentExtractor(BaseExtractor): """ Extract document entities from coditect-documentation and session logs into kg_nodes for full-text searchability across the stack.
Indexes: analysis docs, research, guides, references, standards,
workflows, templates, session logs, SDDs, TDDs, diagrams, suggestions,
and any other markdown content in the documentation repo.
"""
def __init__(
self,
docs_dir: Path,
session_logs_dir: Optional[Path],
diagrams_dir: Optional[Path],
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
project_id: Optional[str] = None,
):
super().__init__(target_db_path, dry_run, tenant_id, project_id)
self.docs_dir = docs_dir
self.session_logs_dir = session_logs_dir
self.diagrams_dir = diagrams_dir
@property
def node_type(self) -> str:
return "document"
def _classify_document(self, file_path: Path, relative_path: str) -> Optional[str]:
"""Classify a document file into a subtype."""
rel = relative_path.replace("\\", "/")
for path_contains, filename_pattern, subtype in DOCUMENT_TYPE_RULES:
path_match = path_contains is None or path_contains in rel
name_match = filename_pattern is None or re.search(filename_pattern, file_path.name)
if path_match and name_match:
if path_contains is not None and filename_pattern is not None:
# Both must match
return subtype
elif path_contains is not None and filename_pattern is None:
return subtype
elif path_contains is None and filename_pattern is not None:
return subtype
return "document" # Generic fallback
def _extract_frontmatter(self, content: str) -> Dict[str, Any]:
"""Extract YAML frontmatter from markdown content."""
if not content.startswith("---"):
return {}
try:
end = content.index("---", 3)
fm_text = content[3:end].strip()
return yaml.safe_load(fm_text) or {}
except (ValueError, yaml.YAMLError):
return {}
def _extract_title(self, content: str, frontmatter: Dict, filename: str) -> str:
"""Extract document title from frontmatter or first heading."""
if frontmatter.get("title"):
return str(frontmatter["title"])
# Look for first # heading
for line in content.split("\n")[:20]:
if line.startswith("# "):
return line[2:].strip()
# Fallback to filename
return filename.replace("-", " ").replace("_", " ").replace(".md", "").title()
def _extract_summary(self, content: str, frontmatter: Dict) -> str:
"""Extract summary from frontmatter or first paragraph."""
if frontmatter.get("summary"):
return str(frontmatter["summary"])[:500]
# Find first non-empty, non-heading paragraph
lines = content.split("\n")
in_frontmatter = False
for line in lines:
if line.strip() == "---":
in_frontmatter = not in_frontmatter
continue
if in_frontmatter:
continue
stripped = line.strip()
if stripped and not stripped.startswith("#") and not stripped.startswith("|"):
return stripped[:500]
return ""
def _scan_directory(self, scan_dir: Path, base_label: str) -> Generator[Tuple[Path, str], None, None]:
"""Recursively scan a directory for markdown files."""
if not scan_dir.exists():
logger.debug(f"Directory not found: {scan_dir}")
return
for md_file in scan_dir.rglob("*.md"):
if md_file.is_symlink():
continue
if any(part.startswith(".") for part in md_file.parts):
continue
if "__pycache__" in str(md_file) or "node_modules" in str(md_file):
continue
relative = str(md_file.relative_to(scan_dir))
yield md_file, f"{base_label}/{relative}"
def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract document entities from all documentation sources.
Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
sources: List[Tuple[Path, str]] = []
# 1. coditect-documentation (primary SSOT)
if self.docs_dir and self.docs_dir.exists():
sources.append((self.docs_dir, "docs"))
# 2. Session logs
if self.session_logs_dir and self.session_logs_dir.exists():
sources.append((self.session_logs_dir, "session-logs"))
# 3. Diagrams (rollout-master/diagrams/)
if self.diagrams_dir and self.diagrams_dir.exists():
sources.append((self.diagrams_dir, "diagrams"))
total_found = 0
total_yielded = 0
for scan_dir, label in sources:
for md_file, relative_path in self._scan_directory(scan_dir, label):
total_found += 1
try:
result = self._parse_document(md_file, relative_path)
if result:
total_yielded += 1
yield result
except Exception as e:
logger.warning(f"Error parsing {md_file.name}: {e}")
continue
logger.info(f"Documents: {total_found} found, {total_yielded} indexed (skipped ADRs/tracks with dedicated extractors)")
def _parse_document(
self, file_path: Path, relative_path: str
) -> Optional[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]]]:
"""Parse a markdown document file."""
subtype = self._classify_document(file_path, relative_path)
# Skip subtypes handled by dedicated extractors
if subtype in SKIP_SUBTYPES:
return None
try:
content = file_path.read_text(encoding="utf-8", errors="replace")
except (IOError, OSError) as e:
logger.warning(f"Cannot read {file_path}: {e}")
return None
frontmatter = self._extract_frontmatter(content)
title = self._extract_title(content, frontmatter, file_path.name)
summary = self._extract_summary(content, frontmatter)
# Generate stable node_id from relative path
node_id = self.generate_node_id(relative_path)
name = title
# Build properties
properties: Dict[str, Any] = {
"title": title,
"file_path": str(file_path),
"relative_path": relative_path,
"file_name": file_path.name,
"file_size": file_path.stat().st_size,
"summary": summary,
}
# Add frontmatter metadata (convert dates/non-serializable to strings)
for key in ("status", "version", "audience", "component_type", "keywords",
"tags", "track", "task_id", "created", "updated", "type"):
if key in frontmatter:
val = frontmatter[key]
properties[key] = str(val) if not isinstance(val, (str, int, float, bool, list)) else val
# Detect content features
properties["has_mermaid"] = "```mermaid" in content
properties["has_code"] = "```" in content
properties["has_tables"] = "|" in content and "---" in content
properties["word_count"] = len(content.split())
return (node_id, name, subtype, properties, None, relative_path)