#!/usr/bin/env python3 """ Project-Scoped Context Query Functions (ADR-156)
Provides functions for querying context data (decisions, learnings, messages) with project scoping. Supports both project-specific and cross-project queries.
Usage: from scripts.core.project_context import ( query_decisions, query_learnings, query_messages, record_decision, record_learning, get_data_scope, )
Created: 2026-02-04 ADR: ADR-156 Project-Scoped Context Databases Architecture """
import os import sqlite3 import sys from datetime import datetime, timezone from pathlib import Path from typing import List, Optional, Dict, Any
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent.parent))
try: from scripts.core.paths import ( get_org_db_path, get_sessions_db_path, discover_project, ) except ImportError: def get_org_db_path(): return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "org.db"
def get_sessions_db_path():
return Path.home() / "PROJECTS" / ".coditect-data" / "context-storage" / "sessions.db"
def discover_project():
return os.environ.get('CODITECT_PROJECT')
def get_iso_timestamp() -> str: """Get ISO 8601 timestamp in UTC.""" return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def get_active_project() -> Optional[str]: """ Get the currently active project.
Resolution order:
1. $CODITECT_PROJECT environment variable
2. Auto-discovered from current working directory
"""
# Environment variable takes precedence
if env_project := os.environ.get('CODITECT_PROJECT'):
return env_project
# Auto-discover from cwd
return discover_project()
def get_data_scope(project_id: str) -> str: """ Determine data scope based on project scope.
Args:
project_id: The project ID to check
Returns:
str: 'global', 'project', or 'customer'
"""
if not project_id:
return 'global'
org_db = get_org_db_path()
if not org_db.exists():
return 'global'
conn = sqlite3.connect(str(org_db))
try:
cursor = conn.execute(
"SELECT scope FROM projects WHERE project_id = ?",
(project_id,)
)
result = cursor.fetchone()
if not result:
return 'global'
project_scope = result[0]
if project_scope == 'platform':
return 'global' # Platform decisions are global
elif project_scope == 'customer':
return 'customer' # Customer data is isolated
else:
return 'project' # Org and project are project-scoped
finally:
conn.close()
def query_decisions( project_id: str = None, include_global: bool = True, limit: int = 100, decision_type: str = None, ) -> List[Dict[str, Any]]: """ Query decisions for a project, optionally including global decisions.
Args:
project_id: Project to query (auto-detected if None)
include_global: Include global scope decisions
limit: Maximum number of results
decision_type: Filter by decision type
Returns:
List of decision dictionaries
"""
if project_id is None:
project_id = get_active_project()
org_db = get_org_db_path()
if not org_db.exists():
return []
conn = sqlite3.connect(str(org_db))
conn.row_factory = sqlite3.Row
try:
if project_id and include_global:
# Project-specific + global decisions
query = """
SELECT d.*, p.name AS project_name
FROM decisions d
LEFT JOIN projects p ON d.project_id = p.project_id
WHERE d.project_id = ? OR d.scope = 'global' OR d.project_id IS NULL
"""
params = [project_id]
elif project_id and not include_global:
# Project-specific only
query = """
SELECT d.*, p.name AS project_name
FROM decisions d
LEFT JOIN projects p ON d.project_id = p.project_id
WHERE d.project_id = ?
"""
params = [project_id]
else:
# All decisions (no project filter)
query = """
SELECT d.*, p.name AS project_name
FROM decisions d
LEFT JOIN projects p ON d.project_id = p.project_id
"""
params = []
if decision_type:
query += " AND d.decision_type = ?"
params.append(decision_type)
query += " ORDER BY d.created_at DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
def query_learnings( project_id: str = None, include_global: bool = True, limit: int = 100, skill_name: str = None, ) -> List[Dict[str, Any]]: """ Query skill learnings for a project.
Args:
project_id: Project to query (auto-detected if None)
include_global: Include global scope learnings
limit: Maximum number of results
skill_name: Filter by skill name
Returns:
List of learning dictionaries
"""
if project_id is None:
project_id = get_active_project()
org_db = get_org_db_path()
if not org_db.exists():
return []
conn = sqlite3.connect(str(org_db))
conn.row_factory = sqlite3.Row
try:
if project_id and include_global:
query = """
SELECT sl.*, p.name AS project_name
FROM skill_learnings sl
LEFT JOIN projects p ON sl.project_id = p.project_id
WHERE sl.project_id = ? OR sl.scope = 'global' OR sl.project_id IS NULL
"""
params = [project_id]
elif project_id and not include_global:
query = """
SELECT sl.*, p.name AS project_name
FROM skill_learnings sl
LEFT JOIN projects p ON sl.project_id = p.project_id
WHERE sl.project_id = ?
"""
params = [project_id]
else:
query = """
SELECT sl.*, p.name AS project_name
FROM skill_learnings sl
LEFT JOIN projects p ON sl.project_id = p.project_id
"""
params = []
if skill_name:
query += " AND sl.skill_name LIKE ?"
params.append(f"%{skill_name}%")
query += " ORDER BY sl.analyzed_at DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
def query_messages( project_id: str = None, limit: int = 100, role: str = None, search_text: str = None, ) -> List[Dict[str, Any]]: """ Query messages for a project.
Args:
project_id: Project to query (auto-detected if None)
limit: Maximum number of results
role: Filter by role (user, assistant)
search_text: Full-text search query
Returns:
List of message dictionaries
"""
if project_id is None:
project_id = get_active_project()
sessions_db = get_sessions_db_path()
if not sessions_db.exists():
return []
conn = sqlite3.connect(str(sessions_db))
conn.row_factory = sqlite3.Row
try:
if project_id:
query = """
SELECT m.*
FROM messages m
WHERE m.project_id = ?
"""
params = [project_id]
else:
query = """
SELECT m.*
FROM messages m
WHERE 1=1
"""
params = []
if role:
query += " AND m.role = ?"
params.append(role)
if search_text:
query += " AND m.content LIKE ?"
params.append(f"%{search_text}%")
query += " ORDER BY m.timestamp DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
def query_error_solutions( project_id: str = None, include_global: bool = True, limit: int = 50, error_type: str = None, ) -> List[Dict[str, Any]]: """ Query error solutions for a project.
Args:
project_id: Project to query (auto-detected if None)
include_global: Include global scope solutions
limit: Maximum number of results
error_type: Filter by error type
Returns:
List of error solution dictionaries
"""
if project_id is None:
project_id = get_active_project()
org_db = get_org_db_path()
if not org_db.exists():
return []
conn = sqlite3.connect(str(org_db))
conn.row_factory = sqlite3.Row
try:
if project_id and include_global:
query = """
SELECT es.*, p.name AS project_name
FROM error_solutions es
LEFT JOIN projects p ON es.project_id = p.project_id
WHERE es.project_id = ? OR es.scope = 'global' OR es.project_id IS NULL
"""
params = [project_id]
elif project_id and not include_global:
query = """
SELECT es.*, p.name AS project_name
FROM error_solutions es
LEFT JOIN projects p ON es.project_id = p.project_id
WHERE es.project_id = ?
"""
params = [project_id]
else:
query = """
SELECT es.*, p.name AS project_name
FROM error_solutions es
LEFT JOIN projects p ON es.project_id = p.project_id
"""
params = []
if error_type:
query += " AND es.error_type = ?"
params.append(error_type)
query += " ORDER BY es.last_used DESC LIMIT ?"
params.append(limit)
cursor = conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
finally:
conn.close()
def record_decision( decision: str, decision_type: str, rationale: str = None, project_id: str = None, scope: str = None, confidence: float = 0.7, tags: str = None, ) -> int: """ Record a decision with project attribution.
Args:
decision: The decision text
decision_type: Type of decision (architecture, implementation, etc.)
rationale: Reasoning behind the decision
project_id: Project to attribute (auto-detected if None)
scope: Data scope (auto-determined if None)
confidence: Confidence level (0.0 to 1.0)
tags: Comma-separated tags
Returns:
int: The ID of the inserted decision
"""
if project_id is None:
project_id = get_active_project()
if scope is None:
scope = get_data_scope(project_id) if project_id else 'global'
org_db = get_org_db_path()
if not org_db.exists():
raise FileNotFoundError(f"org.db not found at {org_db}")
conn = sqlite3.connect(str(org_db))
try:
cursor = conn.execute("""
INSERT INTO decisions (
decision, decision_type, rationale,
project_id, scope, confidence, tags, created_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
decision, decision_type, rationale,
project_id, scope, confidence, tags, get_iso_timestamp()
))
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def record_learning( skill_name: str, session_id: str, outcome: str = None, effectiveness_score: int = None, errors: str = None, project_id: str = None, scope: str = None, ) -> int: """ Record a skill learning with project attribution.
Args:
skill_name: Name of the skill
session_id: Session where learning occurred
outcome: Outcome description
effectiveness_score: Score (0-100)
errors: Error descriptions if any
project_id: Project to attribute (auto-detected if None)
scope: Data scope (auto-determined if None)
Returns:
int: The ID of the inserted learning
"""
if project_id is None:
project_id = get_active_project()
if scope is None:
scope = get_data_scope(project_id) if project_id else 'global'
org_db = get_org_db_path()
if not org_db.exists():
raise FileNotFoundError(f"org.db not found at {org_db}")
conn = sqlite3.connect(str(org_db))
try:
cursor = conn.execute("""
INSERT OR REPLACE INTO skill_learnings (
skill_name, session_id, outcome,
effectiveness_score, errors, project_id, scope, analyzed_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
skill_name, session_id, outcome,
effectiveness_score, errors, project_id, scope, get_iso_timestamp()
))
conn.commit()
return cursor.lastrowid
finally:
conn.close()
def get_project_context_stats(project_id: str = None) -> Dict[str, Any]: """ Get context statistics for a project.
Args:
project_id: Project to query (auto-detected if None)
Returns:
dict: Statistics including counts of decisions, learnings, messages
"""
if project_id is None:
project_id = get_active_project()
stats = {
"project_id": project_id,
"decisions": 0,
"learnings": 0,
"error_solutions": 0,
"messages": 0,
}
org_db = get_org_db_path()
if org_db.exists():
conn = sqlite3.connect(str(org_db))
try:
if project_id:
cursor = conn.execute(
"SELECT COUNT(*) FROM decisions WHERE project_id = ?",
(project_id,)
)
stats["decisions"] = cursor.fetchone()[0]
cursor = conn.execute(
"SELECT COUNT(*) FROM skill_learnings WHERE project_id = ?",
(project_id,)
)
stats["learnings"] = cursor.fetchone()[0]
cursor = conn.execute(
"SELECT COUNT(*) FROM error_solutions WHERE project_id = ?",
(project_id,)
)
stats["error_solutions"] = cursor.fetchone()[0]
finally:
conn.close()
sessions_db = get_sessions_db_path()
if sessions_db.exists():
conn = sqlite3.connect(str(sessions_db))
try:
if project_id:
cursor = conn.execute(
"SELECT COUNT(*) FROM messages WHERE project_id = ?",
(project_id,)
)
stats["messages"] = cursor.fetchone()[0]
finally:
conn.close()
return stats
CLI for testing
if name == "main": import argparse import json
parser = argparse.ArgumentParser(description="Project Context Query (ADR-156)")
parser.add_argument("--project", "-p", help="Project ID")
parser.add_argument("--decisions", action="store_true", help="Query decisions")
parser.add_argument("--learnings", action="store_true", help="Query learnings")
parser.add_argument("--messages", action="store_true", help="Query messages")
parser.add_argument("--errors", action="store_true", help="Query error solutions")
parser.add_argument("--stats", action="store_true", help="Show project stats")
parser.add_argument("--limit", type=int, default=10, help="Result limit")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
project_id = args.project or get_active_project()
print(f"Project: {project_id or '(none - global)'}")
print()
if args.stats:
stats = get_project_context_stats(project_id)
if args.json:
print(json.dumps(stats, indent=2))
else:
print("Context Statistics:")
print(f" Decisions: {stats['decisions']}")
print(f" Learnings: {stats['learnings']}")
print(f" Error Solutions: {stats['error_solutions']}")
print(f" Messages: {stats['messages']}")
elif args.decisions:
results = query_decisions(project_id, limit=args.limit)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(f"Decisions ({len(results)}):")
for d in results:
print(f" [{d.get('decision_type', 'unknown')}] {d.get('decision', '')[:80]}")
if d.get('project_name'):
print(f" Project: {d['project_name']}")
elif args.learnings:
results = query_learnings(project_id, limit=args.limit)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(f"Learnings ({len(results)}):")
for l in results:
print(f" [{l.get('skill_name', 'unknown')}] Score: {l.get('effectiveness_score', 'N/A')}")
elif args.messages:
results = query_messages(project_id, limit=args.limit)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(f"Messages ({len(results)}):")
for m in results:
content = m.get('content', '')[:100]
print(f" [{m.get('role', 'unknown')}] {content}...")
elif args.errors:
results = query_error_solutions(project_id, limit=args.limit)
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(f"Error Solutions ({len(results)}):")
for e in results:
print(f" [{e.get('error_type', 'unknown')}] {e.get('error_signature', '')[:60]}")
else:
# Show stats by default
stats = get_project_context_stats(project_id)
print("Context Statistics:")
print(f" Decisions: {stats['decisions']}")
print(f" Learnings: {stats['learnings']}")
print(f" Error Solutions: {stats['error_solutions']}")
print(f" Messages: {stats['messages']}")