scripts-policy-extractor
#!/usr/bin/env python3 """ J.3.4.10: Policy Node Extractor (ADR-151)
Scans governance sources to extract policy nodes for the knowledge graph:
- CLAUDE.md directives (Safety, Protected Installation, Task ID Protocol, etc.)
- Standards files (coditect-core-standards/*.md)
- Governance hooks (hooks/ with enforcement rules)
node_type: 'policy' Subtypes: 'directive', 'standard', 'enforcement'
Source: CLAUDE.md, coditect-core-standards/.md, hooks/.py Target: org.db kg_nodes table
Created: 2026-02-05 Author: Claude (Opus 4.6) Track: J (Memory Intelligence) Task: J.3.4.10 """
import logging import re from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple
import yaml
from .base_extractor import BaseExtractor
logger = logging.getLogger(name)
class PolicyExtractor(BaseExtractor): """ Extract governance policy entities into kg_nodes.
Scans three sources:
1. CLAUDE.md - Framework directives (Safety, Protected Installation, etc.)
2. coditect-core-standards/ - Standards documents
3. hooks/ - Governance enforcement hooks
"""
# Patterns to identify directive sections in CLAUDE.md
DIRECTIVE_PATTERNS = [
{
"heading_pattern": r"###\s+Safety Directive",
"subtype": "directive",
"scope": "all_agents",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Protected Installation Directive",
"subtype": "directive",
"scope": "file_operations",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Command Execution Policy",
"subtype": "directive",
"scope": "slash_commands",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Task ID Protocol",
"subtype": "directive",
"scope": "tool_calls",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Session Log Attribution",
"subtype": "directive",
"scope": "session_logs",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Governance Hooks",
"subtype": "directive",
"scope": "hooks",
"enforcement_level": "mandatory",
},
]
# Hook files that represent governance enforcement
GOVERNANCE_HOOK_PATTERNS = [
"task_id_validator",
"task-tracking-enforcer",
"pre-commit",
]
def __init__(
self,
framework_dir: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
project_id: Optional[str] = None,
):
super().__init__(target_db_path, dry_run, tenant_id, project_id)
self.framework_dir = framework_dir
self.claude_md_path = framework_dir / "CLAUDE.md"
self.standards_dir = framework_dir / "coditect-core-standards"
self.hooks_dir = framework_dir / "hooks"
@property
def node_type(self) -> str:
return "policy"
def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract policy entities from governance sources.
Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
# Source 1: CLAUDE.md directives
yield from self._extract_claude_md_directives()
# Source 2: Standards files
yield from self._extract_standards()
# Source 3: Governance hooks
yield from self._extract_governance_hooks()
def _extract_claude_md_directives(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""Extract directive policies from CLAUDE.md."""
if not self.claude_md_path.exists():
logger.warning(f"CLAUDE.md not found: {self.claude_md_path}")
return
content = self.claude_md_path.read_text(encoding='utf-8')
logger.info(f"Scanning CLAUDE.md for directives ({len(content)} chars)")
for pattern_config in self.DIRECTIVE_PATTERNS:
heading_re = pattern_config["heading_pattern"]
match = re.search(heading_re, content)
if not match:
continue
# Extract the section content (until next ### or ##)
section_start = match.start()
next_heading = re.search(r'\n###?\s', content[match.end():])
if next_heading:
section_end = match.end() + next_heading.start()
else:
section_end = len(content)
section_text = content[section_start:section_end].strip()
# Extract the heading text for the name
heading_text = match.group(0).strip().lstrip('#').strip()
# Generate a slug for the node ID
slug = re.sub(r'[^a-z0-9]+', '-', heading_text.lower()).strip('-')
node_id = self.generate_node_id(f"claude-md:{slug}")
# Extract the rule text (blockquote content if present)
rule_text = self._extract_blockquote(section_text)
if not rule_text:
rule_text = self._extract_first_paragraph(section_text)
properties = {
"rule": rule_text[:1000] if rule_text else heading_text,
"scope": pattern_config["scope"],
"enforcement_level": pattern_config["enforcement_level"],
"source_file": str(self.claude_md_path),
"section": heading_text,
}
yield (
node_id,
f"Directive: {heading_text}",
pattern_config["subtype"],
properties,
None,
str(self.claude_md_path),
)
logger.info(f"Extracted directives from CLAUDE.md")
def _extract_standards(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""Extract policy nodes from standards files."""
if not self.standards_dir.exists():
logger.warning(f"Standards directory not found: {self.standards_dir}")
return
standard_files = sorted(self.standards_dir.glob("coditect-standard-*.md"))
logger.info(f"Found {len(standard_files)} standards files")
for std_file in standard_files:
try:
result = self._parse_standard_file(std_file)
if result:
yield result
except Exception as e:
logger.warning(f"Error parsing {std_file.name}: {e}")
continue
def _parse_standard_file(self, std_file: Path) -> Optional[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]]]:
"""Parse a standards file into a policy node."""
content = std_file.read_text(encoding='utf-8')
# Extract frontmatter if present
frontmatter = self._extract_frontmatter(content)
# Derive standard name from filename
# coditect-standard-automation.md -> Automation
name_slug = std_file.stem.replace("coditect-standard-", "").replace("CODITECT-STANDARD-", "")
standard_name = name_slug.replace("-", " ").replace("_", " ").title()
# Use frontmatter title if available
if frontmatter.get("title"):
standard_name = frontmatter["title"]
node_id = self.generate_node_id(f"standard:{name_slug}")
# Extract summary
summary = self._extract_summary(content)
# Extract principles/rules from the document
principles = self._extract_principles(content)
properties = {
"standard_name": standard_name,
"scope": frontmatter.get("scope", "framework"),
"enforcement_level": frontmatter.get("enforcement_level", "recommended"),
"source_file": str(std_file),
"file_name": std_file.name,
}
if summary:
properties["summary"] = summary[:500]
if principles:
properties["principles"] = principles[:10] # Cap at 10 principles
if frontmatter.get("version"):
properties["version"] = frontmatter["version"]
if frontmatter.get("status"):
properties["status"] = frontmatter["status"]
return (
node_id,
f"Standard: {standard_name}",
"standard",
properties,
None,
str(std_file),
)
def _extract_governance_hooks(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""Extract enforcement policy nodes from governance hooks."""
if not self.hooks_dir.exists():
logger.warning(f"Hooks directory not found: {self.hooks_dir}")
return
hook_files = sorted(self.hooks_dir.glob("*.py"))
logger.info(f"Scanning {len(hook_files)} hook files for governance hooks")
governance_count = 0
for hook_file in hook_files:
# Check if this is a governance hook
is_governance = False
for pattern in self.GOVERNANCE_HOOK_PATTERNS:
if pattern in hook_file.stem:
is_governance = True
break
if not is_governance:
# Also check file content for governance indicators
try:
content = hook_file.read_text(encoding='utf-8')
if any(kw in content.lower() for kw in ["enforce", "validate", "governance", "compliance", "reject"]):
is_governance = True
except Exception:
continue
if not is_governance:
continue
governance_count += 1
try:
result = self._parse_hook_file(hook_file)
if result:
yield result
except Exception as e:
logger.warning(f"Error parsing hook {hook_file.name}: {e}")
continue
logger.info(f"Extracted {governance_count} governance hooks")
def _parse_hook_file(self, hook_file: Path) -> Optional[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]]]:
"""Parse a hook file into an enforcement policy node."""
content = hook_file.read_text(encoding='utf-8')
# Extract docstring as description
docstring = self._extract_docstring(content)
# Derive hook name
hook_name = hook_file.stem.replace("-", " ").replace("_", " ").title()
node_id = self.generate_node_id(f"hook:{hook_file.stem}")
# Detect hook type from content
hook_type = "unknown"
if "PreToolUse" in content:
hook_type = "pre_tool_use"
elif "PostToolUse" in content:
hook_type = "post_tool_use"
elif "SessionStart" in content or "session.start" in content:
hook_type = "session_start"
elif "pre-commit" in hook_file.stem or "pre_commit" in hook_file.stem:
hook_type = "pre_commit"
# Detect what the hook enforces
enforces = []
if "task_id" in content.lower() or "task id" in content.lower():
enforces.append("task_id_format")
if "track" in content.lower() and "nomenclature" in content.lower():
enforces.append("track_nomenclature")
if "delete" in content.lower() or "rm " in content.lower():
enforces.append("safe_deletion")
properties = {
"hook_name": hook_file.stem,
"hook_type": hook_type,
"enforcement_level": "mandatory",
"scope": "tool_calls",
"source_file": str(hook_file),
}
if docstring:
properties["description"] = docstring[:500]
if enforces:
properties["enforces"] = enforces
return (
node_id,
f"Enforcement: {hook_name}",
"enforcement",
properties,
None,
str(hook_file),
)
# --- Helper Methods ---
def _extract_frontmatter(self, content: str) -> Dict[str, Any]:
"""Extract YAML frontmatter from markdown content."""
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL)
if not frontmatter_match:
return {}
try:
return yaml.safe_load(frontmatter_match.group(1)) or {}
except yaml.YAMLError:
return {}
def _extract_blockquote(self, section_text: str) -> Optional[str]:
"""Extract blockquote content from a markdown section."""
lines = section_text.split('\n')
quote_lines = []
in_quote = False
for line in lines:
stripped = line.strip()
if stripped.startswith('>'):
in_quote = True
quote_lines.append(stripped.lstrip('>').strip())
elif in_quote and stripped:
# Continuation of blockquote without >
break
elif in_quote and not stripped:
break
return '\n'.join(quote_lines).strip() if quote_lines else None
def _extract_first_paragraph(self, section_text: str) -> Optional[str]:
"""Extract first non-heading paragraph from section."""
# Remove heading line
lines = section_text.split('\n')
content_lines = [l for l in lines if not l.strip().startswith('#')]
text = '\n'.join(content_lines).strip()
paragraphs = re.split(r'\n\s*\n', text)
for para in paragraphs:
para = para.strip()
if para and len(para) > 20:
return para
return None
def _extract_summary(self, content: str) -> Optional[str]:
"""Extract summary from a document."""
content = re.sub(r'^---\s*\n.*?\n---\s*\n', '', content, flags=re.DOTALL)
summary_match = re.search(
r'##\s*(?:Executive\s+)?Summary\s*\n+([^\n#]+(?:\n[^\n#]+)*)',
content,
re.IGNORECASE,
)
if summary_match:
return summary_match.group(1).strip()
paragraphs = re.split(r'\n\s*\n', content)
for para in paragraphs:
para = para.strip()
if para and not para.startswith('#') and len(para) > 50:
return para
return None
def _extract_principles(self, content: str) -> List[str]:
"""Extract numbered principles or rules from content."""
principles = []
# Look for numbered list items that look like principles/rules
pattern = re.compile(r'(?:^|\n)\s*(?:\d+\.|\*|-)\s*\*\*(.+?)\*\*', re.MULTILINE)
for match in pattern.finditer(content):
principle = match.group(1).strip()
if len(principle) > 5:
principles.append(principle)
# Also look for ### headings within the content as rule sections
heading_pattern = re.compile(r'^###\s+(.+)', re.MULTILINE)
for match in heading_pattern.finditer(content):
heading = match.group(1).strip()
if "principle" in heading.lower() or "rule" in heading.lower():
principles.append(heading)
return principles
def _extract_docstring(self, content: str) -> Optional[str]:
"""Extract Python docstring from file content."""
match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if match:
docstring = match.group(1).strip()
# Take first paragraph only
paragraphs = docstring.split('\n\n')
return paragraphs[0].strip() if paragraphs else docstring
return None