Skip to main content

Digital Signature Integration Specification

Document ID: CODITECT-DMS-SPEC-014 Version: 1.0.0 Status: Draft Created: December 19, 2025 Last Updated: December 19, 2025 Author: CODITECT Architecture Team Classification: Internal - Technical Specification


Executive Summary

This specification defines the comprehensive digital signature integration for the CODITECT Document Management System. It covers electronic signature standards (eIDAS, ESIGN, UETA), third-party provider integration (DocuSign, Adobe Sign), multi-signer workflows, certificate management, and regulatory compliance including FDA 21 CFR Part 11.

Key Objectives:

  • Legal validity across US, EU, and international jurisdictions
  • Seamless integration with DocuSign and Adobe Sign APIs
  • Multi-signer workflows with routing and delegation
  • Complete audit trail for regulatory compliance
  • FDA 21 CFR Part 11 compliant electronic signatures for life sciences

1. Electronic Signature Standards

1.1 Regulatory Framework

1.1.1 eIDAS (EU Regulation 910/2014)

Three Signature Levels:

LevelLegal EffectRequirements
Simple Electronic Signature (SES)Admissible as evidenceElectronic data attached to/associated with other data
Advanced Electronic Signature (AES)Presumption of integrityUniquely linked to signatory, capable of identifying signatory, created under sole control
Qualified Electronic Signature (QES)Equivalent to handwrittenCreated by QSCD, based on qualified certificate from trust service provider
# eIDAS Signature Level Configuration
from enum import Enum
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime

class EidasSignatureLevel(Enum):
SES = "simple_electronic_signature"
AES = "advanced_electronic_signature"
QES = "qualified_electronic_signature"

@dataclass
class EidasRequirements:
level: EidasSignatureLevel
requires_identity_verification: bool
requires_qualified_certificate: bool
requires_qscd: bool # Qualified Signature Creation Device
legal_effect: str

@classmethod
def get_requirements(cls, level: EidasSignatureLevel) -> "EidasRequirements":
requirements = {
EidasSignatureLevel.SES: cls(
level=level,
requires_identity_verification=False,
requires_qualified_certificate=False,
requires_qscd=False,
legal_effect="admissible_as_evidence"
),
EidasSignatureLevel.AES: cls(
level=level,
requires_identity_verification=True,
requires_qualified_certificate=False,
requires_qscd=False,
legal_effect="presumption_of_integrity"
),
EidasSignatureLevel.QES: cls(
level=level,
requires_identity_verification=True,
requires_qualified_certificate=True,
requires_qscd=True,
legal_effect="equivalent_to_handwritten"
),
}
return requirements[level]

1.1.2 ESIGN Act (US Federal)

Electronic Signatures in Global and National Commerce Act (2000):

@dataclass
class EsignCompliance:
"""US ESIGN Act compliance requirements."""

# Consumer consent requirements
consumer_consent_required: bool = True
consent_must_be_affirmative: bool = True
consent_withdrawal_allowed: bool = True

# Record retention
electronic_records_valid: bool = True
records_must_be_accessible: bool = True
records_must_be_accurate: bool = True

# Disclosure requirements
hardware_software_requirements_disclosed: bool = True
right_to_paper_copy_disclosed: bool = True

def validate_transaction(self, transaction: dict) -> bool:
"""Validate transaction meets ESIGN requirements."""
checks = [
transaction.get("consumer_consent_obtained"),
transaction.get("consent_affirmative"),
transaction.get("disclosures_provided"),
transaction.get("record_accessible"),
]
return all(checks)

1.1.3 UETA (State-Level US)

Uniform Electronic Transactions Act adoption by state:

class UETACompliance:
"""UETA compliance by jurisdiction."""

# States with full UETA adoption
FULL_ADOPTION = [
"AL", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FL", "GA",
"HI", "ID", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA",
"MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM",
"NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN",
"TX", "UT", "VT", "VA", "WV", "WI", "WY"
]

# States with modified UETA
MODIFIED_ADOPTION = ["IL", "NY", "WA"]

@classmethod
def get_requirements(cls, state: str) -> dict:
"""Get state-specific UETA requirements."""
if state in cls.FULL_ADOPTION:
return {
"electronic_signature_valid": True,
"electronic_record_valid": True,
"attribution_required": True,
"intent_to_sign_required": True,
}
elif state in cls.MODIFIED_ADOPTION:
return {
"electronic_signature_valid": True,
"electronic_record_valid": True,
"attribution_required": True,
"intent_to_sign_required": True,
"additional_requirements": cls._get_state_modifications(state)
}
return {"electronic_signature_valid": False}

1.2 FDA 21 CFR Part 11 Electronic Signatures

Life Sciences / Pharmaceutical Compliance:

from enum import Enum
from typing import List, Optional
from datetime import datetime
import hashlib

class Part11SignatureType(Enum):
IDENTIFICATION = "identification" # Username component
AUTHENTICATION = "authentication" # Password/biometric component

class Part11SignatureMeaning(Enum):
APPROVAL = "approval"
REVIEW = "review"
RESPONSIBILITY = "responsibility"
AUTHORSHIP = "authorship"

@dataclass
class Part11ElectronicSignature:
"""FDA 21 CFR Part 11 compliant electronic signature."""

# Required components (§11.50)
signer_printed_name: str
signature_date: datetime
signature_time: datetime
signature_meaning: Part11SignatureMeaning

# Identification components (§11.100)
identification_code: str # Unique to individual
password_used: bool = True
biometric_used: bool = False

# Non-repudiation (§11.70)
signature_hash: str = ""
document_hash: str = ""

# Linkage (§11.70)
linked_to_record: bool = True

def __post_init__(self):
self.signature_hash = self._generate_signature_hash()

def _generate_signature_hash(self) -> str:
"""Generate hash linking signature to record content."""
signature_data = f"{self.signer_printed_name}|{self.signature_date}|{self.signature_meaning.value}"
return hashlib.sha256(signature_data.encode()).hexdigest()

def validate_part11_compliance(self) -> List[str]:
"""Validate signature meets Part 11 requirements."""
violations = []

# §11.50 - Signature manifestations
if not self.signer_printed_name:
violations.append("Missing printed name of signer")
if not self.signature_date:
violations.append("Missing signature date")
if not self.signature_time:
violations.append("Missing signature time")
if not self.signature_meaning:
violations.append("Missing signature meaning")

# §11.100 - Two distinct components
if not (self.password_used or self.biometric_used):
violations.append("Missing authentication component (password or biometric)")

# §11.70 - Signature/record linking
if not self.linked_to_record:
violations.append("Signature not linked to electronic record")

return violations

class Part11SignatureService:
"""Service for managing Part 11 compliant signatures."""

async def create_signature(
self,
document_id: str,
user: "User",
meaning: Part11SignatureMeaning,
password: str,
biometric_data: Optional[bytes] = None
) -> Part11ElectronicSignature:
"""Create Part 11 compliant electronic signature."""

# Verify user identity (§11.100)
if not await self._verify_password(user, password):
raise AuthenticationError("Invalid password")

if biometric_data and not await self._verify_biometric(user, biometric_data):
raise AuthenticationError("Invalid biometric")

# Get document hash for linking
document = await self._get_document(document_id)
document_hash = self._calculate_document_hash(document)

signature = Part11ElectronicSignature(
signer_printed_name=f"{user.first_name} {user.last_name}",
signature_date=datetime.utcnow().date(),
signature_time=datetime.utcnow().time(),
signature_meaning=meaning,
identification_code=user.unique_id,
password_used=True,
biometric_used=biometric_data is not None,
document_hash=document_hash,
)

# Validate compliance
violations = signature.validate_part11_compliance()
if violations:
raise Part11ViolationError(violations)

# Store signature with audit trail
await self._store_signature(document_id, signature)
await self._create_audit_record(document_id, signature, user)

return signature

def _calculate_document_hash(self, document: bytes) -> str:
"""Calculate SHA-256 hash of document content."""
return hashlib.sha256(document).hexdigest()

2. Signature Provider Integration

2.1 Provider Abstraction Layer

from abc import ABC, abstractmethod
from typing import List, Dict, Optional, AsyncIterator
from dataclasses import dataclass
from enum import Enum

class SignatureProvider(Enum):
DOCUSIGN = "docusign"
ADOBE_SIGN = "adobe_sign"
INTERNAL = "internal"

@dataclass
class SignatureRequest:
"""Request for document signature."""
document_id: str
document_name: str
document_content: bytes
signers: List["Signer"]
message: str
subject: str
expiration_days: int = 30
reminder_frequency_days: int = 3
workflow_type: str = "sequential" # sequential, parallel, custom

@dataclass
class Signer:
"""Signature recipient."""
email: str
name: str
role: str
routing_order: int
authentication_method: str = "email" # email, sms, knowledge_based
signature_tabs: List["SignatureTab"] = None

@dataclass
class SignatureTab:
"""Location for signature on document."""
page_number: int
x_position: int
y_position: int
tab_type: str = "signature" # signature, initials, date_signed, text
required: bool = True

@dataclass
class SignatureStatus:
"""Current status of signature request."""
envelope_id: str
status: str
signers_status: List[Dict]
created_at: datetime
updated_at: datetime
completed_at: Optional[datetime] = None

class SignatureProviderInterface(ABC):
"""Abstract interface for signature providers."""

@abstractmethod
async def create_envelope(self, request: SignatureRequest) -> str:
"""Create signature envelope and return envelope ID."""
pass

@abstractmethod
async def get_envelope_status(self, envelope_id: str) -> SignatureStatus:
"""Get current status of signature envelope."""
pass

@abstractmethod
async def void_envelope(self, envelope_id: str, reason: str) -> bool:
"""Void/cancel signature envelope."""
pass

@abstractmethod
async def download_signed_document(self, envelope_id: str) -> bytes:
"""Download completed signed document."""
pass

@abstractmethod
async def resend_envelope(self, envelope_id: str, signer_email: str) -> bool:
"""Resend signature request to specific signer."""
pass

@abstractmethod
async def get_audit_trail(self, envelope_id: str) -> List[Dict]:
"""Get complete audit trail for envelope."""
pass

2.2 DocuSign Integration

import aiohttp
from typing import List, Dict, Optional
import base64
import json

class DocuSignService(SignatureProviderInterface):
"""DocuSign eSignature API integration."""

BASE_URL = "https://na4.docusign.net/restapi/v2.1"
AUTH_URL = "https://account.docusign.com/oauth"

def __init__(
self,
integration_key: str,
user_id: str,
account_id: str,
private_key: str,
environment: str = "production" # production, demo
):
self.integration_key = integration_key
self.user_id = user_id
self.account_id = account_id
self.private_key = private_key
self.environment = environment
self._access_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None

async def _get_access_token(self) -> str:
"""Get JWT access token using server-to-server auth."""
if self._access_token and self._token_expires_at > datetime.utcnow():
return self._access_token

import jwt

# Create JWT assertion
now = datetime.utcnow()
claims = {
"iss": self.integration_key,
"sub": self.user_id,
"aud": "account.docusign.com",
"iat": int(now.timestamp()),
"exp": int((now + timedelta(hours=1)).timestamp()),
"scope": "signature impersonation"
}

assertion = jwt.encode(claims, self.private_key, algorithm="RS256")

# Exchange for access token
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.AUTH_URL}/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"assertion": assertion
}
) as response:
data = await response.json()
self._access_token = data["access_token"]
self._token_expires_at = now + timedelta(seconds=data["expires_in"])
return self._access_token

async def create_envelope(self, request: SignatureRequest) -> str:
"""Create DocuSign envelope for signing."""
token = await self._get_access_token()

# Build envelope definition
envelope_definition = {
"emailSubject": request.subject,
"emailBlurb": request.message,
"status": "sent",
"documents": [{
"documentId": "1",
"name": request.document_name,
"documentBase64": base64.b64encode(request.document_content).decode(),
"fileExtension": request.document_name.split(".")[-1]
}],
"recipients": self._build_recipients(request.signers),
"notification": {
"useAccountDefaults": False,
"reminders": {
"reminderEnabled": True,
"reminderDelay": str(request.reminder_frequency_days),
"reminderFrequency": str(request.reminder_frequency_days)
},
"expirations": {
"expireEnabled": True,
"expireAfter": str(request.expiration_days),
"expireWarn": "3"
}
}
}

async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.BASE_URL}/accounts/{self.account_id}/envelopes",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
json=envelope_definition
) as response:
if response.status != 201:
error = await response.text()
raise DocuSignError(f"Failed to create envelope: {error}")

data = await response.json()
return data["envelopeId"]

def _build_recipients(self, signers: List[Signer]) -> Dict:
"""Build DocuSign recipients structure."""
docusign_signers = []

for signer in signers:
ds_signer = {
"email": signer.email,
"name": signer.name,
"recipientId": str(signer.routing_order),
"routingOrder": str(signer.routing_order),
"roleName": signer.role,
}

# Add signature tabs
if signer.signature_tabs:
ds_signer["tabs"] = {"signHereTabs": []}
for tab in signer.signature_tabs:
ds_signer["tabs"]["signHereTabs"].append({
"documentId": "1",
"pageNumber": str(tab.page_number),
"xPosition": str(tab.x_position),
"yPosition": str(tab.y_position),
})

# Add authentication
if signer.authentication_method == "sms":
ds_signer["idCheckConfigurationName"] = "SMS Auth"
ds_signer["smsAuthentication"] = {
"senderProvidedNumbers": [signer.phone]
}
elif signer.authentication_method == "knowledge_based":
ds_signer["idCheckConfigurationName"] = "ID Check"

docusign_signers.append(ds_signer)

return {"signers": docusign_signers}

async def get_envelope_status(self, envelope_id: str) -> SignatureStatus:
"""Get current envelope status."""
token = await self._get_access_token()

async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.BASE_URL}/accounts/{self.account_id}/envelopes/{envelope_id}",
headers={"Authorization": f"Bearer {token}"}
) as response:
data = await response.json()

# Get recipient status
async with session.get(
f"{self.BASE_URL}/accounts/{self.account_id}/envelopes/{envelope_id}/recipients",
headers={"Authorization": f"Bearer {token}"}
) as recip_response:
recipients = await recip_response.json()

return SignatureStatus(
envelope_id=envelope_id,
status=data["status"],
signers_status=[
{
"email": s["email"],
"name": s["name"],
"status": s["status"],
"signed_at": s.get("signedDateTime")
}
for s in recipients.get("signers", [])
],
created_at=datetime.fromisoformat(data["createdDateTime"].rstrip("Z")),
updated_at=datetime.fromisoformat(data["statusChangedDateTime"].rstrip("Z")),
completed_at=datetime.fromisoformat(data["completedDateTime"].rstrip("Z")) if data.get("completedDateTime") else None
)

async def download_signed_document(self, envelope_id: str) -> bytes:
"""Download completed signed document."""
token = await self._get_access_token()

async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.BASE_URL}/accounts/{self.account_id}/envelopes/{envelope_id}/documents/combined",
headers={"Authorization": f"Bearer {token}"}
) as response:
if response.status != 200:
raise DocuSignError("Failed to download signed document")
return await response.read()

async def get_audit_trail(self, envelope_id: str) -> List[Dict]:
"""Get complete audit trail for envelope."""
token = await self._get_access_token()

async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.BASE_URL}/accounts/{self.account_id}/envelopes/{envelope_id}/audit_events",
headers={"Authorization": f"Bearer {token}"}
) as response:
data = await response.json()
return data.get("auditEvents", [])

2.3 Adobe Sign Integration

class AdobeSignService(SignatureProviderInterface):
"""Adobe Sign (Acrobat Sign) API integration."""

BASE_URL = "https://api.na1.adobesign.com/api/rest/v6"

def __init__(
self,
client_id: str,
client_secret: str,
refresh_token: str,
api_access_point: str = "https://api.na1.adobesign.com"
):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self.api_access_point = api_access_point
self._access_token: Optional[str] = None
self._token_expires_at: Optional[datetime] = None

async def _get_access_token(self) -> str:
"""Refresh OAuth access token."""
if self._access_token and self._token_expires_at > datetime.utcnow():
return self._access_token

async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.api_access_point}/oauth/v2/refresh",
data={
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
) as response:
data = await response.json()
self._access_token = data["access_token"]
self._token_expires_at = datetime.utcnow() + timedelta(seconds=data["expires_in"])
return self._access_token

async def create_envelope(self, request: SignatureRequest) -> str:
"""Create Adobe Sign agreement."""
token = await self._get_access_token()

# First upload document as transient document
transient_doc_id = await self._upload_transient_document(
token, request.document_name, request.document_content
)

# Build agreement definition
agreement = {
"name": request.subject,
"message": request.message,
"signatureType": "ESIGN",
"state": "IN_PROCESS",
"fileInfos": [{
"transientDocumentId": transient_doc_id
}],
"participantSetsInfo": self._build_participant_sets(request.signers, request.workflow_type),
"expirationTime": (datetime.utcnow() + timedelta(days=request.expiration_days)).isoformat() + "Z",
"reminderFrequency": "DAILY_UNTIL_SIGNED" if request.reminder_frequency_days == 1 else "WEEKLY_UNTIL_SIGNED"
}

async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.api_access_point}/api/rest/v6/agreements",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
json=agreement
) as response:
if response.status != 201:
error = await response.text()
raise AdobeSignError(f"Failed to create agreement: {error}")

data = await response.json()
return data["id"]

async def _upload_transient_document(self, token: str, filename: str, content: bytes) -> str:
"""Upload document as transient document."""
async with aiohttp.ClientSession() as session:
form_data = aiohttp.FormData()
form_data.add_field(
"File",
content,
filename=filename,
content_type="application/pdf"
)

async with session.post(
f"{self.api_access_point}/api/rest/v6/transientDocuments",
headers={"Authorization": f"Bearer {token}"},
data=form_data
) as response:
data = await response.json()
return data["transientDocumentId"]

def _build_participant_sets(self, signers: List[Signer], workflow_type: str) -> List[Dict]:
"""Build Adobe Sign participant sets."""
if workflow_type == "parallel":
# All signers in one set, sign in any order
return [{
"order": 1,
"role": "SIGNER",
"memberInfos": [
{"email": s.email, "name": s.name}
for s in signers
]
}]
else:
# Sequential signing
return [
{
"order": signer.routing_order,
"role": "SIGNER",
"memberInfos": [{"email": signer.email, "name": signer.name}]
}
for signer in sorted(signers, key=lambda s: s.routing_order)
]

async def get_envelope_status(self, envelope_id: str) -> SignatureStatus:
"""Get agreement status."""
token = await self._get_access_token()

async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_access_point}/api/rest/v6/agreements/{envelope_id}",
headers={"Authorization": f"Bearer {token}"}
) as response:
data = await response.json()

# Get participant info
async with session.get(
f"{self.api_access_point}/api/rest/v6/agreements/{envelope_id}/members",
headers={"Authorization": f"Bearer {token}"}
) as members_response:
members = await members_response.json()

return SignatureStatus(
envelope_id=envelope_id,
status=self._map_status(data["status"]),
signers_status=[
{
"email": m["email"],
"name": m.get("name", ""),
"status": m["status"],
"signed_at": None # Adobe doesn't provide in this endpoint
}
for m in members.get("participantSets", [{}])[0].get("memberInfos", [])
],
created_at=datetime.fromisoformat(data["createdDate"].rstrip("Z")),
updated_at=datetime.fromisoformat(data["lastEventDate"].rstrip("Z")),
completed_at=None
)

def _map_status(self, adobe_status: str) -> str:
"""Map Adobe Sign status to normalized status."""
status_map = {
"OUT_FOR_SIGNATURE": "sent",
"SIGNED": "completed",
"APPROVED": "completed",
"CANCELLED": "voided",
"EXPIRED": "expired",
"WAITING_FOR_AUTHORING": "created",
"DRAFT": "created",
}
return status_map.get(adobe_status, adobe_status.lower())

3. Multi-Signer Workflows

3.1 Workflow Engine

from enum import Enum
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
import asyncio

class WorkflowType(Enum):
SEQUENTIAL = "sequential" # One after another
PARALLEL = "parallel" # All at once
SERIAL_PARALLEL = "serial_parallel" # Groups in sequence, parallel within group
CUSTOM = "custom" # Custom routing rules

class SignerRole(Enum):
APPROVER = "approver"
SIGNER = "signer"
REVIEWER = "reviewer"
CARBON_COPY = "carbon_copy"
CERTIFIED_DELIVERY = "certified_delivery"

@dataclass
class WorkflowStep:
"""Single step in signature workflow."""
step_id: str
step_order: int
signers: List[Signer]
execution_type: str = "all" # all, any, majority
required_completions: int = 0 # 0 means all
deadline_hours: int = 72
reminder_hours: int = 24
escalation_contact: Optional[str] = None

@dataclass
class SignatureWorkflow:
"""Complete signature workflow definition."""
workflow_id: str
workflow_name: str
workflow_type: WorkflowType
steps: List[WorkflowStep]
document_id: str
created_by: str
created_at: datetime = field(default_factory=datetime.utcnow)

# Workflow state
current_step: int = 0
status: str = "pending" # pending, in_progress, completed, cancelled, expired
completed_signatures: List[Dict] = field(default_factory=list)

class SignatureWorkflowEngine:
"""Engine for managing multi-signer workflows."""

def __init__(
self,
signature_service: SignatureProviderInterface,
notification_service: "NotificationService",
audit_service: "AuditService"
):
self.signature_service = signature_service
self.notification_service = notification_service
self.audit_service = audit_service

async def create_workflow(
self,
document_id: str,
document_content: bytes,
workflow_type: WorkflowType,
signers: List[Signer],
options: Dict
) -> SignatureWorkflow:
"""Create new signature workflow."""

# Build workflow steps based on type
steps = self._build_workflow_steps(workflow_type, signers, options)

workflow = SignatureWorkflow(
workflow_id=str(uuid.uuid4()),
workflow_name=options.get("name", "Signature Request"),
workflow_type=workflow_type,
steps=steps,
document_id=document_id,
created_by=options.get("created_by", "system")
)

# Store workflow
await self._store_workflow(workflow)

# Start first step
await self._execute_step(workflow, workflow.steps[0], document_content)

# Log audit event
await self.audit_service.log(
event_type="workflow_created",
resource_id=workflow.workflow_id,
details={"signers": [s.email for s in signers], "type": workflow_type.value}
)

return workflow

def _build_workflow_steps(
self,
workflow_type: WorkflowType,
signers: List[Signer],
options: Dict
) -> List[WorkflowStep]:
"""Build workflow steps based on type."""

if workflow_type == WorkflowType.SEQUENTIAL:
# Each signer is a separate step
return [
WorkflowStep(
step_id=str(uuid.uuid4()),
step_order=idx + 1,
signers=[signer],
execution_type="all",
deadline_hours=options.get("deadline_hours", 72),
reminder_hours=options.get("reminder_hours", 24)
)
for idx, signer in enumerate(sorted(signers, key=lambda s: s.routing_order))
]

elif workflow_type == WorkflowType.PARALLEL:
# All signers in one step
return [
WorkflowStep(
step_id=str(uuid.uuid4()),
step_order=1,
signers=signers,
execution_type="all",
deadline_hours=options.get("deadline_hours", 72),
reminder_hours=options.get("reminder_hours", 24)
)
]

elif workflow_type == WorkflowType.SERIAL_PARALLEL:
# Group by routing_order
groups = {}
for signer in signers:
groups.setdefault(signer.routing_order, []).append(signer)

return [
WorkflowStep(
step_id=str(uuid.uuid4()),
step_order=order,
signers=group_signers,
execution_type=options.get("group_execution", "all"),
deadline_hours=options.get("deadline_hours", 72),
reminder_hours=options.get("reminder_hours", 24)
)
for order, group_signers in sorted(groups.items())
]

return []

async def _execute_step(
self,
workflow: SignatureWorkflow,
step: WorkflowStep,
document_content: bytes
):
"""Execute a workflow step by sending signature requests."""

# Create signature request for this step's signers
request = SignatureRequest(
document_id=workflow.document_id,
document_name=f"Document_{workflow.document_id}.pdf",
document_content=document_content,
signers=step.signers,
subject=f"Signature Required: {workflow.workflow_name}",
message=f"Please sign by {datetime.utcnow() + timedelta(hours=step.deadline_hours)}",
expiration_days=step.deadline_hours // 24,
reminder_frequency_days=step.reminder_hours // 24
)

envelope_id = await self.signature_service.create_envelope(request)

# Update step with envelope ID
step.envelope_id = envelope_id

# Send notifications
for signer in step.signers:
await self.notification_service.send(
to=signer.email,
template="signature_request",
data={"workflow_name": workflow.workflow_name, "deadline": step.deadline_hours}
)

# Schedule deadline check
asyncio.create_task(
self._schedule_deadline_check(workflow, step)
)

async def handle_signature_complete(
self,
envelope_id: str,
signer_email: str,
signed_at: datetime
):
"""Handle completion of a signature."""

# Find workflow and step
workflow = await self._find_workflow_by_envelope(envelope_id)
current_step = workflow.steps[workflow.current_step]

# Record completion
workflow.completed_signatures.append({
"envelope_id": envelope_id,
"signer_email": signer_email,
"signed_at": signed_at.isoformat(),
"step_id": current_step.step_id
})

# Check if step is complete
step_complete = await self._is_step_complete(current_step, workflow.completed_signatures)

if step_complete:
# Move to next step or complete workflow
if workflow.current_step + 1 < len(workflow.steps):
workflow.current_step += 1
next_step = workflow.steps[workflow.current_step]

# Get signed document for next step
signed_doc = await self.signature_service.download_signed_document(envelope_id)
await self._execute_step(workflow, next_step, signed_doc)
else:
# Workflow complete
workflow.status = "completed"
await self._complete_workflow(workflow)

await self._store_workflow(workflow)

async def _is_step_complete(
self,
step: WorkflowStep,
completed_signatures: List[Dict]
) -> bool:
"""Check if workflow step is complete."""

step_completions = [
sig for sig in completed_signatures
if sig["step_id"] == step.step_id
]

if step.execution_type == "all":
return len(step_completions) >= len(step.signers)
elif step.execution_type == "any":
return len(step_completions) >= 1
elif step.execution_type == "majority":
return len(step_completions) > len(step.signers) / 2
elif step.required_completions > 0:
return len(step_completions) >= step.required_completions

return False

3.2 Delegation Service

@dataclass
class DelegationRule:
"""Rule for signature delegation."""
delegator_email: str
delegate_email: str
delegate_name: str
start_date: datetime
end_date: datetime
scope: str = "all" # all, document_type, workflow_type
scope_filter: Optional[str] = None
reason: str = ""
approved_by: Optional[str] = None
approved_at: Optional[datetime] = None

class SignatureDelegationService:
"""Service for managing signature delegation."""

async def create_delegation(
self,
delegator: "User",
delegate_email: str,
delegate_name: str,
start_date: datetime,
end_date: datetime,
scope: str = "all",
scope_filter: Optional[str] = None,
reason: str = "",
requires_approval: bool = True
) -> DelegationRule:
"""Create new delegation rule."""

# Validate delegate exists and can sign
delegate = await self._validate_delegate(delegate_email)

rule = DelegationRule(
delegator_email=delegator.email,
delegate_email=delegate_email,
delegate_name=delegate_name,
start_date=start_date,
end_date=end_date,
scope=scope,
scope_filter=scope_filter,
reason=reason
)

if requires_approval:
# Submit for manager approval
await self._submit_for_approval(rule, delegator)
else:
rule.approved_at = datetime.utcnow()

await self._store_delegation(rule)

return rule

async def get_active_delegate(
self,
original_signer_email: str,
document_type: Optional[str] = None,
workflow_type: Optional[str] = None
) -> Optional[str]:
"""Get active delegate for a signer."""

rules = await self._get_active_delegations(original_signer_email)

for rule in rules:
if rule.scope == "all":
return rule.delegate_email
elif rule.scope == "document_type" and rule.scope_filter == document_type:
return rule.delegate_email
elif rule.scope == "workflow_type" and rule.scope_filter == workflow_type:
return rule.delegate_email

return None

async def reroute_signature_request(
self,
workflow_id: str,
original_signer_email: str,
delegate_email: str
):
"""Reroute pending signature request to delegate."""

workflow = await self._get_workflow(workflow_id)

# Update signer in current step
for step in workflow.steps:
for signer in step.signers:
if signer.email == original_signer_email:
signer.email = delegate_email
signer.delegated_from = original_signer_email

# If step already sent, resend to delegate
if workflow.status == "in_progress":
current_step = workflow.steps[workflow.current_step]
if hasattr(current_step, 'envelope_id'):
await self.signature_service.resend_envelope(
current_step.envelope_id,
delegate_email
)

await self._store_workflow(workflow)

4. Digital Certificate Management

4.1 Certificate Store

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID, ExtensionOID
from datetime import datetime, timedelta
from typing import Optional, List, Tuple

@dataclass
class CertificateInfo:
"""Digital certificate information."""
serial_number: str
subject_name: str
issuer_name: str
not_valid_before: datetime
not_valid_after: datetime
public_key_algorithm: str
signature_algorithm: str
key_usage: List[str]
is_valid: bool
revocation_status: str
thumbprint_sha256: str

class CertificateStore:
"""Secure storage for digital certificates."""

def __init__(self, kms_client: "KMSClient", database: "Database"):
self.kms = kms_client
self.db = database

async def store_certificate(
self,
user_id: str,
certificate_pem: bytes,
private_key_pem: bytes,
passphrase: Optional[bytes] = None
) -> str:
"""Store certificate and encrypted private key."""

# Parse and validate certificate
cert = x509.load_pem_x509_certificate(certificate_pem)
cert_info = self._extract_cert_info(cert)

# Validate certificate
if not cert_info.is_valid:
raise CertificateError("Certificate is expired or not yet valid")

# Encrypt private key with KMS
encrypted_key = await self.kms.encrypt(private_key_pem)

# Store in database
cert_id = str(uuid.uuid4())
await self.db.execute(
"""
INSERT INTO certificates (
id, user_id, certificate_pem, encrypted_private_key,
serial_number, subject_name, not_valid_before, not_valid_after,
thumbprint_sha256, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
""",
cert_id, user_id, certificate_pem, encrypted_key,
cert_info.serial_number, cert_info.subject_name,
cert_info.not_valid_before, cert_info.not_valid_after,
cert_info.thumbprint_sha256, datetime.utcnow()
)

return cert_id

async def get_signing_certificate(
self,
user_id: str
) -> Tuple[bytes, bytes]:
"""Get certificate and decrypted private key for signing."""

row = await self.db.fetchrow(
"""
SELECT certificate_pem, encrypted_private_key
FROM certificates
WHERE user_id = $1
AND not_valid_after > NOW()
AND revoked = FALSE
ORDER BY not_valid_after DESC
LIMIT 1
""",
user_id
)

if not row:
raise CertificateError("No valid certificate found for user")

# Decrypt private key
private_key_pem = await self.kms.decrypt(row["encrypted_private_key"])

return row["certificate_pem"], private_key_pem

def _extract_cert_info(self, cert: x509.Certificate) -> CertificateInfo:
"""Extract certificate information."""

# Get key usage
key_usage = []
try:
ku = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
if ku.value.digital_signature:
key_usage.append("digital_signature")
if ku.value.content_commitment:
key_usage.append("non_repudiation")
if ku.value.key_encipherment:
key_usage.append("key_encipherment")
except x509.ExtensionNotFound:
pass

now = datetime.utcnow()
is_valid = cert.not_valid_before <= now <= cert.not_valid_after

return CertificateInfo(
serial_number=str(cert.serial_number),
subject_name=cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
issuer_name=cert.issuer.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
not_valid_before=cert.not_valid_before,
not_valid_after=cert.not_valid_after,
public_key_algorithm=cert.public_key().__class__.__name__,
signature_algorithm=cert.signature_algorithm_oid._name,
key_usage=key_usage,
is_valid=is_valid,
revocation_status="unknown",
thumbprint_sha256=cert.fingerprint(hashes.SHA256()).hex()
)

4.2 Document Signing with Certificates

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from PyPDF2 import PdfReader, PdfWriter
import io

class DocumentSigningService:
"""Service for signing documents with digital certificates."""

def __init__(
self,
certificate_store: CertificateStore,
timestamp_authority: "TimestampAuthority"
):
self.cert_store = certificate_store
self.tsa = timestamp_authority

async def sign_pdf(
self,
document_content: bytes,
user_id: str,
signature_reason: str,
signature_location: str = ""
) -> bytes:
"""Sign PDF document with user's certificate."""

# Get certificate and private key
cert_pem, private_key_pem = await self.cert_store.get_signing_certificate(user_id)

# Load certificate and key
cert = x509.load_pem_x509_certificate(cert_pem)
private_key = serialization.load_pem_private_key(private_key_pem, password=None)

# Calculate document hash
document_hash = hashes.Hash(hashes.SHA256())
document_hash.update(document_content)
digest = document_hash.finalize()

# Sign hash with private key
signature = private_key.sign(
digest,
padding.PKCS1v15(),
hashes.SHA256()
)

# Get timestamp from TSA
timestamp = await self.tsa.get_timestamp(digest)

# Embed signature in PDF
signed_pdf = await self._embed_signature_in_pdf(
document_content,
signature,
cert,
timestamp,
signature_reason,
signature_location
)

return signed_pdf

async def verify_signature(
self,
signed_document: bytes
) -> List[Dict]:
"""Verify all signatures in a signed document."""

# Extract signatures from PDF
signatures = self._extract_signatures(signed_document)

results = []
for sig in signatures:
# Verify signature
try:
cert = x509.load_der_x509_certificate(sig["certificate"])
public_key = cert.public_key()

public_key.verify(
sig["signature_value"],
sig["signed_data"],
padding.PKCS1v15(),
hashes.SHA256()
)

# Check certificate validity
now = datetime.utcnow()
cert_valid = cert.not_valid_before <= now <= cert.not_valid_after

# Check revocation status
revocation_status = await self._check_revocation(cert)

# Verify timestamp if present
timestamp_valid = True
if sig.get("timestamp"):
timestamp_valid = await self._verify_timestamp(
sig["timestamp"],
sig["signed_data"]
)

results.append({
"signer": cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value,
"signature_valid": True,
"certificate_valid": cert_valid,
"revocation_status": revocation_status,
"timestamp_valid": timestamp_valid,
"signing_time": sig.get("signing_time"),
"reason": sig.get("reason"),
"location": sig.get("location")
})

except Exception as e:
results.append({
"signature_valid": False,
"error": str(e)
})

return results

5. Signature Audit Trail

5.1 Comprehensive Audit Logging

from enum import Enum
from typing import Dict, List, Optional
from datetime import datetime
import hashlib
import json

class SignatureAuditEventType(Enum):
# Workflow events
WORKFLOW_CREATED = "workflow_created"
WORKFLOW_STARTED = "workflow_started"
WORKFLOW_COMPLETED = "workflow_completed"
WORKFLOW_CANCELLED = "workflow_cancelled"
WORKFLOW_EXPIRED = "workflow_expired"

# Signature events
SIGNATURE_REQUESTED = "signature_requested"
SIGNATURE_VIEWED = "signature_viewed"
SIGNATURE_SIGNED = "signature_signed"
SIGNATURE_DECLINED = "signature_declined"
SIGNATURE_DELEGATED = "signature_delegated"

# Document events
DOCUMENT_UPLOADED = "document_uploaded"
DOCUMENT_DOWNLOADED = "document_downloaded"
DOCUMENT_MODIFIED = "document_modified"

# Authentication events
AUTHENTICATION_SUCCESS = "authentication_success"
AUTHENTICATION_FAILED = "authentication_failed"
MFA_COMPLETED = "mfa_completed"

# Certificate events
CERTIFICATE_USED = "certificate_used"
CERTIFICATE_VERIFIED = "certificate_verified"

@dataclass
class SignatureAuditEvent:
"""Immutable audit event for signature operations."""
event_id: str
event_type: SignatureAuditEventType
timestamp: datetime

# Actor information
actor_id: str
actor_email: str
actor_name: str
actor_ip_address: str
actor_user_agent: str

# Resource information
resource_type: str # workflow, document, signature
resource_id: str

# Event details
details: Dict

# Integrity
previous_event_hash: str
event_hash: str = ""

def __post_init__(self):
if not self.event_hash:
self.event_hash = self._calculate_hash()

def _calculate_hash(self) -> str:
"""Calculate hash for event integrity."""
data = {
"event_id": self.event_id,
"event_type": self.event_type.value,
"timestamp": self.timestamp.isoformat(),
"actor_id": self.actor_id,
"resource_id": self.resource_id,
"details": self.details,
"previous_hash": self.previous_event_hash
}
return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()

class SignatureAuditService:
"""Service for managing signature audit trails."""

def __init__(self, database: "Database", blockchain_anchor: Optional["BlockchainAnchor"] = None):
self.db = database
self.blockchain = blockchain_anchor
self._last_event_hash = ""

async def log_event(
self,
event_type: SignatureAuditEventType,
actor: "User",
resource_type: str,
resource_id: str,
details: Dict,
request: Optional["Request"] = None
) -> SignatureAuditEvent:
"""Log immutable audit event."""

# Get previous event hash for chain
previous_hash = await self._get_last_event_hash(resource_id)

event = SignatureAuditEvent(
event_id=str(uuid.uuid4()),
event_type=event_type,
timestamp=datetime.utcnow(),
actor_id=actor.id,
actor_email=actor.email,
actor_name=f"{actor.first_name} {actor.last_name}",
actor_ip_address=request.client.host if request else "system",
actor_user_agent=request.headers.get("user-agent", "") if request else "system",
resource_type=resource_type,
resource_id=resource_id,
details=details,
previous_event_hash=previous_hash
)

# Store event
await self.db.execute(
"""
INSERT INTO signature_audit_events (
event_id, event_type, timestamp,
actor_id, actor_email, actor_name, actor_ip_address, actor_user_agent,
resource_type, resource_id, details,
previous_event_hash, event_hash
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
""",
event.event_id, event.event_type.value, event.timestamp,
event.actor_id, event.actor_email, event.actor_name,
event.actor_ip_address, event.actor_user_agent,
event.resource_type, event.resource_id, json.dumps(event.details),
event.previous_event_hash, event.event_hash
)

# Anchor to blockchain periodically
if self.blockchain and await self._should_anchor():
await self._anchor_to_blockchain(event)

return event

async def get_audit_trail(
self,
resource_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> List[SignatureAuditEvent]:
"""Get complete audit trail for a resource."""

query = """
SELECT * FROM signature_audit_events
WHERE resource_id = $1
"""
params = [resource_id]

if start_date:
query += " AND timestamp >= $2"
params.append(start_date)
if end_date:
query += f" AND timestamp <= ${len(params) + 1}"
params.append(end_date)

query += " ORDER BY timestamp ASC"

rows = await self.db.fetch(query, *params)

events = []
for row in rows:
events.append(SignatureAuditEvent(
event_id=row["event_id"],
event_type=SignatureAuditEventType(row["event_type"]),
timestamp=row["timestamp"],
actor_id=row["actor_id"],
actor_email=row["actor_email"],
actor_name=row["actor_name"],
actor_ip_address=row["actor_ip_address"],
actor_user_agent=row["actor_user_agent"],
resource_type=row["resource_type"],
resource_id=row["resource_id"],
details=json.loads(row["details"]),
previous_event_hash=row["previous_event_hash"],
event_hash=row["event_hash"]
))

return events

async def verify_audit_trail_integrity(self, resource_id: str) -> Dict:
"""Verify integrity of audit trail hash chain."""

events = await self.get_audit_trail(resource_id)

if not events:
return {"valid": True, "events_verified": 0}

violations = []
expected_previous_hash = ""

for event in events:
# Verify hash chain
if event.previous_event_hash != expected_previous_hash:
violations.append({
"event_id": event.event_id,
"issue": "previous_hash_mismatch",
"expected": expected_previous_hash,
"actual": event.previous_event_hash
})

# Verify event hash
calculated_hash = event._calculate_hash()
if event.event_hash != calculated_hash:
violations.append({
"event_id": event.event_id,
"issue": "event_hash_mismatch",
"expected": calculated_hash,
"actual": event.event_hash
})

expected_previous_hash = event.event_hash

return {
"valid": len(violations) == 0,
"events_verified": len(events),
"violations": violations
}

async def generate_audit_report(
self,
resource_id: str,
format: str = "pdf"
) -> bytes:
"""Generate formatted audit report for regulatory submission."""

events = await self.get_audit_trail(resource_id)
integrity = await self.verify_audit_trail_integrity(resource_id)

report_data = {
"resource_id": resource_id,
"generated_at": datetime.utcnow().isoformat(),
"integrity_verified": integrity["valid"],
"total_events": len(events),
"events": [
{
"timestamp": e.timestamp.isoformat(),
"event_type": e.event_type.value,
"actor": e.actor_name,
"actor_email": e.actor_email,
"ip_address": e.actor_ip_address,
"details": e.details
}
for e in events
]
}

if format == "pdf":
return await self._render_pdf_report(report_data)
elif format == "json":
return json.dumps(report_data, indent=2).encode()
elif format == "csv":
return await self._render_csv_report(report_data)

raise ValueError(f"Unsupported format: {format}")

6. Webhook and Event Integration

6.1 Signature Event Webhooks

from typing import Callable, Dict, List
import hmac
import hashlib
import aiohttp

@dataclass
class WebhookConfig:
"""Webhook configuration."""
webhook_id: str
url: str
secret: str
events: List[str] # List of event types to receive
active: bool = True
retry_count: int = 3
timeout_seconds: int = 30

class SignatureWebhookService:
"""Service for managing signature event webhooks."""

def __init__(self, database: "Database"):
self.db = database

async def register_webhook(
self,
tenant_id: str,
url: str,
events: List[str],
secret: Optional[str] = None
) -> WebhookConfig:
"""Register new webhook endpoint."""

if not secret:
import secrets
secret = secrets.token_urlsafe(32)

config = WebhookConfig(
webhook_id=str(uuid.uuid4()),
url=url,
secret=secret,
events=events
)

await self.db.execute(
"""
INSERT INTO signature_webhooks (
webhook_id, tenant_id, url, secret, events, active
) VALUES ($1, $2, $3, $4, $5, $6)
""",
config.webhook_id, tenant_id, url, secret, events, True
)

return config

async def dispatch_event(
self,
tenant_id: str,
event_type: str,
payload: Dict
):
"""Dispatch event to registered webhooks."""

webhooks = await self._get_webhooks_for_event(tenant_id, event_type)

for webhook in webhooks:
await self._send_webhook(webhook, event_type, payload)

async def _send_webhook(
self,
webhook: WebhookConfig,
event_type: str,
payload: Dict
):
"""Send webhook with signature."""

payload_json = json.dumps(payload)
timestamp = str(int(datetime.utcnow().timestamp()))

# Calculate HMAC signature
signature_payload = f"{timestamp}.{payload_json}"
signature = hmac.new(
webhook.secret.encode(),
signature_payload.encode(),
hashlib.sha256
).hexdigest()

headers = {
"Content-Type": "application/json",
"X-Signature-Event": event_type,
"X-Signature-Timestamp": timestamp,
"X-Signature-Signature": f"sha256={signature}"
}

for attempt in range(webhook.retry_count):
try:
async with aiohttp.ClientSession() as session:
async with session.post(
webhook.url,
json=payload,
headers=headers,
timeout=aiohttp.ClientTimeout(total=webhook.timeout_seconds)
) as response:
if response.status < 400:
return True

# Log failure but don't raise for 4xx
if response.status < 500:
await self._log_webhook_failure(
webhook, event_type, response.status, await response.text()
)
return False

except Exception as e:
if attempt == webhook.retry_count - 1:
await self._log_webhook_failure(webhook, event_type, 0, str(e))
return False

# Exponential backoff
await asyncio.sleep(2 ** attempt)

return False

6.2 Provider Webhook Handlers

from fastapi import APIRouter, Request, HTTPException
import hmac

router = APIRouter(prefix="/webhooks/signatures")

@router.post("/docusign")
async def handle_docusign_webhook(request: Request):
"""Handle DocuSign Connect webhook events."""

# Verify HMAC signature
signature = request.headers.get("X-DocuSign-Signature-1")
if not signature:
raise HTTPException(status_code=401, detail="Missing signature")

body = await request.body()

if not verify_docusign_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")

payload = await request.json()

# Extract envelope information
envelope_id = payload.get("envelopeId")
status = payload.get("status")

if status == "completed":
# Handle completed signature
for recipient in payload.get("recipients", {}).get("signers", []):
await signature_workflow_engine.handle_signature_complete(
envelope_id=envelope_id,
signer_email=recipient["email"],
signed_at=datetime.fromisoformat(recipient["signedDateTime"].rstrip("Z"))
)

elif status == "declined":
await signature_workflow_engine.handle_signature_declined(
envelope_id=envelope_id,
reason=payload.get("declinedReason", "")
)

elif status == "voided":
await signature_workflow_engine.handle_envelope_voided(
envelope_id=envelope_id,
reason=payload.get("voidedReason", "")
)

return {"status": "processed"}

@router.post("/adobe-sign")
async def handle_adobe_sign_webhook(request: Request):
"""Handle Adobe Sign webhook events."""

# Verify client ID
client_id = request.headers.get("X-AdobeSign-ClientId")
if not verify_adobe_client(client_id):
raise HTTPException(status_code=401, detail="Invalid client")

payload = await request.json()

event_type = payload.get("event")
agreement_id = payload.get("agreementId")

if event_type == "AGREEMENT_ALL_SIGNED":
# All signatures collected
await signature_workflow_engine.handle_workflow_complete(
envelope_id=agreement_id
)

elif event_type == "AGREEMENT_PARTICIPANT_SIGNED":
participant = payload.get("participantEmail")
await signature_workflow_engine.handle_signature_complete(
envelope_id=agreement_id,
signer_email=participant,
signed_at=datetime.utcnow()
)

elif event_type == "AGREEMENT_EXPIRED":
await signature_workflow_engine.handle_workflow_expired(
envelope_id=agreement_id
)

return {"status": "processed"}

7. Database Schema

7.1 Signature Tables

-- Signature Workflows
CREATE TABLE signature_workflows (
workflow_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL REFERENCES tenants(id),
document_id UUID NOT NULL REFERENCES documents(id),
workflow_name VARCHAR(255) NOT NULL,
workflow_type VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'pending',
current_step INT NOT NULL DEFAULT 0,
created_by UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,

CONSTRAINT valid_workflow_type CHECK (
workflow_type IN ('sequential', 'parallel', 'serial_parallel', 'custom')
),
CONSTRAINT valid_status CHECK (
status IN ('pending', 'in_progress', 'completed', 'cancelled', 'expired')
)
);

-- Workflow Steps
CREATE TABLE workflow_steps (
step_id UUID PRIMARY KEY,
workflow_id UUID NOT NULL REFERENCES signature_workflows(workflow_id),
step_order INT NOT NULL,
execution_type VARCHAR(50) NOT NULL DEFAULT 'all',
required_completions INT NOT NULL DEFAULT 0,
deadline_hours INT NOT NULL DEFAULT 72,
reminder_hours INT NOT NULL DEFAULT 24,
escalation_contact VARCHAR(255),
envelope_id VARCHAR(255),
status VARCHAR(50) NOT NULL DEFAULT 'pending',
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,

CONSTRAINT valid_execution_type CHECK (
execution_type IN ('all', 'any', 'majority')
)
);

-- Step Signers
CREATE TABLE step_signers (
id UUID PRIMARY KEY,
step_id UUID NOT NULL REFERENCES workflow_steps(step_id),
email VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
role VARCHAR(50) NOT NULL,
routing_order INT NOT NULL,
authentication_method VARCHAR(50) NOT NULL DEFAULT 'email',
delegated_from VARCHAR(255),
status VARCHAR(50) NOT NULL DEFAULT 'pending',
signed_at TIMESTAMPTZ,
declined_at TIMESTAMPTZ,
decline_reason TEXT
);

-- Digital Certificates
CREATE TABLE certificates (
id UUID PRIMARY KEY,
user_id UUID NOT NULL REFERENCES users(id),
tenant_id UUID NOT NULL REFERENCES tenants(id),
certificate_pem BYTEA NOT NULL,
encrypted_private_key BYTEA NOT NULL,
serial_number VARCHAR(255) NOT NULL,
subject_name VARCHAR(255) NOT NULL,
issuer_name VARCHAR(255) NOT NULL,
not_valid_before TIMESTAMPTZ NOT NULL,
not_valid_after TIMESTAMPTZ NOT NULL,
thumbprint_sha256 VARCHAR(64) NOT NULL,
revoked BOOLEAN NOT NULL DEFAULT FALSE,
revoked_at TIMESTAMPTZ,
revocation_reason TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT unique_serial UNIQUE (serial_number, issuer_name)
);

-- Delegation Rules
CREATE TABLE signature_delegations (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL REFERENCES tenants(id),
delegator_email VARCHAR(255) NOT NULL,
delegate_email VARCHAR(255) NOT NULL,
delegate_name VARCHAR(255) NOT NULL,
start_date TIMESTAMPTZ NOT NULL,
end_date TIMESTAMPTZ NOT NULL,
scope VARCHAR(50) NOT NULL DEFAULT 'all',
scope_filter VARCHAR(255),
reason TEXT,
approved_by UUID REFERENCES users(id),
approved_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

CONSTRAINT valid_dates CHECK (end_date > start_date)
);

-- Signature Audit Events (Append-only, immutable)
CREATE TABLE signature_audit_events (
event_id UUID PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
actor_id UUID NOT NULL,
actor_email VARCHAR(255) NOT NULL,
actor_name VARCHAR(255) NOT NULL,
actor_ip_address INET,
actor_user_agent TEXT,
resource_type VARCHAR(50) NOT NULL,
resource_id UUID NOT NULL,
details JSONB NOT NULL,
previous_event_hash VARCHAR(64) NOT NULL,
event_hash VARCHAR(64) NOT NULL,

-- Prevent updates
CONSTRAINT no_updates CHECK (TRUE)
) WITH (fillfactor = 100);

-- Create trigger to prevent updates/deletes
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Audit events cannot be modified or deleted';
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER prevent_audit_updates
BEFORE UPDATE OR DELETE ON signature_audit_events
FOR EACH ROW EXECUTE FUNCTION prevent_audit_modification();

-- Webhooks
CREATE TABLE signature_webhooks (
webhook_id UUID PRIMARY KEY,
tenant_id UUID NOT NULL REFERENCES tenants(id),
url VARCHAR(2048) NOT NULL,
secret VARCHAR(255) NOT NULL,
events TEXT[] NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
retry_count INT NOT NULL DEFAULT 3,
timeout_seconds INT NOT NULL DEFAULT 30,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_workflows_tenant ON signature_workflows(tenant_id);
CREATE INDEX idx_workflows_document ON signature_workflows(document_id);
CREATE INDEX idx_workflows_status ON signature_workflows(status);
CREATE INDEX idx_steps_workflow ON workflow_steps(workflow_id);
CREATE INDEX idx_signers_step ON step_signers(step_id);
CREATE INDEX idx_signers_email ON step_signers(email);
CREATE INDEX idx_certs_user ON certificates(user_id);
CREATE INDEX idx_certs_thumbprint ON certificates(thumbprint_sha256);
CREATE INDEX idx_delegations_delegator ON signature_delegations(delegator_email);
CREATE INDEX idx_delegations_active ON signature_delegations(delegator_email, start_date, end_date);
CREATE INDEX idx_audit_resource ON signature_audit_events(resource_id);
CREATE INDEX idx_audit_timestamp ON signature_audit_events(timestamp);
CREATE INDEX idx_webhooks_tenant ON signature_webhooks(tenant_id);

8. API Endpoints

8.1 Signature API

from fastapi import APIRouter, Depends, HTTPException, UploadFile, File
from typing import List, Optional

router = APIRouter(prefix="/api/v1/signatures", tags=["signatures"])

@router.post("/workflows")
async def create_signature_workflow(
request: CreateWorkflowRequest,
current_user: User = Depends(get_current_user)
) -> WorkflowResponse:
"""Create new signature workflow."""

# Validate document access
document = await document_service.get(request.document_id)
if not await permission_service.can_initiate_signature(current_user, document):
raise HTTPException(status_code=403, detail="Insufficient permissions")

# Apply delegation rules
signers = []
for signer in request.signers:
delegate = await delegation_service.get_active_delegate(
signer.email,
document.document_type
)
if delegate:
signer = signer.copy()
signer.email = delegate
signer.delegated_from = signer.email
signers.append(signer)

# Create workflow
workflow = await workflow_engine.create_workflow(
document_id=request.document_id,
document_content=document.content,
workflow_type=request.workflow_type,
signers=signers,
options=request.options
)

return WorkflowResponse.from_workflow(workflow)

@router.get("/workflows/{workflow_id}")
async def get_workflow_status(
workflow_id: str,
current_user: User = Depends(get_current_user)
) -> WorkflowStatusResponse:
"""Get signature workflow status."""

workflow = await workflow_engine.get_workflow(workflow_id)

if not await permission_service.can_view_workflow(current_user, workflow):
raise HTTPException(status_code=403, detail="Insufficient permissions")

return WorkflowStatusResponse(
workflow_id=workflow.workflow_id,
status=workflow.status,
current_step=workflow.current_step,
total_steps=len(workflow.steps),
signers=[
SignerStatus(
email=s.email,
name=s.name,
status=s.status,
signed_at=s.signed_at
)
for step in workflow.steps
for s in step.signers
],
completed_at=workflow.completed_at
)

@router.post("/workflows/{workflow_id}/cancel")
async def cancel_workflow(
workflow_id: str,
reason: str,
current_user: User = Depends(get_current_user)
) -> Dict:
"""Cancel signature workflow."""

workflow = await workflow_engine.get_workflow(workflow_id)

if not await permission_service.can_cancel_workflow(current_user, workflow):
raise HTTPException(status_code=403, detail="Insufficient permissions")

await workflow_engine.cancel_workflow(workflow_id, reason, current_user)

return {"status": "cancelled", "workflow_id": workflow_id}

@router.get("/workflows/{workflow_id}/audit-trail")
async def get_audit_trail(
workflow_id: str,
format: str = "json",
current_user: User = Depends(get_current_user)
) -> Response:
"""Get complete audit trail for workflow."""

workflow = await workflow_engine.get_workflow(workflow_id)

if not await permission_service.can_view_audit_trail(current_user, workflow):
raise HTTPException(status_code=403, detail="Insufficient permissions")

report = await audit_service.generate_audit_report(workflow_id, format)

if format == "pdf":
return Response(
content=report,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=audit-trail-{workflow_id}.pdf"}
)

return Response(content=report, media_type="application/json")

@router.post("/delegations")
async def create_delegation(
request: CreateDelegationRequest,
current_user: User = Depends(get_current_user)
) -> DelegationResponse:
"""Create signature delegation."""

rule = await delegation_service.create_delegation(
delegator=current_user,
delegate_email=request.delegate_email,
delegate_name=request.delegate_name,
start_date=request.start_date,
end_date=request.end_date,
scope=request.scope,
scope_filter=request.scope_filter,
reason=request.reason
)

return DelegationResponse.from_rule(rule)

@router.get("/delegations")
async def list_delegations(
active_only: bool = True,
current_user: User = Depends(get_current_user)
) -> List[DelegationResponse]:
"""List delegation rules for current user."""

rules = await delegation_service.get_user_delegations(
current_user.email,
active_only=active_only
)

return [DelegationResponse.from_rule(r) for r in rules]

9. Configuration

9.1 Provider Configuration

# config/signature_providers.py

from pydantic import BaseSettings

class DocuSignConfig(BaseSettings):
"""DocuSign configuration."""
enabled: bool = True
integration_key: str
user_id: str
account_id: str
private_key_path: str
environment: str = "production" # production, demo

# Webhook configuration
webhook_url: str
webhook_secret: str

class Config:
env_prefix = "DOCUSIGN_"

class AdobeSignConfig(BaseSettings):
"""Adobe Sign configuration."""
enabled: bool = True
client_id: str
client_secret: str
refresh_token: str
api_access_point: str = "https://api.na1.adobesign.com"

# Webhook configuration
webhook_url: str

class Config:
env_prefix = "ADOBESIGN_"

class SignatureConfig(BaseSettings):
"""Master signature configuration."""

# Default provider
default_provider: str = "docusign" # docusign, adobe_sign, internal

# Workflow defaults
default_expiration_days: int = 30
default_reminder_frequency_days: int = 3

# Part 11 compliance
require_part11_signatures: bool = False
require_mfa_for_signatures: bool = True

# Certificate settings
allow_self_signed_certs: bool = False
certificate_key_size: int = 2048

# Audit settings
audit_retention_days: int = 2555 # 7 years for SOX
blockchain_anchoring_enabled: bool = False

class Config:
env_prefix = "SIGNATURE_"

10. Compliance Matrix

10.1 Standard Compliance Mapping

RequirementeIDASESIGNFDA Part 11Implementation
Electronic signature validitySES/AES/QESYesYesMulti-provider integration
Signer identity verificationAES/QESConsent2-factorMFA + provider auth
Non-repudiationAES/QESYes§11.70Hash-chained audit trail
TimestampRequiredOptionalRequiredTSA integration
Audit trailRequiredRequired§11.10(e)Immutable audit events
Certificate managementQESN/A§11.100Secure certificate store
Signature meaningAES/QESN/A§11.50Captured in metadata

11. Integration Checklist

11.1 Implementation Phases

Phase 1: Core Integration (Week 1-2)

  • DocuSign API integration
  • Adobe Sign API integration
  • Basic workflow engine
  • Webhook handlers

Phase 2: Multi-Signer (Week 3-4)

  • Sequential workflows
  • Parallel workflows
  • Delegation service
  • Escalation handling

Phase 3: Compliance (Week 5-6)

  • FDA Part 11 signatures
  • Certificate management
  • Immutable audit trail
  • Blockchain anchoring (optional)

Phase 4: Advanced (Week 7-8)

  • Custom routing rules
  • Conditional workflows
  • Bulk signing
  • Mobile signing optimization

Document Control

VersionDateAuthorChanges
1.0.02025-12-19CODITECT Architecture TeamInitial specification

Classification: Internal - Technical Specification Review Cycle: Quarterly Next Review: March 2026 Approval Required: CTO, Head of Compliance