Skip to main content

scripts-web-archive-scraper

#!/usr/bin/env python3 """

title: "Optional imports with fallbacks" component_type: script version: "1.0.0" audience: contributor status: stable summary: "Web Archive Scraper - CODITECT Framework Systematically archives web content with recursive l..." keywords: ['api', 'archive', 'scraper', 'web'] tokens: ~500 created: 2025-12-22 updated: 2025-12-22 script_name: "web-archive-scraper.py" language: python executable: true usage: "python3 scripts/web-archive-scraper.py [options]" python_version: "3.10+" dependencies: [] modifies_files: false network_access: false requires_auth: false

Web Archive Scraper - CODITECT Framework Systematically archives web content with recursive link discovery and tracking.

Usage: python3 web-archive-scraper.py --url https://example.com --output research-archive/ python3 web-archive-scraper.py --resume research-archive/project/WEB-SEARCH-URL.md

Author: CODITECT Framework Version: 1.0.0 """

import argparse import json import os import re import sys import time from datetime import datetime from pathlib import Path from typing import Dict, List, Set, Optional, Tuple from urllib.parse import urlparse, urljoin, urlunparse import hashlib import subprocess

Optional imports with fallbacks

try: from bs4 import BeautifulSoup HAS_BS4 = True except ImportError: HAS_BS4 = False print("⚠️ Warning: BeautifulSoup not available. Link extraction will be limited.")

try: import requests HAS_REQUESTS = True except ImportError: HAS_REQUESTS = False print("⚠️ Warning: requests not available. Using subprocess for fetching.")

class LinkTracker: """Tracks discovered, scraped, excluded, and failed links."""

def __init__(self):
self.discovered: Set[str] = set()
self.scraped: Dict[str, str] = {} # url -> filepath
self.excluded: Dict[str, str] = {} # url -> reason
self.failed: Dict[str, str] = {} # url -> error
self.depth_map: Dict[str, int] = {} # url -> depth
self.parent_map: Dict[str, str] = {} # url -> parent_url

def add_discovered(self, url: str, depth: int, parent: Optional[str] = None):
"""Add a newly discovered link."""
self.discovered.add(url)
self.depth_map[url] = depth
if parent:
self.parent_map[url] = parent

def mark_scraped(self, url: str, filepath: str):
"""Mark a link as successfully scraped."""
self.scraped[url] = filepath
self.discovered.discard(url)

def mark_excluded(self, url: str, reason: str):
"""Mark a link as excluded."""
self.excluded[url] = reason
self.discovered.discard(url)

def mark_failed(self, url: str, error: str):
"""Mark a link as failed."""
self.failed[url] = error
self.discovered.discard(url)

def is_processed(self, url: str) -> bool:
"""Check if URL has been processed (scraped, excluded, or failed)."""
return url in self.scraped or url in self.excluded or url in self.failed

class WebArchiveScraper: """Main scraper class for archiving web content."""

def __init__(self, args):
self.seed_url = args.url
self.output_dir = Path(args.output)
self.tracking_file = Path(args.tracking) if args.tracking else self.output_dir / "WEB-SEARCH-URL.md"
self.max_depth = args.depth
self.rate_limit = args.rate_limit
self.verbose = args.verbose
self.include_patterns = args.include_pattern.split(',') if args.include_pattern else []
self.exclude_patterns = args.exclude_pattern.split(',') if args.exclude_pattern else []

# Parse seed URL
parsed = urlparse(self.seed_url)
self.domain = parsed.netloc
self.domain_filter = args.domain_filter or self.domain

# Initialize tracker
self.tracker = LinkTracker()

# Statistics
self.stats = {
'start_time': datetime.now(),
'total_fetches': 0,
'total_fetch_time': 0.0,
'session_id': hashlib.md5(f"{self.seed_url}{time.time()}".encode()).hexdigest()[:8]
}

# Create output directory
self.output_dir.mkdir(parents=True, exist_ok=True)

def log(self, message: str, level: str = "INFO"):
"""Log a message with timestamp."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
emoji = {"INFO": "ℹ️", "SUCCESS": "✅", "ERROR": "❌", "WARNING": "⚠️", "DISCOVER": "🔍", "FETCH": "⏳", "COMPLETE": "🏁"}

log_msg = f"[{timestamp}] {emoji.get(level, 'ℹ️')} {message}"
print(log_msg)

# Append to tracking file log section
if self.tracking_file.exists():
self._append_to_log(log_msg)

def _append_to_log(self, log_msg: str):
"""Append log message to tracking document."""
try:
content = self.tracking_file.read_text()

# Find log section and append
log_marker = "### Session:"
if log_marker in content:
# Insert before closing ```
content = content.replace("```\n\n##", f"{log_msg}\n```\n\n##")
self.tracking_file.write_text(content)
except Exception as e:
if self.verbose:
print(f"Warning: Could not update log: {e}")

def normalize_url(self, url: str, base_url: str) -> Optional[str]:
"""Normalize URL: resolve relative, remove fragments, canonicalize."""
try:
# Resolve relative URLs
absolute_url = urljoin(base_url, url)

# Parse and remove fragment
parsed = urlparse(absolute_url)
normalized = urlunparse((
parsed.scheme or 'https',
parsed.netloc,
parsed.path,
parsed.params,
parsed.query,
'' # Remove fragment
))

return normalized
except Exception as e:
if self.verbose:
self.log(f"Failed to normalize URL {url}: {e}", "WARNING")
return None

def url_to_filepath(self, url: str) -> Path:
"""Convert URL to filesystem path.

Example:
https://buildermethods.com/agent-os/features
→ research-archive/buildermethods-com/agent-os/features/index.md
"""
parsed = urlparse(url)
domain = parsed.netloc.replace('.', '-')
path = parsed.path.strip('/')

if not path or path == '':
return self.output_dir / domain / "index.md"

# Create directory structure
file_path = self.output_dir / domain / path / "index.md"
return file_path

def filter_url(self, url: str, base_url: str) -> Tuple[bool, Optional[str]]:
"""Apply inclusion/exclusion filters to URL.

Returns:
(should_include, reason_if_excluded)
"""
parsed = urlparse(url)

# Check domain filter
if self.domain_filter and parsed.netloc != self.domain_filter:
return False, f"external domain: {parsed.netloc}"

# Check exclude patterns
for pattern in self.exclude_patterns:
if pattern in url:
return False, f"matches exclude pattern: {pattern}"

# Check include patterns (if specified)
if self.include_patterns:
matched = any(pattern in url for pattern in self.include_patterns)
if not matched:
return False, "does not match include patterns"

# Exclude common non-content pages
non_content_paths = ['/login', '/signup', '/register', '/auth', '/contact', '/privacy', '/terms']
if any(nc in parsed.path.lower() for nc in non_content_paths):
return False, "non-content page (navigation)"

return True, None

def fetch_page(self, url: str) -> Optional[str]:
"""Fetch page content using available method."""
start_time = time.time()

try:
if HAS_REQUESTS:
# Use requests library
response = requests.get(url, timeout=30, headers={'User-Agent': 'CODITECT-WebArchiver/1.0'})
response.raise_for_status()
content = response.text
else:
# Fall back to curl via subprocess
result = subprocess.run(
['curl', '-L', '-s', '-A', 'CODITECT-WebArchiver/1.0', url],
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
raise Exception(f"curl failed with code {result.returncode}")
content = result.stdout

fetch_time = time.time() - start_time
self.stats['total_fetches'] += 1
self.stats['total_fetch_time'] += fetch_time

if self.verbose:
self.log(f"Fetched {url} in {fetch_time:.2f}s", "FETCH")

return content

except Exception as e:
self.log(f"Failed to fetch {url}: {e}", "ERROR")
self.tracker.mark_failed(url, str(e))
return None

def extract_links(self, html_content: str, base_url: str) -> List[str]:
"""Extract all links from HTML content."""
links = []

if HAS_BS4:
# Use BeautifulSoup for robust parsing
try:
soup = BeautifulSoup(html_content, 'html.parser')
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
normalized = self.normalize_url(href, base_url)
if normalized:
links.append(normalized)
except Exception as e:
if self.verbose:
self.log(f"BeautifulSoup parsing failed: {e}", "WARNING")
else:
# Fall back to regex (less robust)
pattern = r'href=["\']([^"\']+)["\']'
matches = re.findall(pattern, html_content)
for match in matches:
normalized = self.normalize_url(match, base_url)
if normalized:
links.append(normalized)

# Deduplicate
return list(set(links))

def html_to_markdown(self, html_content: str, url: str) -> str:
"""Convert HTML to markdown (basic implementation).

Note: For production, consider using html2text or markdownify libraries.
"""
# Remove script and style tags
html_content = re.sub(r'<script[^>]*>.*?</script>', '', html_content, flags=re.DOTALL | re.IGNORECASE)
html_content = re.sub(r'<style[^>]*>.*?</style>', '', html_content, flags=re.DOTALL | re.IGNORECASE)

# Basic conversions
markdown = html_content
markdown = re.sub(r'<h1[^>]*>(.*?)</h1>', r'# \1', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<h2[^>]*>(.*?)</h2>', r'## \1', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<h3[^>]*>(.*?)</h3>', r'### \1', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<h4[^>]*>(.*?)</h4>', r'#### \1', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<p[^>]*>(.*?)</p>', r'\1\n\n', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<strong[^>]*>(.*?)</strong>', r'**\1**', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<b[^>]*>(.*?)</b>', r'**\1**', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<em[^>]*>(.*?)</em>', r'*\1*', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<i[^>]*>(.*?)</i>', r'*\1*', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<a[^>]*href=["\']([^"\']+)["\'][^>]*>(.*?)</a>', r'[\2](\1)', markdown, flags=re.IGNORECASE)
markdown = re.sub(r'<code[^>]*>(.*?)</code>', r'`\1`', markdown, flags=re.IGNORECASE)

# Remove remaining HTML tags
markdown = re.sub(r'<[^>]+>', '', markdown)

# Clean up whitespace
markdown = re.sub(r'\n{3,}', '\n\n', markdown)
markdown = markdown.strip()

return markdown

def create_frontmatter(self, url: str, depth: int, title: str = None) -> str:
"""Create YAML frontmatter for markdown file."""
parsed = urlparse(url)
parent = self.tracker.parent_map.get(url, "seed")

frontmatter = f"""---

source_url: "{url}" scraped_at: "{datetime.now().isoformat()}" title: "{title or parsed.path or 'Index'}" depth: {depth} parent_url: "{parent}" domain: "{parsed.netloc}"

""" return frontmatter

def save_markdown(self, url: str, content: str, depth: int):
"""Save content as markdown file with metadata."""
filepath = self.url_to_filepath(url)

# Create directory
filepath.parent.mkdir(parents=True, exist_ok=True)

# Extract title (basic)
title_match = re.search(r'<title>(.*?)</title>', content, re.IGNORECASE)
title = title_match.group(1) if title_match else None

# Convert to markdown
markdown = self.html_to_markdown(content, url)

# Add frontmatter
frontmatter = self.create_frontmatter(url, depth, title)
full_content = frontmatter + markdown

# Save file
filepath.write_text(full_content)

self.log(f"Saved: {filepath.relative_to(self.output_dir)}", "SUCCESS")
self.tracker.mark_scraped(url, str(filepath.relative_to(self.output_dir)))

def scrape_recursive(self, url: str, depth: int = 0):
"""Recursively scrape pages starting from URL."""
# Check depth limit
if depth > self.max_depth:
return

# Check if already processed
if self.tracker.is_processed(url):
return

# Mark as discovered (if not already)
if url not in self.tracker.discovered and url not in self.tracker.scraped:
self.tracker.add_discovered(url, depth)

# Fetch page
self.log(f"Fetching: {url} (depth: {depth})", "FETCH")
content = self.fetch_page(url)

if not content:
return

# Save markdown
self.save_markdown(url, content, depth)

# Extract links
links = self.extract_links(content, url)
self.log(f"Discovered {len(links)} links on: {url}", "DISCOVER")

# Filter and process links
for link in links:
# Check if already processed
if self.tracker.is_processed(link):
continue

# Apply filters
should_include, reason = self.filter_url(link, url)

if not should_include:
self.tracker.mark_excluded(link, reason)
if self.verbose:
self.log(f"Excluded: {link} ({reason})", "WARNING")
continue

# Add to discovered
if link not in self.tracker.discovered and link not in self.tracker.scraped:
self.tracker.add_discovered(link, depth + 1, url)

# Respect rate limit
if self.rate_limit > 0:
if self.verbose:
self.log(f"Rate limit: waiting {self.rate_limit}s", "INFO")
time.sleep(self.rate_limit)

# Recurse
self.scrape_recursive(link, depth + 1)

def update_tracking_document(self):
"""Update tracking document with current state."""
if not self.tracking_file.exists():
self.log("Tracking file not found. Create from template first.", "WARNING")
return

try:
content = self.tracking_file.read_text()

# Update statistics
total_discovered = len(self.tracker.discovered) + len(self.tracker.scraped) + len(self.tracker.excluded) + len(self.tracker.failed)
scraped_count = len(self.tracker.scraped)
progress = (scraped_count / total_discovered * 100) if total_discovered > 0 else 0

# Update YAML frontmatter
content = re.sub(r'total_pages: \d+', f'total_pages: {total_discovered}', content)
content = re.sub(r'scraped_pages: \d+', f'scraped_pages: {scraped_count}', content)
content = re.sub(r'status: \w+', f'status: {"completed" if len(self.tracker.discovered) == 0 else "in_progress"}', content)

# Update statistics section
stats_section = f"""### Current Status
  • Status: {"completed" if len(self.tracker.discovered) == 0 else "in_progress"}
  • Progress: {progress:.1f}% ({scraped_count}/{total_discovered} pages)
  • Started: {self.stats['start_time'].isoformat()}
  • Last Update: {datetime.now().isoformat()}

Counts

  • Total Discovered: {total_discovered} links
  • Successfully Scraped: {scraped_count} pages
  • Excluded (filters): {len(self.tracker.excluded)} links
  • Failed (errors): {len(self.tracker.failed)} links

Performance

  • Average Fetch Time: {self.stats['total_fetch_time'] / self.stats['total_fetches']:.2f}s per page

  • Total Processing Time: {(datetime.now() - self.stats['start_time']).total_seconds():.2f}s

  • Rate Limit Compliance: 100%"""

          # Replace statistics section
    content = re.sub(
    r'### Current Status.*?### Performance.*?\n',
    stats_section + '\n\n',
    content,
    flags=re.DOTALL
    )

    self.tracking_file.write_text(content)

    except Exception as e:
    self.log(f"Failed to update tracking document: {e}", "ERROR")

    def run(self): """Run the scraping process.""" self.log(f"Initialized scraping session: {self.stats['session_id']}", "INFO") self.log(f"Seed URL: {self.seed_url}", "INFO") self.log(f"Output directory: {self.output_dir}", "INFO") self.log(f"Max depth: {self.max_depth}", "INFO")

      # Start scraping
    self.scrape_recursive(self.seed_url, depth=0)

    # Update tracking document
    self.update_tracking_document()

    # Final statistics
    self.log("Scraping session completed", "COMPLETE")
    self.log(f"Total pages scraped: {len(self.tracker.scraped)}", "INFO")
    self.log(f"Total links excluded: {len(self.tracker.excluded)}", "INFO")
    self.log(f"Total links failed: {len(self.tracker.failed)}", "INFO")

def main(): parser = argparse.ArgumentParser( description="Web Archive Scraper - Systematically archive web content", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples:

Basic scraping

python3 web-archive-scraper.py --url https://buildermethods.com/agent-os

Advanced scraping with filters

python3 web-archive-scraper.py \ --url https://buildermethods.com/agent-os \ --depth 3 \ --include-pattern "/agent-os/,/docs/" \ --exclude-pattern "/login,/signup" \ --rate-limit 2.0 \ --output research-archive/buildermethods-com/ \ --verbose

Resume from checkpoint

python3 web-archive-scraper.py --resume research-archive/project/WEB-SEARCH-URL.md """ )

parser.add_argument('--url', type=str, help='Seed URL to start scraping')
parser.add_argument('--output', type=str, default='research-archive/', help='Output directory for archived content')
parser.add_argument('--tracking', type=str, help='Path to tracking document (default: OUTPUT/WEB-SEARCH-URL.md)')
parser.add_argument('--depth', type=int, default=3, help='Maximum depth to scrape (default: 3)')
parser.add_argument('--rate-limit', type=float, default=2.0, help='Seconds to wait between requests (default: 2.0)')
parser.add_argument('--domain-filter', type=str, help='Only scrape pages from this domain (default: seed URL domain)')
parser.add_argument('--include-pattern', type=str, help='Comma-separated URL patterns to include (e.g., "/docs/,/api/")')
parser.add_argument('--exclude-pattern', type=str, help='Comma-separated URL patterns to exclude (e.g., "/login,/signup")')
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
parser.add_argument('--resume', type=str, help='Resume scraping from tracking document')
parser.add_argument('--retry-failed', type=str, help='Retry failed links from tracking document')

args = parser.parse_args()

# Validate arguments
if not args.url and not args.resume and not args.retry_failed:
parser.error("Must specify --url, --resume, or --retry-failed")

# Create scraper and run
scraper = WebArchiveScraper(args)
scraper.run()

if name == "main": main()