scripts-transcript-normalize
#!/usr/bin/env python3 """ Transcript normalization script.
- Optional filename normalization (lowercase kebab-case, no spaces)
- Punctuation spacing repair
- Sentence splitting with abbreviation handling
- Paragraphing with topic/demos/risk/list cues
- Markdown output with speaker line """
from future import annotations
import argparse import re from pathlib import Path from typing import Dict, List, Optional, Tuple
CUE_RE = re.compile( r'^(Now|So|But|Also|Another|Next|Finally|One|Two|Three|First|Second|Third|Last|Then|However|' r'In fact|By the way|For example|For instance|That said|Meanwhile|On the other hand|' r'Additionally|Moreover|Further|In addition|At the same time|Still|Yet|As a result|' r'Therefore|Thus|Consequently|In summary|Overall|In short|To be clear|To recap|Quick note|' r'Side note|Note|Anyway|Moving on|Let me show|Let me tell|Here\b|There\b|This\b|That\b)\b', re.I, )
DEMO_RE = re.compile( r'^(Here\s+is|Here\s+are|Here\s*'s|Now\s+I'll\s+show|Now\s+let\s+me\s+show|' r'So\s+I\s+asked|Then\s+it\s+did|Let\s+me\s+show|Let\s+me\s+tell)\b', re.I, )
RISK_RE = re.compile( r'^(Not\s+perfect|There\s+is\s+a\s+security\s+risk|Risk|Risks|Security|Cost|Costs|Crashes|' r'Limitations|It\s+is\s+far\s+from\s+perfect|You\s+really\s+have\s+to\s+be\s+careful)', re.I, )
ENUM_RE = re.compile(r'^(First|Second|Third|Fourth|Fifth|One|Two|Three)\b', re.I)
ABBREVIATIONS = { 'e.g.', 'i.e.', 'mr.', 'mrs.', 'dr.', 'vs.', 'etc.', 'jr.', 'sr.', 'u.s.', 'u.k.', 'p.s.' }
SENTENCE_SPLIT_RE = re.compile(r'(?:(?<=.{3})|(?<=[.!?]))\s+') TIMESTAMP_RE = re.compile(r'^\s*(\d{1,2}:\d{2}(?::\d{2})?)\s+') SPEAKER_LINE_RE = re.compile(r'^\s*([A-Za-z][\w .-]{0,40})\s*:\s*(.+)$') FILLER_RE = re.compile(r'\b(um+|uh+|er+|ah+|like|you know)\b', re.I)
def to_kebab_lower(name: str) -> str: p = Path(name) stem = p.stem suffix = p.suffix.lower() stem = stem.replace("'", '') stem = re.sub(r'[\s_]+', '-', stem) stem = re.sub(r'-+', '-', stem) stem = stem.strip('-').lower() return stem + suffix
def insert_missing_spaces(text: str) -> str: # Insert space after ellipses when immediately followed by a letter/number text = re.sub(r'(.{3})([A-Za-z0-9])', r'\1 \2', text)
def _fix(m: re.Match) -> str:
prev = m.group(1)
punct = m.group(2)
nxt = m.group(3)
if punct == '.' and prev.isdigit() and nxt.isdigit():
return prev + punct + nxt
if punct == '.' and prev.isupper() and nxt.isupper():
return prev + punct + nxt
return prev + punct + ' ' + nxt
# Insert space between [alnum][punct][alnum]
text = re.sub(r'([A-Za-z0-9])([.!?])([A-Za-z0-9])', _fix, text)
# Fix split decimals or version numbers (e.g., "4. 7" -> "4.7")
text = re.sub(r'(\d)\.\s+(\d)', r'\1.\2', text)
return text
def split_sentences(text: str) -> List[str]: raw = [s.strip() for s in SENTENCE_SPLIT_RE.split(text) if s.strip()] if not raw: return []
merged: List[str] = []
i = 0
while i < len(raw):
s = raw[i]
lower = s.lower()
if lower.endswith(tuple(ABBREVIATIONS)) and i + 1 < len(raw):
s = s + ' ' + raw[i + 1]
i += 1
elif re.search(r'\b[A-Z]\.$', s) and i + 1 < len(raw):
s = s + ' ' + raw[i + 1]
i += 1
elif re.search(r'(?:\b[A-Z]\.){2,}$', s) and i + 1 < len(raw):
s = s + ' ' + raw[i + 1]
i += 1
merged.append(s)
i += 1
return merged
def paragraphize(sentences: List[str]) -> List[str]: paragraphs: List[str] = [] buf: List[str] = [] for s in sentences: if buf and (RISK_RE.match(s) or DEMO_RE.match(s) or CUE_RE.match(s) or ENUM_RE.match(s)): paragraphs.append(' '.join(buf)) buf = [s] continue
buf.append(s)
if len(buf) >= 4:
paragraphs.append(' '.join(buf))
buf = []
if buf:
paragraphs.append(' '.join(buf))
# Hard cap 6 sentences
final_paras: List[str] = []
for p in paragraphs:
sents = split_sentences(p)
for j in range(0, len(sents), 6):
final_paras.append(' '.join(sents[j:j + 6]))
return final_paras
def detect_speaker(lines: List[str]) -> Tuple[Optional[str], int]: for i, ln in enumerate(lines): if ln.strip(): m = re.match(r'^(.*?)(\d{1,2}:\d{2})$', ln.strip()) if m: speaker = m.group(1).strip() or 'Unknown speaker' return speaker, i + 1 return None, i return None, 0
def dehyphenate_lines(lines: List[str]) -> List[str]: if not lines: return lines out: List[str] = [] i = 0 while i < len(lines): line = lines[i].rstrip() if line.endswith('-') and i + 1 < len(lines): next_line = lines[i + 1].lstrip() if line[:-1] and next_line: out.append(line[:-1] + next_line) i += 2 continue out.append(line) i += 1 return out
def extract_speaker_blocks(lines: List[str]) -> Optional[List[Tuple[str, str]]]: blocks: List[Tuple[str, str]] = [] for ln in lines: m = SPEAKER_LINE_RE.match(ln) if not m: continue speaker = m.group(1).strip() text = m.group(2).strip() if speaker and text: blocks.append((speaker, text)) return blocks if len(blocks) >= 2 else None
def normalize_transcript( text: str, keep_timestamps: bool, dehyphenate: bool, remove_fillers: bool, speaker_labels: bool, ) -> Tuple[Optional[str], List[str]]: lines = [ln.rstrip() for ln in text.splitlines()] if dehyphenate: lines = dehyphenate_lines(lines)
# Speaker detection from leading timestamp line
speaker, start_idx = detect_speaker([ln.strip() for ln in lines])
content_lines = [ln.strip() for ln in lines[start_idx:] if ln.strip()]
# Optional speaker labels within the transcript
speaker_blocks = extract_speaker_blocks(content_lines) if speaker_labels else None
if speaker_blocks:
paragraphs: List[str] = []
for spk, content in speaker_blocks:
stream = content
if remove_fillers:
stream = FILLER_RE.sub('', stream)
stream = re.sub(r'\s+', ' ', stream).strip()
stream = insert_missing_spaces(stream)
sentences = split_sentences(stream)
for p in paragraphize(sentences):
paragraphs.append(f'**Speaker:** {spk} — {p}')
return speaker, paragraphs
# Timestamp preservation
timestamps: List[Tuple[str, str]] = []
if keep_timestamps:
for ln in content_lines:
m = TIMESTAMP_RE.match(ln)
if m:
ts = m.group(1)
rest = ln[m.end():].strip()
timestamps.append((ts, rest))
else:
timestamps.append(('', ln))
else:
timestamps = [('', ln) for ln in content_lines]
stream = ' '.join(t for _, t in timestamps if t)
stream = re.sub(r'\s+', ' ', stream).strip()
if remove_fillers:
stream = FILLER_RE.sub('', stream)
stream = re.sub(r'\s+', ' ', stream).strip()
stream = insert_missing_spaces(stream)
sentences = split_sentences(stream)
paragraphs = paragraphize(sentences)
if keep_timestamps and timestamps:
# Apply first timestamp to first paragraph only (conservative)
first_ts = next((ts for ts, _ in timestamps if ts), '')
if first_ts and paragraphs:
paragraphs[0] = f'[{first_ts}] {paragraphs[0]}'
return speaker, paragraphs
def title_from_filename(path: Path) -> str: return path.stem.replace('-', ' ')
def process_file( src: Path, dst_dir: Path, dry_run: bool, keep_timestamps: bool, dehyphenate: bool, remove_fillers: bool, speaker_labels: bool, ) -> Dict[str, int]: text = src.read_text(encoding='utf-8', errors='ignore') speaker, paragraphs = normalize_transcript( text, keep_timestamps=keep_timestamps, dehyphenate=dehyphenate, remove_fillers=remove_fillers, speaker_labels=speaker_labels, )
out_lines: List[str] = [f'# {title_from_filename(src)}', '']
if speaker:
out_lines.append(f'**Speaker:** {speaker}')
out_lines.append('')
for p in paragraphs:
out_lines.append(p)
out_lines.append('')
out_text = '\n'.join(out_lines).strip() + '\n'
out_path = dst_dir / (src.stem + '.md')
stats = {
'sentences': sum(len(split_sentences(p)) for p in paragraphs),
'paragraphs': len(paragraphs),
'speaker_detected': 1 if speaker else 0,
}
if dry_run:
return stats
out_path.write_text(out_text, encoding='utf-8')
return stats
def rename_inputs(paths: List[Path], dry_run: bool) -> Tuple[List[Path], int]: renamed: List[Path] = [] count = 0 for p in paths: new_name = to_kebab_lower(p.name) new_path = p.with_name(new_name) if new_path == p: renamed.append(p) continue if dry_run: renamed.append(new_path) count += 1 continue p.rename(new_path) renamed.append(new_path) count += 1 return renamed, count
def main() -> None: parser = argparse.ArgumentParser(description='Normalize transcript TXT files to Markdown.') parser.add_argument('--input', required=True, help='Input directory containing .txt files') parser.add_argument('--output', required=True, help='Output directory for .md files') parser.add_argument('--rename', action='store_true', help='Rename input files to lowercase kebab-case') parser.add_argument('--dry-run', action='store_true', help='Do not write files') parser.add_argument('--keep-timestamps', action='store_true', help='Preserve leading timestamps') parser.add_argument('--dehyphenate', action='store_true', help='Merge hyphenated line breaks') parser.add_argument('--remove-fillers', action='store_true', help='Remove filler words (um, uh, you know)') parser.add_argument('--speaker-labels', action='store_true', help='Detect and label speaker lines (Name: ...)') parser.add_argument('--report', help='Write JSON summary report to this path') args = parser.parse_args()
in_dir = Path(args.input)
out_dir = Path(args.output)
if not in_dir.exists() or not in_dir.is_dir():
raise SystemExit(f'Input directory not found: {in_dir}')
if not args.dry_run:
out_dir.mkdir(parents=True, exist_ok=True)
txt_files = sorted(in_dir.glob('*.txt'))
renamed_count = 0
if args.rename:
txt_files, renamed_count = rename_inputs(txt_files, args.dry_run)
report: Dict[str, Dict[str, int]] = {}
for src in txt_files:
stats = process_file(
src,
out_dir,
args.dry_run,
keep_timestamps=args.keep_timestamps,
dehyphenate=args.dehyphenate,
remove_fillers=args.remove_fillers,
speaker_labels=args.speaker_labels,
)
report[src.name] = stats
if args.report and not args.dry_run:
import json
payload = {
'files_processed': len(txt_files),
'files_renamed': renamed_count,
'stats': report,
}
Path(args.report).write_text(json.dumps(payload, indent=2), encoding='utf-8')
if name == 'main': main()