Skip to main content

scripts-policy-extractor

#!/usr/bin/env python3 """ J.3.4.10: Policy Node Extractor (ADR-151)

Scans governance sources to extract policy nodes for the knowledge graph:

  • CLAUDE.md directives (Safety, Protected Installation, Task ID Protocol, etc.)
  • Standards files (coditect-core-standards/*.md)
  • Governance hooks (hooks/ with enforcement rules)

node_type: 'policy' Subtypes: 'directive', 'standard', 'enforcement'

Source: CLAUDE.md, coditect-core-standards/.md, hooks/.py Target: org.db kg_nodes table

Created: 2026-02-05 Author: Claude (Opus 4.6) Track: J (Memory Intelligence) Task: J.3.4.10 """

import logging import re from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Tuple

import yaml

from .base_extractor import BaseExtractor

logger = logging.getLogger(name)

class PolicyExtractor(BaseExtractor): """ Extract governance policy entities into kg_nodes.

Scans three sources:
1. CLAUDE.md - Framework directives (Safety, Protected Installation, etc.)
2. coditect-core-standards/ - Standards documents
3. hooks/ - Governance enforcement hooks
"""

# Patterns to identify directive sections in CLAUDE.md
DIRECTIVE_PATTERNS = [
{
"heading_pattern": r"###\s+Safety Directive",
"subtype": "directive",
"scope": "all_agents",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Protected Installation Directive",
"subtype": "directive",
"scope": "file_operations",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Command Execution Policy",
"subtype": "directive",
"scope": "slash_commands",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Task ID Protocol",
"subtype": "directive",
"scope": "tool_calls",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Session Log Attribution",
"subtype": "directive",
"scope": "session_logs",
"enforcement_level": "mandatory",
},
{
"heading_pattern": r"###\s+Governance Hooks",
"subtype": "directive",
"scope": "hooks",
"enforcement_level": "mandatory",
},
]

# Hook files that represent governance enforcement
GOVERNANCE_HOOK_PATTERNS = [
"task_id_validator",
"task-tracking-enforcer",
"pre-commit",
]

def __init__(
self,
framework_dir: Path,
target_db_path: Path,
dry_run: bool = False,
tenant_id: Optional[str] = None,
project_id: Optional[str] = None,
):
super().__init__(target_db_path, dry_run, tenant_id, project_id)
self.framework_dir = framework_dir
self.claude_md_path = framework_dir / "CLAUDE.md"
self.standards_dir = framework_dir / "coditect-core-standards"
self.hooks_dir = framework_dir / "hooks"

@property
def node_type(self) -> str:
return "policy"

def extract_entities(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""
Extract policy entities from governance sources.

Yields:
Tuple of (node_id, name, subtype, properties, source_table, source_id)
"""
# Source 1: CLAUDE.md directives
yield from self._extract_claude_md_directives()

# Source 2: Standards files
yield from self._extract_standards()

# Source 3: Governance hooks
yield from self._extract_governance_hooks()

def _extract_claude_md_directives(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""Extract directive policies from CLAUDE.md."""
if not self.claude_md_path.exists():
logger.warning(f"CLAUDE.md not found: {self.claude_md_path}")
return

content = self.claude_md_path.read_text(encoding='utf-8')
logger.info(f"Scanning CLAUDE.md for directives ({len(content)} chars)")

for pattern_config in self.DIRECTIVE_PATTERNS:
heading_re = pattern_config["heading_pattern"]
match = re.search(heading_re, content)
if not match:
continue

# Extract the section content (until next ### or ##)
section_start = match.start()
next_heading = re.search(r'\n###?\s', content[match.end():])
if next_heading:
section_end = match.end() + next_heading.start()
else:
section_end = len(content)

section_text = content[section_start:section_end].strip()

# Extract the heading text for the name
heading_text = match.group(0).strip().lstrip('#').strip()

# Generate a slug for the node ID
slug = re.sub(r'[^a-z0-9]+', '-', heading_text.lower()).strip('-')
node_id = self.generate_node_id(f"claude-md:{slug}")

# Extract the rule text (blockquote content if present)
rule_text = self._extract_blockquote(section_text)
if not rule_text:
rule_text = self._extract_first_paragraph(section_text)

properties = {
"rule": rule_text[:1000] if rule_text else heading_text,
"scope": pattern_config["scope"],
"enforcement_level": pattern_config["enforcement_level"],
"source_file": str(self.claude_md_path),
"section": heading_text,
}

yield (
node_id,
f"Directive: {heading_text}",
pattern_config["subtype"],
properties,
None,
str(self.claude_md_path),
)

logger.info(f"Extracted directives from CLAUDE.md")

def _extract_standards(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""Extract policy nodes from standards files."""
if not self.standards_dir.exists():
logger.warning(f"Standards directory not found: {self.standards_dir}")
return

standard_files = sorted(self.standards_dir.glob("coditect-standard-*.md"))
logger.info(f"Found {len(standard_files)} standards files")

for std_file in standard_files:
try:
result = self._parse_standard_file(std_file)
if result:
yield result
except Exception as e:
logger.warning(f"Error parsing {std_file.name}: {e}")
continue

def _parse_standard_file(self, std_file: Path) -> Optional[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]]]:
"""Parse a standards file into a policy node."""
content = std_file.read_text(encoding='utf-8')

# Extract frontmatter if present
frontmatter = self._extract_frontmatter(content)

# Derive standard name from filename
# coditect-standard-automation.md -> Automation
name_slug = std_file.stem.replace("coditect-standard-", "").replace("CODITECT-STANDARD-", "")
standard_name = name_slug.replace("-", " ").replace("_", " ").title()

# Use frontmatter title if available
if frontmatter.get("title"):
standard_name = frontmatter["title"]

node_id = self.generate_node_id(f"standard:{name_slug}")

# Extract summary
summary = self._extract_summary(content)

# Extract principles/rules from the document
principles = self._extract_principles(content)

properties = {
"standard_name": standard_name,
"scope": frontmatter.get("scope", "framework"),
"enforcement_level": frontmatter.get("enforcement_level", "recommended"),
"source_file": str(std_file),
"file_name": std_file.name,
}

if summary:
properties["summary"] = summary[:500]
if principles:
properties["principles"] = principles[:10] # Cap at 10 principles
if frontmatter.get("version"):
properties["version"] = frontmatter["version"]
if frontmatter.get("status"):
properties["status"] = frontmatter["status"]

return (
node_id,
f"Standard: {standard_name}",
"standard",
properties,
None,
str(std_file),
)

def _extract_governance_hooks(self) -> Generator[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]], None, None]:
"""Extract enforcement policy nodes from governance hooks."""
if not self.hooks_dir.exists():
logger.warning(f"Hooks directory not found: {self.hooks_dir}")
return

hook_files = sorted(self.hooks_dir.glob("*.py"))
logger.info(f"Scanning {len(hook_files)} hook files for governance hooks")

governance_count = 0
for hook_file in hook_files:
# Check if this is a governance hook
is_governance = False
for pattern in self.GOVERNANCE_HOOK_PATTERNS:
if pattern in hook_file.stem:
is_governance = True
break

if not is_governance:
# Also check file content for governance indicators
try:
content = hook_file.read_text(encoding='utf-8')
if any(kw in content.lower() for kw in ["enforce", "validate", "governance", "compliance", "reject"]):
is_governance = True
except Exception:
continue

if not is_governance:
continue

governance_count += 1
try:
result = self._parse_hook_file(hook_file)
if result:
yield result
except Exception as e:
logger.warning(f"Error parsing hook {hook_file.name}: {e}")
continue

logger.info(f"Extracted {governance_count} governance hooks")

def _parse_hook_file(self, hook_file: Path) -> Optional[Tuple[str, str, Optional[str], Dict[str, Any], Optional[str], Optional[str]]]:
"""Parse a hook file into an enforcement policy node."""
content = hook_file.read_text(encoding='utf-8')

# Extract docstring as description
docstring = self._extract_docstring(content)

# Derive hook name
hook_name = hook_file.stem.replace("-", " ").replace("_", " ").title()

node_id = self.generate_node_id(f"hook:{hook_file.stem}")

# Detect hook type from content
hook_type = "unknown"
if "PreToolUse" in content:
hook_type = "pre_tool_use"
elif "PostToolUse" in content:
hook_type = "post_tool_use"
elif "SessionStart" in content or "session.start" in content:
hook_type = "session_start"
elif "pre-commit" in hook_file.stem or "pre_commit" in hook_file.stem:
hook_type = "pre_commit"

# Detect what the hook enforces
enforces = []
if "task_id" in content.lower() or "task id" in content.lower():
enforces.append("task_id_format")
if "track" in content.lower() and "nomenclature" in content.lower():
enforces.append("track_nomenclature")
if "delete" in content.lower() or "rm " in content.lower():
enforces.append("safe_deletion")

properties = {
"hook_name": hook_file.stem,
"hook_type": hook_type,
"enforcement_level": "mandatory",
"scope": "tool_calls",
"source_file": str(hook_file),
}

if docstring:
properties["description"] = docstring[:500]
if enforces:
properties["enforces"] = enforces

return (
node_id,
f"Enforcement: {hook_name}",
"enforcement",
properties,
None,
str(hook_file),
)

# --- Helper Methods ---

def _extract_frontmatter(self, content: str) -> Dict[str, Any]:
"""Extract YAML frontmatter from markdown content."""
frontmatter_match = re.match(r'^---\s*\n(.*?)\n---\s*\n', content, re.DOTALL)
if not frontmatter_match:
return {}
try:
return yaml.safe_load(frontmatter_match.group(1)) or {}
except yaml.YAMLError:
return {}

def _extract_blockquote(self, section_text: str) -> Optional[str]:
"""Extract blockquote content from a markdown section."""
lines = section_text.split('\n')
quote_lines = []
in_quote = False
for line in lines:
stripped = line.strip()
if stripped.startswith('>'):
in_quote = True
quote_lines.append(stripped.lstrip('>').strip())
elif in_quote and stripped:
# Continuation of blockquote without >
break
elif in_quote and not stripped:
break
return '\n'.join(quote_lines).strip() if quote_lines else None

def _extract_first_paragraph(self, section_text: str) -> Optional[str]:
"""Extract first non-heading paragraph from section."""
# Remove heading line
lines = section_text.split('\n')
content_lines = [l for l in lines if not l.strip().startswith('#')]
text = '\n'.join(content_lines).strip()

paragraphs = re.split(r'\n\s*\n', text)
for para in paragraphs:
para = para.strip()
if para and len(para) > 20:
return para
return None

def _extract_summary(self, content: str) -> Optional[str]:
"""Extract summary from a document."""
content = re.sub(r'^---\s*\n.*?\n---\s*\n', '', content, flags=re.DOTALL)

summary_match = re.search(
r'##\s*(?:Executive\s+)?Summary\s*\n+([^\n#]+(?:\n[^\n#]+)*)',
content,
re.IGNORECASE,
)
if summary_match:
return summary_match.group(1).strip()

paragraphs = re.split(r'\n\s*\n', content)
for para in paragraphs:
para = para.strip()
if para and not para.startswith('#') and len(para) > 50:
return para
return None

def _extract_principles(self, content: str) -> List[str]:
"""Extract numbered principles or rules from content."""
principles = []

# Look for numbered list items that look like principles/rules
pattern = re.compile(r'(?:^|\n)\s*(?:\d+\.|\*|-)\s*\*\*(.+?)\*\*', re.MULTILINE)
for match in pattern.finditer(content):
principle = match.group(1).strip()
if len(principle) > 5:
principles.append(principle)

# Also look for ### headings within the content as rule sections
heading_pattern = re.compile(r'^###\s+(.+)', re.MULTILINE)
for match in heading_pattern.finditer(content):
heading = match.group(1).strip()
if "principle" in heading.lower() or "rule" in heading.lower():
principles.append(heading)

return principles

def _extract_docstring(self, content: str) -> Optional[str]:
"""Extract Python docstring from file content."""
match = re.search(r'"""(.*?)"""', content, re.DOTALL)
if match:
docstring = match.group(1).strip()
# Take first paragraph only
paragraphs = docstring.split('\n\n')
return paragraphs[0].strip() if paragraphs else docstring
return None