Skip to main content

C4 Workflow Diagram - Level 4: Code Execution Flow

CODITECT Context Intelligence Platform

Diagram Level: 4 (Code) Focus: Method-level execution flow with code examples Audience: Developers implementing features Purpose: Understand detailed implementation logic


Workflow 1: SearchService.hybrid_search() Execution

# core/services/search_service.py

async def hybrid_search(
self,
organization_id: UUID,
query: str,
limit: int = 20,
alpha: float = 0.5
) -> List[Conversation]:
"""
Execute hybrid search with RRF fusion.

Step-by-step execution flow documented below.
"""
# STEP 1: Create parallel search tasks
keyword_task = self.keyword_search(
organization_id,
query,
limit * 2 # Fetch 2x for fusion
)

semantic_task = self.semantic_search(
organization_id,
query,
limit * 2
)

# STEP 2: Execute both searches in parallel (asyncio)
keyword_results, semantic_results = await asyncio.gather(
keyword_task,
semantic_task
)
# Timeline: ~80ms (both execute concurrently, not sequentially)

# STEP 3: Fuse results using RRF
fused_results = self._rrf_fusion(
keyword_results,
semantic_results,
alpha
)
# Timeline: ~5ms (in-memory processing)

# STEP 4: Return top N
return fused_results[:limit]

Execution Timeline (80ms total):

0ms:   Start parallel tasks
├─ keyword_search() starts → PostgreSQL query
└─ semantic_search() starts → Weaviate query

40ms: Keyword results return (PostgreSQL FTS)
60ms: Semantic results return (Weaviate HNSW)
65ms: RRF fusion completes
80ms: Return fused results to API

Workflow 2: _rrf_fusion() Algorithm Implementation

def _rrf_fusion(
self,
keyword_results: List[Conversation],
semantic_results: List[Conversation],
alpha: float
) -> List[Conversation]:
"""
Reciprocal Rank Fusion algorithm.

Formula: score(c) = Σ 1 / (k + rank(c))
"""
# STEP 1: Initialize score dictionary
scores: Dict[UUID, float] = {}

# STEP 2: Calculate keyword scores
for rank, conversation in enumerate(keyword_results, start=1):
keyword_score = self._calculate_rrf_score(rank, self.rrf_k)
# Example: rank=1, k=60 → score = 1/61 ≈ 0.0164
scores[conversation.id] = alpha * keyword_score

# STEP 3: Calculate semantic scores
for rank, conversation in enumerate(semantic_results, start=1):
semantic_score = self._calculate_rrf_score(rank, self.rrf_k)

# If conversation appears in both lists, add scores
if conversation.id in scores:
scores[conversation.id] += (1 - alpha) * semantic_score
else:
scores[conversation.id] = (1 - alpha) * semantic_score

# STEP 4: Combine all unique conversations
all_conversations = {c.id: c for c in keyword_results}
all_conversations.update({c.id: c for c in semantic_results})

# STEP 5: Attach scores
for conv_id, score in scores.items():
all_conversations[conv_id].relevance_score = score

# STEP 6: Sort by score (descending)
sorted_conversations = sorted(
all_conversations.values(),
key=lambda c: c.relevance_score,
reverse=True
)

return sorted_conversations

Example Execution (alpha=0.5, k=60):

Input:
keyword_results = [Conv A (rank 1), Conv B (rank 2)]
semantic_results = [Conv B (rank 1), Conv C (rank 2)]

Step 2 - Keyword Scores:
Conv A: 0.5 * (1/61) = 0.00820
Conv B: 0.5 * (1/62) = 0.00806

Step 3 - Semantic Scores:
Conv B: 0.00806 + 0.5 * (1/61) = 0.01626 (appears in both!)
Conv C: 0.5 * (1/62) = 0.00806

Step 6 - Sorted Output:
1. Conv B (score: 0.01626)
2. Conv A (score: 0.00820)
3. Conv C (score: 0.00806)

Time Complexity: O(n log n) where n = unique conversations Space Complexity: O(n) for scores dictionary


Workflow 3: ConversationRepository.search_by_keyword() SQL Execution

# core/repositories/postgresql_conversation_repository.py

async def search_by_keyword(
self,
organization_id: UUID,
query: str,
limit: int = 20
) -> List[Conversation]:
"""
PostgreSQL full-text search with RLS.
"""
# STEP 1: Convert query to tsquery format
tsquery = ' & '.join(query.split())
# Example: "authentication bug" → "authentication & bug"

# STEP 2: Build SQL with tsvector matching
sql = """
SELECT
id, organization_id, user_id, title, created_at,
ts_rank(
to_tsvector('english', title || ' ' || content),
to_tsquery('english', :tsquery)
) AS relevance_score
FROM conversations
WHERE
organization_id = :org_id -- RLS ENFORCEMENT (CRITICAL!)
AND to_tsvector('english', title || ' ' || content)
@@ to_tsquery('english', :tsquery)
ORDER BY relevance_score DESC
LIMIT :limit
"""

# STEP 3: Execute query (uses GIN index)
result = await self.db.execute(
text(sql),
{
"org_id": organization_id,
"tsquery": tsquery,
"limit": limit
}
)

# STEP 4: Convert rows to domain objects
rows = result.fetchall()
conversations = [
Conversation.from_dict(dict(row))
for row in rows
]

return conversations

SQL Execution Plan (with GIN index):

EXPLAIN ANALYZE
SELECT ...
FROM conversations
WHERE organization_id = 'uuid-123'
AND to_tsvector('english', title || ' ' || content)
@@ to_tsquery('english', 'authentication & bug');

-- Output:
Bitmap Heap Scan on conversations (cost=12.50..45.30 rows=10 width=256) (actual time=0.045..0.080 rows=8 loops=1)
Recheck Cond: (to_tsvector('english', ...) @@ to_tsquery(...))
Filter: (organization_id = 'uuid-123')
Heap Blocks: exact=5
-> Bitmap Index Scan on idx_conversations_fts (cost=0.00..12.50 rows=20 width=0) (actual time=0.030..0.030 rows=10 loops=1)
Index Cond: (to_tsvector('english', ...) @@ to_tsquery(...))
Planning Time: 0.125 ms
Execution Time: 0.105 ms

Performance: <50ms p95 (GIN index scan)


Workflow 4: CorrelationService Score Calculation

# core/services/correlation_service.py

async def correlate_commit(self, commit: Commit) -> List[Tuple[Conversation, float]]:
"""
Find conversations that likely led to this commit.
"""
# STEP 1: Find temporal candidates (±6 hours)
time_window_start = commit.timestamp - timedelta(hours=6)
time_window_end = commit.timestamp + timedelta(hours=6)

conversations = await self.conversation_repo.list_by_date_range(
organization_id=commit.organization_id,
start=time_window_start,
end=time_window_end
)

if not conversations:
return [] # No candidates

# STEP 2: Score each conversation
correlations = []
for conversation in conversations:
# Calculate temporal score
time_diff = abs((commit.timestamp - conversation.created_at).total_seconds() / 3600)
temporal_score = max(0, 1 - (time_diff / 6)) # Linear decay 0-6 hours

# Calculate semantic score
semantic_score = await self._calculate_semantic_similarity(
commit.message,
conversation.title + " " + conversation.get_full_text()
)

# Check for explicit link
explicit_score = 1.0 if self._has_manual_link(commit, conversation) else 0.0

# Final weighted score
final_score = (
0.6 * temporal_score +
0.3 * semantic_score +
0.1 * explicit_score
)

correlations.append((conversation, final_score))

# STEP 3: Filter by threshold and sort
high_confidence = [
(conv, score) for conv, score in correlations
if score > 0.7
]

return sorted(high_confidence, key=lambda x: x[1], reverse=True)

Example Calculation:

Commit: "fix: Resolve JWT expiration bug" @ 10:30 AM
Conversation: "Authentication issues discussion" @ 10:00 AM

Temporal Score:
time_diff = 0.5 hours
temporal_score = 1 - (0.5 / 6) = 0.917

Semantic Score:
cosine_similarity("fix JWT expiration", "authentication issues") = 0.85

Explicit Score:
No manual link = 0.0

Final Score:
0.6 * 0.917 + 0.3 * 0.85 + 0.1 * 0.0 = 0.805

Result: HIGH CONFIDENCE (>0.7) → Create link

Workflow 5: Error Handling with Try-Catch

# api/endpoints/conversation_endpoints.py

@router.get("/conversations/search")
async def search_conversations(
q: str,
alpha: float = 0.5,
limit: int = 20,
current_user: User = Depends(get_current_user),
search_service: SearchService = Depends(get_search_service),
request: Request = None
):
"""Search conversations with comprehensive error handling."""
try:
# STEP 1: Validate parameters
if not 0.0 <= alpha <= 1.0:
raise ValidationError("Alpha must be between 0.0 and 1.0")

if limit > 100:
raise ValidationError("Limit cannot exceed 100")

# STEP 2: Execute search
results = await search_service.hybrid_search(
organization_id=current_user.organization_id,
query=q,
alpha=alpha,
limit=limit
)

# STEP 3: Return results
return {
"query": q,
"alpha": alpha,
"count": len(results),
"items": [conv.to_dict() for conv in results]
}

except ValidationError as e:
# User input error (400 Bad Request)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)

except PermissionError as e:
# Access denied (403 Forbidden)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You do not have permission to perform this search"
)

except DatabaseError as e:
# Database connection issue (500 Internal Server Error)
logger.error(f"Database error during search: {e}", exc_info=True)
trace_id = str(uuid4())

# Alert on-call engineer
alert_pagerduty(
severity="high",
message=f"Database error in search API: {e}",
trace_id=trace_id
)

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"type": "https://api.context-intelligence.com/errors/database-error",
"title": "Database Error",
"status": 500,
"detail": "An unexpected error occurred. Please try again later.",
"trace_id": trace_id
}
)

except Exception as e:
# Unknown error (500)
logger.error(f"Unexpected error in search: {e}", exc_info=True)
trace_id = str(uuid4())

raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"type": "https://api.context-intelligence.com/errors/internal-error",
"title": "Internal Server Error",
"status": 500,
"detail": "An unexpected error occurred. Please try again later.",
"trace_id": trace_id
}
)

Workflow 6: Async/Await Execution Flow

# Demonstration of async execution

async def example_workflow():
"""
Example showing async/await execution with timing.
"""
import asyncio
from time import time

# Sequential execution (slow)
start = time()
result1 = await slow_operation_1() # 100ms
result2 = await slow_operation_2() # 100ms
duration_sequential = time() - start
# Duration: ~200ms (100 + 100)

# Parallel execution (fast)
start = time()
result1, result2 = await asyncio.gather(
slow_operation_1(), # 100ms
slow_operation_2() # 100ms
)
duration_parallel = time() - start
# Duration: ~100ms (both execute concurrently)

print(f"Sequential: {duration_sequential:.0f}ms")
print(f"Parallel: {duration_parallel:.0f}ms")
print(f"Speedup: {duration_sequential / duration_parallel:.1f}x")

Output:

Sequential: 200ms
Parallel: 100ms
Speedup: 2.0x

Performance Profiling Output

# Using cProfile to measure SearchService.hybrid_search()

import cProfile
import pstats

profiler = cProfile.Profile()
profiler.enable()

await search_service.hybrid_search(org_id, "authentication", alpha=0.5, limit=20)

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10)

Output:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
1 0.000 0.000 0.082 0.082 search_service.py:45(hybrid_search)
2 0.001 0.000 0.075 0.038 asyncio/tasks.py:650(gather)
1 0.003 0.003 0.040 0.040 conversation_repo.py:120(search_by_keyword)
1 0.002 0.002 0.035 0.035 weaviate_client.py:80(search_semantic)
1 0.005 0.005 0.005 0.005 search_service.py:95(_rrf_fusion)
234 0.001 0.000 0.001 0.000 {method 'execute' of 'asyncpg.protocol'}
40 0.000 0.000 0.000 0.000 conversation.py:15(from_dict)

Analysis:

  • Total execution: 82ms
  • Parallel tasks (keyword + semantic): 75ms (91% of total)
  • RRF fusion: 5ms (6% of total)
  • Overhead: 2ms (3%)

Code-Level Optimization Example

Before (slow):

# Sequential processing
for conversation in conversations:
embedding = await openai_client.generate_embedding(conversation.text)
await weaviate_client.upsert(conversation.id, embedding)
# Duration: n * 500ms = 5 seconds for 10 conversations

After (fast):

# Batch processing
texts = [conv.text for conv in conversations]
embeddings = await openai_client.generate_embeddings_batch(texts) # 500ms for all

# Parallel upserts
await asyncio.gather(*[
weaviate_client.upsert(conv.id, emb)
for conv, emb in zip(conversations, embeddings)
])
# Duration: 500ms + 100ms = 600ms for 10 conversations (8.3x faster!)

Diagram Maintained By: Engineering Team Last Updated: 2025-11-26 Related Documents: