Skip to main content

Sequence Diagram: Usage-Based Metering Flow

Purpose: Enterprise tier usage-based billing with event sourcing, periodic aggregation, and Stripe Usage Records API integration.

Actors:

  • CODITECT Client (generating usage events)
  • License API (usage tracking)
  • PostgreSQL (event sourcing)
  • Celery Worker (aggregation tasks)
  • Stripe (usage-based billing)

Flow: Usage events → Event sourcing → Hourly aggregation → Daily Stripe reporting


Mermaid Sequence Diagram


Step-by-Step Breakdown

1. Usage Event Recording (Steps 1-3)

Client-side: Generate usage events:

# Client-side: Usage tracking
import requests
from datetime import datetime

class UsageTracker:
"""
Tracks CODITECT usage for Enterprise tier.

Events tracked:
- Agent invocations
- Command executions
- Storage usage (GB-hours)
"""

def __init__(self, jwt_token: str, license_key: str):
self.jwt_token = jwt_token
self.license_key = license_key
self.api_url = 'https://api.coditect.ai/api/v1/usage/track'

def track_agent_invocation(
self,
agent_type: str,
duration_seconds: float
):
"""
Track agent invocation.

Billed at: $0.10 per invocation
"""
self._send_event(
event_type='agent_invocation',
quantity=1,
metadata={
'agent_type': agent_type,
'duration_seconds': duration_seconds
}
)

def track_command_execution(
self,
command_name: str
):
"""
Track command execution.

Billed at: $0.02 per command
"""
self._send_event(
event_type='command_execution',
quantity=1,
metadata={
'command': command_name
}
)

def track_storage_usage(
self,
storage_gb: float,
duration_hours: float
):
"""
Track storage usage in GB-hours.

Billed at: $0.05 per GB-hour
"""
gb_hours = storage_gb * duration_hours

self._send_event(
event_type='storage_usage',
quantity=gb_hours,
metadata={
'storage_gb': storage_gb,
'duration_hours': duration_hours
}
)

def _send_event(
self,
event_type: str,
quantity: float,
metadata: dict
):
"""
Send usage event to API.

Non-blocking - fails silently if network unavailable.
"""
try:
requests.post(
self.api_url,
headers={
'Authorization': f'Bearer {self.jwt_token}'
},
json={
'license_key': self.license_key,
'event_type': event_type,
'quantity': quantity,
'metadata': metadata,
'timestamp': datetime.utcnow().isoformat()
},
timeout=5
)

except Exception as e:
# Log error but don't block execution
logger.warning(f"Failed to track usage: {e}")

Server-side: Usage tracking endpoint:

# Server-side: Usage tracking endpoint
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status, serializers
from datetime import datetime

from apps.licenses.models import License, UsageEvent as UsageEventModel

# Request Serializer
class UsageEventSerializer(serializers.Serializer):
license_key = serializers.CharField(max_length=255)
event_type = serializers.ChoiceField(choices=['agent_invocation', 'command_execution', 'storage_usage'])
quantity = serializers.FloatField()
metadata = serializers.JSONField()
timestamp = serializers.DateTimeField()

@api_view(['POST'])
@permission_classes([IsAuthenticated])
def track_usage(request):
"""
Record usage event (event sourcing pattern).

Events are immutable - append-only log.
"""
# Validate request data
serializer = UsageEventSerializer(data=request.data)
if not serializer.is_valid():
return Response(
{"detail": "Invalid request data", "errors": serializer.errors},
status=status.HTTP_400_BAD_REQUEST
)

license_key = serializer.validated_data['license_key']
event_type = serializer.validated_data['event_type']
quantity = serializer.validated_data['quantity']
metadata = serializer.validated_data['metadata']
timestamp = serializer.validated_data['timestamp']

# Get license
try:
license_obj = License.objects.get(license_key=license_key)
except License.DoesNotExist:
return Response(
{"detail": "License not found"},
status=status.HTTP_404_NOT_FOUND
)

# Verify Enterprise tier (only Enterprise has usage-based billing)
if license_obj.tier != 'enterprise':
# Ignore usage events for non-Enterprise tiers
return Response(
{'status': 'ok', 'tracked': False},
status=status.HTTP_200_OK
)

# Record event (event sourcing - immutable append)
UsageEventModel.objects.create(
license=license_obj,
tenant=license_obj.tenant,
event_type=event_type,
quantity=quantity,
metadata=metadata,
timestamp=timestamp,
processed=False # Will be aggregated later
)

return Response(
{'status': 'ok', 'tracked': True},
status=status.HTTP_200_OK
)

2. Hourly Aggregation (Steps 4-6)

Server-side: Celery aggregation task:

# Server-side: Hourly usage aggregation
from celery import shared_task
from datetime import datetime, timedelta
from django.db import transaction

@shared_task(name='usage.tasks.aggregate_usage_events')
def aggregate_usage_events():
"""
Aggregate usage events hourly.

Groups events by (license_id, event_type, hour).
Marks events as processed.

Schedule: Every hour at :05 (e.g., 14:05, 15:05)
"""
from .models import UsageEvent, UsageAggregate

# Define aggregation period (last hour)
period_end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
period_start = period_end - timedelta(hours=1)

# Get unprocessed events
events = UsageEvent.objects.filter(
timestamp__gte=period_start,
timestamp__lt=period_end,
processed=False
).select_related('license')

if not events.exists():
logger.info(f"No usage events to aggregate for {period_start}")
return {'aggregated': 0}

# Group events by (license_id, event_type)
from collections import defaultdict

groups = defaultdict(lambda: {'quantity': 0, 'event_ids': []})

for event in events:
key = (event.license_id, event.event_type)
groups[key]['quantity'] += event.quantity
groups[key]['event_ids'].append(event.id)

# Create aggregates
aggregates_created = 0

for (license_id, event_type), data in groups.items():
with transaction.atomic():
# Create aggregate
aggregate = UsageAggregate.objects.create(
license_id=license_id,
event_type=event_type,
quantity=data['quantity'],
period_start=period_start,
period_end=period_end,
reported_to_stripe=False
)

# Mark events as processed
UsageEvent.objects.filter(
id__in=data['event_ids']
).update(
processed=True,
aggregate_id=aggregate.id
)

aggregates_created += 1

logger.info(
f"Aggregated {len(events)} events into {aggregates_created} aggregates "
f"for period {period_start} - {period_end}"
)

return {
'aggregated': len(events),
'aggregates_created': aggregates_created
}


# Celery Beat schedule
CELERY_BEAT_SCHEDULE = {
'aggregate-usage-events': {
'task': 'usage.tasks.aggregate_usage_events',
'schedule': crontab(minute=5), # Every hour at :05
},
}

3. Daily Stripe Reporting (Steps 7-10)

Server-side: Report usage to Stripe:

# Server-side: Daily Stripe usage reporting
from celery import shared_task
import stripe
from datetime import datetime, timedelta

@shared_task(name='usage.tasks.report_usage_to_stripe')
def report_usage_to_stripe():
"""
Report usage to Stripe daily.

Reports yesterday's aggregated usage.
Stripe will include in next monthly invoice.

Schedule: Daily at 1:00 AM UTC
"""
from .models import License, UsageAggregate

# Define reporting period (yesterday)
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
yesterday = today - timedelta(days=1)

# Get Enterprise licenses with unreported usage
licenses = License.objects.filter(
tier='enterprise',
stripe_subscription_id__isnull=False,
is_active=True
).distinct()

total_reported = 0

for license_obj in licenses:
# Get yesterday's aggregates
aggregates = UsageAggregate.objects.filter(
license=license_obj,
period_start__gte=yesterday,
period_start__lt=today,
reported_to_stripe=False
)

if not aggregates.exists():
continue # No usage yesterday

# Group by event type
usage_by_type = {}

for agg in aggregates:
if agg.event_type not in usage_by_type:
usage_by_type[agg.event_type] = 0

usage_by_type[agg.event_type] += agg.quantity

# Get Stripe subscription items for this license
subscription_items = get_subscription_items(license_obj.stripe_subscription_id)

# Report each usage type to Stripe
for event_type, quantity in usage_by_type.items():
subscription_item_id = subscription_items.get(event_type)

if not subscription_item_id:
logger.warning(
f"No subscription item for {event_type} "
f"(license: {license_obj.license_key})"
)
continue

# Report to Stripe
usage_record = stripe.SubscriptionItem.create_usage_record(
subscription_item_id,
quantity=int(quantity), # Stripe requires integer
timestamp=int(yesterday.timestamp()),
action='increment'
)

# Mark aggregates as reported
aggregates.filter(event_type=event_type).update(
reported_to_stripe=True,
stripe_usage_record_id=usage_record.id
)

total_reported += 1

logger.info(
f"Reported {quantity} {event_type} to Stripe "
f"(license: {license_obj.license_key}, record: {usage_record.id})"
)

return {
'licenses_processed': len(licenses),
'usage_records_reported': total_reported
}


def get_subscription_items(subscription_id: str) -> dict:
"""
Get Stripe subscription items for usage-based billing.

Returns mapping: event_type → subscription_item_id
"""
subscription = stripe.Subscription.retrieve(subscription_id)

items = {}

for item in subscription['items']['data']:
price_id = item['price']['id']

# Map price IDs to event types
if price_id == 'price_agent_invocation':
items['agent_invocation'] = item['id']
elif price_id == 'price_command_execution':
items['command_execution'] = item['id']
elif price_id == 'price_storage_usage':
items['storage_usage'] = item['id']

return items


# Celery Beat schedule
CELERY_BEAT_SCHEDULE = {
'report-usage-to-stripe': {
'task': 'usage.tasks.report_usage_to_stripe',
'schedule': crontab(hour=1, minute=0), # Daily at 1:00 AM UTC
},
}

Database Models

Event sourcing models:

# Server-side: Usage event sourcing models
from django.db import models
from django.utils import timezone

class UsageEvent(models.Model):
"""
Immutable usage event log (event sourcing pattern).

Never updated or deleted - append-only.
"""

class EventType(models.TextChoices):
AGENT_INVOCATION = 'agent_invocation', 'Agent Invocation'
COMMAND_EXECUTION = 'command_execution', 'Command Execution'
STORAGE_USAGE = 'storage_usage', 'Storage Usage'

license = models.ForeignKey('License', on_delete=models.CASCADE)
tenant = models.ForeignKey('Tenant', on_delete=models.CASCADE)

event_type = models.CharField(max_length=50, choices=EventType.choices)
quantity = models.DecimalField(max_digits=10, decimal_places=4)
timestamp = models.DateTimeField(default=timezone.now)

# Metadata (JSON)
metadata = models.JSONField(default=dict, blank=True)

# Processing status
processed = models.BooleanField(default=False)
aggregate = models.ForeignKey(
'UsageAggregate',
null=True,
blank=True,
on_delete=models.SET_NULL
)

class Meta:
db_table = 'usage_events'
indexes = [
models.Index(fields=['license', 'timestamp']),
models.Index(fields=['processed', 'timestamp']),
models.Index(fields=['event_type', 'timestamp']),
]

def __str__(self):
return f"{self.event_type}: {self.quantity} @ {self.timestamp}"


class UsageAggregate(models.Model):
"""
Hourly aggregated usage for Stripe reporting.

Aggregates multiple events into single quantity.
"""

license = models.ForeignKey('License', on_delete=models.CASCADE)
event_type = models.CharField(max_length=50)

quantity = models.DecimalField(max_digits=10, decimal_places=4)

period_start = models.DateTimeField()
period_end = models.DateTimeField()

# Stripe reporting status
reported_to_stripe = models.BooleanField(default=False)
stripe_usage_record_id = models.CharField(max_length=100, null=True, blank=True)

created_at = models.DateTimeField(auto_now_add=True)

class Meta:
db_table = 'usage_aggregates'
indexes = [
models.Index(fields=['license', 'period_start']),
models.Index(fields=['reported_to_stripe', 'period_start']),
]
unique_together = [['license', 'event_type', 'period_start']]

def __str__(self):
return f"{self.event_type}: {self.quantity} ({self.period_start})"

Stripe Pricing Configuration

Create usage-based prices in Stripe:

# Stripe Dashboard or API: Create usage-based prices

# Agent invocations: $0.10 per invocation
stripe.Price.create(
product='prod_enterprise_coditect',
currency='usd',
unit_amount_decimal='0.10', # $0.10
billing_scheme='per_unit',
recurring={
'interval': 'month',
'usage_type': 'metered',
'aggregate_usage': 'sum'
},
nickname='Agent Invocations'
)

# Command executions: $0.02 per command
stripe.Price.create(
product='prod_enterprise_coditect',
currency='usd',
unit_amount_decimal='0.02',
billing_scheme='per_unit',
recurring={
'interval': 'month',
'usage_type': 'metered',
'aggregate_usage': 'sum'
},
nickname='Command Executions'
)

# Storage: $0.05 per GB-hour
stripe.Price.create(
product='prod_enterprise_coditect',
currency='usd',
unit_amount_decimal='0.05',
billing_scheme='per_unit',
recurring={
'interval': 'month',
'usage_type': 'metered',
'aggregate_usage': 'sum'
},
nickname='Storage (GB-hours)'
)

Analytics Queries

-- Top usage by license (last 30 days)
SELECT
l.license_key,
ue.event_type,
SUM(ue.quantity) as total_usage,
COUNT(*) as event_count
FROM usage_events ue
JOIN licenses l ON ue.license_id = l.id
WHERE ue.timestamp >= NOW() - INTERVAL '30 days'
GROUP BY l.license_key, ue.event_type
ORDER BY total_usage DESC
LIMIT 20;

-- Average usage per Enterprise customer
SELECT
event_type,
AVG(daily_usage) as avg_daily_usage,
MAX(daily_usage) as max_daily_usage
FROM (
SELECT
license_id,
event_type,
DATE(period_start) as date,
SUM(quantity) as daily_usage
FROM usage_aggregates
WHERE period_start >= NOW() - INTERVAL '30 days'
GROUP BY license_id, event_type, DATE(period_start)
) daily
GROUP BY event_type;

-- Estimated monthly revenue from usage
SELECT
l.license_key,
SUM(CASE WHEN ua.event_type = 'agent_invocation' THEN ua.quantity * 0.10 ELSE 0 END) as agent_revenue,
SUM(CASE WHEN ua.event_type = 'command_execution' THEN ua.quantity * 0.02 ELSE 0 END) as command_revenue,
SUM(CASE WHEN ua.event_type = 'storage_usage' THEN ua.quantity * 0.05 ELSE 0 END) as storage_revenue,
(
SUM(CASE WHEN ua.event_type = 'agent_invocation' THEN ua.quantity * 0.10 ELSE 0 END) +
SUM(CASE WHEN ua.event_type = 'command_execution' THEN ua.quantity * 0.02 ELSE 0 END) +
SUM(CASE WHEN ua.event_type = 'storage_usage' THEN ua.quantity * 0.05 ELSE 0 END)
) as total_usage_revenue
FROM usage_aggregates ua
JOIN licenses l ON ua.license_id = l.id
WHERE ua.period_start >= DATE_TRUNC('month', NOW())
GROUP BY l.license_key
ORDER BY total_usage_revenue DESC;

  • ADR-015: Usage-Based Metering
  • ADR-013: Stripe Integration for Billing
  • 08-license-renewal-flow.md: Subscription renewal with usage charges

Last Updated: 2025-11-30 Diagram Type: Sequence (Mermaid) Scope: Billing - Usage-based metering (Enterprise)