Technical Design Document
Palantir Pattern Implementation Guide
Document Type: Technical Reference
Version: 1.0
Audience: Engineering Teams
1. Implementation Patterns from Palantir
1.1 Ontology-Oriented Development
Palantir's "Ontology SDK" (OSDK) pattern enables application development using business objects rather than data tables.
Pattern: Business Object Abstraction
// TRADITIONAL APPROACH (Anti-pattern)
// Developers work with raw data structures
async function getFlightStatus(flightId: string) {
const result = await db.query(`
SELECT f.*, a.name as aircraft_name, d.code as departure_code
FROM flights f
JOIN aircraft a ON f.aircraft_id = a.id
JOIN airports d ON f.departure_airport_id = d.id
WHERE f.id = ?
`, [flightId]);
return result[0];
}
// ONTOLOGY-ORIENTED APPROACH (Palantir Pattern)
// Developers work with business objects
async function getFlightStatus(flightId: string) {
const flight = await ontology.Flight.get(flightId);
return {
flight,
aircraft: await flight.aircraft,
departure: await flight.departureAirport,
status: flight.status,
// Actions available
availableActions: [
flight.canDelay() && 'delay',
flight.canCancel() && 'cancel',
flight.canReschedule() && 'reschedule'
].filter(Boolean)
};
}
CODITECT Application:
// CODITECT Healthcare Ontology Pattern
interface Patient {
patientId: string;
demographics: Demographics;
// Links to related objects
encounters: Encounter[];
providers: Provider[];
carePlans: CarePlan[];
claims: Claim[];
// Computed properties (functions)
riskScore(): Promise<RiskScore>;
careGaps(): Promise<CareGap[]>;
// Available actions
scheduleAppointment(provider: Provider, date: Date): Promise<Appointment>;
submitPriorAuth(procedure: Procedure): Promise<PriorAuth>;
createCarePlan(template: CarePlanTemplate): Promise<CarePlan>;
}
1.2 Agent Architecture Pattern
Pattern: Guardrail-Constrained Autonomous Agents
from dataclasses import dataclass, field
from typing import List, Optional, Callable
from enum import Enum
class GuardrailType(Enum):
REGULATORY = "regulatory"
BUSINESS = "business"
SECURITY = "security"
RESOURCE = "resource"
@dataclass
class Guardrail:
"""
Defines a constraint that agents must respect
"""
guardrail_type: GuardrailType
name: str
validation_fn: Callable[[dict], bool]
error_message: str
requires_human_override: bool = False
@dataclass
class AgentContext:
"""
Context for agent execution
"""
ontology: "OntologyClient"
guardrails: List[Guardrail]
audit_trail: "AuditTrail"
human_checkpoint_fn: Callable[["CheckpointRequest"], "CheckpointResponse"]
class AutonomousAgent:
"""
Base class for Palantir-style autonomous agents
"""
def __init__(self, context: AgentContext):
self.context = context
self.state = {}
async def execute(self, trigger: dict) -> dict:
"""
Main execution loop with guardrail checks
"""
# 1. Plan execution
plan = await self._plan(trigger)
# 2. Validate plan against guardrails
for guardrail in self.context.guardrails:
if not guardrail.validation_fn(plan):
if guardrail.requires_human_override:
response = await self._request_human_checkpoint(
guardrail, plan
)
if not response.approved:
return {"status": "blocked", "reason": guardrail.error_message}
else:
return {"status": "blocked", "reason": guardrail.error_message}
# 3. Execute with audit trail
results = []
for step in plan.steps:
self.context.audit_trail.log_step_start(step)
try:
result = await self._execute_step(step)
self.context.audit_trail.log_step_complete(step, result)
results.append(result)
except Exception as e:
self.context.audit_trail.log_step_error(step, e)
return {"status": "error", "step": step.name, "error": str(e)}
# 4. Return with full audit trail
return {
"status": "complete",
"results": results,
"audit_trail_id": self.context.audit_trail.id
}
async def _plan(self, trigger: dict) -> "ExecutionPlan":
"""Override in subclass to define planning logic"""
raise NotImplementedError
async def _execute_step(self, step: "ExecutionStep") -> dict:
"""Override in subclass to define step execution"""
raise NotImplementedError
async def _request_human_checkpoint(
self, guardrail: Guardrail, plan: "ExecutionPlan"
) -> "CheckpointResponse":
return await self.context.human_checkpoint_fn(
CheckpointRequest(
agent=self.__class__.__name__,
guardrail=guardrail.name,
context=plan.to_dict(),
options=["approve", "reject", "modify"]
)
)
1.3 LLM Orchestration Pattern
Pattern: Multi-Model Routing
from enum import Enum
from typing import Dict, Any, Optional
from dataclasses import dataclass
class ModelTier(Enum):
HAIKU = "haiku" # Fast, cheap, simple tasks
SONNET = "sonnet" # Balanced, most tasks
OPUS = "opus" # Complex reasoning, critical decisions
@dataclass
class TaskClassification:
complexity: float # 0.0 - 1.0
requires_reasoning: bool
is_regulatory: bool
is_security_sensitive: bool
estimated_tokens: int
class LLMOrchestrator:
"""
Palantir-style model routing based on task characteristics
"""
MODEL_CONFIGS = {
ModelTier.HAIKU: {
"model_id": "claude-haiku",
"max_tokens": 4096,
"cost_per_1k_tokens": 0.00025
},
ModelTier.SONNET: {
"model_id": "claude-sonnet",
"max_tokens": 8192,
"cost_per_1k_tokens": 0.003
},
ModelTier.OPUS: {
"model_id": "claude-opus",
"max_tokens": 16384,
"cost_per_1k_tokens": 0.015
}
}
def select_model(self, task: TaskClassification) -> ModelTier:
"""
Route to appropriate model based on task characteristics
"""
# Always use Opus for regulatory or security-sensitive tasks
if task.is_regulatory or task.is_security_sensitive:
return ModelTier.OPUS
# Use Opus for complex reasoning
if task.requires_reasoning and task.complexity > 0.7:
return ModelTier.OPUS
# Use Sonnet for moderate complexity
if task.complexity > 0.3 or task.requires_reasoning:
return ModelTier.SONNET
# Use Haiku for simple tasks
return ModelTier.HAIKU
async def complete(
self,
prompt: str,
task: TaskClassification,
output_format: Optional[str] = None
) -> Dict[str, Any]:
"""
Execute completion with automatic model selection
"""
model_tier = self.select_model(task)
config = self.MODEL_CONFIGS[model_tier]
response = await self._call_model(
model_id=config["model_id"],
prompt=prompt,
max_tokens=min(task.estimated_tokens * 2, config["max_tokens"]),
output_format=output_format
)
return {
"response": response,
"model_used": model_tier.value,
"estimated_cost": self._estimate_cost(response, config)
}
1.4 Bootcamp Automation Pattern
Pattern: Rapid Value Demonstration
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime, timedelta
@dataclass
class BootcampPhase:
name: str
duration_hours: int
deliverables: List[str]
success_criteria: List[str]
@dataclass
class BootcampConfig:
"""
Configuration for CODITECT "Compliance Bootcamp"
Inspired by Palantir's 5-day AIP Bootcamp
"""
name: str
duration_days: int
phases: List[BootcampPhase]
target_roi_multiplier: float
@classmethod
def healthcare_compliance(cls) -> "BootcampConfig":
"""
2-day healthcare compliance automation bootcamp
"""
return cls(
name="Healthcare Compliance Bootcamp",
duration_days=2,
target_roi_multiplier=2.0, # 2x ROI demonstrated
phases=[
BootcampPhase(
name="Day 1 AM: Data Connection",
duration_hours=4,
deliverables=[
"EHR/claims data connected",
"Sample dataset mapped to ontology",
"Data quality assessment"
],
success_criteria=[
"≥1000 records processed",
"All required fields mapped",
"Data quality score ≥80%"
]
),
BootcampPhase(
name="Day 1 PM: Ontology Setup",
duration_hours=4,
deliverables=[
"Patient object type configured",
"Provider object type configured",
"Encounter object type configured",
"Links established"
],
success_criteria=[
"3+ object types defined",
"5+ links configured",
"Sample queries working"
]
),
BootcampPhase(
name="Day 2 AM: Agent Deployment",
duration_hours=4,
deliverables=[
"Compliance checking agent deployed",
"Documentation automation running",
"Alert system configured"
],
success_criteria=[
"Agent processing live data",
"≥10 compliance checks passing",
"Alerts firing correctly"
]
),
BootcampPhase(
name="Day 2 PM: ROI Demonstration",
duration_hours=4,
deliverables=[
"ROI calculator populated",
"Time savings documented",
"Expansion roadmap presented"
],
success_criteria=[
"ROI ≥2x documented",
"Customer signs LOI",
"Next steps agreed"
]
)
]
)
class BootcampOrchestrator:
"""
Orchestrates bootcamp execution
"""
def __init__(self, config: BootcampConfig):
self.config = config
self.progress = {}
async def execute_phase(
self,
phase: BootcampPhase,
customer_data: Dict[str, Any]
) -> Dict[str, Any]:
"""
Execute a single bootcamp phase
"""
results = {
"phase": phase.name,
"started_at": datetime.now().isoformat(),
"deliverables": {},
"criteria_met": {}
}
for deliverable in phase.deliverables:
# Execute deliverable automation
result = await self._execute_deliverable(deliverable, customer_data)
results["deliverables"][deliverable] = result
for criterion in phase.success_criteria:
# Validate success criteria
met = await self._check_criterion(criterion, results["deliverables"])
results["criteria_met"][criterion] = met
results["completed_at"] = datetime.now().isoformat()
results["success"] = all(results["criteria_met"].values())
return results
def calculate_roi(
self,
hours_saved_per_week: float,
hourly_rate: float,
weeks_per_year: int = 50
) -> Dict[str, float]:
"""
Calculate ROI for customer presentation
"""
annual_savings = hours_saved_per_week * hourly_rate * weeks_per_year
return {
"hours_saved_per_week": hours_saved_per_week,
"hours_saved_per_year": hours_saved_per_week * weeks_per_year,
"annual_savings_usd": annual_savings,
"roi_multiple": annual_savings / 50000, # Assuming $50K annual cost
"payback_period_days": (50000 / annual_savings) * 365
}
2. Data Model Patterns
2.1 Temporal Data Pattern
Pattern: Bi-Temporal Versioning
from datetime import datetime
from typing import Optional, Generic, TypeVar
T = TypeVar('T')
@dataclass
class BiTemporalRecord(Generic[T]):
"""
Bi-temporal record for full history tracking
"""
record_id: str
data: T
# Transaction time: when the record was stored
transaction_time_start: datetime
transaction_time_end: Optional[datetime] # None = current
# Valid time: when the fact was true in the real world
valid_time_start: datetime
valid_time_end: Optional[datetime] # None = current
# Audit
created_by: str
superseded_by: Optional[str]
class BiTemporalStore:
"""
Storage with bi-temporal querying
"""
def get_current(self, record_id: str) -> Optional[BiTemporalRecord]:
"""Get current view of record"""
pass
def get_as_of(
self,
record_id: str,
transaction_time: datetime
) -> Optional[BiTemporalRecord]:
"""Get record as it was known at transaction time"""
pass
def get_at_time(
self,
record_id: str,
valid_time: datetime
) -> Optional[BiTemporalRecord]:
"""Get record that was valid at specified time"""
pass
def get_history(
self,
record_id: str
) -> List[BiTemporalRecord]:
"""Get full history of record"""
pass
2.2 Link Pattern
Pattern: Typed Relationships
// Palantir-style typed links between objects
interface Link<Source, Target> {
linkType: string;
source: Source;
target: Target;
properties: Record<string, unknown>;
validFrom: Date;
validTo?: Date;
}
// Example: Healthcare domain links
type PatientProviderLink = Link<Patient, Provider> & {
properties: {
relationshipType: "primary" | "specialist" | "referred";
startDate: Date;
endDate?: Date;
};
};
type EncounterClaimLink = Link<Encounter, Claim> & {
properties: {
claimType: "professional" | "institutional";
lineItems: number;
totalAmount: number;
};
};
// Link traversal
async function getPatientCareTeam(patient: Patient): Promise<Provider[]> {
const links = await ontology.links
.ofType<PatientProviderLink>("patient_provider")
.from(patient)
.where(link => !link.properties.endDate)
.execute();
return links.map(link => link.target);
}
3. Security Patterns
3.1 Row-Level Security Pattern
Pattern: Ontology-Based Access Control
from typing import Set, Callable
from dataclasses import dataclass
@dataclass
class SecurityContext:
user_id: str
roles: Set[str]
permissions: Set[str]
organization_ids: Set[str]
class OntologySecurityProvider:
"""
Implements Palantir-style ontology-aware security
"""
def __init__(self):
self.object_type_policies: Dict[str, Callable] = {}
self.property_masks: Dict[str, Dict[str, Callable]] = {}
def register_policy(
self,
object_type: str,
policy: Callable[[SecurityContext, "OntologyObject"], bool]
):
"""
Register access policy for object type
"""
self.object_type_policies[object_type] = policy
def register_property_mask(
self,
object_type: str,
property_name: str,
mask: Callable[[SecurityContext, Any], Any]
):
"""
Register property-level masking
"""
if object_type not in self.property_masks:
self.property_masks[object_type] = {}
self.property_masks[object_type][property_name] = mask
def filter_objects(
self,
objects: List["OntologyObject"],
context: SecurityContext
) -> List["OntologyObject"]:
"""
Filter objects based on access policies
"""
result = []
for obj in objects:
policy = self.object_type_policies.get(obj.object_type)
if policy is None or policy(context, obj):
masked_obj = self._apply_masks(obj, context)
result.append(masked_obj)
return result
def _apply_masks(
self,
obj: "OntologyObject",
context: SecurityContext
) -> "OntologyObject":
"""
Apply property-level masks
"""
masks = self.property_masks.get(obj.object_type, {})
for prop_name, mask_fn in masks.items():
if hasattr(obj, prop_name):
original_value = getattr(obj, prop_name)
masked_value = mask_fn(context, original_value)
setattr(obj, prop_name, masked_value)
return obj
# Example: Healthcare security policies
def patient_access_policy(context: SecurityContext, patient: "Patient") -> bool:
"""
Patients visible if:
- User is treating provider
- User is in same organization
- User has admin role
"""
if "admin" in context.roles:
return True
if patient.organization_id in context.organization_ids:
return True
if context.user_id in [p.provider_id for p in patient.providers]:
return True
return False
def ssn_mask(context: SecurityContext, ssn: str) -> str:
"""
Mask SSN unless user has PHI access
"""
if "phi_access" in context.permissions:
return ssn
return f"XXX-XX-{ssn[-4:]}" if ssn else None
3.2 Audit Trail Pattern
Pattern: Immutable Audit Log
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
import hashlib
import json
@dataclass
class AuditEntry:
"""
Immutable audit entry with cryptographic integrity
"""
entry_id: str
timestamp: datetime
actor_id: str
action_type: str
object_type: str
object_id: str
changes: Dict[str, Any]
context: Dict[str, Any]
previous_hash: str
entry_hash: str
@classmethod
def create(
cls,
actor_id: str,
action_type: str,
object_type: str,
object_id: str,
changes: Dict[str, Any],
context: Dict[str, Any],
previous_hash: str
) -> "AuditEntry":
entry_id = f"audit_{datetime.now().timestamp()}"
timestamp = datetime.now()
# Create hash chain
hash_input = json.dumps({
"entry_id": entry_id,
"timestamp": timestamp.isoformat(),
"actor_id": actor_id,
"action_type": action_type,
"object_type": object_type,
"object_id": object_id,
"changes": changes,
"previous_hash": previous_hash
}, sort_keys=True)
entry_hash = hashlib.sha256(hash_input.encode()).hexdigest()
return cls(
entry_id=entry_id,
timestamp=timestamp,
actor_id=actor_id,
action_type=action_type,
object_type=object_type,
object_id=object_id,
changes=changes,
context=context,
previous_hash=previous_hash,
entry_hash=entry_hash
)
class AuditTrail:
"""
Append-only audit trail with hash chain integrity
"""
def __init__(self):
self.entries: List[AuditEntry] = []
self._latest_hash = "genesis"
def log(
self,
actor_id: str,
action_type: str,
object_type: str,
object_id: str,
changes: Dict[str, Any],
context: Optional[Dict[str, Any]] = None
) -> AuditEntry:
"""
Log audit entry with hash chain
"""
entry = AuditEntry.create(
actor_id=actor_id,
action_type=action_type,
object_type=object_type,
object_id=object_id,
changes=changes,
context=context or {},
previous_hash=self._latest_hash
)
self.entries.append(entry)
self._latest_hash = entry.entry_hash
return entry
def verify_integrity(self) -> bool:
"""
Verify hash chain integrity
"""
if not self.entries:
return True
prev_hash = "genesis"
for entry in self.entries:
if entry.previous_hash != prev_hash:
return False
prev_hash = entry.entry_hash
return True
4. Deployment Patterns
4.1 Multi-Environment Deployment
Pattern: Apollo-Style Deployment Abstraction
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Dict, Any, List
class DeploymentEnvironment(Enum):
CLOUD = "cloud"
ON_PREMISE = "on_premise"
EDGE = "edge"
AIR_GAPPED = "air_gapped"
@dataclass
class DeploymentConfig:
environment: DeploymentEnvironment
components: List[str]
replicas: Dict[str, int]
resources: Dict[str, Dict[str, Any]]
network: Dict[str, Any]
storage: Dict[str, Any]
class DeploymentTarget(ABC):
"""
Abstract deployment target (Apollo pattern)
"""
@abstractmethod
async def deploy(self, config: DeploymentConfig) -> "DeploymentResult":
pass
@abstractmethod
async def health_check(self) -> "HealthStatus":
pass
@abstractmethod
async def rollback(self, version: str) -> "RollbackResult":
pass
class CloudDeploymentTarget(DeploymentTarget):
"""
Cloud deployment (AWS/GCP/Azure)
"""
async def deploy(self, config: DeploymentConfig) -> "DeploymentResult":
# Kubernetes deployment
pass
class OnPremiseDeploymentTarget(DeploymentTarget):
"""
On-premise deployment
"""
async def deploy(self, config: DeploymentConfig) -> "DeploymentResult":
# Docker Compose or Nomad deployment
pass
class EdgeDeploymentTarget(DeploymentTarget):
"""
Edge deployment (IoT, manufacturing)
"""
async def deploy(self, config: DeploymentConfig) -> "DeploymentResult":
# Lightweight container deployment
pass
class AirGappedDeploymentTarget(DeploymentTarget):
"""
Air-gapped deployment (classified environments)
"""
async def deploy(self, config: DeploymentConfig) -> "DeploymentResult":
# Offline package installation
pass
class DeploymentOrchestrator:
"""
Orchestrates deployment across environments
"""
def __init__(self):
self.targets: Dict[DeploymentEnvironment, DeploymentTarget] = {
DeploymentEnvironment.CLOUD: CloudDeploymentTarget(),
DeploymentEnvironment.ON_PREMISE: OnPremiseDeploymentTarget(),
DeploymentEnvironment.EDGE: EdgeDeploymentTarget(),
DeploymentEnvironment.AIR_GAPPED: AirGappedDeploymentTarget(),
}
async def deploy(self, config: DeploymentConfig) -> "DeploymentResult":
target = self.targets[config.environment]
return await target.deploy(config)
5. Integration Patterns
5.1 Data Connector Pattern
Pattern: Unified Data Integration
from abc import ABC, abstractmethod
from typing import AsyncIterator, Dict, Any
class DataConnector(ABC):
"""
Abstract data connector (Foundry pattern)
"""
@abstractmethod
async def connect(self) -> None:
pass
@abstractmethod
async def test_connection(self) -> bool:
pass
@abstractmethod
async def get_schema(self) -> Dict[str, Any]:
pass
@abstractmethod
async def read(
self,
query: Dict[str, Any]
) -> AsyncIterator[Dict[str, Any]]:
pass
@abstractmethod
async def write(
self,
records: List[Dict[str, Any]]
) -> int:
pass
class EHRConnector(DataConnector):
"""
Connector for Electronic Health Record systems
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.ehr_type = config.get("ehr_type") # Epic, Cerner, etc.
async def connect(self) -> None:
# FHIR or proprietary API connection
pass
async def read(
self,
query: Dict[str, Any]
) -> AsyncIterator[Dict[str, Any]]:
# Read from FHIR API
pass
class ClaimsConnector(DataConnector):
"""
Connector for claims/billing systems
"""
pass
class DataIntegrationPipeline:
"""
Orchestrates data integration from multiple sources
"""
def __init__(self, connectors: List[DataConnector]):
self.connectors = connectors
async def sync(self) -> Dict[str, Any]:
"""
Sync data from all connectors to ontology
"""
results = {}
for connector in self.connectors:
await connector.connect()
async for record in connector.read({}):
# Transform and load to ontology
pass
return results
Technical Design Document v1.0 — February 2026