#!/usr/bin/env python3 """ OPT-5b: Knowledge Graph Contradiction Detector (ADR-151)
Detects contradictions and conflicts within the knowledge graph:
- Policy violations: decisions that conflict with mandatory policies
- Superseded conflicts: active decisions that contradict newer replacements
- Opposing stances: decisions of same type with opposing recommendations
Creates CONTRADICTS edges between conflicting nodes and outputs a report.
Usage: python3 scripts/knowledge_graph/contradiction_detector.py python3 scripts/knowledge_graph/contradiction_detector.py --dry-run python3 scripts/knowledge_graph/contradiction_detector.py --rules policy_violation,opposing_stance python3 scripts/knowledge_graph/contradiction_detector.py --report python3 scripts/knowledge_graph/contradiction_detector.py --json
Created: 2026-02-05 Author: Claude (Opus 4.6) Track: J (Memory Intelligence) Task: J.3.7.3 """
import argparse import json import logging import re import sqlite3 import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Generator, List, Optional, Set, Tuple
Handle imports for both module and direct execution
try: from scripts.core.paths import get_org_db_path except ModuleNotFoundError: _script_dir = Path(file).resolve().parent _core_root = _script_dir.parent.parent if str(_core_root) not in sys.path: sys.path.insert(0, str(_core_root)) from scripts.core.paths import get_org_db_path
Configure logging
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(name)
Contradiction tuple: (from_node, to_node, severity, details)
Contradiction = Tuple[str, str, str, Dict[str, Any]]
Severity levels
SEVERITY_CRITICAL = "critical" # Policy violation, must resolve SEVERITY_HIGH = "high" # Active conflict, should resolve SEVERITY_MEDIUM = "medium" # Potential conflict, review recommended SEVERITY_LOW = "low" # Informational, may be intentional
Negation pairs for stance detection
AFFIRMATIVE_VERBS = { "use", "adopt", "implement", "enable", "choose", "select", "prefer", "require", "deploy", "install", "create", "add", "include", "allow", "accept", "start", "keep", "maintain", } NEGATIVE_VERBS = { "avoid", "remove", "disable", "reject", "deprecate", "delete", "exclude", "disallow", "deny", "stop", "drop", "abandon", "replace", "eliminate", "forbid", "prohibit", "prevent", } NEGATION_PREFIXES = {"don't", "do not", "never", "not", "no longer", "shouldn't"}
Technology keywords for domain matching
Values are canonical name → set of aliases
TECH_ALIASES = { "postgres": {"postgres", "postgresql"}, "kubernetes": {"kubernetes", "k8s"}, }
TECH_DOMAINS = { "database": {"sqlite", "postgres", "mysql", "mongodb", "redis", "dynamodb", "firestore", "cockroachdb", "foundationdb", "supabase"}, "cache": {"redis", "memcached", "varnish"}, "container": {"docker", "podman", "containerd", "buildah", "kaniko"}, "orchestration": {"kubernetes", "gke", "eks", "aks", "docker-compose", "nomad", "swarm", "cloud run"}, "backend_framework": {"django", "flask", "fastapi", "express", "nestjs"}, "frontend_framework": {"react", "nextjs", "angular", "vue", "svelte"}, "auth_protocol": {"oauth", "saml", "oidc", "firebase auth", "auth0", "keycloak"}, "auth_token": {"jwt", "session", "cookie"}, "messaging": {"rabbitmq", "kafka", "pubsub", "sqs", "nats"}, "search": {"elasticsearch", "opensearch", "algolia", "meilisearch", "typesense", "fts5"}, }
class ContradictionDetector: """ Detects contradictions in the knowledge graph between decisions, policies, and ADRs. Creates CONTRADICTS edges for discovered conflicts. """
BATCH_SIZE = 500
RULES = {
"policy_violation": "Decisions that violate mandatory policy rules",
"superseded_conflict": "Active decisions contradicted by newer replacements",
"opposing_stance": "Same-type decisions with opposing recommendations",
"tech_divergence": "Conflicting technology choices in same domain",
}
def __init__(
self,
org_db_path: Path,
dry_run: bool = False,
):
self.org_db_path = org_db_path
self.dry_run = dry_run
self._conn: Optional[sqlite3.Connection] = None
self._existing_nodes: Optional[Set[str]] = None
self._existing_edges: Optional[Set[str]] = None
self.stats: Dict[str, Dict[str, int]] = {}
self.contradictions: List[Dict[str, Any]] = [] # For reporting
def connect(self) -> sqlite3.Connection:
if self._conn is None:
self._conn = sqlite3.connect(str(self.org_db_path))
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA foreign_keys = ON;")
return self._conn
def close(self):
if self._conn:
self._conn.close()
self._conn = None
def load_existing_nodes(self) -> Set[str]:
if self._existing_nodes is not None:
return self._existing_nodes
conn = self.connect()
self._existing_nodes = {
row[0] for row in conn.execute("SELECT id FROM kg_nodes")
}
return self._existing_nodes
def load_existing_edges(self) -> Set[str]:
if self._existing_edges is not None:
return self._existing_edges
conn = self.connect()
self._existing_edges = {
f"{row[0]}:{row[1]}:{row[2]}"
for row in conn.execute(
"SELECT edge_type, from_node, to_node FROM kg_edges"
)
}
return self._existing_edges
def edge_exists(self, from_node: str, to_node: str) -> bool:
edges = self.load_existing_edges()
return f"CONTRADICTS:{from_node}:{to_node}" in edges
def upsert_contradiction_edges(
self,
conn: sqlite3.Connection,
items: List[Contradiction],
) -> Tuple[int, int]:
"""Batch upsert CONTRADICTS edges."""
now = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
success = 0
errors = 0
for from_node, to_node, severity, details in items:
edge_id = f"CONTRADICTS:{from_node}:{to_node}"
properties = {
"contradiction_type": details.get("rule", "unknown"),
"severity": severity,
"description": details.get("description", ""),
"evidence": details.get("evidence", ""),
"inferred": True,
"inferred_at": now,
}
props_json = json.dumps(properties, ensure_ascii=False)
try:
conn.execute("""
INSERT INTO kg_edges (
id, edge_type, from_node, to_node, properties,
created_at
) VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(from_node, to_node, edge_type) DO UPDATE SET
properties = excluded.properties
""", (edge_id, "CONTRADICTS", from_node, to_node, props_json, now))
success += 1
except sqlite3.Error as e:
logger.debug(f"Error upserting contradiction edge {edge_id}: {e}")
errors += 1
return success, errors
def run(self, rules: Optional[List[str]] = None) -> Dict[str, Dict[str, int]]:
if rules is None:
rules = list(self.RULES.keys())
conn = self.connect()
nodes = self.load_existing_nodes()
self.load_existing_edges()
logger.info("=" * 60)
logger.info("ADR-151 Knowledge Graph Contradiction Detector")
logger.info(f"Mode: {'DRY RUN' if self.dry_run else 'EXECUTE'}")
logger.info(f"Rules: {', '.join(rules)}")
logger.info(f"Nodes: {len(nodes)}")
logger.info("=" * 60)
rule_methods = {
"policy_violation": self._detect_policy_violations,
"superseded_conflict": self._detect_superseded_conflicts,
"opposing_stance": self._detect_opposing_stances,
"tech_divergence": self._detect_tech_divergence,
}
for rule_name in rules:
if rule_name not in rule_methods:
logger.warning(f"Unknown rule: {rule_name}")
continue
logger.info(f"\n[{rule_name.upper()}] {self.RULES[rule_name]}")
logger.info("-" * 40)
rule_stats = {
"candidates": 0,
"contradictions": 0,
"skipped_exists": 0,
"skipped_missing": 0,
"errors": 0,
}
batch: List[Contradiction] = []
try:
for contradiction in rule_methods[rule_name]():
from_node, to_node, severity, details = contradiction
rule_stats["candidates"] += 1
# Validate endpoints
if from_node not in nodes or to_node not in nodes:
rule_stats["skipped_missing"] += 1
continue
# Skip if already exists (in either direction)
if self.edge_exists(from_node, to_node) or self.edge_exists(to_node, from_node):
rule_stats["skipped_exists"] += 1
continue
# Record for report
self.contradictions.append({
"from": from_node,
"to": to_node,
"severity": severity,
"rule": rule_name,
**details,
})
if self.dry_run:
rule_stats["contradictions"] += 1
if rule_stats["contradictions"] <= 5:
logger.info(
f" [{severity.upper()}] {from_node} <-> {to_node}"
)
if details.get("description"):
logger.info(f" {details['description'][:120]}")
continue
batch.append(contradiction)
# Track to avoid duplicates within run
key = f"CONTRADICTS:{from_node}:{to_node}"
self._existing_edges.add(key)
if len(batch) >= self.BATCH_SIZE:
ok, err = self.upsert_contradiction_edges(conn, batch)
conn.commit()
rule_stats["contradictions"] += ok
rule_stats["errors"] += err
batch = []
# Flush remaining
if batch and not self.dry_run:
ok, err = self.upsert_contradiction_edges(conn, batch)
conn.commit()
rule_stats["contradictions"] += ok
rule_stats["errors"] += err
except Exception as e:
logger.error(f"Rule {rule_name} failed: {e}", exc_info=True)
rule_stats["errors"] += 1
self.stats[rule_name] = rule_stats
logger.info(
f" Result: {rule_stats['contradictions']} contradictions, "
f"{rule_stats['skipped_exists']} existing, "
f"{rule_stats['skipped_missing']} missing nodes, "
f"{rule_stats['errors']} errors"
)
# Summary
total = sum(s["contradictions"] for s in self.stats.values())
by_severity = defaultdict(int)
for c in self.contradictions:
by_severity[c["severity"]] += 1
logger.info(f"\nTotal contradictions: {total}")
for sev in [SEVERITY_CRITICAL, SEVERITY_HIGH, SEVERITY_MEDIUM, SEVERITY_LOW]:
if by_severity[sev]:
logger.info(f" {sev}: {by_severity[sev]}")
self.close()
return self.stats
# ---- Detection Rules ----
def _detect_policy_violations(self) -> Generator[Contradiction, None, None]:
"""
Rule: Policy Violation
Compares decision text against mandatory policy rules to find
decisions that may violate established policies.
Extracts forbidden actions from policy rules and checks if
any decisions reference performing those actions.
"""
conn = self.connect()
# Load mandatory policies
policies = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'policy'
AND properties IS NOT NULL
""").fetchall()
if not policies:
logger.info(" No policies found")
return
# Build forbidden-action patterns from policies
policy_patterns: List[Tuple[str, str, List[re.Pattern]]] = []
for policy in policies:
policy_id = policy['id']
try:
props = json.loads(policy['properties'])
except (json.JSONDecodeError, TypeError):
continue
rule_text = props.get('rule', '')
if not rule_text:
continue
# Extract forbidden actions from policy rule text
patterns = self._extract_forbidden_patterns(rule_text)
if patterns:
policy_patterns.append((policy_id, rule_text, patterns))
if not policy_patterns:
logger.info(" No actionable policy patterns extracted")
return
logger.info(f" Loaded {len(policy_patterns)} policies with {sum(len(p[2]) for p in policy_patterns)} patterns")
# Scan decisions for violations
cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'decision'
AND properties IS NOT NULL
""")
for decision in cursor:
decision_id = decision['id']
try:
props = json.loads(decision['properties'])
except (json.JSONDecodeError, TypeError):
continue
# Build searchable text from decision
text = " ".join(filter(None, [
decision['name'] or "",
props.get('decision_text', ''),
props.get('rationale', ''),
props.get('context', ''),
])).lower()
if not text.strip():
continue
# Check against each policy
for policy_id, rule_text, patterns in policy_patterns:
for pattern in patterns:
match = pattern.search(text)
if match:
matched_text = match.group(0)
yield (
decision_id,
policy_id,
SEVERITY_CRITICAL,
{
"rule": "policy_violation",
"description": f"Decision may violate policy: matched '{matched_text}'",
"evidence": f"Policy rule: {rule_text[:200]}",
"matched_pattern": matched_text,
"policy_name": policy_id,
},
)
break # One violation per policy per decision is enough
def _extract_forbidden_patterns(self, rule_text: str) -> List[re.Pattern]:
"""Extract regex patterns for forbidden actions from policy rule text."""
patterns = []
rule_lower = rule_text.lower()
# Pattern: "NEVER <action>" - extract the action
never_matches = re.findall(
r'never\s+(?:use\s+)?[`"\']*([a-z][a-z0-9_ -]+)[`"\']*',
rule_lower,
)
for action in never_matches:
action = action.strip().rstrip('.,;:')
if len(action) > 3: # Skip very short matches
try:
patterns.append(re.compile(
r'\b' + re.escape(action) + r'\b', re.IGNORECASE
))
except re.error:
pass
# Pattern: "DO NOT <action>"
donot_matches = re.findall(
r'do\s+not\s+(?:use\s+)?[`"\']*([a-z][a-z0-9_ -]+)[`"\']*',
rule_lower,
)
for action in donot_matches:
action = action.strip().rstrip('.,;:')
if len(action) > 3:
try:
patterns.append(re.compile(
r'\b' + re.escape(action) + r'\b', re.IGNORECASE
))
except re.error:
pass
# Pattern: "MUST be made in <location>" → violation = modifying elsewhere
# This is too complex for regex alone, skip
return patterns
def _detect_superseded_conflicts(self) -> Generator[Contradiction, None, None]:
"""
Rule: Superseded Conflict
Finds decision chains (via SUPERSEDES edges) where both the newer
and older decisions are still active and may give conflicting guidance.
This catches stale decisions that haven't been deprecated after
being superseded.
"""
conn = self.connect()
# Find all SUPERSEDES edges between decisions
cursor = conn.execute("""
SELECT
e.from_node as newer_id,
e.to_node as older_id,
n_new.name as newer_name,
n_new.properties as newer_props,
n_old.name as older_name,
n_old.properties as older_props,
e.properties as edge_props
FROM kg_edges e
JOIN kg_nodes n_new ON n_new.id = e.from_node AND n_new.node_type = 'decision'
JOIN kg_nodes n_old ON n_old.id = e.to_node AND n_old.node_type = 'decision'
WHERE e.edge_type = 'SUPERSEDES'
""")
for row in cursor:
newer_id = row['newer_id']
older_id = row['older_id']
try:
newer_props = json.loads(row['newer_props']) if row['newer_props'] else {}
older_props = json.loads(row['older_props']) if row['older_props'] else {}
edge_props = json.loads(row['edge_props']) if row['edge_props'] else {}
except (json.JSONDecodeError, TypeError):
continue
# Extract texts for comparison
newer_text = " ".join(filter(None, [
row['newer_name'] or "",
newer_props.get('decision_text', ''),
])).lower()
older_text = " ".join(filter(None, [
row['older_name'] or "",
older_props.get('decision_text', ''),
])).lower()
if not newer_text.strip() or not older_text.strip():
continue
# Check if the two decisions have opposing signals
newer_stance = self._extract_stance(newer_text)
older_stance = self._extract_stance(older_text)
if self._stances_conflict(newer_stance, older_stance):
dtype = edge_props.get('decision_type', 'general')
yield (
newer_id,
older_id,
SEVERITY_HIGH,
{
"rule": "superseded_conflict",
"description": (
f"Superseding decision appears to contradict the older one "
f"(type: {dtype})"
),
"evidence": (
f"Newer stance: {self._format_stance(newer_stance)[:100]}; "
f"Older stance: {self._format_stance(older_stance)[:100]}"
),
"decision_type": dtype,
},
)
def _detect_opposing_stances(self) -> Generator[Contradiction, None, None]:
"""
Rule: Opposing Stance
Within the same decision_type, find pairs of decisions that take
opposing stances on the same topic using verb-object analysis.
Only considers decisions with sufficient text content.
"""
conn = self.connect()
# Load decisions grouped by type
cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'decision'
AND properties IS NOT NULL
""")
type_decisions: Dict[str, List[Tuple[str, str, Dict]]] = defaultdict(list)
for row in cursor:
try:
props = json.loads(row['properties'])
except (json.JSONDecodeError, TypeError):
continue
dtype = props.get('decision_type', 'general')
text = " ".join(filter(None, [
row['name'] or "",
props.get('decision_text', ''),
props.get('rationale', ''),
])).lower()
# Skip very short texts
if len(text) < 20:
continue
type_decisions[dtype].append((row['id'], text, props))
# For each type, compare decisions pairwise
# Only process types with manageable numbers (skip 'general' if too large)
for dtype, decisions in type_decisions.items():
if dtype == 'general' and len(decisions) > 200:
# Sample for general type to avoid O(n^2) explosion
import random
decisions = random.sample(decisions, 200)
for i in range(len(decisions)):
stance_i = self._extract_stance(decisions[i][1])
if not stance_i["actions"]:
continue
for j in range(i + 1, len(decisions)):
stance_j = self._extract_stance(decisions[j][1])
if not stance_j["actions"]:
continue
conflict = self._find_specific_conflict(stance_i, stance_j)
if conflict:
yield (
decisions[i][0],
decisions[j][0],
SEVERITY_MEDIUM,
{
"rule": "opposing_stance",
"description": f"Opposing stances on '{conflict['topic']}' within {dtype} decisions",
"evidence": (
f"A: '{conflict['stance_a'][:80]}' vs "
f"B: '{conflict['stance_b'][:80]}'"
),
"decision_type": dtype,
"conflict_topic": conflict["topic"],
},
)
def _detect_tech_divergence(self) -> Generator[Contradiction, None, None]:
"""
Rule: Technology Divergence
Within architecture/technology/database decisions, detect when
multiple decisions recommend different technologies for the
same problem domain.
"""
conn = self.connect()
cursor = conn.execute("""
SELECT id, name, properties
FROM kg_nodes
WHERE node_type = 'decision'
AND properties IS NOT NULL
AND json_extract(properties, '$.decision_type') IN
('architecture', 'technology', 'database', 'security')
""")
# Map: domain → list of (decision_id, tech_name, text)
domain_techs: Dict[str, List[Tuple[str, str, str]]] = defaultdict(list)
for row in cursor:
try:
props = json.loads(row['properties'])
except (json.JSONDecodeError, TypeError):
continue
text = " ".join(filter(None, [
row['name'] or "",
props.get('decision_text', ''),
])).lower()
# Check each tech domain (canonicalize aliases)
for domain, techs in TECH_DOMAINS.items():
for tech in techs:
# Also search for aliases
search_terms = {tech}
for canonical, aliases in TECH_ALIASES.items():
if tech == canonical:
search_terms = aliases
break
if any(t in text for t in search_terms):
domain_techs[domain].append((row['id'], tech, text))
# Find conflicts within domains
for domain, entries in domain_techs.items():
if len(entries) < 2:
continue
# Group by decision (a decision might mention multiple techs)
decision_techs: Dict[str, Set[str]] = defaultdict(set)
decision_texts: Dict[str, str] = {}
for dec_id, tech, text in entries:
decision_techs[dec_id].add(tech)
decision_texts[dec_id] = text
# Find pairs that chose different primary techs
decision_list = list(decision_techs.items())
for i in range(len(decision_list)):
dec_i, techs_i = decision_list[i]
for j in range(i + 1, len(decision_list)):
dec_j, techs_j = decision_list[j]
# Conflict if no overlap and different choices
if not techs_i.intersection(techs_j):
# Check that the texts show affirmative usage
text_i = decision_texts[dec_i]
text_j = decision_texts[dec_j]
if (self._is_affirmative_tech_mention(text_i, techs_i) and
self._is_affirmative_tech_mention(text_j, techs_j)):
yield (
dec_i,
dec_j,
SEVERITY_MEDIUM,
{
"rule": "tech_divergence",
"description": (
f"Different {domain} technologies chosen: "
f"{', '.join(sorted(techs_i))} vs {', '.join(sorted(techs_j))}"
),
"evidence": f"Domain: {domain}",
"domain": domain,
"tech_a": sorted(techs_i),
"tech_b": sorted(techs_j),
},
)
# ---- Helper Methods ----
def _extract_stance(self, text: str) -> Dict[str, Any]:
"""
Extract stance information from text.
Returns dict with:
actions: list of (verb, object, polarity) tuples
negations: set of negated objects
affirmations: set of affirmed objects
"""
stance = {
"actions": [],
"negations": set(),
"affirmations": set(),
}
# Find negation + verb patterns: "don't use X", "never use X", "avoid X"
neg_patterns = [
r"(?:don'?t|do\s+not|never|shouldn'?t)\s+(?:use|adopt|implement|enable|deploy|install|create)\s+(\w[\w\s-]{2,20})",
r"(?:avoid|remove|disable|reject|deprecate|eliminate|forbid|prohibit)\s+(\w[\w\s-]{2,20})",
]
for pattern in neg_patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
obj = match.group(1).strip().rstrip('.,;:')
if len(obj) > 2:
stance["actions"].append(("negate", obj, -1))
stance["negations"].add(obj)
# Find affirmative verb patterns: "use X", "adopt X", "implement X"
aff_patterns = [
r"(?:use|adopt|implement|enable|deploy|install|choose|select|prefer|require)\s+(\w[\w\s-]{2,20})",
]
for pattern in aff_patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
obj = match.group(1).strip().rstrip('.,;:')
if len(obj) > 2:
# Verify it's not preceded by negation
start = max(0, match.start() - 20)
prefix = text[start:match.start()].lower()
is_negated = any(
neg in prefix
for neg in ["don't", "do not", "never", "not", "shouldn't"]
)
if not is_negated:
stance["actions"].append(("affirm", obj, 1))
stance["affirmations"].add(obj)
return stance
def _stances_conflict(
self,
stance_a: Dict[str, Any],
stance_b: Dict[str, Any],
) -> bool:
"""Check if two stances have a clear conflict."""
# Conflict: A affirms something B negates (or vice versa)
for obj_a in stance_a["affirmations"]:
for obj_b in stance_b["negations"]:
if self._objects_match(obj_a, obj_b):
return True
for obj_a in stance_a["negations"]:
for obj_b in stance_b["affirmations"]:
if self._objects_match(obj_a, obj_b):
return True
return False
def _find_specific_conflict(
self,
stance_a: Dict[str, Any],
stance_b: Dict[str, Any],
) -> Optional[Dict[str, str]]:
"""Find a specific conflicting topic between two stances."""
for obj_a in stance_a["affirmations"]:
for obj_b in stance_b["negations"]:
if self._objects_match(obj_a, obj_b):
return {
"topic": obj_a,
"stance_a": f"affirm: {obj_a}",
"stance_b": f"negate: {obj_b}",
}
for obj_a in stance_a["negations"]:
for obj_b in stance_b["affirmations"]:
if self._objects_match(obj_a, obj_b):
return {
"topic": obj_a,
"stance_a": f"negate: {obj_a}",
"stance_b": f"affirm: {obj_b}",
}
return None
def _objects_match(self, obj_a: str, obj_b: str) -> bool:
"""Check if two extracted objects refer to the same thing."""
a = obj_a.lower().strip()
b = obj_b.lower().strip()
# Exact match
if a == b:
return True
# One contains the other (for multi-word objects)
if len(a) > 4 and len(b) > 4:
if a in b or b in a:
return True
# First significant word match (for "use redis caching" vs "redis")
a_words = set(a.split())
b_words = set(b.split())
common = a_words.intersection(b_words)
# Only match if common words are significant (not stop words)
stop = {"the", "a", "an", "to", "for", "in", "on", "of", "and", "or", "is", "it", "as"}
significant_common = common - stop
if significant_common and len(significant_common) >= 1:
return True
return False
def _format_stance(self, stance: Dict[str, Any]) -> str:
"""Format stance for display."""
parts = []
for action_type, obj, _ in stance.get("actions", []):
parts.append(f"{action_type}({obj})")
return ", ".join(parts) if parts else "(no clear stance)"
def _is_affirmative_tech_mention(self, text: str, techs: Set[str]) -> bool:
"""Check if the text affirms using the given technologies."""
# Expand aliases
expanded_techs = set()
for tech in techs:
expanded_techs.add(tech)
for canonical, aliases in TECH_ALIASES.items():
if tech == canonical:
expanded_techs.update(aliases)
for tech in expanded_techs:
# Check for affirmative patterns around the tech mention
aff_patterns = [
rf'(?:use|adopt|deploy|implement|choose|select|prefer|require|install)\s+{re.escape(tech)}',
rf'{re.escape(tech)}\s+(?:is|was|will be)\s+(?:the|our|the best)',
rf'(?:chose|selected|decided on|went with|using)\s+{re.escape(tech)}',
]
for pattern in aff_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
# If no clear affirmative pattern, check the tech isn't negated
for tech in expanded_techs:
neg_patterns = [
rf'(?:avoid|remove|replace|deprecated?|don\'t use)\s+{re.escape(tech)}',
rf'{re.escape(tech)}\s+(?:is|was)\s+(?:deprecated|removed|replaced)',
]
for pattern in neg_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False
# Default: tech mention without clear polarity is tentatively affirmative
return True
def get_report(self) -> Dict[str, Any]:
"""Generate contradiction report."""
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"mode": "dry_run" if self.dry_run else "live",
"stats": self.stats,
"total_contradictions": len(self.contradictions),
"by_severity": {},
"by_rule": {},
"contradictions": self.contradictions,
}
for c in self.contradictions:
sev = c["severity"]
rule = c["rule"]
report["by_severity"][sev] = report["by_severity"].get(sev, 0) + 1
report["by_rule"][rule] = report["by_rule"].get(rule, 0) + 1
return report
def get_contradiction_stats(org_db_path: Path) -> Dict[str, int]: """Get count of contradiction edges by severity.""" conn = sqlite3.connect(str(org_db_path)) try: cursor = conn.execute(""" SELECT json_extract(properties, '$.severity') as severity, COUNT(*) as cnt FROM kg_edges WHERE edge_type = 'CONTRADICTS' GROUP BY severity ORDER BY cnt DESC """) return {row[0]: row[1] for row in cursor} except sqlite3.OperationalError: return {} finally: conn.close()
def print_report(detector: ContradictionDetector): """Print formatted contradiction report.""" report = detector.get_report()
print("\n" + "=" * 60)
print("CONTRADICTION DETECTION REPORT")
print("=" * 60)
print(f"Generated: {report['generated_at']}")
print(f"Mode: {report['mode']}")
print(f"Total: {report['total_contradictions']}")
print()
if report['by_severity']:
print("By Severity:")
for sev in [SEVERITY_CRITICAL, SEVERITY_HIGH, SEVERITY_MEDIUM, SEVERITY_LOW]:
count = report['by_severity'].get(sev, 0)
if count:
marker = {"critical": "!!!", "high": "!!", "medium": "!", "low": "."}
print(f" {marker.get(sev, '.')} {sev:10} {count:>5}")
print()
if report['by_rule']:
print("By Rule:")
for rule, count in sorted(report['by_rule'].items(), key=lambda x: -x[1]):
print(f" {rule:25} {count:>5}")
print()
# Print critical and high severity details
critical_high = [
c for c in report['contradictions']
if c['severity'] in (SEVERITY_CRITICAL, SEVERITY_HIGH)
]
if critical_high:
print(f"Critical/High Details ({len(critical_high)}):")
print("-" * 60)
for c in critical_high[:20]:
print(f" [{c['severity'].upper()}] {c['rule']}")
print(f" From: {c['from']}")
print(f" To: {c['to']}")
if c.get('description'):
print(f" {c['description'][:100]}")
if c.get('evidence'):
print(f" Evidence: {c['evidence'][:100]}")
print()
def main(): parser = argparse.ArgumentParser( description="ADR-151 Knowledge Graph Contradiction Detector", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Rules: policy_violation Decisions that violate mandatory policy rules superseded_conflict Active decisions contradicted by newer replacements opposing_stance Same-type decisions with opposing recommendations tech_divergence Conflicting technology choices in same domain
Examples: # Run all rules python3 scripts/knowledge_graph/contradiction_detector.py
# Dry run
python3 scripts/knowledge_graph/contradiction_detector.py --dry-run
# Run specific rules
python3 scripts/knowledge_graph/contradiction_detector.py --rules policy_violation
# Full report with details
python3 scripts/knowledge_graph/contradiction_detector.py --report
# JSON output for CI integration
python3 scripts/knowledge_graph/contradiction_detector.py --json
# Show existing contradiction stats
python3 scripts/knowledge_graph/contradiction_detector.py --stats
"""
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview contradictions without writing edges"
)
parser.add_argument(
"--rules", "-r",
type=str,
help="Comma-separated list of rules to run (default: all)"
)
parser.add_argument(
"--report",
action="store_true",
help="Print detailed contradiction report"
)
parser.add_argument(
"--stats",
action="store_true",
help="Show existing contradiction statistics and exit"
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
org_db = get_org_db_path()
if args.stats:
stats = get_contradiction_stats(org_db)
print("\nContradiction Edge Statistics")
print("=" * 40)
if not stats:
print("No contradiction edges found")
else:
total = 0
for severity, count in stats.items():
print(f" {severity or 'unknown':15} {count:>8,}")
total += count
print("-" * 40)
print(f" {'TOTAL':15} {total:>8,}")
return 0
rules = None
if args.rules:
rules = [r.strip() for r in args.rules.split(",")]
for r in rules:
if r not in ContradictionDetector.RULES:
print(f"Error: Unknown rule '{r}'")
print(f"Available: {', '.join(ContradictionDetector.RULES.keys())}")
return 1
detector = ContradictionDetector(
org_db_path=org_db,
dry_run=args.dry_run,
)
results = detector.run(rules=rules)
if args.report or args.json:
if args.json:
print(json.dumps(detector.get_report(), indent=2, default=str))
else:
print_report(detector)
else:
# Print summary stats
stats = get_contradiction_stats(org_db) if not args.dry_run else {}
if stats:
print("\nContradiction Edges in Graph:")
for sev, count in stats.items():
print(f" {sev or 'unknown':15} {count:>5}")
return 0
if name == "main": sys.exit(main())