Skip to main content

scripts-code-extractor

#!/usr/bin/env python3 """ CODITECT Code Extractor

Extracts and categorizes code blocks from various sources:

  • Documentation pages (HTML/Markdown)
  • Source files (Python, JS, Go, Rust, etc.)
  • README files
  • Jupyter notebooks

Features:

  • Language detection with confidence scoring
  • Code quality filtering (min/max length, syntax validation)
  • Deduplication via content hashing
  • Context preservation (surrounding text)

Author: CODITECT Version: 1.0.0 """

import hashlib import re from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Optional, Any

class CodeQuality(Enum): """Code quality levels.""" HIGH = "high" # Compilable, documented, complete MEDIUM = "medium" # Runnable but incomplete LOW = "low" # Snippet, partial, errors

@dataclass class ExtractionConfig: """Configuration for code extraction.""" min_length: int = 20 max_length: int = 5000 include_comments: bool = True validate_syntax: bool = True deduplicate: bool = True preserve_context: bool = True context_lines: int = 3

@dataclass class ExtractedCode: """Extracted code block with metadata.""" code: str language: str source: str line_number: Optional[int] = None quality: CodeQuality = CodeQuality.MEDIUM confidence: float = 0.8 context_before: str = "" context_after: str = "" hash: str = "" tags: list[str] = field(default_factory=list)

def __post_init__(self):
if not self.hash:
self.hash = hashlib.md5(self.code.encode()).hexdigest()[:12]

def to_dict(self) -> dict:
return {
"code": self.code,
"language": self.language,
"source": self.source,
"line_number": self.line_number,
"quality": self.quality.value,
"confidence": self.confidence,
"hash": self.hash,
"tags": self.tags
}

class CodeExtractor: """ CODITECT Code Extractor

Extracts code blocks from multiple source types with:
- Intelligent language detection
- Quality scoring
- Deduplication
- Context preservation

Supported sources:
- HTML documentation (BeautifulSoup optional)
- Markdown files
- Source code files
- Jupyter notebooks
"""

# Language detection patterns
LANGUAGE_PATTERNS = {
"python": {
"shebangs": [r"#!/.*python"],
"patterns": [
r"^def\s+\w+\s*\(",
r"^class\s+\w+",
r"^import\s+\w+",
r"^from\s+\w+\s+import",
r"self\.\w+",
r"__\w+__",
r"@\w+decorator"
],
"keywords": ["def", "class", "import", "from", "self", "None", "True", "False"],
"weight": 0.9
},
"javascript": {
"shebangs": [r"#!/.*node"],
"patterns": [
r"^const\s+\w+\s*=",
r"^let\s+\w+\s*=",
r"^function\s+\w+\s*\(",
r"=>\s*\{",
r"^export\s+(default\s+)?",
r"require\(['\"]",
r"console\.(log|error|warn)"
],
"keywords": ["const", "let", "function", "export", "require", "async", "await"],
"weight": 0.85
},
"typescript": {
"shebangs": [],
"patterns": [
r":\s*(string|number|boolean|any)\b",
r"interface\s+\w+",
r"type\s+\w+\s*=",
r"<\w+>",
r"as\s+\w+",
r":\s*\w+\[\]"
],
"keywords": ["interface", "type", "implements", "extends", "enum"],
"weight": 0.9
},
"go": {
"shebangs": [],
"patterns": [
r"^package\s+\w+",
r"^func\s+\w+",
r"^type\s+\w+\s+struct",
r":=",
r"fmt\.\w+",
r"go\s+\w+\(",
r"defer\s+"
],
"keywords": ["package", "func", "type", "struct", "interface", "chan", "go", "defer"],
"weight": 0.95
},
"rust": {
"shebangs": [],
"patterns": [
r"^fn\s+\w+",
r"^struct\s+\w+",
r"^impl\s+\w+",
r"^use\s+\w+",
r"let\s+mut\s+",
r"->",
r"&\w+",
r"\.unwrap\(\)"
],
"keywords": ["fn", "struct", "impl", "use", "let", "mut", "pub", "mod"],
"weight": 0.95
},
"java": {
"shebangs": [],
"patterns": [
r"^public\s+(class|interface)",
r"^private\s+\w+",
r"^protected\s+\w+",
r"System\.out\.",
r"@\w+\s*\n\s*public",
r"new\s+\w+\("
],
"keywords": ["public", "private", "protected", "class", "interface", "extends", "implements"],
"weight": 0.85
},
"csharp": {
"shebangs": [],
"patterns": [
r"^using\s+\w+;",
r"^namespace\s+\w+",
r"^public\s+class",
r"Console\.Write",
r"\[\w+\]",
r"var\s+\w+\s*="
],
"keywords": ["using", "namespace", "public", "private", "class", "async", "await", "var"],
"weight": 0.85
},
"bash": {
"shebangs": [r"#!/.*bash", r"#!/.*sh"],
"patterns": [
r"^\$\s*",
r"^#!",
r"echo\s+",
r"\|\s*\w+",
r"\$\{\w+\}",
r"if\s*\[\s*"
],
"keywords": ["echo", "if", "fi", "then", "else", "for", "done", "export"],
"weight": 0.8
},
"sql": {
"shebangs": [],
"patterns": [
r"^SELECT\s+",
r"^INSERT\s+INTO",
r"^UPDATE\s+\w+",
r"^CREATE\s+TABLE",
r"^FROM\s+\w+",
r"^WHERE\s+"
],
"keywords": ["SELECT", "FROM", "WHERE", "INSERT", "UPDATE", "DELETE", "CREATE", "TABLE"],
"weight": 0.9
},
"yaml": {
"shebangs": [],
"patterns": [
r"^\w+:\s*$",
r"^\s+-\s+\w+",
r"^\s+\w+:\s+\w+"
],
"keywords": [],
"weight": 0.7
},
"json": {
"shebangs": [],
"patterns": [
r'^\s*\{',
r'^\s*\[',
r'"\w+":\s*',
],
"keywords": [],
"weight": 0.7
}
}

# Markdown code fence pattern
MD_CODE_PATTERN = re.compile(
r'```(\w*)\n(.*?)```',
re.DOTALL | re.MULTILINE
)

# HTML code block patterns
HTML_CODE_PATTERNS = [
re.compile(r'<pre[^>]*><code[^>]*class="[^"]*language-(\w+)[^"]*"[^>]*>(.*?)</code></pre>', re.DOTALL),
re.compile(r'<pre[^>]*><code[^>]*>(.*?)</code></pre>', re.DOTALL),
re.compile(r'<code[^>]*class="[^"]*language-(\w+)[^"]*"[^>]*>(.*?)</code>', re.DOTALL),
]

def __init__(self, config: Optional[ExtractionConfig] = None):
self.config = config or ExtractionConfig()
self._seen_hashes: set[str] = set()

def extract_from_markdown(self, content: str, source: str = "markdown") -> list[ExtractedCode]:
"""Extract code blocks from Markdown content."""
blocks = []

for match in self.MD_CODE_PATTERN.finditer(content):
lang_hint = match.group(1).lower() if match.group(1) else ""
code = match.group(2).strip()

if not self._valid_length(code):
continue

# Detect language if not specified
language, confidence = self._detect_language(code, lang_hint)

# Get context
context_before, context_after = "", ""
if self.config.preserve_context:
context_before, context_after = self._get_context(
content, match.start(), match.end()
)

block = ExtractedCode(
code=code,
language=language,
source=source,
confidence=confidence,
quality=self._assess_quality(code, language),
context_before=context_before,
context_after=context_after
)

if self._should_include(block):
blocks.append(block)

return blocks

def extract_from_html(self, html: str, source: str = "html") -> list[ExtractedCode]:
"""Extract code blocks from HTML content."""
blocks = []

# Try to use BeautifulSoup if available
try:
from bs4 import BeautifulSoup
return self._extract_html_bs4(html, source)
except ImportError:
pass

# Fallback to regex
for pattern in self.HTML_CODE_PATTERNS:
for match in pattern.finditer(html):
groups = match.groups()

if len(groups) == 2:
lang_hint, code = groups
else:
lang_hint, code = "", groups[0]

# Unescape HTML entities
code = self._unescape_html(code)

if not self._valid_length(code):
continue

language, confidence = self._detect_language(code, lang_hint or "")

block = ExtractedCode(
code=code,
language=language,
source=source,
confidence=confidence,
quality=self._assess_quality(code, language)
)

if self._should_include(block):
blocks.append(block)

return blocks

def _extract_html_bs4(self, html: str, source: str) -> list[ExtractedCode]:
"""Extract using BeautifulSoup for better parsing."""
from bs4 import BeautifulSoup

blocks = []
soup = BeautifulSoup(html, 'html.parser')

# Find all code elements
for code_elem in soup.find_all(['code', 'pre']):
code = code_elem.get_text()

if not self._valid_length(code):
continue

# Try to get language from class
lang_hint = ""
classes = code_elem.get('class', [])
for cls in classes:
if cls.startswith('language-'):
lang_hint = cls.replace('language-', '')
break
elif cls in self.LANGUAGE_PATTERNS:
lang_hint = cls
break

language, confidence = self._detect_language(code, lang_hint)

block = ExtractedCode(
code=code,
language=language,
source=source,
confidence=confidence,
quality=self._assess_quality(code, language)
)

if self._should_include(block):
blocks.append(block)

return blocks

def extract_from_file(self, file_path: Path) -> list[ExtractedCode]:
"""Extract code from a source file."""
try:
content = file_path.read_text()
except Exception:
return []

# Determine language from extension
ext = file_path.suffix.lower()
ext_map = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".tsx": "typescript",
".go": "go",
".rs": "rust",
".java": "java",
".cs": "csharp",
".sh": "bash",
".sql": "sql",
".yaml": "yaml",
".yml": "yaml",
".json": "json"
}

language = ext_map.get(ext, "unknown")

# Extract meaningful blocks (functions, classes)
blocks = []

if language == "python":
blocks.extend(self._extract_python_blocks(content, str(file_path)))
elif language in ["javascript", "typescript"]:
blocks.extend(self._extract_js_blocks(content, str(file_path), language))
else:
# For other languages, extract the whole file if reasonable
if self._valid_length(content):
blocks.append(ExtractedCode(
code=content,
language=language,
source=str(file_path),
quality=self._assess_quality(content, language)
))

return blocks

def _extract_python_blocks(self, content: str, source: str) -> list[ExtractedCode]:
"""Extract Python functions and classes."""
import ast
blocks = []

try:
tree = ast.parse(content)
except SyntaxError:
return blocks

lines = content.split('\n')

for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
start = node.lineno - 1
end = getattr(node, 'end_lineno', start + 20)

code = '\n'.join(lines[start:end])

if self._valid_length(code):
blocks.append(ExtractedCode(
code=code,
language="python",
source=source,
line_number=node.lineno,
quality=CodeQuality.HIGH,
tags=[type(node).__name__.lower()]
))

return blocks

def _extract_js_blocks(self, content: str, source: str, language: str) -> list[ExtractedCode]:
"""Extract JavaScript/TypeScript functions."""
blocks = []

# Function patterns
patterns = [
# Arrow functions with export
r'(export\s+(?:default\s+)?(?:const|let)\s+\w+\s*=\s*(?:async\s+)?\([^)]*\)\s*=>\s*\{[^}]+(?:\{[^}]*\}[^}]*)*\})',
# Regular functions
r'((?:export\s+)?(?:async\s+)?function\s+\w+\s*\([^)]*\)\s*\{[^}]+(?:\{[^}]*\}[^}]*)*\})',
# Class definitions
r'((?:export\s+)?class\s+\w+\s*(?:extends\s+\w+)?\s*\{[^}]+(?:\{[^}]*\}[^}]*)*\})'
]

for pattern in patterns:
for match in re.finditer(pattern, content, re.MULTILINE | re.DOTALL):
code = match.group(1)
if self._valid_length(code):
blocks.append(ExtractedCode(
code=code,
language=language,
source=source,
quality=CodeQuality.HIGH
))

return blocks

def extract_from_notebook(self, notebook_path: Path) -> list[ExtractedCode]:
"""Extract code from Jupyter notebook."""
import json

blocks = []

try:
with open(notebook_path) as f:
notebook = json.load(f)
except (json.JSONDecodeError, OSError):
return blocks

cells = notebook.get('cells', [])

for i, cell in enumerate(cells):
if cell.get('cell_type') != 'code':
continue

source = cell.get('source', [])
if isinstance(source, list):
code = ''.join(source)
else:
code = source

if not self._valid_length(code):
continue

language, confidence = self._detect_language(code, "python")

blocks.append(ExtractedCode(
code=code,
language=language,
source=f"{notebook_path}:cell_{i}",
confidence=confidence,
quality=self._assess_quality(code, language),
tags=["notebook"]
))

return blocks

def _detect_language(self, code: str, hint: str = "") -> tuple[str, float]:
"""Detect programming language with confidence score."""
# If hint matches a known language, use it
if hint and hint.lower() in self.LANGUAGE_PATTERNS:
return hint.lower(), 0.95

# Normalize hint
hint_map = {
"py": "python",
"js": "javascript",
"ts": "typescript",
"rb": "ruby",
"sh": "bash",
"shell": "bash"
}
if hint.lower() in hint_map:
return hint_map[hint.lower()], 0.9

# Score each language
scores: dict[str, float] = {}

for lang, patterns in self.LANGUAGE_PATTERNS.items():
score = 0.0

# Check shebangs
for shebang in patterns.get("shebangs", []):
if re.search(shebang, code[:100]):
score += 0.5

# Check patterns
for pattern in patterns.get("patterns", []):
matches = len(re.findall(pattern, code, re.MULTILINE))
score += min(matches * 0.1, 0.3)

# Check keywords
keywords = patterns.get("keywords", [])
keyword_count = sum(1 for kw in keywords if re.search(rf'\b{kw}\b', code))
score += min(keyword_count * 0.05, 0.2)

# Apply language weight
score *= patterns.get("weight", 1.0)

scores[lang] = score

if not scores:
return "unknown", 0.0

best_lang = max(scores, key=scores.get)
best_score = scores[best_lang]

# Normalize confidence
confidence = min(best_score / 0.8, 1.0)

return best_lang, confidence

def _assess_quality(self, code: str, language: str) -> CodeQuality:
"""Assess code quality."""
score = 0

# Length scoring
lines = code.count('\n') + 1
if 10 <= lines <= 100:
score += 2
elif 5 <= lines <= 200:
score += 1

# Has comments/docstrings
if re.search(r'(#|//|/\*|\"\"\"|\'\'\')', code):
score += 1

# Has proper structure (functions, classes)
if re.search(r'(def |function |class |fn |func )', code):
score += 1

# No obvious errors
error_patterns = [r'TODO', r'FIXME', r'XXX', r'HACK', r'\.\.\.']
if not any(re.search(p, code) for p in error_patterns):
score += 1

# Validate syntax for Python
if language == "python" and self.config.validate_syntax:
try:
compile(code, '<string>', 'exec')
score += 2
except SyntaxError:
score -= 1

if score >= 5:
return CodeQuality.HIGH
elif score >= 2:
return CodeQuality.MEDIUM
else:
return CodeQuality.LOW

def _valid_length(self, code: str) -> bool:
"""Check if code length is within bounds."""
length = len(code)
return self.config.min_length <= length <= self.config.max_length

def _should_include(self, block: ExtractedCode) -> bool:
"""Check if block should be included (deduplication)."""
if not self.config.deduplicate:
return True

if block.hash in self._seen_hashes:
return False

self._seen_hashes.add(block.hash)
return True

def _get_context(
self,
content: str,
start: int,
end: int
) -> tuple[str, str]:
"""Get surrounding context for a code block."""
lines = content.split('\n')
code_start_line = content[:start].count('\n')
code_end_line = content[:end].count('\n')

before_start = max(0, code_start_line - self.config.context_lines)
after_end = min(len(lines), code_end_line + self.config.context_lines + 1)

context_before = '\n'.join(lines[before_start:code_start_line])
context_after = '\n'.join(lines[code_end_line + 1:after_end])

return context_before.strip(), context_after.strip()

def _unescape_html(self, text: str) -> str:
"""Unescape HTML entities."""
replacements = [
('&lt;', '<'),
('&gt;', '>'),
('&amp;', '&'),
('&quot;', '"'),
('&#39;', "'"),
('&nbsp;', ' ')
]
for old, new in replacements:
text = text.replace(old, new)
return text

def reset_deduplication(self):
"""Reset deduplication cache."""
self._seen_hashes.clear()