#!/usr/bin/env python3 """ CODITECT Scope Resolution Module (ADR-159)
Provides consistent multi-tenant scope resolution for all CODITECT commands. Implements the standard --project/--team/--tenant parameter pattern and config discovery cascade.
Usage: from scripts.core.scope import ( resolve_scope, get_project_config, add_scope_args, ScopeContext, )
# In command scripts:
parser = argparse.ArgumentParser()
add_scope_args(parser)
args = parser.parse_args()
scope = resolve_scope(project=args.project, team=args.team, tenant=args.tenant)
# Use scope for data routing
config = get_project_config(scope.project)
Created: 2026-02-06 ADR: ADR-159 (Multi-Tenant Command Architecture) """
import argparse import json import os import sys from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, Optional
Add parent to path for imports
sys.path.insert(0, str(Path(file).parent.parent.parent))
try: from scripts.core.paths import ( discover_project, get_current_project, get_project_scope, get_framework_dir, get_user_data_dir, ) except ImportError: # Fallback for standalone usage def discover_project(cwd=None): return os.environ.get('CODITECT_PROJECT')
def get_current_project():
return os.environ.get('CODITECT_PROJECT')
def get_project_scope(project_id=None):
if project_id and project_id.startswith('CUST-'):
return 'customer'
return 'global' if not project_id else 'project'
def get_framework_dir():
return Path.home() / "Library" / "Application Support" / "CODITECT" / "core"
def get_user_data_dir():
return Path.home() / "PROJECTS" / ".coditect-data"
---------------------------------------------------------------------------
ScopeContext dataclass
---------------------------------------------------------------------------
@dataclass class ScopeContext: """ Resolved scope context for a command invocation.
Attributes:
project: Project ID (e.g., 'PILOT', 'CUST-avivatec-fpa')
team: Team ID (e.g., 'engineering')
tenant: Tenant ID (e.g., 'avivatec')
scope_level: Data isolation level ('global', 'project', 'customer')
source: How the project was resolved ('flag', 'env', 'auto', 'none')
"""
project: Optional[str] = None
team: Optional[str] = None
tenant: Optional[str] = None
scope_level: str = 'global'
source: str = 'none'
@property
def is_scoped(self) -> bool:
"""Whether this context has any project/team/tenant scope."""
return self.project is not None or self.team is not None or self.tenant is not None
@property
def is_customer(self) -> bool:
"""Whether this is a customer-scoped context (CUST-* project)."""
return self.scope_level == 'customer'
@property
def tenant_id(self) -> Optional[str]:
"""
Get the tenant ID, inferring from project if not explicit.
CUST-avivatec-fpa -> tenant 'avivatec'
"""
if self.tenant:
return self.tenant
if self.project and self.project.startswith('CUST-'):
parts = self.project.split('-')
if len(parts) >= 2:
return parts[1]
return None
def to_dict(self) -> Dict[str, Any]:
"""Serialize to dict for logging/config."""
return {
'project': self.project,
'team': self.team,
'tenant': self.tenant,
'scope_level': self.scope_level,
'source': self.source,
}
---------------------------------------------------------------------------
Scope Resolution (ADR-159 Section 7)
---------------------------------------------------------------------------
def resolve_scope( project: Optional[str] = None, team: Optional[str] = None, tenant: Optional[str] = None, ) -> ScopeContext: """ Resolve scope from flags, env vars, or auto-detection.
Config discovery cascade (most specific wins):
1. Explicit CLI flag --project PILOT
2. Environment variable $CODITECT_PROJECT
3. Working directory detect discover_project()
4. Global scope (no project context)
Args:
project: Explicit project ID from CLI flag
team: Explicit team ID from CLI flag
tenant: Explicit tenant ID from CLI flag
Returns:
ScopeContext with resolved project, team, tenant, and scope level
"""
resolved_project = None
source = 'none'
# 1. Explicit flags take precedence
if project:
resolved_project = project
source = 'flag'
# 2. Environment variable
if not resolved_project:
env_project = os.environ.get("CODITECT_PROJECT")
if env_project:
resolved_project = env_project
source = 'env'
# 3. Auto-detect from working directory
if not resolved_project:
detected = discover_project()
if detected:
resolved_project = detected
source = 'auto'
# Determine scope level
scope_level = get_project_scope(resolved_project)
return ScopeContext(
project=resolved_project,
team=team,
tenant=tenant,
scope_level=scope_level,
source=source,
)
---------------------------------------------------------------------------
Config Discovery (ADR-159 Section 3)
---------------------------------------------------------------------------
def get_project_config(project_id: Optional[str]) -> Dict[str, Any]: """ Load project-specific config with tenant fallback.
Config discovery cascade:
1. config/projects/{id}/config.json
2. config/tenants/{tenant_id}/config.json (if CUST-* project)
3. config/config.json (global defaults)
4. Empty dict
Args:
project_id: The project ID to load config for
Returns:
Dict with merged configuration
"""
if not project_id:
return _load_global_config()
framework_dir = get_framework_dir()
# 1. Project-specific config
project_config_path = framework_dir / "config" / "projects" / project_id / "config.json"
if project_config_path.exists():
try:
with open(project_config_path) as f:
config = json.load(f)
# Merge with global defaults
global_config = _load_global_config()
global_config.update(config)
return global_config
except (json.JSONDecodeError, IOError):
pass
# 2. Infer tenant from project prefix (CUST-avivatec-fpa -> avivatec)
if project_id.startswith("CUST-"):
parts = project_id.split("-")
if len(parts) >= 2:
tenant_id = parts[1]
tenant_config_path = framework_dir / "config" / "tenants" / tenant_id / "config.json"
if tenant_config_path.exists():
try:
with open(tenant_config_path) as f:
config = json.load(f)
global_config = _load_global_config()
global_config.update(config)
return global_config
except (json.JSONDecodeError, IOError):
pass
# 3. Global config
return _load_global_config()
def get_routing_config(config_name: str) -> Dict[str, Any]: """ Load a routing config file (e.g., session-log-repos.json).
Args:
config_name: Config file name (e.g., 'session-log-repos.json')
Returns:
Dict with routing configuration
"""
framework_dir = get_framework_dir()
config_path = framework_dir / "config" / config_name
if config_path.exists():
try:
with open(config_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {}
def resolve_route( routing_config: Dict[str, Any], scope: ScopeContext, key: str = "default", ) -> Optional[str]: """ Resolve a data route from routing config and scope.
Resolution order:
1. projects.{project_id} in routing config
2. tenants.{tenant_id} in routing config
3. default value
Args:
routing_config: The routing configuration dict
scope: The resolved scope context
key: The key to look up (default: 'default')
Returns:
The resolved route value, or None
"""
# 1. Project-specific route
if scope.project:
projects = routing_config.get("projects", {})
if scope.project in projects:
return projects[scope.project]
# 2. Tenant-specific route
tenant = scope.tenant_id
if tenant:
tenants = routing_config.get("tenants", {})
if tenant in tenants:
return tenants[tenant]
# 3. Default
return routing_config.get(key)
---------------------------------------------------------------------------
SQL Helpers for project-scoped queries
---------------------------------------------------------------------------
def project_where_clause(scope: ScopeContext, column: str = "project_id") -> str: """ Generate SQL WHERE clause fragment for project scoping.
Args:
scope: The resolved scope context
column: The column name to filter on
Returns:
SQL fragment (e.g., "AND project_id = 'PILOT'") or empty string
"""
if scope.project:
# Use parameterized queries in actual code; this is for simple cases
return f"AND {column} = '{scope.project}'"
return ""
def project_sql_params(scope: ScopeContext) -> Dict[str, Any]: """ Generate SQL parameters dict for project-scoped queries.
Returns:
Dict with project/team/tenant params (only non-None values)
"""
params = {}
if scope.project:
params['project_id'] = scope.project
if scope.team:
params['team_id'] = scope.team
if scope.tenant:
params['tenant_id'] = scope.tenant
return params
---------------------------------------------------------------------------
Argparse Helpers
---------------------------------------------------------------------------
def add_scope_args(parser: argparse.ArgumentParser) -> None: """ Add standard --project/--team/--tenant arguments to an argparse parser.
This ensures all scope-aware commands use consistent flag names.
Args:
parser: The argparse.ArgumentParser to add arguments to
"""
scope_group = parser.add_argument_group('scope', 'Project/team/tenant scope options')
scope_group.add_argument(
'--project',
type=str,
default=None,
help='Project scope (e.g., PILOT, CUST-avivatec-fpa). '
'Auto-detected from working directory if not specified.',
)
scope_group.add_argument(
'--team',
type=str,
default=None,
help='Team scope (e.g., engineering). Optional grouping.',
)
scope_group.add_argument(
'--tenant',
type=str,
default=None,
help='Tenant scope (e.g., avivatec). Implies all projects within tenant.',
)
def scope_from_args(args: argparse.Namespace) -> ScopeContext: """ Create a ScopeContext from parsed argparse arguments.
Args:
args: Parsed argparse namespace (must have project, team, tenant)
Returns:
ScopeContext with resolved scope
"""
return resolve_scope(
project=getattr(args, 'project', None),
team=getattr(args, 'team', None),
tenant=getattr(args, 'tenant', None),
)
---------------------------------------------------------------------------
Internal Helpers
---------------------------------------------------------------------------
def _load_global_config() -> Dict[str, Any]: """Load the global config.json.""" framework_dir = get_framework_dir() global_config_path = framework_dir / "config" / "config.json" if global_config_path.exists(): try: with open(global_config_path) as f: return json.load(f) except (json.JSONDecodeError, IOError): pass return {}
---------------------------------------------------------------------------
CLI for testing
---------------------------------------------------------------------------
def main(): """Test scope resolution from command line.""" parser = argparse.ArgumentParser(description="CODITECT Scope Resolution (ADR-159)") add_scope_args(parser) parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') args = parser.parse_args()
scope = scope_from_args(args)
print(f"Resolved Scope:")
print(f" Project: {scope.project or '(none - global)'}")
print(f" Team: {scope.team or '(none)'}")
print(f" Tenant: {scope.tenant_id or '(none)'}")
print(f" Scope Level: {scope.scope_level}")
print(f" Source: {scope.source}")
print(f" Is Scoped: {scope.is_scoped}")
print(f" Is Customer: {scope.is_customer}")
if args.verbose and scope.project:
config = get_project_config(scope.project)
print(f"\n Project Config: {json.dumps(config, indent=2)}")
if name == "main": main()