Google Docs Integration - Technical Design Document
Project: CODITECT Google Docs Integration Version: 1.0.0 Status: Planning Last Updated: December 17, 2025
1. Overview
This document provides technical implementation details for the Google Docs integration, including code patterns, database schema, and integration points.
2. Core Implementation
2.1 DocsClient - Async API Client
"""Async Google Docs API client with batching."""
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
import asyncio
from typing import Optional, List, Dict, Any
class AsyncDocsClient:
"""Async wrapper for Google Docs API v1."""
SCOPES = [
'https://www.googleapis.com/auth/documents',
'https://www.googleapis.com/auth/drive.file'
]
def __init__(self, credentials: Credentials):
self.credentials = credentials
self._service = None
self._drive_service = None
@property
def service(self):
if self._service is None:
self._service = build('docs', 'v1', credentials=self.credentials)
return self._service
@property
def drive_service(self):
if self._drive_service is None:
self._drive_service = build('drive', 'v3', credentials=self.credentials)
return self._drive_service
async def _execute(self, request):
"""Execute API request in thread pool."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, request.execute)
# Document Operations
async def create(self, title: str) -> dict:
"""Create a new empty document."""
request = self.service.documents().create(body={'title': title})
return await self._execute(request)
async def get(
self,
document_id: str,
fields: Optional[str] = None,
suggestions_view_mode: str = "DEFAULT_FOR_CURRENT_ACCESS"
) -> dict:
"""Get document content and metadata."""
kwargs = {
'documentId': document_id,
'suggestionsViewMode': suggestions_view_mode
}
if fields:
kwargs['fields'] = fields
request = self.service.documents().get(**kwargs)
return await self._execute(request)
async def batch_update(
self,
document_id: str,
requests: List[dict]
) -> dict:
"""Apply batch updates to document."""
body = {'requests': requests}
request = self.service.documents().batchUpdate(
documentId=document_id,
body=body
)
return await self._execute(request)
# Text Operations
async def insert_text(
self,
document_id: str,
text: str,
index: Optional[int] = None,
end_of_segment: bool = False,
segment_id: str = ""
) -> dict:
"""Insert text at specified location."""
if end_of_segment:
location = {'endOfSegmentLocation': {'segmentId': segment_id}}
else:
location = {'index': index}
requests = [{
'insertText': {
'text': text,
'location': location
}
}]
return await self.batch_update(document_id, requests)
async def delete_content(
self,
document_id: str,
start_index: int,
end_index: int,
segment_id: str = ""
) -> dict:
"""Delete content in range."""
requests = [{
'deleteContentRange': {
'range': {
'startIndex': start_index,
'endIndex': end_index,
'segmentId': segment_id
}
}
}]
return await self.batch_update(document_id, requests)
async def replace_all_text(
self,
document_id: str,
find_text: str,
replace_text: str,
match_case: bool = False
) -> dict:
"""Find and replace all occurrences."""
requests = [{
'replaceAllText': {
'containsText': {
'text': find_text,
'matchCase': match_case
},
'replaceText': replace_text
}
}]
return await self.batch_update(document_id, requests)
# Formatting Operations
async def update_text_style(
self,
document_id: str,
start_index: int,
end_index: int,
text_style: dict,
fields: str
) -> dict:
"""Apply text formatting."""
requests = [{
'updateTextStyle': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'textStyle': text_style,
'fields': fields
}
}]
return await self.batch_update(document_id, requests)
async def update_paragraph_style(
self,
document_id: str,
start_index: int,
end_index: int,
paragraph_style: dict,
fields: str
) -> dict:
"""Apply paragraph formatting."""
requests = [{
'updateParagraphStyle': {
'range': {
'startIndex': start_index,
'endIndex': end_index
},
'paragraphStyle': paragraph_style,
'fields': fields
}
}]
return await self.batch_update(document_id, requests)
# Table Operations
async def insert_table(
self,
document_id: str,
rows: int,
columns: int,
index: Optional[int] = None,
end_of_segment: bool = False
) -> dict:
"""Insert table at location."""
if end_of_segment:
location = {'endOfSegmentLocation': {'segmentId': ''}}
else:
location = {'index': index}
requests = [{
'insertTable': {
'rows': rows,
'columns': columns,
'location': location
}
}]
return await self.batch_update(document_id, requests)
async def insert_table_row(
self,
document_id: str,
table_start_location: int,
row_index: int,
insert_below: bool = True
) -> dict:
"""Insert row in table."""
requests = [{
'insertTableRow': {
'tableCellLocation': {
'tableStartLocation': {'index': table_start_location},
'rowIndex': row_index,
'columnIndex': 0
},
'insertBelow': insert_below
}
}]
return await self.batch_update(document_id, requests)
# Named Range Operations
async def create_named_range(
self,
document_id: str,
name: str,
start_index: int,
end_index: int
) -> dict:
"""Create named range (bookmark)."""
requests = [{
'createNamedRange': {
'name': name,
'range': {
'startIndex': start_index,
'endIndex': end_index
}
}
}]
return await self.batch_update(document_id, requests)
async def delete_named_range(
self,
document_id: str,
named_range_id: Optional[str] = None,
name: Optional[str] = None
) -> dict:
"""Delete named range."""
delete_request = {}
if named_range_id:
delete_request['namedRangeId'] = named_range_id
elif name:
delete_request['name'] = name
requests = [{'deleteNamedRange': delete_request}]
return await self.batch_update(document_id, requests)
# Export Operations (via Drive API)
async def export(
self,
document_id: str,
mime_type: str
) -> bytes:
"""Export document to specified format."""
request = self.drive_service.files().export(
fileId=document_id,
mimeType=mime_type
)
return await self._execute(request)
async def copy(
self,
document_id: str,
title: Optional[str] = None,
parent_id: Optional[str] = None
) -> dict:
"""Copy document."""
body = {}
if title:
body['name'] = title
if parent_id:
body['parents'] = [parent_id]
request = self.drive_service.files().copy(
fileId=document_id,
body=body if body else None,
fields='id, name, webViewLink'
)
return await self._execute(request)
2.2 DocumentManager - High-Level Operations
"""High-level document management operations."""
from dataclasses import dataclass
from typing import Optional, List, Dict
from datetime import datetime
@dataclass
class DocumentInfo:
id: str
title: str
web_link: str
created_time: datetime
modified_time: datetime
@dataclass
class CreateFromTemplateResult:
document: DocumentInfo
replacements_made: int
success: bool
error: Optional[str] = None
class DocumentManager:
"""High-level document management with CODITECT conventions."""
EXPORT_MIMETYPES = {
'pdf': 'application/pdf',
'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'txt': 'text/plain',
'html': 'text/html',
'rtf': 'application/rtf',
'odt': 'application/vnd.oasis.opendocument.text'
}
def __init__(
self,
docs_client: AsyncDocsClient,
template_store: "TemplateStore",
drive_client: "AsyncDriveClient"
):
self.docs = docs_client
self.templates = template_store
self.drive = drive_client
async def create(self, title: str, parent_folder_id: Optional[str] = None) -> DocumentInfo:
"""Create new empty document."""
result = await self.docs.create(title)
doc_id = result['documentId']
# Move to folder if specified
if parent_folder_id:
await self.drive.update_file(
file_id=doc_id,
add_parents=[parent_folder_id]
)
# Get full metadata
file_info = await self.drive.get_file(doc_id)
return DocumentInfo(
id=doc_id,
title=result['title'],
web_link=f"https://docs.google.com/document/d/{doc_id}/edit",
created_time=datetime.fromisoformat(file_info['createdTime'].replace('Z', '+00:00')),
modified_time=datetime.fromisoformat(file_info['modifiedTime'].replace('Z', '+00:00'))
)
async def create_from_template(
self,
template_id: str,
title: str,
placeholders: Dict[str, str],
parent_folder_id: Optional[str] = None
) -> CreateFromTemplateResult:
"""Create document from template with placeholder replacement."""
try:
# Get template info
template = await self.templates.get(template_id)
# Copy template document
copy_result = await self.docs.copy(
document_id=template.document_id,
title=title,
parent_id=parent_folder_id
)
doc_id = copy_result['id']
# Replace placeholders
replacement_count = 0
requests = []
for placeholder, value in placeholders.items():
requests.append({
'replaceAllText': {
'containsText': {
'text': placeholder,
'matchCase': True
},
'replaceText': value
}
})
replacement_count += 1
if requests:
await self.docs.batch_update(doc_id, requests)
return CreateFromTemplateResult(
document=DocumentInfo(
id=doc_id,
title=title,
web_link=copy_result['webViewLink'],
created_time=datetime.utcnow(),
modified_time=datetime.utcnow()
),
replacements_made=replacement_count,
success=True
)
except Exception as e:
return CreateFromTemplateResult(
document=None,
replacements_made=0,
success=False,
error=str(e)
)
async def get_content(self, document_id: str) -> str:
"""Get document as plain text."""
doc = await self.docs.get(document_id)
return self._extract_text(doc.get('body', {}))
async def append_content(
self,
document_id: str,
content: str,
add_newline: bool = True
) -> None:
"""Append content to end of document."""
if add_newline:
content = f"\n{content}"
await self.docs.insert_text(document_id, content, end_of_segment=True)
async def export(
self,
document_id: str,
format: str
) -> bytes:
"""Export document to specified format."""
if format not in self.EXPORT_MIMETYPES:
raise ValueError(f"Unsupported format: {format}")
mime_type = self.EXPORT_MIMETYPES[format]
return await self.docs.export(document_id, mime_type)
async def export_to_markdown(self, document_id: str) -> str:
"""Export document to Markdown."""
doc = await self.docs.get(document_id)
return self._convert_to_markdown(doc.get('body', {}))
def _extract_text(self, body: dict) -> str:
"""Extract plain text from document body."""
text_parts = []
for element in body.get('content', []):
if 'paragraph' in element:
para = element['paragraph']
for elem in para.get('elements', []):
if 'textRun' in elem:
text_parts.append(elem['textRun']['content'])
return ''.join(text_parts)
def _convert_to_markdown(self, body: dict) -> str:
"""Convert document body to Markdown."""
md_parts = []
for element in body.get('content', []):
if 'paragraph' in element:
para = element['paragraph']
style = para.get('paragraphStyle', {})
named_style = style.get('namedStyleType', 'NORMAL_TEXT')
# Handle headings
heading_map = {
'HEADING_1': '# ',
'HEADING_2': '## ',
'HEADING_3': '### ',
'HEADING_4': '#### ',
'HEADING_5': '##### ',
'HEADING_6': '###### '
}
prefix = heading_map.get(named_style, '')
# Handle bullet points
if para.get('bullet'):
prefix = '- '
# Extract text with formatting
para_text = ''
for elem in para.get('elements', []):
if 'textRun' in elem:
text = elem['textRun']['content']
text_style = elem['textRun'].get('textStyle', {})
if text_style.get('bold'):
text = f"**{text.strip()}** "
if text_style.get('italic'):
text = f"*{text.strip()}* "
if text_style.get('link'):
url = text_style['link'].get('url', '')
text = f"[{text.strip()}]({url})"
para_text += text
md_parts.append(f"{prefix}{para_text}")
elif 'table' in element:
md_parts.append(self._table_to_markdown(element['table']))
return '\n'.join(md_parts)
def _table_to_markdown(self, table: dict) -> str:
"""Convert table to Markdown."""
rows = table.get('tableRows', [])
if not rows:
return ''
md_rows = []
for i, row in enumerate(rows):
cells = []
for cell in row.get('tableCells', []):
cell_text = self._extract_text({'content': cell.get('content', [])})
cells.append(cell_text.strip())
md_rows.append('| ' + ' | '.join(cells) + ' |')
# Add header separator after first row
if i == 0:
md_rows.append('| ' + ' | '.join(['---'] * len(cells)) + ' |')
return '\n'.join(md_rows)
2.3 TemplateManager
"""Template management for document generation."""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict
@dataclass
class Template:
id: str
name: str
description: str
document_id: str # Google Docs document ID
category: str
placeholders: List[str] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class TemplateManager:
"""Manage document templates."""
CATEGORIES = [
'meeting_notes',
'project_docs',
'reports',
'proposals',
'other'
]
def __init__(self, template_store: "TemplateStore", docs_client: AsyncDocsClient):
self.store = template_store
self.docs = docs_client
async def create(
self,
name: str,
document_id: str,
description: str = "",
category: str = "other"
) -> Template:
"""Register a document as a template."""
# Verify document exists
doc = await self.docs.get(document_id, fields='title,body')
# Extract placeholders (pattern: {{PLACEHOLDER}})
content = self._extract_text(doc.get('body', {}))
placeholders = self._find_placeholders(content)
template = Template(
id=f"tmpl_{document_id[:8]}",
name=name,
description=description,
document_id=document_id,
category=category,
placeholders=placeholders
)
await self.store.save(template)
return template
async def get(self, template_id: str) -> Template:
"""Get template by ID."""
return await self.store.get(template_id)
async def list(
self,
category: Optional[str] = None
) -> List[Template]:
"""List templates, optionally filtered by category."""
templates = await self.store.list()
if category:
templates = [t for t in templates if t.category == category]
return templates
async def update_placeholders(self, template_id: str) -> Template:
"""Re-scan template document for placeholders."""
template = await self.store.get(template_id)
doc = await self.docs.get(template.document_id, fields='body')
content = self._extract_text(doc.get('body', {}))
template.placeholders = self._find_placeholders(content)
template.updated_at = datetime.utcnow()
await self.store.save(template)
return template
async def delete(self, template_id: str) -> bool:
"""Delete template (not the document)."""
return await self.store.delete(template_id)
def _extract_text(self, body: dict) -> str:
"""Extract all text from document body."""
text_parts = []
for element in body.get('content', []):
if 'paragraph' in element:
for elem in element['paragraph'].get('elements', []):
if 'textRun' in elem:
text_parts.append(elem['textRun']['content'])
return ''.join(text_parts)
def _find_placeholders(self, content: str) -> List[str]:
"""Find all {{PLACEHOLDER}} patterns in content."""
import re
pattern = r'\{\{([A-Z_]+)\}\}'
matches = re.findall(pattern, content)
return [f"{{{{{m}}}}}" for m in set(matches)]
2.4 CommentService
"""Comment management for documents."""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
@dataclass
class CommentAuthor:
email: str
name: str
photo_url: Optional[str] = None
@dataclass
class CommentReply:
id: str
content: str
author: CommentAuthor
created_time: datetime
@dataclass
class Comment:
id: str
content: str
author: CommentAuthor
created_time: datetime
modified_time: datetime
resolved: bool
replies: List[CommentReply]
quoted_text: Optional[str] = None
class CommentService:
"""Manage document comments via Drive API."""
def __init__(self, drive_client: "AsyncDriveClient"):
self.drive = drive_client
async def list(
self,
document_id: str,
include_resolved: bool = False
) -> List[Comment]:
"""List comments on document."""
# Drive Comments API
result = await self.drive.list_comments(
file_id=document_id,
include_deleted=False,
fields='comments(id,content,author,createdTime,modifiedTime,resolved,quotedFileContent,replies)'
)
comments = []
for c in result.get('comments', []):
if not include_resolved and c.get('resolved'):
continue
author = c.get('author', {})
comments.append(Comment(
id=c['id'],
content=c['content'],
author=CommentAuthor(
email=author.get('emailAddress', ''),
name=author.get('displayName', ''),
photo_url=author.get('photoLink')
),
created_time=datetime.fromisoformat(c['createdTime'].replace('Z', '+00:00')),
modified_time=datetime.fromisoformat(c['modifiedTime'].replace('Z', '+00:00')),
resolved=c.get('resolved', False),
replies=[
CommentReply(
id=r['id'],
content=r['content'],
author=CommentAuthor(
email=r.get('author', {}).get('emailAddress', ''),
name=r.get('author', {}).get('displayName', '')
),
created_time=datetime.fromisoformat(r['createdTime'].replace('Z', '+00:00'))
)
for r in c.get('replies', [])
],
quoted_text=c.get('quotedFileContent', {}).get('value')
))
return comments
async def add(
self,
document_id: str,
content: str,
anchor_start: Optional[int] = None,
anchor_end: Optional[int] = None
) -> Comment:
"""Add comment to document."""
body = {'content': content}
if anchor_start is not None and anchor_end is not None:
body['anchor'] = f"start={anchor_start}&end={anchor_end}"
result = await self.drive.create_comment(document_id, body)
return await self._parse_comment(result)
async def reply(
self,
document_id: str,
comment_id: str,
content: str
) -> CommentReply:
"""Reply to a comment."""
result = await self.drive.create_reply(
document_id,
comment_id,
{'content': content}
)
author = result.get('author', {})
return CommentReply(
id=result['id'],
content=result['content'],
author=CommentAuthor(
email=author.get('emailAddress', ''),
name=author.get('displayName', '')
),
created_time=datetime.fromisoformat(result['createdTime'].replace('Z', '+00:00'))
)
async def resolve(self, document_id: str, comment_id: str) -> bool:
"""Resolve a comment."""
await self.drive.update_comment(
document_id,
comment_id,
{'resolved': True}
)
return True
async def delete(self, document_id: str, comment_id: str) -> bool:
"""Delete a comment."""
await self.drive.delete_comment(document_id, comment_id)
return True
3. Database Schema
-- Document templates
CREATE TABLE doc_templates (
id VARCHAR(50) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
document_id VARCHAR(255) NOT NULL,
category VARCHAR(50) NOT NULL,
placeholders JSONB DEFAULT '[]',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_templates_category ON doc_templates(category);
-- Documents linked to CODITECT entities
CREATE TABLE linked_documents (
id SERIAL PRIMARY KEY,
document_id VARCHAR(255) NOT NULL,
title VARCHAR(500),
entity_type VARCHAR(50) NOT NULL, -- meeting, project, etc.
entity_id VARCHAR(255) NOT NULL,
template_id VARCHAR(50),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
web_link TEXT,
FOREIGN KEY (template_id) REFERENCES doc_templates(id) ON DELETE SET NULL
);
CREATE INDEX idx_linked_docs_entity ON linked_documents(entity_type, entity_id);
CREATE INDEX idx_linked_docs_document ON linked_documents(document_id);
-- Document exports
CREATE TABLE document_exports (
id SERIAL PRIMARY KEY,
document_id VARCHAR(255) NOT NULL,
format VARCHAR(20) NOT NULL,
file_path TEXT,
drive_file_id VARCHAR(255),
size_bytes BIGINT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX idx_exports_document ON document_exports(document_id);
4. API Endpoints
"""FastAPI endpoints for Google Docs integration."""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, List, Dict
router = APIRouter(prefix="/api/v1/docs", tags=["docs"])
class CreateDocumentRequest(BaseModel):
title: str
template_id: Optional[str] = None
placeholders: Optional[Dict[str, str]] = None
parent_folder_id: Optional[str] = None
share_with: Optional[List[str]] = None
class ExportRequest(BaseModel):
format: str # pdf, docx, txt, html, markdown
include_comments: bool = False
class AddCommentRequest(BaseModel):
content: str
anchor_start: Optional[int] = None
anchor_end: Optional[int] = None
@router.post("")
async def create_document(
request: CreateDocumentRequest,
doc_manager: DocumentManager = Depends(get_doc_manager)
):
"""Create new document, optionally from template."""
if request.template_id:
result = await doc_manager.create_from_template(
template_id=request.template_id,
title=request.title,
placeholders=request.placeholders or {},
parent_folder_id=request.parent_folder_id
)
if not result.success:
raise HTTPException(400, result.error)
doc = result.document
else:
doc = await doc_manager.create(
title=request.title,
parent_folder_id=request.parent_folder_id
)
# Share with users if specified
if request.share_with:
for email in request.share_with:
await doc_manager.drive.share_with_user(
doc.id, email, role='writer'
)
return doc
@router.get("/{document_id}")
async def get_document(
document_id: str,
docs_client: AsyncDocsClient = Depends(get_docs_client)
):
"""Get document content and metadata."""
return await docs_client.get(document_id)
@router.post("/{document_id}/export")
async def export_document(
document_id: str,
request: ExportRequest,
doc_manager: DocumentManager = Depends(get_doc_manager)
):
"""Export document to specified format."""
if request.format == 'markdown':
content = await doc_manager.export_to_markdown(document_id)
return {"format": "markdown", "content": content}
else:
content = await doc_manager.export(document_id, request.format)
# Return download URL or base64
return {"format": request.format, "size": len(content)}
# Template endpoints
@router.get("/templates")
async def list_templates(
category: Optional[str] = None,
template_manager: TemplateManager = Depends(get_template_manager)
):
"""List available templates."""
return await template_manager.list(category=category)
@router.post("/templates/{template_id}/apply")
async def apply_template(
template_id: str,
request: CreateDocumentRequest,
doc_manager: DocumentManager = Depends(get_doc_manager)
):
"""Create document from template."""
result = await doc_manager.create_from_template(
template_id=template_id,
title=request.title,
placeholders=request.placeholders or {}
)
if not result.success:
raise HTTPException(400, result.error)
return result
# Comment endpoints
@router.get("/{document_id}/comments")
async def list_comments(
document_id: str,
include_resolved: bool = False,
comment_service: CommentService = Depends(get_comment_service)
):
"""List comments on document."""
return await comment_service.list(document_id, include_resolved)
@router.post("/{document_id}/comments")
async def add_comment(
document_id: str,
request: AddCommentRequest,
comment_service: CommentService = Depends(get_comment_service)
):
"""Add comment to document."""
return await comment_service.add(
document_id,
request.content,
request.anchor_start,
request.anchor_end
)
@router.post("/{document_id}/comments/{comment_id}/resolve")
async def resolve_comment(
document_id: str,
comment_id: str,
comment_service: CommentService = Depends(get_comment_service)
):
"""Resolve a comment."""
await comment_service.resolve(document_id, comment_id)
return {"status": "resolved"}
5. Configuration
"""Google Docs configuration."""
from pydantic_settings import BaseSettings
from typing import List
class DocsConfig(BaseSettings):
"""Google Docs integration settings."""
# OAuth credentials (shared with Drive)
GOOGLE_CLIENT_ID: str
GOOGLE_CLIENT_SECRET: str
GOOGLE_REDIRECT_URI: str = "http://localhost:8000/oauth/callback"
# API settings
DOCS_API_VERSION: str = "v1"
DOCS_SCOPES: List[str] = [
"https://www.googleapis.com/auth/documents",
"https://www.googleapis.com/auth/drive.file"
]
# Template settings
DEFAULT_TEMPLATE_FOLDER: str = "CODITECT/Templates"
# Rate limiting
RATE_LIMIT_READ: int = 300 # per minute
RATE_LIMIT_WRITE: int = 60 # per minute
class Config:
env_prefix = "DOCS_"
env_file = ".env"
6. Test Plan Overview
6.1 Unit Tests (Target: 90%+ coverage)
| Test File | Tests | Description |
|---|---|---|
| test_docs_client.py | 25+ | API client operations |
| test_document_manager.py | 15+ | Document management |
| test_template_manager.py | 10+ | Template operations |
| test_comment_service.py | 10+ | Comment operations |
| test_export_service.py | 8+ | Export operations |
6.2 Integration Tests
| Test File | Tests | Description |
|---|---|---|
| test_docs_api_integration.py | 15+ | Real API calls |
| test_database_integration.py | 8+ | Database operations |
6.3 E2E Tests
| Test File | Tests | Description |
|---|---|---|
| test_meeting_notes_e2e.py | 5+ | Meeting notes workflow |
| test_template_workflow_e2e.py | 5+ | Template creation and usage |
| test_export_workflow_e2e.py | 5+ | Export to various formats |
Document Control:
- Created: December 17, 2025
- Owner: CODITECT Engineering Team