Technical Design Document (TDD)
Coditect Activity Dashboard
Document Version: 1.0
Date: November 27, 2025
Status: Draft
Author: Engineering Team
1. Overview
1.1 Purpose
This Technical Design Document provides implementation-level specifications for the Coditect Activity Dashboard, including detailed algorithms, data structures, and code patterns required for development.
1.2 Scope
- Task-to-Session/Commit linking algorithms
- Progress calculation logic
- Real-time update mechanisms
- Frontend component implementations
- Backend service implementations
2. Task Linking System
2.1 Commit-to-Task Linking Algorithm
from dataclasses import dataclass
from typing import List, Optional, Tuple
import re
from difflib import SequenceMatcher
@dataclass
class LinkResult:
task_id: str
confidence: float
link_type: str # 'explicit' | 'inferred'
evidence: str
class CommitTaskLinker:
"""Links git commits to tasks using multiple strategies"""
# Patterns for explicit task references
EXPLICIT_PATTERNS = [
r'#TASK-(\d+)', # #TASK-123
r'closes?\s+#(\d+)', # closes #123
r'fixes?\s+#(\d+)', # fixes #123
r'resolves?\s+#(\d+)', # resolves #123
r'\[TASK-(\d+)\]', # [TASK-123]
]
TITLE_SIMILARITY_THRESHOLD = 0.7
FILE_OVERLAP_THRESHOLD = 0.3
def __init__(self, task_repository):
self.task_repo = task_repository
async def link_commit(
self,
commit: GitCommit,
project_id: str
) -> List[LinkResult]:
"""
Main entry point for linking a commit to tasks.
Strategy priority:
1. Explicit references in commit message
2. Title similarity matching
3. File path overlap analysis
"""
results = []
# Strategy 1: Explicit references
explicit_links = await self._find_explicit_references(
commit.message,
project_id
)
results.extend(explicit_links)
# If we found explicit links, return early with high confidence
if explicit_links:
return results
# Strategy 2: Title similarity
title_links = await self._find_by_title_similarity(
commit.message,
project_id
)
results.extend(title_links)
# Strategy 3: File overlap (additive, not exclusive)
file_links = await self._find_by_file_overlap(
commit.files_changed,
project_id
)
# Merge file links with existing results
results = self._merge_link_results(results, file_links)
return results
async def _find_explicit_references(
self,
message: str,
project_id: str
) -> List[LinkResult]:
"""Find explicit task references in commit message"""
results = []
for pattern in self.EXPLICIT_PATTERNS:
matches = re.findall(pattern, message, re.IGNORECASE)
for match in matches:
task = await self.task_repo.find_by_reference(
match,
project_id
)
if task:
results.append(LinkResult(
task_id=task.id,
confidence=1.0,
link_type='explicit',
evidence=f"Explicit reference: {match}"
))
return results
async def _find_by_title_similarity(
self,
message: str,
project_id: str
) -> List[LinkResult]:
"""Find tasks with similar titles to commit message"""
results = []
# Get first line of commit message (conventional commit title)
title = message.split('\n')[0]
# Remove conventional commit prefix
title = re.sub(r'^(feat|fix|chore|docs|refactor|test)\([^)]*\):\s*', '', title)
# Get all tasks for project
tasks = await self.task_repo.get_by_project(project_id)
for task in tasks:
similarity = SequenceMatcher(
None,
title.lower(),
task.title.lower()
).ratio()
if similarity >= self.TITLE_SIMILARITY_THRESHOLD:
results.append(LinkResult(
task_id=task.id,
confidence=similarity,
link_type='inferred',
evidence=f"Title similarity: {similarity:.2f}"
))
return sorted(results, key=lambda x: x.confidence, reverse=True)[:3]
async def _find_by_file_overlap(
self,
files_changed: List[str],
project_id: str
) -> List[LinkResult]:
"""Find tasks based on file path overlap with previous commits"""
results = []
# Get tasks with linked commits
tasks_with_commits = await self.task_repo.get_with_commit_files(
project_id
)
for task_id, task_files in tasks_with_commits.items():
overlap = len(set(files_changed) & set(task_files))
total = len(set(files_changed) | set(task_files))
if total > 0:
overlap_ratio = overlap / total
if overlap_ratio >= self.FILE_OVERLAP_THRESHOLD:
results.append(LinkResult(
task_id=task_id,
confidence=min(0.8, overlap_ratio),
link_type='inferred',
evidence=f"File overlap: {overlap}/{total} files"
))
return results
def _merge_link_results(
self,
existing: List[LinkResult],
new: List[LinkResult]
) -> List[LinkResult]:
"""Merge link results, taking highest confidence for duplicates"""
result_map = {r.task_id: r for r in existing}
for link in new:
if link.task_id in result_map:
if link.confidence > result_map[link.task_id].confidence:
result_map[link.task_id] = link
else:
result_map[link.task_id] = link
return list(result_map.values())
2.2 Session-to-Task Linking Algorithm
from typing import List, Set
import spacy
from collections import Counter
class SessionTaskLinker:
"""Links LLM sessions to tasks using NLP analysis"""
KEYWORD_MATCH_THRESHOLD = 3
CONTEXT_WINDOW_SIZE = 500 # characters
def __init__(self, task_repository):
self.task_repo = task_repository
self.nlp = spacy.load("en_core_web_sm")
async def link_session(
self,
session: LLMSession,
project_id: str
) -> List[LinkResult]:
"""
Link session to tasks based on content analysis.
Strategy:
1. Extract keywords from session summary
2. Extract named entities (file names, features)
3. Match against task titles and descriptions
"""
results = []
# Get session text content
text = await self._get_session_text(session)
# Extract keywords and entities
keywords = self._extract_keywords(text)
entities = self._extract_entities(text)
# Get all tasks for project
tasks = await self.task_repo.get_by_project(project_id)
for task in tasks:
score = self._calculate_match_score(
keywords,
entities,
task
)
if score >= self.KEYWORD_MATCH_THRESHOLD:
confidence = min(0.9, score / 10)
results.append(LinkResult(
task_id=task.id,
confidence=confidence,
link_type='inferred',
evidence=f"Keyword match score: {score}"
))
return sorted(results, key=lambda x: x.confidence, reverse=True)[:5]
async def _get_session_text(self, session: LLMSession) -> str:
"""Get combined text from session summary and messages"""
texts = []
if session.summary:
texts.append(session.summary)
# Get user messages (not assistant responses)
for msg in session.messages[:20]: # Limit to first 20
if msg.role == 'user':
texts.append(msg.content[:self.CONTEXT_WINDOW_SIZE])
return ' '.join(texts)
def _extract_keywords(self, text: str) -> Set[str]:
"""Extract significant keywords from text"""
doc = self.nlp(text.lower())
# Get nouns and proper nouns
keywords = set()
for token in doc:
if token.pos_ in ('NOUN', 'PROPN') and len(token.text) > 2:
keywords.add(token.lemma_)
return keywords
def _extract_entities(self, text: str) -> Set[str]:
"""Extract named entities and patterns"""
entities = set()
# File paths
file_pattern = r'[\w/-]+\.(py|ts|js|tsx|jsx|go|rs|java)'
entities.update(re.findall(file_pattern, text))
# CamelCase identifiers
camel_pattern = r'\b[A-Z][a-z]+(?:[A-Z][a-z]+)+\b'
entities.update(re.findall(camel_pattern, text))
# snake_case identifiers
snake_pattern = r'\b[a-z]+(?:_[a-z]+)+\b'
entities.update(re.findall(snake_pattern, text))
return entities
def _calculate_match_score(
self,
keywords: Set[str],
entities: Set[str],
task: Task
) -> float:
"""Calculate match score between extracted terms and task"""
score = 0.0
task_text = f"{task.title} {task.description or ''}".lower()
task_keywords = self._extract_keywords(task_text)
# Keyword overlap
keyword_overlap = len(keywords & task_keywords)
score += keyword_overlap * 1.5
# Direct title match
for keyword in keywords:
if keyword in task.title.lower():
score += 2.0
# Entity matches
for entity in entities:
if entity.lower() in task_text:
score += 1.0
return score
3. Progress Calculation
3.1 Project Progress Calculator
from dataclasses import dataclass
from typing import Dict, List
from decimal import Decimal
@dataclass
class ProgressMetrics:
project_id: str
total_tasks: int
completed_tasks: int
progress_pct: Decimal
tasks_by_status: Dict[str, int]
velocity_week: int # Tasks completed this week
velocity_trend: str # 'up' | 'down' | 'stable'
class ProgressCalculator:
"""Calculates progress metrics from task checkbox state"""
def __init__(self, task_repository, commit_repository):
self.task_repo = task_repository
self.commit_repo = commit_repository
async def calculate_project_progress(
self,
project_id: str
) -> ProgressMetrics:
"""
Calculate progress based on checkbox state.
Progress = checked_tasks / total_tasks * 100
"""
tasks = await self.task_repo.get_by_project(project_id)
total = len(tasks)
completed = sum(1 for t in tasks if t.checked)
# Count by status
status_counts = {}
for task in tasks:
status_counts[task.status] = status_counts.get(task.status, 0) + 1
# Calculate velocity
week_ago = datetime.now() - timedelta(days=7)
two_weeks_ago = datetime.now() - timedelta(days=14)
completed_this_week = await self.task_repo.count_completed_since(
project_id, week_ago
)
completed_last_week = await self.task_repo.count_completed_between(
project_id, two_weeks_ago, week_ago
)
if completed_this_week > completed_last_week:
trend = 'up'
elif completed_this_week < completed_last_week:
trend = 'down'
else:
trend = 'stable'
progress_pct = Decimal(completed) / Decimal(total) * 100 if total > 0 else Decimal(0)
return ProgressMetrics(
project_id=project_id,
total_tasks=total,
completed_tasks=completed,
progress_pct=progress_pct.quantize(Decimal('0.1')),
tasks_by_status=status_counts,
velocity_week=completed_this_week,
velocity_trend=trend
)
async def calculate_task_list_progress(
self,
list_id: str
) -> Tuple[int, int, Decimal]:
"""Calculate progress for a single task list"""
tasks = await self.task_repo.get_by_list(list_id)
total = len(tasks)
completed = sum(1 for t in tasks if t.checked)
pct = Decimal(completed) / Decimal(total) * 100 if total > 0 else Decimal(0)
return completed, total, pct.quantize(Decimal('0.1'))
3.2 Work Distribution Calculator
@dataclass
class WorkDistributionEntry:
project_id: str
project_name: str
percentage: Decimal
commit_count: int
session_count: int
tasks_completed: int
weighted_score: int
class WorkDistributionCalculator:
"""Calculates work distribution across projects"""
# Weights for different activity types
COMMIT_WEIGHT = 3
SESSION_WEIGHT = 1
TASK_WEIGHT = 5
def __init__(self, repos):
self.project_repo = repos['project']
self.commit_repo = repos['commit']
self.session_repo = repos['session']
self.task_repo = repos['task']
async def calculate_distribution(
self,
time_range: str = 'week'
) -> List[WorkDistributionEntry]:
"""
Calculate work distribution by project.
Weighted score = commits*3 + sessions*1 + tasks_completed*5
"""
since = self._get_since_date(time_range)
projects = await self.project_repo.get_all_active()
entries = []
total_score = 0
for project in projects:
commit_count = await self.commit_repo.count_since(
project.id, since
)
session_count = await self.session_repo.count_since(
project.id, since
)
tasks_completed = await self.task_repo.count_completed_since(
project.id, since
)
weighted_score = (
commit_count * self.COMMIT_WEIGHT +
session_count * self.SESSION_WEIGHT +
tasks_completed * self.TASK_WEIGHT
)
total_score += weighted_score
entries.append(WorkDistributionEntry(
project_id=project.id,
project_name=project.name,
percentage=Decimal(0), # Calculated after
commit_count=commit_count,
session_count=session_count,
tasks_completed=tasks_completed,
weighted_score=weighted_score
))
# Calculate percentages
for entry in entries:
if total_score > 0:
entry.percentage = (
Decimal(entry.weighted_score) /
Decimal(total_score) * 100
).quantize(Decimal('0.1'))
return sorted(entries, key=lambda x: x.percentage, reverse=True)
def _get_since_date(self, time_range: str) -> datetime:
now = datetime.now()
if time_range == 'day':
return now - timedelta(days=1)
elif time_range == 'week':
return now - timedelta(days=7)
elif time_range == 'month':
return now - timedelta(days=30)
else:
return now - timedelta(days=7)
4. Activity Feed System
4.1 Activity Aggregator
from enum import Enum
from heapq import nlargest
class ActivityType(Enum):
TASK_COMPLETED = 'task_completed'
TASK_BLOCKED = 'blocked'
COMMIT = 'commit'
SESSION = 'session'
STATUS_CHANGE = 'status_change'
@dataclass
class Activity:
id: str
type: ActivityType
title: str
subtitle: Optional[str]
project_id: str
project_name: str
task_id: Optional[str]
timestamp: datetime
priority: int # Higher = more important
class ActivityAggregator:
"""Aggregates and prioritizes activity highlights"""
# Priority weights by activity type
PRIORITY_WEIGHTS = {
ActivityType.TASK_COMPLETED: 100,
ActivityType.TASK_BLOCKED: 90,
ActivityType.COMMIT: 50,
ActivityType.SESSION: 30,
ActivityType.STATUS_CHANGE: 40,
}
def __init__(self, repos):
self.task_repo = repos['task']
self.commit_repo = repos['commit']
self.session_repo = repos['session']
self.project_repo = repos['project']
async def get_recent_activity(
self,
limit: int = 5,
project_id: Optional[str] = None,
since: Optional[datetime] = None
) -> List[ActivityHighlight]:
"""
Get top N activity highlights, prioritized by importance.
Priority order:
1. Task completions (checked=true)
2. Tasks blocked
3. Significant commits
4. Status changes
5. Sessions
"""
if since is None:
since = datetime.now() - timedelta(days=7)
activities = []
# Collect activities from all sources
activities.extend(
await self._get_task_completions(project_id, since)
)
activities.extend(
await self._get_blocked_tasks(project_id, since)
)
activities.extend(
await self._get_recent_commits(project_id, since)
)
activities.extend(
await self._get_recent_sessions(project_id, since)
)
# Sort by priority (descending) then timestamp (descending)
activities.sort(
key=lambda x: (x.priority, x.timestamp),
reverse=True
)
# Return top N
return [self._to_highlight(a) for a in activities[:limit]]
async def _get_task_completions(
self,
project_id: Optional[str],
since: datetime
) -> List[Activity]:
"""Get recently completed tasks"""
tasks = await self.task_repo.get_completed_since(project_id, since)
activities = []
for task in tasks:
project = await self.project_repo.get(task.project_id)
# Get linked evidence counts
commit_count = await self.task_repo.count_linked_commits(task.id)
session_count = await self.task_repo.count_linked_sessions(task.id)
subtitle = None
if commit_count or session_count:
parts = []
if commit_count:
parts.append(f"{commit_count} commit{'s' if commit_count > 1 else ''}")
if session_count:
parts.append(f"{session_count} session{'s' if session_count > 1 else ''}")
subtitle = ', '.join(parts)
activities.append(Activity(
id=f"task-{task.id}",
type=ActivityType.TASK_COMPLETED,
title=f"☑ {task.title}",
subtitle=subtitle,
project_id=project.id,
project_name=project.name,
task_id=task.id,
timestamp=task.updated_at,
priority=self.PRIORITY_WEIGHTS[ActivityType.TASK_COMPLETED]
))
return activities
async def _get_blocked_tasks(
self,
project_id: Optional[str],
since: datetime
) -> List[Activity]:
"""Get recently blocked tasks"""
tasks = await self.task_repo.get_blocked_since(project_id, since)
activities = []
for task in tasks:
project = await self.project_repo.get(task.project_id)
activities.append(Activity(
id=f"blocked-{task.id}",
type=ActivityType.TASK_BLOCKED,
title=f"⚠️ {task.title}",
subtitle=task.blocked_reason[:50] if task.blocked_reason else "Blocked",
project_id=project.id,
project_name=project.name,
task_id=task.id,
timestamp=task.updated_at,
priority=self.PRIORITY_WEIGHTS[ActivityType.TASK_BLOCKED]
))
return activities
async def _get_recent_commits(
self,
project_id: Optional[str],
since: datetime
) -> List[Activity]:
"""Get significant recent commits"""
commits = await self.commit_repo.get_since(project_id, since)
activities = []
for commit in commits[:10]: # Limit to most recent
project = await self.project_repo.get_by_repo(commit.repo_id)
# Parse commit message for display
title = commit.message.split('\n')[0][:60]
activities.append(Activity(
id=f"commit-{commit.sha}",
type=ActivityType.COMMIT,
title=f"{commit.sha[:7]}: {title}",
subtitle=f"+{commit.additions}/-{commit.deletions}",
project_id=project.id,
project_name=project.name,
task_id=None,
timestamp=commit.timestamp,
priority=self.PRIORITY_WEIGHTS[ActivityType.COMMIT]
))
return activities
def _to_highlight(self, activity: Activity) -> ActivityHighlight:
"""Convert internal Activity to API response type"""
icon_map = {
ActivityType.TASK_COMPLETED: 'check',
ActivityType.TASK_BLOCKED: 'alert',
ActivityType.COMMIT: 'git',
ActivityType.SESSION: 'chat',
ActivityType.STATUS_CHANGE: 'check',
}
return ActivityHighlight(
id=activity.id,
type=activity.type.value,
title=activity.title,
subtitle=activity.subtitle,
project_id=activity.project_id,
project_name=activity.project_name,
task_id=activity.task_id,
timestamp=activity.timestamp.isoformat(),
icon=icon_map[activity.type]
)
5. Real-Time Updates
5.1 WebSocket Manager
from fastapi import WebSocket
from typing import Dict, Set
import asyncio
import json
class DashboardWebSocketManager:
"""Manages WebSocket connections for real-time dashboard updates"""
def __init__(self):
# connection_id -> WebSocket
self.connections: Dict[str, WebSocket] = {}
# connection_id -> subscribed project IDs
self.subscriptions: Dict[str, Set[str]] = {}
# project_id -> connection IDs
self.project_subscribers: Dict[str, Set[str]] = {}
async def connect(
self,
websocket: WebSocket,
connection_id: str
):
"""Accept new WebSocket connection"""
await websocket.accept()
self.connections[connection_id] = websocket
self.subscriptions[connection_id] = set()
async def disconnect(self, connection_id: str):
"""Clean up disconnected client"""
if connection_id in self.subscriptions:
for project_id in self.subscriptions[connection_id]:
if project_id in self.project_subscribers:
self.project_subscribers[project_id].discard(connection_id)
del self.subscriptions[connection_id]
if connection_id in self.connections:
del self.connections[connection_id]
async def subscribe(
self,
connection_id: str,
project_ids: List[str]
):
"""Subscribe connection to project updates"""
for project_id in project_ids:
self.subscriptions[connection_id].add(project_id)
if project_id not in self.project_subscribers:
self.project_subscribers[project_id] = set()
self.project_subscribers[project_id].add(connection_id)
async def broadcast_to_project(
self,
project_id: str,
event_type: str,
data: dict
):
"""Broadcast event to all subscribers of a project"""
if project_id not in self.project_subscribers:
return
message = json.dumps({
'type': event_type,
'project_id': project_id,
'data': data,
'timestamp': datetime.now().isoformat()
})
disconnected = []
for connection_id in self.project_subscribers[project_id]:
if connection_id in self.connections:
try:
await self.connections[connection_id].send_text(message)
except Exception:
disconnected.append(connection_id)
# Clean up failed connections
for conn_id in disconnected:
await self.disconnect(conn_id)
async def broadcast_task_update(
self,
task: Task,
project_id: str,
change_type: str
):
"""Broadcast task update event"""
await self.broadcast_to_project(
project_id,
f'task.{change_type}',
{
'task_id': task.id,
'title': task.title,
'checked': task.checked,
'status': task.status
}
)
async def broadcast_progress_update(
self,
project_id: str,
progress: ProgressMetrics
):
"""Broadcast progress change event"""
await self.broadcast_to_project(
project_id,
'progress.changed',
{
'progress_pct': float(progress.progress_pct),
'completed': progress.completed_tasks,
'total': progress.total_tasks
}
)
5.2 Event Handlers
class DashboardEventHandler:
"""Handles events and triggers dashboard updates"""
def __init__(
self,
ws_manager: DashboardWebSocketManager,
progress_calc: ProgressCalculator,
activity_agg: ActivityAggregator,
cache: RedisCache
):
self.ws = ws_manager
self.progress = progress_calc
self.activity = activity_agg
self.cache = cache
async def on_task_updated(self, task: Task, old_task: Task):
"""Handle task update event"""
project_id = await self._get_project_id(task.list_id)
# Determine change type
if task.checked and not old_task.checked:
change_type = 'completed'
elif task.status == 'blocked' and old_task.status != 'blocked':
change_type = 'blocked'
else:
change_type = 'updated'
# Broadcast task change
await self.ws.broadcast_task_update(task, project_id, change_type)
# Recalculate and broadcast progress if completion changed
if task.checked != old_task.checked:
progress = await self.progress.calculate_project_progress(project_id)
await self.ws.broadcast_progress_update(project_id, progress)
# Invalidate cache
await self.cache.delete(f"dashboard:{project_id}")
await self.cache.delete("portfolio:summary")
async def on_commit_received(self, commit: GitCommit):
"""Handle new commit event"""
project_id = await self._get_project_id_from_repo(commit.repo_id)
# Link commit to tasks
linker = CommitTaskLinker(self.task_repo)
links = await linker.link_commit(commit, project_id)
# Broadcast commit with linked tasks
await self.ws.broadcast_to_project(
project_id,
'commit.new',
{
'sha': commit.sha,
'message': commit.message[:100],
'linked_tasks': [l.task_id for l in links]
}
)
# Invalidate caches
await self.cache.delete(f"activity:{project_id}")
await self.cache.delete(f"distribution:week")
async def on_session_ended(self, session: LLMSession):
"""Handle session end event"""
# Link session to tasks
linker = SessionTaskLinker(self.task_repo)
links = await linker.link_session(session, session.project_id)
# Broadcast session summary
await self.ws.broadcast_to_project(
session.project_id,
'session.ended',
{
'session_id': session.id,
'summary': session.summary,
'linked_tasks': [l.task_id for l in links]
}
)
6. Frontend Implementation
6.1 State Management
// stores/dashboardStore.ts
import { create } from 'zustand';
import { subscribeWithSelector } from 'zustand/middleware';
interface DashboardState {
// Data
portfolioSummary: ProjectSummary[];
selectedProjectId: string | null;
kanban: KanbanBoard | null;
recentActivity: ActivityHighlight[];
blockedTasks: BlockedTask[];
workDistribution: WorkDistribution[];
// UI State
isLoading: boolean;
error: string | null;
timeRange: 'day' | 'week' | 'month';
// Actions
fetchDashboard: () => Promise<void>;
selectProject: (projectId: string | null) => void;
setTimeRange: (range: 'day' | 'week' | 'month') => void;
updateTask: (task: Partial<Task> & { id: string }) => void;
// WebSocket handlers
handleTaskUpdate: (data: TaskUpdateEvent) => void;
handleProgressUpdate: (data: ProgressUpdateEvent) => void;
}
export const useDashboardStore = create<DashboardState>()(
subscribeWithSelector((set, get) => ({
// Initial state
portfolioSummary: [],
selectedProjectId: null,
kanban: null,
recentActivity: [],
blockedTasks: [],
workDistribution: [],
isLoading: false,
error: null,
timeRange: 'week',
// Actions
fetchDashboard: async () => {
set({ isLoading: true, error: null });
try {
const { selectedProjectId, timeRange } = get();
const response = await api.getDashboard({
projectId: selectedProjectId,
timeRange
});
set({
portfolioSummary: response.portfolio_summary,
kanban: response.selected_project?.kanban || null,
recentActivity: response.recent_activity,
blockedTasks: response.blocked_tasks,
workDistribution: response.work_distribution,
isLoading: false
});
} catch (error) {
set({ error: error.message, isLoading: false });
}
},
selectProject: (projectId) => {
set({ selectedProjectId: projectId });
get().fetchDashboard();
},
setTimeRange: (range) => {
set({ timeRange: range });
get().fetchDashboard();
},
// Optimistic update for task changes
updateTask: async (taskUpdate) => {
const { kanban } = get();
if (!kanban) return;
// Optimistic update
const newKanban = updateTaskInKanban(kanban, taskUpdate);
set({ kanban: newKanban });
try {
await api.updateTask(taskUpdate.id, taskUpdate);
} catch (error) {
// Revert on failure
set({ kanban, error: error.message });
}
},
// WebSocket event handlers
handleTaskUpdate: (data) => {
const { kanban, portfolioSummary } = get();
if (kanban) {
const newKanban = updateTaskInKanban(kanban, {
id: data.task_id,
checked: data.checked,
status: data.status
});
set({ kanban: newKanban });
}
},
handleProgressUpdate: (data) => {
const { portfolioSummary } = get();
const newSummary = portfolioSummary.map(p =>
p.project_id === data.project_id
? { ...p, progress_pct: data.progress_pct }
: p
);
set({ portfolioSummary: newSummary });
}
}))
);
6.2 WebSocket Hook
// hooks/useDashboardWebSocket.ts
import { useEffect, useRef, useCallback } from 'react';
import { useDashboardStore } from '../stores/dashboardStore';
export function useDashboardWebSocket(projectIds: string[]) {
const wsRef = useRef<WebSocket | null>(null);
const reconnectTimeoutRef = useRef<NodeJS.Timeout>();
const handleTaskUpdate = useDashboardStore(s => s.handleTaskUpdate);
const handleProgressUpdate = useDashboardStore(s => s.handleProgressUpdate);
const connect = useCallback(() => {
const ws = new WebSocket(`${WS_BASE_URL}/ws/dashboard`);
ws.onopen = () => {
console.log('Dashboard WebSocket connected');
ws.send(JSON.stringify({
type: 'subscribe',
project_ids: projectIds
}));
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
switch (message.type) {
case 'task.updated':
case 'task.completed':
case 'task.blocked':
handleTaskUpdate(message.data);
break;
case 'progress.changed':
handleProgressUpdate(message.data);
break;
}
};
ws.onclose = () => {
console.log('Dashboard WebSocket disconnected');
// Reconnect after 3 seconds
reconnectTimeoutRef.current = setTimeout(connect, 3000);
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
ws.close();
};
wsRef.current = ws;
}, [projectIds, handleTaskUpdate, handleProgressUpdate]);
useEffect(() => {
connect();
return () => {
if (reconnectTimeoutRef.current) {
clearTimeout(reconnectTimeoutRef.current);
}
if (wsRef.current) {
wsRef.current.close();
}
};
}, [connect]);
return wsRef.current;
}
7. Performance Optimizations
7.1 Query Optimization
-- Materialized view for dashboard summary
CREATE MATERIALIZED VIEW project_dashboard_summary AS
SELECT
p.id AS project_id,
p.name AS project_name,
COUNT(t.id) AS total_tasks,
COUNT(t.id) FILTER (WHERE t.checked = true) AS completed_tasks,
ROUND(
COUNT(t.id) FILTER (WHERE t.checked = true)::numeric /
NULLIF(COUNT(t.id), 0) * 100, 1
) AS progress_pct,
COUNT(t.id) FILTER (WHERE t.status = 'blocked') AS blocked_count,
(
SELECT COUNT(*)
FROM git_commits gc
JOIN git_repos gr ON gc.repo_id = gr.id
WHERE gr.project_id = p.id
AND gc.timestamp > NOW() - INTERVAL '7 days'
) AS commits_this_week
FROM projects p
LEFT JOIN project_plans pp ON pp.project_id = p.id
LEFT JOIN task_lists tl ON tl.plan_id = pp.id
LEFT JOIN tasks t ON t.list_id = tl.id
WHERE p.status = 'active'
GROUP BY p.id, p.name;
-- Refresh on schedule or trigger
CREATE OR REPLACE FUNCTION refresh_dashboard_summary()
RETURNS trigger AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY project_dashboard_summary;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER refresh_dashboard_on_task_change
AFTER INSERT OR UPDATE OR DELETE ON tasks
FOR EACH STATEMENT
EXECUTE FUNCTION refresh_dashboard_summary();
7.2 Caching Strategy
class DashboardCache:
"""Redis-based caching for dashboard data"""
TTL_PORTFOLIO = 60 # 1 minute
TTL_KANBAN = 30 # 30 seconds
TTL_ACTIVITY = 60 # 1 minute
TTL_DISTRIBUTION = 300 # 5 minutes
def __init__(self, redis_client):
self.redis = redis_client
async def get_portfolio_summary(self) -> Optional[List[ProjectSummary]]:
"""Get cached portfolio summary"""
data = await self.redis.get("portfolio:summary")
if data:
return [ProjectSummary(**p) for p in json.loads(data)]
return None
async def set_portfolio_summary(self, summary: List[ProjectSummary]):
"""Cache portfolio summary"""
data = json.dumps([asdict(s) for s in summary])
await self.redis.setex("portfolio:summary", self.TTL_PORTFOLIO, data)
async def invalidate_project(self, project_id: str):
"""Invalidate all caches related to a project"""
keys = [
"portfolio:summary",
f"kanban:{project_id}",
f"activity:{project_id}",
"distribution:*"
]
for key in keys:
await self.redis.delete(key)
8. Testing Strategy
8.1 Unit Tests
# tests/test_linking.py
import pytest
from unittest.mock import AsyncMock, MagicMock
from linking import CommitTaskLinker, SessionTaskLinker
class TestCommitTaskLinker:
@pytest.fixture
def task_repo(self):
repo = AsyncMock()
repo.find_by_reference.return_value = MagicMock(id='task-123')
return repo
@pytest.fixture
def linker(self, task_repo):
return CommitTaskLinker(task_repo)
@pytest.mark.asyncio
async def test_explicit_reference_found(self, linker, task_repo):
"""Test that explicit task references are found"""
commit = MagicMock(
message="feat(auth): Add JWT support #TASK-123",
files_changed=[]
)
results = await linker.link_commit(commit, 'project-1')
assert len(results) == 1
assert results[0].task_id == 'task-123'
assert results[0].confidence == 1.0
assert results[0].link_type == 'explicit'
@pytest.mark.asyncio
async def test_title_similarity_matching(self, linker, task_repo):
"""Test fuzzy matching by title similarity"""
task_repo.find_by_reference.return_value = None
task_repo.get_by_project.return_value = [
MagicMock(id='task-1', title='Implement JWT authentication'),
MagicMock(id='task-2', title='Add unit tests'),
]
commit = MagicMock(
message="Add JWT authentication support",
files_changed=[]
)
results = await linker.link_commit(commit, 'project-1')
assert len(results) >= 1
assert results[0].task_id == 'task-1'
assert results[0].link_type == 'inferred'
8.2 Integration Tests
# tests/integration/test_dashboard_api.py
import pytest
from httpx import AsyncClient
class TestDashboardAPI:
@pytest.mark.asyncio
async def test_get_dashboard_returns_all_components(
self,
client: AsyncClient,
seed_data
):
"""Test dashboard endpoint returns complete response"""
response = await client.get("/api/v1/dashboard")
assert response.status_code == 200
data = response.json()
assert 'portfolio_summary' in data
assert 'recent_activity' in data
assert 'blocked_tasks' in data
assert 'work_distribution' in data
@pytest.mark.asyncio
async def test_activity_feed_limited_to_5(
self,
client: AsyncClient,
seed_many_activities
):
"""Test activity feed returns max 5 items"""
response = await client.get("/api/v1/dashboard/activity")
assert response.status_code == 200
data = response.json()
assert len(data) <= 5
9. Appendices
9.1 Configuration
# config/dashboard.yaml
dashboard:
activity:
max_items: 5
lookback_days: 7
linking:
title_similarity_threshold: 0.7
file_overlap_threshold: 0.3
keyword_match_threshold: 3
cache:
portfolio_ttl: 60
kanban_ttl: 30
activity_ttl: 60
distribution_ttl: 300
websocket:
heartbeat_interval: 30
reconnect_delay: 3000
9.2 Document History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2025-11-27 | Engineering Team | Initial release |