Skip to main content

Version Control Specification

Document ID: 000.13 Version: 1.0.0 Status: Draft Last Updated: December 19, 2025 Owner: CODITECT Document Management Team


1. Executive Summary

This specification defines the version control system for the CODITECT Document Management System, including semantic versioning, check-in/check-out patterns, document locking, branching, and conflict resolution.

1.1 Key Capabilities

CapabilityDescriptionPriority
Semantic VersioningMAJOR.MINOR.PATCH formatP0
Check-In/Check-OutConcurrency controlP0
Document LockingExclusive/shared locksP0
Version HistoryComplete audit trailP0
BranchingParallel document versionsP1
Conflict ResolutionMerge and diff toolsP1

1.2 Business Impact

  • 90% reduction in version conflicts
  • 70% reduction in data loss from concurrent edits
  • 100% traceability of document changes

2. Semantic Versioning

2.1 Version Format

MAJOR.MINOR.PATCH[-PRERELEASE][+BUILD]

Examples:
1.0.0 - Initial release
1.1.0 - Added new section
1.1.1 - Fixed typo
2.0.0 - Major restructure
2.0.0-beta.1 - Beta version
2.0.0+20251219 - Build metadata

2.2 Version Increment Rules

class SemanticVersion:
"""Semantic versioning for documents."""

def __init__(self, major: int = 1, minor: int = 0, patch: int = 0):
self.major = major
self.minor = minor
self.patch = patch
self.prerelease: Optional[str] = None
self.build_metadata: Optional[str] = None

@classmethod
def parse(cls, version_string: str) -> "SemanticVersion":
"""Parse version string into components."""
# Remove build metadata
if "+" in version_string:
version_string, build = version_string.split("+", 1)
else:
build = None

# Remove prerelease
if "-" in version_string:
version_string, prerelease = version_string.split("-", 1)
else:
prerelease = None

# Parse major.minor.patch
parts = version_string.split(".")
version = cls(
major=int(parts[0]),
minor=int(parts[1]) if len(parts) > 1 else 0,
patch=int(parts[2]) if len(parts) > 2 else 0
)
version.prerelease = prerelease
version.build_metadata = build

return version

def bump_major(self) -> "SemanticVersion":
"""Increment major version (breaking changes)."""
return SemanticVersion(self.major + 1, 0, 0)

def bump_minor(self) -> "SemanticVersion":
"""Increment minor version (new features)."""
return SemanticVersion(self.major, self.minor + 1, 0)

def bump_patch(self) -> "SemanticVersion":
"""Increment patch version (bug fixes)."""
return SemanticVersion(self.major, self.minor, self.patch + 1)

def __str__(self) -> str:
result = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
result += f"-{self.prerelease}"
if self.build_metadata:
result += f"+{self.build_metadata}"
return result

def __lt__(self, other: "SemanticVersion") -> bool:
"""Compare versions for sorting."""
return (self.major, self.minor, self.patch) < \
(other.major, other.minor, other.patch)

2.3 Version Change Categories

Change TypeVersion BumpExamples
MAJORX.0.0Complete restructure, policy reversal, incompatible changes
MINORx.Y.0New sections, expanded content, added appendices
PATCHx.y.ZTypo fixes, formatting, clarifications

2.4 Automatic Version Suggestion

class VersionSuggestionEngine:
"""Suggest appropriate version bump based on changes."""

async def suggest_version(
self,
document_id: UUID,
changes: DocumentChanges
) -> VersionSuggestion:
"""Analyze changes and suggest version bump."""

# Calculate change magnitude
content_diff = await self.calculate_diff(
changes.previous_content,
changes.new_content
)

# Thresholds for version bumps
MAJOR_THRESHOLD = 0.50 # 50%+ content changed
MINOR_THRESHOLD = 0.10 # 10%+ content changed

change_ratio = content_diff.changed_ratio

if change_ratio >= MAJOR_THRESHOLD:
suggested_bump = "major"
reason = f"{change_ratio:.0%} of content changed (major restructure)"
elif change_ratio >= MINOR_THRESHOLD:
suggested_bump = "minor"
reason = f"{change_ratio:.0%} of content changed (significant additions)"
else:
suggested_bump = "patch"
reason = f"{change_ratio:.0%} of content changed (minor corrections)"

current_version = SemanticVersion.parse(changes.current_version)

return VersionSuggestion(
current_version=str(current_version),
suggested_bump=suggested_bump,
suggested_version=str(getattr(current_version, f"bump_{suggested_bump}")()),
reason=reason,
change_summary=content_diff.summary
)

3. Check-In/Check-Out System

3.1 Check-Out Process

class CheckOutService:
"""Manage document check-out operations."""

async def check_out(
self,
document_id: UUID,
user_id: UUID,
lock_type: Literal["exclusive", "shared", "advisory"] = "exclusive",
reason: Optional[str] = None
) -> CheckOutResult:
"""Check out a document for editing."""

document = await self.document_service.get(document_id)

# Check existing locks
existing_lock = await self.get_active_lock(document_id)

if existing_lock:
if existing_lock.lock_type == "exclusive":
raise DocumentLockedError(
f"Document is exclusively locked by {existing_lock.user.email}",
locked_by=existing_lock.user_id,
locked_at=existing_lock.locked_at
)
elif lock_type == "exclusive" and existing_lock.lock_type == "shared":
raise DocumentLockedError(
"Cannot acquire exclusive lock - document has shared locks"
)

# Create lock
lock = DocumentLock(
id=uuid4(),
document_id=document_id,
document_version=document.version,
user_id=user_id,
lock_type=lock_type,
locked_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(hours=24),
reason=reason
)

await self.save_lock(lock)

# Create working copy
working_copy = await self.create_working_copy(document, user_id)

# Log event
await self.audit_service.log(
event_type="document_checked_out",
document_id=document_id,
user_id=user_id,
details={"lock_type": lock_type, "reason": reason}
)

return CheckOutResult(
lock_id=lock.id,
document_id=document_id,
working_copy_id=working_copy.id,
lock_type=lock_type,
expires_at=lock.expires_at
)

async def check_out_with_download(
self,
document_id: UUID,
user_id: UUID,
format: Literal["original", "pdf", "docx"] = "original"
) -> CheckOutWithContentResult:
"""Check out and download document content."""

checkout = await self.check_out(document_id, user_id)
document = await self.document_service.get(document_id)

if format == "original":
content = document.content
mime_type = document.mime_type
elif format == "pdf":
content = await self.export_service.to_pdf(document)
mime_type = "application/pdf"
elif format == "docx":
content = await self.export_service.to_docx(document)
mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"

return CheckOutWithContentResult(
**checkout.__dict__,
content=content,
mime_type=mime_type,
filename=f"{document.name}.{format}"
)

3.2 Check-In Process

class CheckInService:
"""Manage document check-in operations."""

async def check_in(
self,
document_id: UUID,
user_id: UUID,
new_content: bytes,
change_summary: str,
version_bump: Literal["major", "minor", "patch"] = "patch"
) -> CheckInResult:
"""Check in an edited document."""

# Validate lock ownership
lock = await self.get_active_lock(document_id)

if not lock:
raise NoActiveLockError("Document is not checked out")

if lock.user_id != user_id:
raise LockOwnershipError(
f"Document is locked by another user: {lock.user.email}"
)

document = await self.document_service.get(document_id)

# Pre-check-in validations
validations = await self.run_validations(document, new_content)

if not validations.passed:
return CheckInResult(
success=False,
validation_errors=validations.errors
)

# Calculate new version
current_version = SemanticVersion.parse(document.version)
new_version = getattr(current_version, f"bump_{version_bump}")()

# Create version record
version_record = DocumentVersion(
id=uuid4(),
document_id=document_id,
version=str(new_version),
content=new_content,
content_hash=hashlib.sha256(new_content).hexdigest(),
previous_version=document.version,
created_by=user_id,
created_at=datetime.utcnow(),
change_summary=change_summary
)

# Update document
document.content = new_content
document.version = str(new_version)
document.modified_by = user_id
document.modified_at = datetime.utcnow()

# Save atomically
async with self.db.transaction():
await self.save_version_record(version_record)
await self.document_service.save(document)
await self.release_lock(lock.id)
await self.delete_working_copy(document_id, user_id)

# Log event
await self.audit_service.log(
event_type="document_checked_in",
document_id=document_id,
user_id=user_id,
details={
"previous_version": str(current_version),
"new_version": str(new_version),
"change_summary": change_summary
}
)

return CheckInResult(
success=True,
document_id=document_id,
new_version=str(new_version),
version_record_id=version_record.id
)

async def run_validations(
self,
document: Document,
new_content: bytes
) -> ValidationResult:
"""Run pre-check-in validations."""

errors = []

# Virus scan
scan_result = await self.virus_scanner.scan(new_content)
if not scan_result.clean:
errors.append(ValidationError(
code="virus_detected",
message=f"Malware detected: {scan_result.threat_name}"
))

# Format validation
if document.required_format:
format_valid = await self.format_validator.validate(
new_content,
document.required_format
)
if not format_valid:
errors.append(ValidationError(
code="invalid_format",
message=f"Document must be in {document.required_format} format"
))

# Size limit
if len(new_content) > document.max_size_bytes:
errors.append(ValidationError(
code="size_exceeded",
message=f"Document exceeds maximum size of {document.max_size_bytes} bytes"
))

return ValidationResult(
passed=len(errors) == 0,
errors=errors
)

3.3 Lock Management

class LockManagementService:
"""Manage document locks."""

async def extend_lock(
self,
lock_id: UUID,
user_id: UUID,
extension_hours: int = 24
) -> DocumentLock:
"""Extend an existing lock."""

lock = await self.get_lock(lock_id)

if lock.user_id != user_id:
raise LockOwnershipError("Cannot extend another user's lock")

lock.expires_at = datetime.utcnow() + timedelta(hours=extension_hours)
lock.extension_count += 1

await self.save_lock(lock)

return lock

async def force_release_lock(
self,
lock_id: UUID,
admin_user_id: UUID,
reason: str
) -> bool:
"""Admin force release of a lock."""

lock = await self.get_lock(lock_id)

# Validate admin permissions
admin = await self.user_service.get(admin_user_id)
if "admin" not in admin.roles:
raise PermissionDeniedError("Only admins can force release locks")

# Notify original lock holder
await self.notification_service.send(
recipient=lock.user_id,
template="lock_force_released",
data={
"document_name": lock.document.name,
"released_by": admin.email,
"reason": reason
}
)

# Release lock
lock.status = "force_released"
lock.released_at = datetime.utcnow()
lock.released_by = admin_user_id
lock.release_reason = reason

await self.save_lock(lock)

# Log event
await self.audit_service.log(
event_type="lock_force_released",
document_id=lock.document_id,
user_id=admin_user_id,
details={"original_holder": lock.user_id, "reason": reason}
)

return True

async def cleanup_expired_locks(self) -> int:
"""Cleanup expired locks (scheduled job)."""

expired_locks = await self.get_expired_locks()
count = 0

for lock in expired_locks:
# Notify lock holder
await self.notification_service.send(
recipient=lock.user_id,
template="lock_expired",
data={
"document_name": lock.document.name,
"locked_at": lock.locked_at.isoformat()
}
)

# Release lock
lock.status = "expired"
lock.released_at = datetime.utcnow()
await self.save_lock(lock)

count += 1

return count

4. Version History and Rollback

4.1 Version History Service

class VersionHistoryService:
"""Manage document version history."""

async def get_version_history(
self,
document_id: UUID,
page: int = 1,
page_size: int = 20
) -> VersionHistoryResult:
"""Get paginated version history."""

versions = await self.db.fetch(
"""
SELECT v.*, u.email as author_email
FROM document_versions v
JOIN users u ON v.created_by = u.id
WHERE v.document_id = $1
ORDER BY v.created_at DESC
LIMIT $2 OFFSET $3
""",
document_id, page_size, (page - 1) * page_size
)

total = await self.db.fetchval(
"SELECT COUNT(*) FROM document_versions WHERE document_id = $1",
document_id
)

return VersionHistoryResult(
versions=[DocumentVersion(**v) for v in versions],
total=total,
page=page,
page_size=page_size
)

async def get_version(
self,
document_id: UUID,
version: str
) -> DocumentVersion:
"""Get a specific version of a document."""

version_record = await self.db.fetchrow(
"""
SELECT * FROM document_versions
WHERE document_id = $1 AND version = $2
""",
document_id, version
)

if not version_record:
raise VersionNotFoundError(f"Version {version} not found")

return DocumentVersion(**version_record)

async def compare_versions(
self,
document_id: UUID,
version_a: str,
version_b: str
) -> VersionComparison:
"""Compare two versions of a document."""

v_a = await self.get_version(document_id, version_a)
v_b = await self.get_version(document_id, version_b)

# Generate diff
diff = await self.diff_service.generate_diff(
v_a.content,
v_b.content
)

return VersionComparison(
version_a=version_a,
version_b=version_b,
diff=diff,
additions=diff.additions_count,
deletions=diff.deletions_count,
changes=diff.changes_count
)

4.2 Rollback Service

class RollbackService:
"""Handle document rollback operations."""

async def rollback_to_version(
self,
document_id: UUID,
target_version: str,
user_id: UUID,
reason: str
) -> RollbackResult:
"""Rollback document to a previous version."""

document = await self.document_service.get(document_id)
target = await self.version_service.get_version(document_id, target_version)

# Check permissions
if not await self.can_rollback(document, user_id):
raise PermissionDeniedError("User cannot rollback this document")

# Current version becomes history
current_version = SemanticVersion.parse(document.version)
new_version = current_version.bump_minor() # Rollbacks are minor bumps

# Create rollback version record
rollback_record = DocumentVersion(
id=uuid4(),
document_id=document_id,
version=str(new_version),
content=target.content,
content_hash=target.content_hash,
previous_version=document.version,
created_by=user_id,
created_at=datetime.utcnow(),
change_summary=f"Rollback to version {target_version}: {reason}",
is_rollback=True,
rollback_from_version=document.version,
rollback_to_version=target_version
)

# Update document
document.content = target.content
document.version = str(new_version)
document.modified_by = user_id
document.modified_at = datetime.utcnow()

async with self.db.transaction():
await self.save_version_record(rollback_record)
await self.document_service.save(document)

# Log event
await self.audit_service.log(
event_type="document_rollback",
document_id=document_id,
user_id=user_id,
details={
"from_version": str(current_version),
"to_version": target_version,
"new_version": str(new_version),
"reason": reason
}
)

return RollbackResult(
success=True,
document_id=document_id,
new_version=str(new_version),
rolled_back_to=target_version
)

5. Document Branching

5.1 Branch Management

class BranchService:
"""Manage document branches for parallel development."""

async def create_branch(
self,
document_id: UUID,
branch_name: str,
user_id: UUID,
base_version: Optional[str] = None
) -> DocumentBranch:
"""Create a new branch from a document."""

document = await self.document_service.get(document_id)
base = base_version or document.version

# Validate branch name
if not self.is_valid_branch_name(branch_name):
raise InvalidBranchNameError(
"Branch names must be alphanumeric with hyphens/underscores"
)

# Check for existing branch
existing = await self.get_branch(document_id, branch_name)
if existing:
raise BranchExistsError(f"Branch '{branch_name}' already exists")

# Create branch
branch = DocumentBranch(
id=uuid4(),
document_id=document_id,
name=branch_name,
base_version=base,
current_version=base,
created_by=user_id,
created_at=datetime.utcnow(),
status="active"
)

# Copy content to branch
base_content = await self.get_version_content(document_id, base)
branch_version = DocumentVersion(
id=uuid4(),
document_id=document_id,
version=f"{base}-{branch_name}.1",
content=base_content,
created_by=user_id,
created_at=datetime.utcnow(),
change_summary=f"Created branch '{branch_name}' from {base}",
branch_name=branch_name
)

await self.save_branch(branch)
await self.save_version_record(branch_version)

return branch

async def merge_branch(
self,
document_id: UUID,
source_branch: str,
target_branch: str,
user_id: UUID,
strategy: Literal["overwrite", "merge", "manual"] = "merge"
) -> MergeResult:
"""Merge a branch into another."""

source = await self.get_branch(document_id, source_branch)
target = await self.get_branch(document_id, target_branch)

# Get content
source_content = await self.get_branch_content(source)
target_content = await self.get_branch_content(target)

if strategy == "overwrite":
# Source completely replaces target
merged_content = source_content
conflicts = []

elif strategy == "merge":
# Three-way merge
base_content = await self.get_version_content(
document_id,
source.base_version
)

merge_result = await self.three_way_merge(
base_content,
source_content,
target_content
)

merged_content = merge_result.content
conflicts = merge_result.conflicts

if conflicts:
return MergeResult(
success=False,
conflicts=conflicts,
requires_manual_resolution=True
)

else: # manual
return MergeResult(
success=False,
requires_manual_resolution=True,
diff=await self.diff_service.generate_diff(
target_content,
source_content
)
)

# Apply merge
new_version = await self.create_merge_version(
document_id,
target_branch,
merged_content,
user_id,
f"Merged '{source_branch}' into '{target_branch}'"
)

return MergeResult(
success=True,
new_version=new_version,
source_branch=source_branch,
target_branch=target_branch
)

5.2 Conflict Resolution

class ConflictResolutionService:
"""Handle merge conflicts."""

async def three_way_merge(
self,
base: bytes,
source: bytes,
target: bytes
) -> ThreeWayMergeResult:
"""Perform three-way merge."""

# Convert to text for comparison
base_text = base.decode('utf-8')
source_text = source.decode('utf-8')
target_text = target.decode('utf-8')

# Split into lines
base_lines = base_text.splitlines(keepends=True)
source_lines = source_text.splitlines(keepends=True)
target_lines = target_text.splitlines(keepends=True)

# Compute diffs
source_diff = list(difflib.unified_diff(base_lines, source_lines))
target_diff = list(difflib.unified_diff(base_lines, target_lines))

# Detect conflicts
conflicts = []
merged_lines = []

# Simplified merge algorithm
source_changes = self.parse_diff(source_diff)
target_changes = self.parse_diff(target_diff)

for line_num, base_line in enumerate(base_lines):
source_change = source_changes.get(line_num)
target_change = target_changes.get(line_num)

if source_change and target_change:
if source_change == target_change:
# Same change, no conflict
merged_lines.append(source_change)
else:
# Conflict!
conflicts.append(Conflict(
line_number=line_num,
base_content=base_line,
source_content=source_change,
target_content=target_change
))
merged_lines.append(self.create_conflict_marker(
base_line, source_change, target_change
))
elif source_change:
merged_lines.append(source_change)
elif target_change:
merged_lines.append(target_change)
else:
merged_lines.append(base_line)

return ThreeWayMergeResult(
content=''.join(merged_lines).encode('utf-8'),
conflicts=conflicts,
has_conflicts=len(conflicts) > 0
)

def create_conflict_marker(
self,
base: str,
source: str,
target: str
) -> str:
"""Create Git-style conflict markers."""
return f"""<<<<<<< SOURCE
{source}=======
{target}>>>>>>> TARGET
"""

6. API Endpoints

# Version Information
GET /api/v1/documents/{id}/version
GET /api/v1/documents/{id}/versions
GET /api/v1/documents/{id}/versions/{version}
GET /api/v1/documents/{id}/versions/compare?a={v1}&b={v2}

# Check-Out/Check-In
POST /api/v1/documents/{id}/checkout
Body: { "lock_type": "exclusive", "reason": "Updating policy" }

POST /api/v1/documents/{id}/checkout-download
Body: { "lock_type": "exclusive", "format": "docx" }

POST /api/v1/documents/{id}/checkin
Body: { "content_base64": "...", "change_summary": "...", "version_bump": "minor" }

POST /api/v1/documents/{id}/cancel-checkout

# Lock Management
GET /api/v1/documents/{id}/lock
POST /api/v1/documents/{id}/lock/extend
DELETE /api/v1/documents/{id}/lock (admin force release)
GET /api/v1/locks/my-locks

# Rollback
POST /api/v1/documents/{id}/rollback
Body: { "target_version": "1.2.0", "reason": "Reverting incorrect changes" }

# Branches
GET /api/v1/documents/{id}/branches
POST /api/v1/documents/{id}/branches
Body: { "name": "gdpr-updates", "base_version": "2.0.0" }

GET /api/v1/documents/{id}/branches/{name}
DELETE /api/v1/documents/{id}/branches/{name}

POST /api/v1/documents/{id}/branches/{name}/merge
Body: { "target_branch": "main", "strategy": "merge" }

# Conflict Resolution
GET /api/v1/documents/{id}/branches/{name}/conflicts
POST /api/v1/documents/{id}/branches/{name}/resolve
Body: { "resolutions": [...] }

7. Database Schema

-- Document versions
CREATE TABLE document_versions (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
version VARCHAR(50) NOT NULL,
content BYTEA NOT NULL,
content_hash VARCHAR(64) NOT NULL,
previous_version VARCHAR(50),
created_by UUID REFERENCES users(id),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
change_summary TEXT,
branch_name VARCHAR(100),
is_rollback BOOLEAN DEFAULT FALSE,
rollback_from_version VARCHAR(50),
rollback_to_version VARCHAR(50),
UNIQUE(document_id, version)
);

CREATE INDEX idx_versions_document ON document_versions(document_id);
CREATE INDEX idx_versions_created ON document_versions(created_at DESC);

-- Document locks
CREATE TABLE document_locks (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
document_version VARCHAR(50) NOT NULL,
user_id UUID NOT NULL REFERENCES users(id),
lock_type VARCHAR(20) NOT NULL, -- exclusive, shared, advisory
status VARCHAR(20) NOT NULL DEFAULT 'active',
locked_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
extension_count INTEGER DEFAULT 0,
reason TEXT,
released_at TIMESTAMP WITH TIME ZONE,
released_by UUID REFERENCES users(id),
release_reason TEXT
);

CREATE INDEX idx_locks_document ON document_locks(document_id) WHERE status = 'active';
CREATE INDEX idx_locks_user ON document_locks(user_id) WHERE status = 'active';
CREATE INDEX idx_locks_expires ON document_locks(expires_at) WHERE status = 'active';

-- Document branches
CREATE TABLE document_branches (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
name VARCHAR(100) NOT NULL,
base_version VARCHAR(50) NOT NULL,
current_version VARCHAR(50) NOT NULL,
created_by UUID REFERENCES users(id),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
status VARCHAR(20) DEFAULT 'active',
merged_at TIMESTAMP WITH TIME ZONE,
merged_by UUID REFERENCES users(id),
merged_into VARCHAR(100),
UNIQUE(document_id, name)
);

CREATE INDEX idx_branches_document ON document_branches(document_id);

-- Working copies
CREATE TABLE working_copies (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
user_id UUID NOT NULL REFERENCES users(id),
lock_id UUID REFERENCES document_locks(id),
content BYTEA,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_saved_at TIMESTAMP WITH TIME ZONE,
UNIQUE(document_id, user_id)
);

8. Implementation Checklist

Phase 1: Core Version Control (Weeks 1-2)

  • Implement semantic versioning
  • Build version history storage
  • Create version comparison/diff service
  • Implement version API endpoints

Phase 2: Check-In/Check-Out (Weeks 3-4)

  • Implement document locking
  • Build check-out workflow
  • Create check-in with validation
  • Implement lock management (extend, force release)

Phase 3: Advanced Features (Weeks 5-6)

  • Build rollback functionality
  • Implement document branching
  • Create three-way merge
  • Build conflict resolution UI

Document Version: 1.0.0 Effective Date: Upon Approval Review Date: Quarterly Owner: Engineering Team