Skip to main content

scripts-configuration-manager

#!/usr/bin/env python3 """

title: "Configuration Manager" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Multi-tenant cascading configuration for CODITECT" keywords: ['config', 'tenant', 'team', 'project', 'user', 'hierarchy'] tokens: ~300 created: 2026-01-28 updated: 2026-01-28 script_name: "configuration_manager.py" language: python executable: true usage: "from scripts.core.configuration_manager import ConfigurationManager" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Configuration Manager for CODITECT.

Implements cascading configuration resolution: platform → tenant → team → project → user

Higher levels override lower levels. User preferences have highest priority.

Track: J.13 (Memory - Generic Session Export) Task: J.13.1.2 """

from future import annotations

import json import os from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, TypeVar, Generic from datetime import datetime

T = TypeVar('T')

@dataclass class ConfigSource: """Represents a configuration source.""" level: str # platform, tenant, team, project, user path: Path loaded: bool = False data: Dict[str, Any] = field(default_factory=dict) load_time: Optional[datetime] = None error: Optional[str] = None

class ConfigurationManager: """ Multi-tenant cascading configuration manager.

Configuration hierarchy (lowest to highest priority):
1. Platform defaults (CODITECT-provided)
2. Tenant overrides (organization-level)
3. Team overrides (team-level)
4. Project overrides (project-level)
5. User preferences (highest priority)

All levels are optional except platform defaults.
"""

# Default configuration values
DEFAULTS: Dict[str, Any] = {
"llm_tools": {
"enabled": ["claude", "codex", "gemini"],
"default": "claude",
"auto_detect": True
},
"export": {
"default_format": "jsonl",
"include_tool_results": True,
"include_system_prompts": False,
"include_thinking": True,
"max_message_length": 100000,
"truncate_tool_results": 50000
},
"session_discovery": {
"max_age_days": 30,
"exclude_patterns": ["*.tmp", "*.backup", "*.bak"],
"include_hidden": False
},
"output": {
"default_directory": "~/.coditect-data/exports/",
"filename_template": "{llm}-{date}-{session_id}.{format}",
"compression": False
},
"reconstruction": {
"preserve_timestamps": True,
"preserve_message_ids": True,
"deduplicate": True
}
}

def __init__(
self,
config_root: Optional[Path] = None,
tenant_id: Optional[str] = None,
team_id: Optional[str] = None,
project_id: Optional[str] = None
):
"""
Initialize ConfigurationManager.

Args:
config_root: Root directory for config files. Defaults to ~/.coditect-data/config/
tenant_id: Current tenant ID (for multi-tenant lookup)
team_id: Current team ID
project_id: Current project ID
"""
if config_root is None:
config_root = Path(os.path.expanduser("~/.coditect-data/config"))

self.config_root = config_root
self.tenant_id = tenant_id
self.team_id = team_id
self.project_id = project_id

self._sources: Dict[str, ConfigSource] = {}
self._cache: Dict[str, Any] = {}
self._cache_valid = False

self._init_sources()

def _init_sources(self) -> None:
"""Initialize configuration sources in priority order."""
# Platform config (lowest priority)
self._sources["platform"] = ConfigSource(
level="platform",
path=self.config_root / "platform.json"
)

# Tenant config
if self.tenant_id:
self._sources["tenant"] = ConfigSource(
level="tenant",
path=self.config_root / "tenant" / f"{self.tenant_id}.json"
)

# Team config
if self.team_id:
self._sources["team"] = ConfigSource(
level="team",
path=self.config_root / "team" / f"{self.team_id}.json"
)

# Project config
if self.project_id:
self._sources["project"] = ConfigSource(
level="project",
path=self.config_root / "project" / f"{self.project_id}.json"
)

# User config (highest priority)
self._sources["user"] = ConfigSource(
level="user",
path=self.config_root / "user.json"
)

def load_all(self, force_reload: bool = False) -> None:
"""
Load all configuration sources.

Args:
force_reload: Force reload even if already loaded
"""
for source in self._sources.values():
if force_reload or not source.loaded:
self._load_source(source)

self._cache_valid = False # Invalidate cache after reload

def _load_source(self, source: ConfigSource) -> None:
"""Load a single configuration source."""
source.loaded = True
source.load_time = datetime.now()

if not source.path.exists():
source.data = {}
return

try:
with open(source.path, 'r', encoding='utf-8') as f:
source.data = json.load(f)
except json.JSONDecodeError as e:
source.error = f"JSON parse error: {e}"
source.data = {}
except IOError as e:
source.error = f"IO error: {e}"
source.data = {}

def get(self, key: str, default: Any = None) -> Any:
"""
Get configuration value with cascading resolution.

Supports dot notation for nested keys: "export.default_format"

Args:
key: Configuration key (supports dot notation)
default: Default value if key not found

Returns:
Configuration value from highest-priority source
"""
# Ensure sources are loaded
self.load_all()

# Check cache first
cache_key = f"get:{key}"
if self._cache_valid and cache_key in self._cache:
return self._cache[cache_key]

# Parse key path
key_parts = key.split('.')

# Try each source in reverse priority order (user first)
for level in reversed(["platform", "tenant", "team", "project", "user"]):
source = self._sources.get(level)
if not source or not source.data:
continue

value = self._get_nested(source.data, key_parts)
if value is not None:
self._cache[cache_key] = value
return value

# Check defaults
value = self._get_nested(self.DEFAULTS, key_parts)
if value is not None:
self._cache[cache_key] = value
return value

return default

def _get_nested(self, data: Dict[str, Any], keys: List[str]) -> Any:
"""Get nested value from dict using key path."""
current = data
for key in keys:
if not isinstance(current, dict):
return None
if key not in current:
return None
current = current[key]
return current

def get_section(self, section: str) -> Dict[str, Any]:
"""
Get entire configuration section with merged values.

Args:
section: Section name (e.g., "export", "llm_tools")

Returns:
Merged configuration dict for section
"""
self.load_all()

result: Dict[str, Any] = {}

# Start with defaults
if section in self.DEFAULTS:
result = dict(self.DEFAULTS[section])

# Merge each level in order
for level in ["platform", "tenant", "team", "project", "user"]:
source = self._sources.get(level)
if source and source.data and section in source.data:
self._deep_merge(result, source.data[section])

return result

def _deep_merge(self, base: Dict[str, Any], override: Dict[str, Any]) -> None:
"""Deep merge override into base (in-place)."""
for key, value in override.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
self._deep_merge(base[key], value)
else:
base[key] = value

def set_user(self, key: str, value: Any) -> None:
"""
Set user-level configuration value.

Args:
key: Configuration key (supports dot notation)
value: Value to set
"""
user_source = self._sources.get("user")
if not user_source:
return

# Ensure user config is loaded
if not user_source.loaded:
self._load_source(user_source)

# Parse key path and set value
key_parts = key.split('.')
self._set_nested(user_source.data, key_parts, value)

# Save user config
self._save_source(user_source)

# Invalidate cache
self._cache_valid = False

def _set_nested(self, data: Dict[str, Any], keys: List[str], value: Any) -> None:
"""Set nested value in dict using key path."""
for key in keys[:-1]:
if key not in data:
data[key] = {}
data = data[key]
data[keys[-1]] = value

def _save_source(self, source: ConfigSource) -> None:
"""Save configuration source to disk."""
try:
source.path.parent.mkdir(parents=True, exist_ok=True)
with open(source.path, 'w', encoding='utf-8') as f:
json.dump(source.data, f, indent=2)
except IOError as e:
source.error = f"Save error: {e}"

def get_effective_config(self) -> Dict[str, Any]:
"""
Get complete effective configuration with all merges applied.

Returns:
Full merged configuration dict
"""
self.load_all()

result: Dict[str, Any] = dict(self.DEFAULTS)

for level in ["platform", "tenant", "team", "project", "user"]:
source = self._sources.get(level)
if source and source.data:
self._deep_merge(result, source.data)

return result

def get_source_info(self) -> List[Dict[str, Any]]:
"""Get information about all configuration sources."""
self.load_all()

return [
{
"level": source.level,
"path": str(source.path),
"exists": source.path.exists(),
"loaded": source.loaded,
"load_time": source.load_time.isoformat() if source.load_time else None,
"error": source.error,
"keys": list(source.data.keys()) if source.data else []
}
for source in self._sources.values()
]

def validate(self) -> List[str]:
"""
Validate configuration.

Returns:
List of validation errors (empty if valid)
"""
errors: List[str] = []
config = self.get_effective_config()

# Validate llm_tools
llm_tools = config.get("llm_tools", {})
enabled = llm_tools.get("enabled", [])
default_llm = llm_tools.get("default")

if default_llm and default_llm not in enabled:
errors.append(f"Default LLM '{default_llm}' not in enabled list")

# Validate export settings
export = config.get("export", {})
valid_formats = ["jsonl", "json", "sqlite"]
default_format = export.get("default_format")
if default_format and default_format not in valid_formats:
errors.append(f"Invalid default_format: {default_format}")

# Validate session_discovery
discovery = config.get("session_discovery", {})
max_age = discovery.get("max_age_days")
if max_age is not None and (not isinstance(max_age, int) or max_age < 0):
errors.append(f"Invalid max_age_days: {max_age}")

return errors

Singleton instance

_config_manager: Optional[ConfigurationManager] = None

def get_config_manager( tenant_id: Optional[str] = None, team_id: Optional[str] = None, project_id: Optional[str] = None ) -> ConfigurationManager: """Get or create singleton ConfigurationManager instance.""" global _config_manager if _config_manager is None: _config_manager = ConfigurationManager( tenant_id=tenant_id, team_id=team_id, project_id=project_id ) return _config_manager

def get_config(key: str, default: Any = None) -> Any: """Convenience function to get configuration value.""" return get_config_manager().get(key, default)

if name == "main": import argparse

parser = argparse.ArgumentParser(description="Configuration Manager")
parser.add_argument("--get", help="Get configuration value")
parser.add_argument("--section", help="Get configuration section")
parser.add_argument("--all", action="store_true", help="Show all effective config")
parser.add_argument("--sources", action="store_true", help="Show source info")
parser.add_argument("--validate", action="store_true", help="Validate configuration")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()

manager = ConfigurationManager()

if args.get:
value = manager.get(args.get)
if args.json:
print(json.dumps({"key": args.get, "value": value}))
else:
print(f"{args.get} = {value}")

elif args.section:
section = manager.get_section(args.section)
print(json.dumps(section, indent=2))

elif args.all:
config = manager.get_effective_config()
print(json.dumps(config, indent=2))

elif args.sources:
sources = manager.get_source_info()
if args.json:
print(json.dumps(sources, indent=2))
else:
for src in sources:
status = "✓" if src["exists"] else "✗"
print(f"{status} [{src['level']}] {src['path']}")
if src["error"]:
print(f" Error: {src['error']}")

elif args.validate:
errors = manager.validate()
if errors:
print("Validation errors:")
for err in errors:
print(f" - {err}")
else:
print("Configuration is valid")

else:
# Show summary
config = manager.get_effective_config()
print("CODITECT Configuration Summary")
print("=" * 40)
print(f"LLM Tools: {config.get('llm_tools', {}).get('enabled', [])}")
print(f"Default LLM: {config.get('llm_tools', {}).get('default', 'auto')}")
print(f"Export Format: {config.get('export', {}).get('default_format', 'jsonl')}")