Skip to main content

scripts-document-translator

#!/usr/bin/env python3 """ CODITECT Document Translator

High-fidelity document translation with format preservation and quality verification. Supports DOCX and PPTX files.

Usage: python3 document_translator.py <input_file> [options]

Options: --from Source language (default: auto-detect) --to Target language (default: en) --verify Run back-translation verification on 10% sample --verify-full Run back-translation on 100% --batch-size Batch size for translation (default: 20) --output Custom output path --resume Resume from checkpoint --dry-run Preview without translating """

import argparse import json import os import sys import time import random import re from datetime import datetime from pathlib import Path from difflib import SequenceMatcher

try: from docx import Document from deep_translator import GoogleTranslator from langdetect import detect except ImportError as e: print(f"Missing dependency: {e}") print("Install with: pip install python-docx deep-translator langdetect") sys.exit(1)

class DocumentTranslator: """Format-preserving document translator with QC."""

MERGED_WORD_FIXES = [
('objectivetheof', 'objective of'),
('theof', 'the of'), ('ofthe', 'of the'), ('inthe', 'in the'),
('tothe', 'to the'), ('forthe', 'for the'), ('andthe', 'and the'),
('withthe', 'with the'), ('fromthe', 'from the'), ('onthe', 'on the'),
('atthe', 'at the'), ('bythe', 'by the'), ('basedon', 'based on'),
('creationbased', 'creation based'), ('isthe', 'is the'),
]

def __init__(self, source_lang='pt', target_lang='en', batch_size=20):
self.source_lang = source_lang
self.target_lang = target_lang
self.batch_size = batch_size
self.translator = GoogleTranslator(source=source_lang, target=target_lang)
self.stats = {
'total_units': 0,
'translated': 0,
'errors': 0,
'warnings': 0,
'verification_passed': 0,
'verification_failed': 0,
}

def extract_docx(self, docx_path):
"""Extract content from DOCX with formatting metadata."""
doc = Document(docx_path)
content_map = []

# Extract paragraphs with run-level detail
for p_idx, para in enumerate(doc.paragraphs):
if not para.text.strip():
continue

runs = []
for r_idx, run in enumerate(para.runs):
if run.text:
runs.append({
'index': r_idx,
'text': run.text,
'bold': run.bold,
'italic': run.italic,
'underline': run.underline is not None,
})

content_map.append({
'id': f'p_{p_idx}',
'type': 'paragraph',
'index': p_idx,
'text': para.text,
'style': para.style.name if para.style else 'Normal',
'runs': runs,
'status': 'pending'
})

# Extract tables
for t_idx, table in enumerate(doc.tables):
for r_idx, row in enumerate(table.rows):
for c_idx, cell in enumerate(row.cells):
text = cell.text.strip()
if text:
content_map.append({
'id': f't_{t_idx}_{r_idx}_{c_idx}',
'type': 'table_cell',
'table_index': t_idx,
'row_index': r_idx,
'col_index': c_idx,
'text': text,
'status': 'pending'
})

# Extract headers/footers
for s_idx, section in enumerate(doc.sections):
for h_idx, para in enumerate(section.header.paragraphs):
if para.text.strip():
content_map.append({
'id': f'h_{s_idx}_{h_idx}',
'type': 'header',
'section': s_idx,
'para_index': h_idx,
'text': para.text,
'status': 'pending'
})
for f_idx, para in enumerate(section.footer.paragraphs):
if para.text.strip():
content_map.append({
'id': f'f_{s_idx}_{f_idx}',
'type': 'footer',
'section': s_idx,
'para_index': f_idx,
'text': para.text,
'status': 'pending'
})

self.stats['total_units'] = len(content_map)
return content_map

def fix_merged_words(self, text):
"""Fix common merged word patterns from translation APIs."""
if not text:
return text
for old, new in self.MERGED_WORD_FIXES:
text = re.sub(re.escape(old), new, text, flags=re.IGNORECASE)
return text

def translate_unit(self, text, max_retries=3):
"""Translate a single text unit with retries."""
if not text or not text.strip():
return text

# Skip URLs, emails, codes
if text.startswith(('http://', 'https://', 'www.', '@')):
return text
if re.match(r'^[A-Z]+-\d+', text.strip()): # Codes like E-001, US-001
return text

for attempt in range(max_retries):
try:
translated = self.translator.translate(text)
if translated:
return self.fix_merged_words(translated)
time.sleep(0.2 * (attempt + 1))
except Exception as e:
if attempt == max_retries - 1:
return None
time.sleep(0.5 * (attempt + 1))
return None

def translate_batch(self, units, progress_callback=None):
"""Translate a batch of units with micro-QC."""
results = []

for i, unit in enumerate(units):
if unit['status'] != 'pending':
results.append(unit)
continue

original = unit['text']
translated = self.translate_unit(original)

if translated is None:
unit['status'] = 'error'
unit['error'] = 'Translation failed after retries'
self.stats['errors'] += 1
else:
# Micro-QC: Check token ratio
orig_tokens = len(original.split())
trans_tokens = len(translated.split())
ratio = trans_tokens / orig_tokens if orig_tokens > 0 else 1

if ratio < 0.2 or ratio > 5.0:
unit['qc_warning'] = f'Token ratio {ratio:.2f} outside normal range'
self.stats['warnings'] += 1

unit['translated'] = translated
unit['status'] = 'translated'
self.stats['translated'] += 1

results.append(unit)
time.sleep(0.1) # Rate limiting

if progress_callback and (i + 1) % 10 == 0:
progress_callback(i + 1, len(units))

return results

def verify_sample(self, units, sample_rate=0.10):
"""Back-translate a sample to verify semantic accuracy."""
translated_units = [u for u in units if u['status'] == 'translated']
if not translated_units:
return []

sample_size = max(1, int(len(translated_units) * sample_rate))
sample = random.sample(translated_units, min(sample_size, len(translated_units)))

back_translator = GoogleTranslator(source=self.target_lang, target=self.source_lang)
results = []

for unit in sample:
try:
back_translated = back_translator.translate(unit['translated'])
time.sleep(0.1)

# Calculate similarity
similarity = SequenceMatcher(
None,
unit['text'].lower(),
(back_translated or '').lower()
).ratio()

status = 'pass' if similarity >= 0.60 else 'review'
if status == 'pass':
self.stats['verification_passed'] += 1
else:
self.stats['verification_failed'] += 1

results.append({
'id': unit['id'],
'original': unit['text'][:100],
'translated': unit['translated'][:100],
'back_translated': (back_translated or '')[:100],
'similarity': round(similarity, 3),
'status': status
})
except Exception as e:
results.append({
'id': unit['id'],
'status': 'error',
'error': str(e)
})

return results

def rebuild_docx(self, source_path, translated_map, output_path):
"""Rebuild document preserving formatting."""
doc = Document(source_path)
lookup = {u['id']: u for u in translated_map if u['status'] == 'translated'}

# Process paragraphs
for p_idx, para in enumerate(doc.paragraphs):
unit_id = f'p_{p_idx}'
if unit_id not in lookup:
continue

translated_text = lookup[unit_id]['translated']

if not para.runs:
continue
elif len(para.runs) == 1:
para.runs[0].text = translated_text
else:
# Distribute text across runs to preserve formatting
self._distribute_text_to_runs(para.runs, translated_text)

# Process tables
for t_idx, table in enumerate(doc.tables):
for r_idx, row in enumerate(table.rows):
for c_idx, cell in enumerate(row.cells):
unit_id = f't_{t_idx}_{r_idx}_{c_idx}'
if unit_id in lookup:
self._set_cell_text(cell, lookup[unit_id]['translated'])

# Process headers/footers
for s_idx, section in enumerate(doc.sections):
for h_idx, para in enumerate(section.header.paragraphs):
unit_id = f'h_{s_idx}_{h_idx}'
if unit_id in lookup and para.runs:
if len(para.runs) == 1:
para.runs[0].text = lookup[unit_id]['translated']

for f_idx, para in enumerate(section.footer.paragraphs):
unit_id = f'f_{s_idx}_{f_idx}'
if unit_id in lookup and para.runs:
if len(para.runs) == 1:
para.runs[0].text = lookup[unit_id]['translated']

doc.save(output_path)
return output_path

def _distribute_text_to_runs(self, runs, text):
"""Distribute translated text across runs preserving formatting."""
original_lengths = [len(run.text) for run in runs]
total = sum(original_lengths)

if total == 0:
if runs:
runs[0].text = text
return

words = text.split()
if not words:
for run in runs:
run.text = ''
return

current_word = 0
for i, run in enumerate(runs):
proportion = original_lengths[i] / total if total > 0 else 1 / len(runs)
word_count = max(1, int(len(words) * proportion))
end_word = min(current_word + word_count, len(words))

run.text = ' '.join(words[current_word:end_word])
if i < len(runs) - 1 and end_word < len(words):
run.text += ' '
current_word = end_word

# Handle remaining words
if current_word < len(words):
runs[-1].text += ' ' + ' '.join(words[current_word:])

def _set_cell_text(self, cell, text):
"""Set cell text preserving basic formatting."""
if cell.paragraphs:
para = cell.paragraphs[0]
for run in para.runs:
run.text = ''
if para.runs:
para.runs[0].text = text
else:
para.add_run(text)

def generate_report(self, workspace, verification_results=None):
"""Generate QC report."""
report = f"""# Translation Quality Control Report

Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Workspace: {workspace}

Statistics

MetricValue
Total Units{self.stats['total_units']}
Translated{self.stats['translated']}
Errors{self.stats['errors']}
Warnings{self.stats['warnings']}

Coverage

Translation Rate: {self.stats['translated'] / self.stats['total_units'] * 100:.1f}% """

    if verification_results:
report += f"""

Verification Results

MetricValue
Sample Size{len(verification_results)}
Passed{self.stats['verification_passed']}
Needs Review{self.stats['verification_failed']}

Items Needing Review

""" for r in verification_results: if r.get('status') == 'review': report += f"- {r['id']} (similarity: {r['similarity']})\n" report += f" - Original: {r['original']}\n" report += f" - Translated: {r['translated']}\n\n"

    report += """

Quality Checklist

  • All units translated
  • No critical errors
  • Verification passed
  • Manual spot check completed """ return report

def main(): parser = argparse.ArgumentParser(description='CODITECT Document Translator') parser.add_argument('input', help='Input document path') parser.add_argument('--from', dest='source_lang', default='pt', help='Source language') parser.add_argument('--to', dest='target_lang', default='en', help='Target language') parser.add_argument('--verify', action='store_true', help='Run back-translation verification (10%)') parser.add_argument('--verify-full', action='store_true', help='Run full verification (100%)') parser.add_argument('--batch-size', type=int, default=20, help='Batch size') parser.add_argument('--output', help='Output path') parser.add_argument('--resume', help='Resume from checkpoint') parser.add_argument('--dry-run', action='store_true', help='Preview without translating') args = parser.parse_args()

if not os.path.exists(args.input):
print(f"Error: File not found: {args.input}")
sys.exit(1)

# Create workspace
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
workspace = f"translation-{timestamp}"
os.makedirs(workspace, exist_ok=True)
os.makedirs(f"{workspace}/00_backup", exist_ok=True)
os.makedirs(f"{workspace}/01_extraction", exist_ok=True)
os.makedirs(f"{workspace}/02_translation", exist_ok=True)
os.makedirs(f"{workspace}/03_verification", exist_ok=True)
os.makedirs(f"{workspace}/04_final", exist_ok=True)

# Backup original
import shutil
shutil.copy(args.input, f"{workspace}/00_backup/")

# Initialize translator
translator = DocumentTranslator(
source_lang=args.source_lang,
target_lang=args.target_lang,
batch_size=args.batch_size
)

# Phase 1: Extract
print("Phase 1: Extracting content...")
if args.resume:
with open(args.resume) as f:
content_map = json.load(f)
print(f" Resumed from checkpoint: {len(content_map)} units")
else:
content_map = translator.extract_docx(args.input)
with open(f"{workspace}/01_extraction/content_map.json", 'w', encoding='utf-8') as f:
json.dump(content_map, f, indent=2, ensure_ascii=False)
print(f" Extracted {len(content_map)} units")

if args.dry_run:
print("\nDry run complete. Would translate:")
pending = sum(1 for u in content_map if u['status'] == 'pending')
print(f" {pending} pending units")
sys.exit(0)

# Phase 2: Translate
print("\nPhase 2: Translating...")

def progress(current, total):
pct = current / total * 100
print(f" Progress: {current}/{total} ({pct:.1f}%)")

translated = translator.translate_batch(content_map, progress_callback=progress)
with open(f"{workspace}/02_translation/translated_map.json", 'w', encoding='utf-8') as f:
json.dump(translated, f, indent=2, ensure_ascii=False)
print(f" Translated: {translator.stats['translated']}/{translator.stats['total_units']}")
if translator.stats['errors'] > 0:
print(f" Errors: {translator.stats['errors']}")

# Phase 3: Verify
verification_results = None
if args.verify or args.verify_full:
print("\nPhase 3: Verifying...")
sample_rate = 1.0 if args.verify_full else 0.10
verification_results = translator.verify_sample(translated, sample_rate)
with open(f"{workspace}/03_verification/verification.json", 'w', encoding='utf-8') as f:
json.dump(verification_results, f, indent=2, ensure_ascii=False)
print(f" Verified: {len(verification_results)} units")
print(f" Passed: {translator.stats['verification_passed']}")
print(f" Needs Review: {translator.stats['verification_failed']}")

# Phase 4: Rebuild
print("\nPhase 4: Rebuilding document...")
input_name = Path(args.input).stem
output_name = f"{input_name}-{args.target_lang.upper()}.docx"
output_path = args.output or f"{workspace}/04_final/{output_name}"
translator.rebuild_docx(args.input, translated, output_path)
print(f" Output: {output_path}")

# Generate report
report = translator.generate_report(workspace, verification_results)
with open(f"{workspace}/04_final/QC_REPORT.md", 'w') as f:
f.write(report)
print(f" Report: {workspace}/04_final/QC_REPORT.md")

print(f"\n{'='*60}")
print("TRANSLATION COMPLETE")
print(f"{'='*60}")
print(f"Output: {output_path}")
print(f"Workspace: {workspace}/")

if name == 'main': main()