Skip to main content

scripts-onboarding-telemetry

#!/usr/bin/env python3 """

title: "Get script directory for path resolution (works from any cwd)" component_type: script version: "1.0.0" audience: contributor status: stable summary: "CODITECT Onboarding Telemetry" keywords: ['analysis', 'onboarding', 'telemetry'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "onboarding-telemetry.py" language: python executable: true usage: "python3 scripts/onboarding-telemetry.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

CODITECT Onboarding Telemetry

Tracks completion rates, drop-off points, and learning patterns for continuous improvement of the onboarding experience.

Usage: python3 scripts/onboarding-telemetry.py --dashboard python3 scripts/onboarding-telemetry.py --report python3 scripts/onboarding-telemetry.py --record-event phase_complete --data '{"phase": 3}' python3 scripts/onboarding-telemetry.py --analyze-dropoff """

import argparse import json import os from collections import defaultdict from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional, List, Dict, Any

Get script directory for path resolution (works from any cwd)

SCRIPT_DIR = Path(file).resolve().parent CORE_ROOT = SCRIPT_DIR.parent

Telemetry storage

TELEMETRY_DIR = CORE_ROOT / ".coditect" / "telemetry" EVENTS_FILE = TELEMETRY_DIR / "onboarding_events.jsonl" AGGREGATES_FILE = TELEMETRY_DIR / "onboarding_aggregates.json" PROGRESS_FILE = CORE_ROOT / ".coditect" / "onboarding-full-progress.json"

Event types

EVENT_TYPES = { "session_start": "User started onboarding", "session_resume": "User resumed onboarding", "phase_start": "User started a phase", "phase_complete": "User completed a phase", "phase_skip": "User skipped a phase", "exercise_complete": "User completed an exercise", "quiz_attempt": "User attempted a quiz", "badge_earned": "User earned a badge", "session_exit": "User exited onboarding", "session_complete": "User completed all onboarding", "error_encountered": "User encountered an error", "help_requested": "User requested help", }

Phase names for reporting

PHASE_NAMES = { 0: "Welcome & Assessment", 1: "Big Picture", 2: "Commands", 3: "Agents", 4: "Skills", 5: "Scripts", 6: "Tools", 7: "Workflows", 8: "Cookbook", 9: "Integration", 10: "Final Check", }

def get_timestamp() -> str: """Get current ISO timestamp.""" return datetime.now(timezone.utc).isoformat()

def ensure_telemetry_dir(): """Ensure telemetry directory exists.""" TELEMETRY_DIR.mkdir(parents=True, exist_ok=True)

def record_event(event_type: str, data: Optional[Dict] = None) -> Dict: """Record a telemetry event.""" ensure_telemetry_dir()

event = {
"timestamp": get_timestamp(),
"event_type": event_type,
"data": data or {},
"session_id": get_session_id(),
}

# Append to events file
with open(EVENTS_FILE, 'a') as f:
f.write(json.dumps(event) + "\n")

return event

def get_session_id() -> Optional[str]: """Get current session ID from progress file.""" if PROGRESS_FILE.exists(): try: with open(PROGRESS_FILE) as f: progress = json.load(f) return progress.get("started_at", "unknown") except: pass return "unknown"

def load_all_events() -> List[Dict]: """Load all telemetry events.""" events = [] if EVENTS_FILE.exists(): with open(EVENTS_FILE) as f: for line in f: try: events.append(json.loads(line.strip())) except json.JSONDecodeError: continue return events

def calculate_aggregates() -> Dict[str, Any]: """Calculate aggregate metrics from events.""" events = load_all_events()

if not events:
return {"error": "No telemetry data available"}

# Group events by session
sessions = defaultdict(list)
for event in events:
sessions[event.get("session_id", "unknown")].append(event)

# Calculate metrics
total_sessions = len(sessions)
completed_sessions = 0
phase_completion_counts = defaultdict(int)
phase_skip_counts = defaultdict(int)
phase_dropoff_counts = defaultdict(int)
time_per_phase = defaultdict(list)
badge_counts = defaultdict(int)
quiz_scores = []

for session_id, session_events in sessions.items():
# Sort by timestamp
session_events.sort(key=lambda x: x.get("timestamp", ""))

# Track phase progression
phases_completed = set()
phases_started = set()
last_phase_time = None

for event in session_events:
event_type = event.get("event_type")
data = event.get("data", {})

if event_type == "session_complete":
completed_sessions += 1

elif event_type == "phase_start":
phase = data.get("phase")
if phase is not None:
phases_started.add(phase)
last_phase_time = event.get("timestamp")

elif event_type == "phase_complete":
phase = data.get("phase")
if phase is not None:
phases_completed.add(phase)
phase_completion_counts[phase] += 1

# Calculate time spent
if last_phase_time:
try:
start = datetime.fromisoformat(last_phase_time.replace('Z', '+00:00'))
end = datetime.fromisoformat(event["timestamp"].replace('Z', '+00:00'))
duration = (end - start).total_seconds()
time_per_phase[phase].append(duration)
except:
pass

elif event_type == "phase_skip":
phase = data.get("phase")
if phase is not None:
phase_skip_counts[phase] += 1

elif event_type == "badge_earned":
badge = data.get("badge")
if badge:
badge_counts[badge] += 1

elif event_type == "quiz_attempt":
score = data.get("score")
if score is not None:
quiz_scores.append(score)

# Calculate dropoff (started but not completed phases)
for phase in phases_started:
if phase not in phases_completed:
phase_dropoff_counts[phase] += 1

# Calculate averages
avg_time_per_phase = {}
for phase, times in time_per_phase.items():
if times:
avg_time_per_phase[phase] = sum(times) / len(times)

# Completion rate
completion_rate = (completed_sessions / total_sessions * 100) if total_sessions > 0 else 0

# Average quiz score
avg_quiz_score = sum(quiz_scores) / len(quiz_scores) if quiz_scores else 0

aggregates = {
"generated_at": get_timestamp(),
"total_sessions": total_sessions,
"completed_sessions": completed_sessions,
"completion_rate_percent": round(completion_rate, 1),
"phase_completion_counts": dict(phase_completion_counts),
"phase_skip_counts": dict(phase_skip_counts),
"phase_dropoff_counts": dict(phase_dropoff_counts),
"avg_time_per_phase_seconds": {k: round(v, 1) for k, v in avg_time_per_phase.items()},
"badge_counts": dict(badge_counts),
"avg_quiz_score": round(avg_quiz_score, 1),
"total_events": len(events),
}

# Save aggregates
with open(AGGREGATES_FILE, 'w') as f:
json.dump(aggregates, f, indent=2)

return aggregates

def analyze_dropoff() -> Dict[str, Any]: """Analyze where users drop off most frequently.""" aggregates = calculate_aggregates()

if "error" in aggregates:
return aggregates

dropoffs = aggregates.get("phase_dropoff_counts", {})
completions = aggregates.get("phase_completion_counts", {})

# Calculate dropoff rate per phase
dropoff_analysis = []
for phase_num in range(11):
phase_str = str(phase_num)
dropped = dropoffs.get(phase_str, 0)
completed = completions.get(phase_str, 0)
total = dropped + completed

if total > 0:
dropoff_rate = (dropped / total) * 100
dropoff_analysis.append({
"phase": phase_num,
"name": PHASE_NAMES.get(phase_num, f"Phase {phase_num}"),
"dropoff_count": dropped,
"completion_count": completed,
"dropoff_rate_percent": round(dropoff_rate, 1),
"severity": "HIGH" if dropoff_rate > 30 else "MEDIUM" if dropoff_rate > 15 else "LOW"
})

# Sort by dropoff rate
dropoff_analysis.sort(key=lambda x: x["dropoff_rate_percent"], reverse=True)

return {
"analysis_type": "dropoff",
"generated_at": get_timestamp(),
"phases_by_dropoff_rate": dropoff_analysis,
"recommendations": generate_recommendations(dropoff_analysis),
}

def generate_recommendations(dropoff_analysis: List[Dict]) -> List[str]: """Generate recommendations based on dropoff analysis.""" recommendations = []

for item in dropoff_analysis:
if item["severity"] == "HIGH":
phase = item["phase"]
name = item["name"]
rate = item["dropoff_rate_percent"]

recommendations.append(
f"HIGH PRIORITY: Phase {phase} ({name}) has {rate}% dropoff rate. "
f"Consider simplifying content or adding more engagement."
)

# Specific recommendations by phase
if phase == 3: # Agents
recommendations.append(
" - Agents phase may be too complex. Consider adding more analogies "
"and breaking the Task() pattern into smaller steps."
)
elif phase == 7: # Workflows
recommendations.append(
" - Workflows phase may feel abstract. Consider adding more "
"concrete, relatable examples."
)

if not recommendations:
recommendations.append("No critical dropoff points detected. Onboarding flow is performing well.")

return recommendations

def generate_dashboard() -> str: """Generate ASCII dashboard of telemetry metrics.""" aggregates = calculate_aggregates()

if "error" in aggregates:
return f"Error: {aggregates['error']}"

# Build dashboard
dashboard = """

╔══════════════════════════════════════════════════════════════════════════════╗ ║ CODITECT ONBOARDING TELEMETRY DASHBOARD ║ ╠══════════════════════════════════════════════════════════════════════════════╣ ║ ║ ║ OVERVIEW ║ ║ ──────── ║ """

total = aggregates.get("total_sessions", 0)
completed = aggregates.get("completed_sessions", 0)
rate = aggregates.get("completion_rate_percent", 0)

# Progress bar
bar_width = 30
filled = int(bar_width * rate / 100) if rate else 0
bar = "█" * filled + "░" * (bar_width - filled)

dashboard += f"║ Total Sessions: {total:<10} ║\n"
dashboard += f"║ Completed Sessions: {completed:<10} ║\n"
dashboard += f"║ Completion Rate: [{bar}] {rate}% ║\n"
dashboard += f"║ Avg Quiz Score: {aggregates.get('avg_quiz_score', 0)}/7 ║\n"

dashboard += """║ ║

║ PHASE COMPLETION ║ ║ ──────────────── ║ """

completions = aggregates.get("phase_completion_counts", {})
dropoffs = aggregates.get("phase_dropoff_counts", {})

for phase_num in range(11):
phase_str = str(phase_num)
completed = completions.get(phase_str, 0)
dropped = dropoffs.get(phase_str, 0)
name = PHASE_NAMES.get(phase_num, f"Phase {phase_num}")[:20]

# Mini progress bar
total_attempts = completed + dropped
if total_attempts > 0:
success_rate = completed / total_attempts
mini_bar = "█" * int(10 * success_rate) + "░" * (10 - int(10 * success_rate))
else:
mini_bar = "░" * 10

dashboard += f"║ {phase_num:2}. {name:<20} [{mini_bar}] {completed:3} completed, {dropped:3} dropped ║\n"

dashboard += """║ ║

║ BADGES EARNED ║ ║ ───────────── ║ """

badge_counts = aggregates.get("badge_counts", {})
badge_icons = {
"command_explorer": "💻",
"agent_invoker": "🤖",
"skill_spotter": "⚡",
"script_runner": "📜",
"tool_aware": "🔧",
"workflow_designer": "🔄",
"recipe_finder": "📖",
"onboarding_complete": "🎓",
"speed_learner": "⚡",
"perfect_score": "💯",
}

badge_line = " "
for badge, count in sorted(badge_counts.items(), key=lambda x: -x[1])[:8]:
icon = badge_icons.get(badge, "🏆")
badge_line += f"{icon}×{count} "

dashboard += f"║{badge_line:<76}║\n"

dashboard += """║ ║

╚══════════════════════════════════════════════════════════════════════════════╝ """

return dashboard

def generate_report() -> str: """Generate detailed text report.""" aggregates = calculate_aggregates() dropoff = analyze_dropoff()

report = f"""

CODITECT ONBOARDING TELEMETRY REPORT Generated: {get_timestamp()}

EXECUTIVE SUMMARY

Total Onboarding Sessions: {aggregates.get('total_sessions', 0)} Completion Rate: {aggregates.get('completion_rate_percent', 0)}% Average Quiz Score: {aggregates.get('avg_quiz_score', 0)}/7

PHASE-BY-PHASE ANALYSIS

"""

completions = aggregates.get("phase_completion_counts", {})
dropoffs = aggregates.get("phase_dropoff_counts", {})
times = aggregates.get("avg_time_per_phase_seconds", {})

for phase_num in range(11):
phase_str = str(phase_num)
name = PHASE_NAMES.get(phase_num, f"Phase {phase_num}")
completed = completions.get(phase_str, 0)
dropped = dropoffs.get(phase_str, 0)
avg_time = times.get(phase_str, 0)

total = completed + dropped
success_rate = (completed / total * 100) if total > 0 else 0

report += f"""

Phase {phase_num}: {name}

  • Completions: {completed}

  • Dropoffs: {dropped}

  • Success Rate: {success_rate:.1f}%

  • Avg Time: {avg_time:.0f} seconds """

    report += """ DROPOFF ANALYSIS


"""

for item in dropoff.get("phases_by_dropoff_rate", [])[:5]:
report += f"""

{item['name']} (Phase {item['phase']}):

  • Dropoff Rate: {item['dropoff_rate_percent']}%

  • Severity: {item['severity']} """

    report += """ RECOMMENDATIONS


"""

for rec in dropoff.get("recommendations", []):
report += f"• {rec}\n"

report += """

================================================================================ End of Report """

return report

def main(): parser = argparse.ArgumentParser(description="CODITECT Onboarding Telemetry") parser.add_argument("--dashboard", action="store_true", help="Show ASCII dashboard") parser.add_argument("--report", action="store_true", help="Generate detailed report") parser.add_argument("--record-event", type=str, help="Record an event") parser.add_argument("--data", type=str, help="JSON data for event") parser.add_argument("--analyze-dropoff", action="store_true", help="Analyze dropoff points") parser.add_argument("--json", action="store_true", help="Output as JSON")

args = parser.parse_args()

ensure_telemetry_dir()

if args.record_event:
data = None
if args.data:
try:
data = json.loads(args.data)
except json.JSONDecodeError:
print("Error: Invalid JSON data")
return

event = record_event(args.record_event, data)
print(f"✅ Event recorded: {args.record_event}")
if args.json:
print(json.dumps(event, indent=2))
return

if args.dashboard:
print(generate_dashboard())
return

if args.report:
print(generate_report())
return

if args.analyze_dropoff:
analysis = analyze_dropoff()
if args.json:
print(json.dumps(analysis, indent=2))
else:
print("\nDROPOFF ANALYSIS")
print("=" * 60)
for item in analysis.get("phases_by_dropoff_rate", []):
severity_icon = "🔴" if item["severity"] == "HIGH" else "🟡" if item["severity"] == "MEDIUM" else "🟢"
print(f"{severity_icon} Phase {item['phase']}: {item['name']}")
print(f" Dropoff Rate: {item['dropoff_rate_percent']}%")
print(f" Completed: {item['completion_count']}, Dropped: {item['dropoff_count']}")
print()

print("\nRECOMMENDATIONS:")
for rec in analysis.get("recommendations", []):
print(f" • {rec}")
return

# Default: show dashboard
print(generate_dashboard())

if name == "main": main()