Version Control Specification
Document ID: 000.13 Version: 1.0.0 Status: Draft Last Updated: December 19, 2025 Owner: CODITECT Document Management Team
1. Executive Summary
This specification defines the version control system for the CODITECT Document Management System, including semantic versioning, check-in/check-out patterns, document locking, branching, and conflict resolution.
1.1 Key Capabilities
| Capability | Description | Priority |
|---|---|---|
| Semantic Versioning | MAJOR.MINOR.PATCH format | P0 |
| Check-In/Check-Out | Concurrency control | P0 |
| Document Locking | Exclusive/shared locks | P0 |
| Version History | Complete audit trail | P0 |
| Branching | Parallel document versions | P1 |
| Conflict Resolution | Merge and diff tools | P1 |
1.2 Business Impact
- 90% reduction in version conflicts
- 70% reduction in data loss from concurrent edits
- 100% traceability of document changes
2. Semantic Versioning
2.1 Version Format
MAJOR.MINOR.PATCH[-PRERELEASE][+BUILD]
Examples:
1.0.0 - Initial release
1.1.0 - Added new section
1.1.1 - Fixed typo
2.0.0 - Major restructure
2.0.0-beta.1 - Beta version
2.0.0+20251219 - Build metadata
2.2 Version Increment Rules
class SemanticVersion:
"""Semantic versioning for documents."""
def __init__(self, major: int = 1, minor: int = 0, patch: int = 0):
self.major = major
self.minor = minor
self.patch = patch
self.prerelease: Optional[str] = None
self.build_metadata: Optional[str] = None
@classmethod
def parse(cls, version_string: str) -> "SemanticVersion":
"""Parse version string into components."""
# Remove build metadata
if "+" in version_string:
version_string, build = version_string.split("+", 1)
else:
build = None
# Remove prerelease
if "-" in version_string:
version_string, prerelease = version_string.split("-", 1)
else:
prerelease = None
# Parse major.minor.patch
parts = version_string.split(".")
version = cls(
major=int(parts[0]),
minor=int(parts[1]) if len(parts) > 1 else 0,
patch=int(parts[2]) if len(parts) > 2 else 0
)
version.prerelease = prerelease
version.build_metadata = build
return version
def bump_major(self) -> "SemanticVersion":
"""Increment major version (breaking changes)."""
return SemanticVersion(self.major + 1, 0, 0)
def bump_minor(self) -> "SemanticVersion":
"""Increment minor version (new features)."""
return SemanticVersion(self.major, self.minor + 1, 0)
def bump_patch(self) -> "SemanticVersion":
"""Increment patch version (bug fixes)."""
return SemanticVersion(self.major, self.minor, self.patch + 1)
def __str__(self) -> str:
result = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
result += f"-{self.prerelease}"
if self.build_metadata:
result += f"+{self.build_metadata}"
return result
def __lt__(self, other: "SemanticVersion") -> bool:
"""Compare versions for sorting."""
return (self.major, self.minor, self.patch) < \
(other.major, other.minor, other.patch)
2.3 Version Change Categories
| Change Type | Version Bump | Examples |
|---|---|---|
| MAJOR | X.0.0 | Complete restructure, policy reversal, incompatible changes |
| MINOR | x.Y.0 | New sections, expanded content, added appendices |
| PATCH | x.y.Z | Typo fixes, formatting, clarifications |
2.4 Automatic Version Suggestion
class VersionSuggestionEngine:
"""Suggest appropriate version bump based on changes."""
async def suggest_version(
self,
document_id: UUID,
changes: DocumentChanges
) -> VersionSuggestion:
"""Analyze changes and suggest version bump."""
# Calculate change magnitude
content_diff = await self.calculate_diff(
changes.previous_content,
changes.new_content
)
# Thresholds for version bumps
MAJOR_THRESHOLD = 0.50 # 50%+ content changed
MINOR_THRESHOLD = 0.10 # 10%+ content changed
change_ratio = content_diff.changed_ratio
if change_ratio >= MAJOR_THRESHOLD:
suggested_bump = "major"
reason = f"{change_ratio:.0%} of content changed (major restructure)"
elif change_ratio >= MINOR_THRESHOLD:
suggested_bump = "minor"
reason = f"{change_ratio:.0%} of content changed (significant additions)"
else:
suggested_bump = "patch"
reason = f"{change_ratio:.0%} of content changed (minor corrections)"
current_version = SemanticVersion.parse(changes.current_version)
return VersionSuggestion(
current_version=str(current_version),
suggested_bump=suggested_bump,
suggested_version=str(getattr(current_version, f"bump_{suggested_bump}")()),
reason=reason,
change_summary=content_diff.summary
)
3. Check-In/Check-Out System
3.1 Check-Out Process
class CheckOutService:
"""Manage document check-out operations."""
async def check_out(
self,
document_id: UUID,
user_id: UUID,
lock_type: Literal["exclusive", "shared", "advisory"] = "exclusive",
reason: Optional[str] = None
) -> CheckOutResult:
"""Check out a document for editing."""
document = await self.document_service.get(document_id)
# Check existing locks
existing_lock = await self.get_active_lock(document_id)
if existing_lock:
if existing_lock.lock_type == "exclusive":
raise DocumentLockedError(
f"Document is exclusively locked by {existing_lock.user.email}",
locked_by=existing_lock.user_id,
locked_at=existing_lock.locked_at
)
elif lock_type == "exclusive" and existing_lock.lock_type == "shared":
raise DocumentLockedError(
"Cannot acquire exclusive lock - document has shared locks"
)
# Create lock
lock = DocumentLock(
id=uuid4(),
document_id=document_id,
document_version=document.version,
user_id=user_id,
lock_type=lock_type,
locked_at=datetime.utcnow(),
expires_at=datetime.utcnow() + timedelta(hours=24),
reason=reason
)
await self.save_lock(lock)
# Create working copy
working_copy = await self.create_working_copy(document, user_id)
# Log event
await self.audit_service.log(
event_type="document_checked_out",
document_id=document_id,
user_id=user_id,
details={"lock_type": lock_type, "reason": reason}
)
return CheckOutResult(
lock_id=lock.id,
document_id=document_id,
working_copy_id=working_copy.id,
lock_type=lock_type,
expires_at=lock.expires_at
)
async def check_out_with_download(
self,
document_id: UUID,
user_id: UUID,
format: Literal["original", "pdf", "docx"] = "original"
) -> CheckOutWithContentResult:
"""Check out and download document content."""
checkout = await self.check_out(document_id, user_id)
document = await self.document_service.get(document_id)
if format == "original":
content = document.content
mime_type = document.mime_type
elif format == "pdf":
content = await self.export_service.to_pdf(document)
mime_type = "application/pdf"
elif format == "docx":
content = await self.export_service.to_docx(document)
mime_type = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
return CheckOutWithContentResult(
**checkout.__dict__,
content=content,
mime_type=mime_type,
filename=f"{document.name}.{format}"
)
3.2 Check-In Process
class CheckInService:
"""Manage document check-in operations."""
async def check_in(
self,
document_id: UUID,
user_id: UUID,
new_content: bytes,
change_summary: str,
version_bump: Literal["major", "minor", "patch"] = "patch"
) -> CheckInResult:
"""Check in an edited document."""
# Validate lock ownership
lock = await self.get_active_lock(document_id)
if not lock:
raise NoActiveLockError("Document is not checked out")
if lock.user_id != user_id:
raise LockOwnershipError(
f"Document is locked by another user: {lock.user.email}"
)
document = await self.document_service.get(document_id)
# Pre-check-in validations
validations = await self.run_validations(document, new_content)
if not validations.passed:
return CheckInResult(
success=False,
validation_errors=validations.errors
)
# Calculate new version
current_version = SemanticVersion.parse(document.version)
new_version = getattr(current_version, f"bump_{version_bump}")()
# Create version record
version_record = DocumentVersion(
id=uuid4(),
document_id=document_id,
version=str(new_version),
content=new_content,
content_hash=hashlib.sha256(new_content).hexdigest(),
previous_version=document.version,
created_by=user_id,
created_at=datetime.utcnow(),
change_summary=change_summary
)
# Update document
document.content = new_content
document.version = str(new_version)
document.modified_by = user_id
document.modified_at = datetime.utcnow()
# Save atomically
async with self.db.transaction():
await self.save_version_record(version_record)
await self.document_service.save(document)
await self.release_lock(lock.id)
await self.delete_working_copy(document_id, user_id)
# Log event
await self.audit_service.log(
event_type="document_checked_in",
document_id=document_id,
user_id=user_id,
details={
"previous_version": str(current_version),
"new_version": str(new_version),
"change_summary": change_summary
}
)
return CheckInResult(
success=True,
document_id=document_id,
new_version=str(new_version),
version_record_id=version_record.id
)
async def run_validations(
self,
document: Document,
new_content: bytes
) -> ValidationResult:
"""Run pre-check-in validations."""
errors = []
# Virus scan
scan_result = await self.virus_scanner.scan(new_content)
if not scan_result.clean:
errors.append(ValidationError(
code="virus_detected",
message=f"Malware detected: {scan_result.threat_name}"
))
# Format validation
if document.required_format:
format_valid = await self.format_validator.validate(
new_content,
document.required_format
)
if not format_valid:
errors.append(ValidationError(
code="invalid_format",
message=f"Document must be in {document.required_format} format"
))
# Size limit
if len(new_content) > document.max_size_bytes:
errors.append(ValidationError(
code="size_exceeded",
message=f"Document exceeds maximum size of {document.max_size_bytes} bytes"
))
return ValidationResult(
passed=len(errors) == 0,
errors=errors
)
3.3 Lock Management
class LockManagementService:
"""Manage document locks."""
async def extend_lock(
self,
lock_id: UUID,
user_id: UUID,
extension_hours: int = 24
) -> DocumentLock:
"""Extend an existing lock."""
lock = await self.get_lock(lock_id)
if lock.user_id != user_id:
raise LockOwnershipError("Cannot extend another user's lock")
lock.expires_at = datetime.utcnow() + timedelta(hours=extension_hours)
lock.extension_count += 1
await self.save_lock(lock)
return lock
async def force_release_lock(
self,
lock_id: UUID,
admin_user_id: UUID,
reason: str
) -> bool:
"""Admin force release of a lock."""
lock = await self.get_lock(lock_id)
# Validate admin permissions
admin = await self.user_service.get(admin_user_id)
if "admin" not in admin.roles:
raise PermissionDeniedError("Only admins can force release locks")
# Notify original lock holder
await self.notification_service.send(
recipient=lock.user_id,
template="lock_force_released",
data={
"document_name": lock.document.name,
"released_by": admin.email,
"reason": reason
}
)
# Release lock
lock.status = "force_released"
lock.released_at = datetime.utcnow()
lock.released_by = admin_user_id
lock.release_reason = reason
await self.save_lock(lock)
# Log event
await self.audit_service.log(
event_type="lock_force_released",
document_id=lock.document_id,
user_id=admin_user_id,
details={"original_holder": lock.user_id, "reason": reason}
)
return True
async def cleanup_expired_locks(self) -> int:
"""Cleanup expired locks (scheduled job)."""
expired_locks = await self.get_expired_locks()
count = 0
for lock in expired_locks:
# Notify lock holder
await self.notification_service.send(
recipient=lock.user_id,
template="lock_expired",
data={
"document_name": lock.document.name,
"locked_at": lock.locked_at.isoformat()
}
)
# Release lock
lock.status = "expired"
lock.released_at = datetime.utcnow()
await self.save_lock(lock)
count += 1
return count
4. Version History and Rollback
4.1 Version History Service
class VersionHistoryService:
"""Manage document version history."""
async def get_version_history(
self,
document_id: UUID,
page: int = 1,
page_size: int = 20
) -> VersionHistoryResult:
"""Get paginated version history."""
versions = await self.db.fetch(
"""
SELECT v.*, u.email as author_email
FROM document_versions v
JOIN users u ON v.created_by = u.id
WHERE v.document_id = $1
ORDER BY v.created_at DESC
LIMIT $2 OFFSET $3
""",
document_id, page_size, (page - 1) * page_size
)
total = await self.db.fetchval(
"SELECT COUNT(*) FROM document_versions WHERE document_id = $1",
document_id
)
return VersionHistoryResult(
versions=[DocumentVersion(**v) for v in versions],
total=total,
page=page,
page_size=page_size
)
async def get_version(
self,
document_id: UUID,
version: str
) -> DocumentVersion:
"""Get a specific version of a document."""
version_record = await self.db.fetchrow(
"""
SELECT * FROM document_versions
WHERE document_id = $1 AND version = $2
""",
document_id, version
)
if not version_record:
raise VersionNotFoundError(f"Version {version} not found")
return DocumentVersion(**version_record)
async def compare_versions(
self,
document_id: UUID,
version_a: str,
version_b: str
) -> VersionComparison:
"""Compare two versions of a document."""
v_a = await self.get_version(document_id, version_a)
v_b = await self.get_version(document_id, version_b)
# Generate diff
diff = await self.diff_service.generate_diff(
v_a.content,
v_b.content
)
return VersionComparison(
version_a=version_a,
version_b=version_b,
diff=diff,
additions=diff.additions_count,
deletions=diff.deletions_count,
changes=diff.changes_count
)
4.2 Rollback Service
class RollbackService:
"""Handle document rollback operations."""
async def rollback_to_version(
self,
document_id: UUID,
target_version: str,
user_id: UUID,
reason: str
) -> RollbackResult:
"""Rollback document to a previous version."""
document = await self.document_service.get(document_id)
target = await self.version_service.get_version(document_id, target_version)
# Check permissions
if not await self.can_rollback(document, user_id):
raise PermissionDeniedError("User cannot rollback this document")
# Current version becomes history
current_version = SemanticVersion.parse(document.version)
new_version = current_version.bump_minor() # Rollbacks are minor bumps
# Create rollback version record
rollback_record = DocumentVersion(
id=uuid4(),
document_id=document_id,
version=str(new_version),
content=target.content,
content_hash=target.content_hash,
previous_version=document.version,
created_by=user_id,
created_at=datetime.utcnow(),
change_summary=f"Rollback to version {target_version}: {reason}",
is_rollback=True,
rollback_from_version=document.version,
rollback_to_version=target_version
)
# Update document
document.content = target.content
document.version = str(new_version)
document.modified_by = user_id
document.modified_at = datetime.utcnow()
async with self.db.transaction():
await self.save_version_record(rollback_record)
await self.document_service.save(document)
# Log event
await self.audit_service.log(
event_type="document_rollback",
document_id=document_id,
user_id=user_id,
details={
"from_version": str(current_version),
"to_version": target_version,
"new_version": str(new_version),
"reason": reason
}
)
return RollbackResult(
success=True,
document_id=document_id,
new_version=str(new_version),
rolled_back_to=target_version
)
5. Document Branching
5.1 Branch Management
class BranchService:
"""Manage document branches for parallel development."""
async def create_branch(
self,
document_id: UUID,
branch_name: str,
user_id: UUID,
base_version: Optional[str] = None
) -> DocumentBranch:
"""Create a new branch from a document."""
document = await self.document_service.get(document_id)
base = base_version or document.version
# Validate branch name
if not self.is_valid_branch_name(branch_name):
raise InvalidBranchNameError(
"Branch names must be alphanumeric with hyphens/underscores"
)
# Check for existing branch
existing = await self.get_branch(document_id, branch_name)
if existing:
raise BranchExistsError(f"Branch '{branch_name}' already exists")
# Create branch
branch = DocumentBranch(
id=uuid4(),
document_id=document_id,
name=branch_name,
base_version=base,
current_version=base,
created_by=user_id,
created_at=datetime.utcnow(),
status="active"
)
# Copy content to branch
base_content = await self.get_version_content(document_id, base)
branch_version = DocumentVersion(
id=uuid4(),
document_id=document_id,
version=f"{base}-{branch_name}.1",
content=base_content,
created_by=user_id,
created_at=datetime.utcnow(),
change_summary=f"Created branch '{branch_name}' from {base}",
branch_name=branch_name
)
await self.save_branch(branch)
await self.save_version_record(branch_version)
return branch
async def merge_branch(
self,
document_id: UUID,
source_branch: str,
target_branch: str,
user_id: UUID,
strategy: Literal["overwrite", "merge", "manual"] = "merge"
) -> MergeResult:
"""Merge a branch into another."""
source = await self.get_branch(document_id, source_branch)
target = await self.get_branch(document_id, target_branch)
# Get content
source_content = await self.get_branch_content(source)
target_content = await self.get_branch_content(target)
if strategy == "overwrite":
# Source completely replaces target
merged_content = source_content
conflicts = []
elif strategy == "merge":
# Three-way merge
base_content = await self.get_version_content(
document_id,
source.base_version
)
merge_result = await self.three_way_merge(
base_content,
source_content,
target_content
)
merged_content = merge_result.content
conflicts = merge_result.conflicts
if conflicts:
return MergeResult(
success=False,
conflicts=conflicts,
requires_manual_resolution=True
)
else: # manual
return MergeResult(
success=False,
requires_manual_resolution=True,
diff=await self.diff_service.generate_diff(
target_content,
source_content
)
)
# Apply merge
new_version = await self.create_merge_version(
document_id,
target_branch,
merged_content,
user_id,
f"Merged '{source_branch}' into '{target_branch}'"
)
return MergeResult(
success=True,
new_version=new_version,
source_branch=source_branch,
target_branch=target_branch
)
5.2 Conflict Resolution
class ConflictResolutionService:
"""Handle merge conflicts."""
async def three_way_merge(
self,
base: bytes,
source: bytes,
target: bytes
) -> ThreeWayMergeResult:
"""Perform three-way merge."""
# Convert to text for comparison
base_text = base.decode('utf-8')
source_text = source.decode('utf-8')
target_text = target.decode('utf-8')
# Split into lines
base_lines = base_text.splitlines(keepends=True)
source_lines = source_text.splitlines(keepends=True)
target_lines = target_text.splitlines(keepends=True)
# Compute diffs
source_diff = list(difflib.unified_diff(base_lines, source_lines))
target_diff = list(difflib.unified_diff(base_lines, target_lines))
# Detect conflicts
conflicts = []
merged_lines = []
# Simplified merge algorithm
source_changes = self.parse_diff(source_diff)
target_changes = self.parse_diff(target_diff)
for line_num, base_line in enumerate(base_lines):
source_change = source_changes.get(line_num)
target_change = target_changes.get(line_num)
if source_change and target_change:
if source_change == target_change:
# Same change, no conflict
merged_lines.append(source_change)
else:
# Conflict!
conflicts.append(Conflict(
line_number=line_num,
base_content=base_line,
source_content=source_change,
target_content=target_change
))
merged_lines.append(self.create_conflict_marker(
base_line, source_change, target_change
))
elif source_change:
merged_lines.append(source_change)
elif target_change:
merged_lines.append(target_change)
else:
merged_lines.append(base_line)
return ThreeWayMergeResult(
content=''.join(merged_lines).encode('utf-8'),
conflicts=conflicts,
has_conflicts=len(conflicts) > 0
)
def create_conflict_marker(
self,
base: str,
source: str,
target: str
) -> str:
"""Create Git-style conflict markers."""
return f"""<<<<<<< SOURCE
{source}=======
{target}>>>>>>> TARGET
"""
6. API Endpoints
# Version Information
GET /api/v1/documents/{id}/version
GET /api/v1/documents/{id}/versions
GET /api/v1/documents/{id}/versions/{version}
GET /api/v1/documents/{id}/versions/compare?a={v1}&b={v2}
# Check-Out/Check-In
POST /api/v1/documents/{id}/checkout
Body: { "lock_type": "exclusive", "reason": "Updating policy" }
POST /api/v1/documents/{id}/checkout-download
Body: { "lock_type": "exclusive", "format": "docx" }
POST /api/v1/documents/{id}/checkin
Body: { "content_base64": "...", "change_summary": "...", "version_bump": "minor" }
POST /api/v1/documents/{id}/cancel-checkout
# Lock Management
GET /api/v1/documents/{id}/lock
POST /api/v1/documents/{id}/lock/extend
DELETE /api/v1/documents/{id}/lock (admin force release)
GET /api/v1/locks/my-locks
# Rollback
POST /api/v1/documents/{id}/rollback
Body: { "target_version": "1.2.0", "reason": "Reverting incorrect changes" }
# Branches
GET /api/v1/documents/{id}/branches
POST /api/v1/documents/{id}/branches
Body: { "name": "gdpr-updates", "base_version": "2.0.0" }
GET /api/v1/documents/{id}/branches/{name}
DELETE /api/v1/documents/{id}/branches/{name}
POST /api/v1/documents/{id}/branches/{name}/merge
Body: { "target_branch": "main", "strategy": "merge" }
# Conflict Resolution
GET /api/v1/documents/{id}/branches/{name}/conflicts
POST /api/v1/documents/{id}/branches/{name}/resolve
Body: { "resolutions": [...] }
7. Database Schema
-- Document versions
CREATE TABLE document_versions (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
version VARCHAR(50) NOT NULL,
content BYTEA NOT NULL,
content_hash VARCHAR(64) NOT NULL,
previous_version VARCHAR(50),
created_by UUID REFERENCES users(id),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
change_summary TEXT,
branch_name VARCHAR(100),
is_rollback BOOLEAN DEFAULT FALSE,
rollback_from_version VARCHAR(50),
rollback_to_version VARCHAR(50),
UNIQUE(document_id, version)
);
CREATE INDEX idx_versions_document ON document_versions(document_id);
CREATE INDEX idx_versions_created ON document_versions(created_at DESC);
-- Document locks
CREATE TABLE document_locks (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
document_version VARCHAR(50) NOT NULL,
user_id UUID NOT NULL REFERENCES users(id),
lock_type VARCHAR(20) NOT NULL, -- exclusive, shared, advisory
status VARCHAR(20) NOT NULL DEFAULT 'active',
locked_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
extension_count INTEGER DEFAULT 0,
reason TEXT,
released_at TIMESTAMP WITH TIME ZONE,
released_by UUID REFERENCES users(id),
release_reason TEXT
);
CREATE INDEX idx_locks_document ON document_locks(document_id) WHERE status = 'active';
CREATE INDEX idx_locks_user ON document_locks(user_id) WHERE status = 'active';
CREATE INDEX idx_locks_expires ON document_locks(expires_at) WHERE status = 'active';
-- Document branches
CREATE TABLE document_branches (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
name VARCHAR(100) NOT NULL,
base_version VARCHAR(50) NOT NULL,
current_version VARCHAR(50) NOT NULL,
created_by UUID REFERENCES users(id),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
status VARCHAR(20) DEFAULT 'active',
merged_at TIMESTAMP WITH TIME ZONE,
merged_by UUID REFERENCES users(id),
merged_into VARCHAR(100),
UNIQUE(document_id, name)
);
CREATE INDEX idx_branches_document ON document_branches(document_id);
-- Working copies
CREATE TABLE working_copies (
id UUID PRIMARY KEY,
document_id UUID NOT NULL REFERENCES documents(id),
user_id UUID NOT NULL REFERENCES users(id),
lock_id UUID REFERENCES document_locks(id),
content BYTEA,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_saved_at TIMESTAMP WITH TIME ZONE,
UNIQUE(document_id, user_id)
);
8. Implementation Checklist
Phase 1: Core Version Control (Weeks 1-2)
- Implement semantic versioning
- Build version history storage
- Create version comparison/diff service
- Implement version API endpoints
Phase 2: Check-In/Check-Out (Weeks 3-4)
- Implement document locking
- Build check-out workflow
- Create check-in with validation
- Implement lock management (extend, force release)
Phase 3: Advanced Features (Weeks 5-6)
- Build rollback functionality
- Implement document branching
- Create three-way merge
- Build conflict resolution UI
Document Version: 1.0.0 Effective Date: Upon Approval Review Date: Quarterly Owner: Engineering Team