Skip to main content

#!/usr/bin/env python3 """ CODITECT Playwright MCP Bridge (H.8.2.4)

Connects QAAgentBrowserTools to the @playwright/mcp server via JSON-RPC over stdio, enabling real browser automation in autonomous loops.

Architecture: QAAgentBrowserTools --> PlaywrightMCPBridge --> @playwright/mcp (subprocess) | +-- JSON-RPC stdio transport +-- Tool call mapping +-- Session lifecycle management +-- Screenshot capture/storage

Usage: bridge = PlaywrightMCPBridge(config=PlaywrightMCPConfig(headless=True)) await bridge.start()

result = await bridge.navigate("https://example.com")
screenshot = await bridge.screenshot()
await bridge.click("button#submit")

await bridge.stop()

Author: CODITECT Framework Version: 1.0.0 Created: February 17, 2026 ADR Reference: ADR-109-qa-agent-browser-automation.md """

import asyncio import json import logging import shutil import uuid from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional

logger = logging.getLogger(name)

=============================================================================

CONFIGURATION

=============================================================================

@dataclass class PlaywrightMCPConfig: """Configuration for the Playwright MCP server."""

browser: str = "chromium"
headless: bool = True
viewport_width: int = 1280
viewport_height: int = 720
timeout_action_ms: int = 5000
timeout_navigation_ms: int = 30000
caps: List[str] = field(default_factory=lambda: ["vision"])
allowed_origins: Optional[str] = None
blocked_origins: Optional[str] = None
output_dir: Optional[str] = None
npx_path: Optional[str] = None
node_path: Optional[str] = None
save_trace: bool = False
save_video: Optional[str] = None # e.g. "1280x720"
console_level: str = "error"
image_responses: str = "allow"
isolated: bool = True # Keep browser profile in memory

def to_args(self) -> List[str]:
"""Convert config to CLI arguments for @playwright/mcp."""
args = []
if self.browser != "chromium":
args.extend(["--browser", self.browser])
if self.headless:
args.append("--headless")
args.extend([
"--viewport-size", f"{self.viewport_width}x{self.viewport_height}",
"--timeout-action", str(self.timeout_action_ms),
"--timeout-navigation", str(self.timeout_navigation_ms),
"--console-level", self.console_level,
"--image-responses", self.image_responses,
])
if self.caps:
args.extend(["--caps", ",".join(self.caps)])
if self.allowed_origins:
args.extend(["--allowed-origins", self.allowed_origins])
if self.blocked_origins:
args.extend(["--blocked-origins", self.blocked_origins])
if self.output_dir:
args.extend(["--output-dir", self.output_dir])
if self.save_trace:
args.append("--save-trace")
if self.save_video:
args.extend(["--save-video", self.save_video])
if self.isolated:
args.append("--isolated")
return args

def to_dict(self) -> Dict[str, Any]:
return asdict(self)

=============================================================================

MCP JSON-RPC TRANSPORT

=============================================================================

class MCPTransportError(Exception): """Error communicating with MCP server.""" pass

class MCPToolCallError(Exception): """Error returned from an MCP tool call.""" pass

class MCPStdioTransport: """ JSON-RPC over stdio transport for MCP servers.

Handles the low-level protocol: sending requests, reading responses,
managing request IDs, and parsing JSON-RPC messages.
"""

def __init__(self):
self._process: Optional[asyncio.subprocess.Process] = None
self._request_id = 0
self._pending: Dict[int, asyncio.Future] = {}
self._reader_task: Optional[asyncio.Task] = None
self._initialized = False

async def start(self, command: List[str]) -> None:
"""Start the MCP server subprocess."""
self._process = await asyncio.create_subprocess_exec(
*command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._reader_task = asyncio.create_task(self._read_loop())
logger.info(f"MCP server started: PID {self._process.pid}")

async def stop(self) -> None:
"""Stop the MCP server subprocess."""
if self._reader_task:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
if self._process:
self._process.terminate()
try:
await asyncio.wait_for(self._process.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._process.kill()
await self._process.wait()
logger.info("MCP server stopped")
# Cancel any pending futures
for future in self._pending.values():
if not future.done():
future.set_exception(MCPTransportError("Transport closed"))
self._pending.clear()

async def send_request(self, method: str, params: Optional[Dict] = None) -> Any:
"""Send a JSON-RPC request and wait for the response."""
if not self._process or self._process.returncode is not None:
raise MCPTransportError("MCP server not running")

self._request_id += 1
req_id = self._request_id

message = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
}
if params is not None:
message["params"] = params

future: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[req_id] = future

data = json.dumps(message)
# MCP stdio uses Content-Length header framing
header = f"Content-Length: {len(data)}\r\n\r\n"
self._process.stdin.write(header.encode() + data.encode())
await self._process.stdin.drain()

try:
return await asyncio.wait_for(future, timeout=60.0)
except asyncio.TimeoutError:
self._pending.pop(req_id, None)
raise MCPTransportError(f"Request {method} timed out")

async def send_notification(self, method: str, params: Optional[Dict] = None) -> None:
"""Send a JSON-RPC notification (no response expected)."""
if not self._process or self._process.returncode is not None:
raise MCPTransportError("MCP server not running")

message = {
"jsonrpc": "2.0",
"method": method,
}
if params is not None:
message["params"] = params

data = json.dumps(message)
header = f"Content-Length: {len(data)}\r\n\r\n"
self._process.stdin.write(header.encode() + data.encode())
await self._process.stdin.drain()

async def _read_loop(self) -> None:
"""Read JSON-RPC responses from the MCP server stdout."""
try:
while True:
# Read Content-Length header
header_line = await self._process.stdout.readline()
if not header_line:
break

header_str = header_line.decode().strip()
if not header_str.startswith("Content-Length:"):
continue

content_length = int(header_str.split(":")[1].strip())

# Read empty line separator
await self._process.stdout.readline()

# Read body
body = await self._process.stdout.readexactly(content_length)
message = json.loads(body.decode())

# Route response to pending future
if "id" in message:
req_id = message["id"]
future = self._pending.pop(req_id, None)
if future and not future.done():
if "error" in message:
future.set_exception(
MCPToolCallError(
message["error"].get("message", "Unknown error")
)
)
else:
future.set_result(message.get("result"))

except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"MCP reader error: {e}")

@property
def is_running(self) -> bool:
return (
self._process is not None
and self._process.returncode is None
)

=============================================================================

PLAYWRIGHT MCP BRIDGE

=============================================================================

class PlaywrightMCPBridge: """ Bridge between QAAgentBrowserTools and the @playwright/mcp server.

Manages the MCP server lifecycle and provides typed Python methods
for browser automation operations.
"""

def __init__(
self,
config: Optional[PlaywrightMCPConfig] = None,
storage_path: Optional[Path] = None,
):
self.config = config or PlaywrightMCPConfig()
_user_data = Path.home() / "PROJECTS" / ".coditect-data"
default_path = (
_user_data / "browser-automation"
if _user_data.exists()
else Path.home() / ".coditect" / "browser-automation"
)
self.storage_path = storage_path or default_path
self.storage_path.mkdir(parents=True, exist_ok=True)

self._transport = MCPStdioTransport()
self._tools: Dict[str, Dict[str, Any]] = {}
self._started = False
self._server_info: Dict[str, Any] = {}

async def start(self) -> None:
"""Start the Playwright MCP server and initialize the session."""
npx = self.config.npx_path or shutil.which("npx")
if not npx:
raise MCPTransportError(
"npx not found. Install Node.js to use Playwright MCP bridge."
)

command = [npx, "@playwright/mcp"] + self.config.to_args()
logger.info(f"Starting Playwright MCP: {' '.join(command)}")

await self._transport.start(command)

# MCP initialize handshake
result = await self._transport.send_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "coditect-ralph-wiggum",
"version": "1.0.0",
},
})
self._server_info = result or {}
logger.info(f"MCP initialized: {self._server_info.get('serverInfo', {})}")

# Send initialized notification
await self._transport.send_notification("notifications/initialized")

# Discover available tools
tools_result = await self._transport.send_request("tools/list")
if tools_result and "tools" in tools_result:
for tool in tools_result["tools"]:
self._tools[tool["name"]] = tool
logger.info(f"Discovered {len(self._tools)} Playwright tools")

self._started = True

async def stop(self) -> None:
"""Stop the Playwright MCP server."""
self._started = False
await self._transport.stop()

@property
def is_running(self) -> bool:
return self._started and self._transport.is_running

@property
def available_tools(self) -> List[str]:
return list(self._tools.keys())

async def call_tool(self, name: str, arguments: Optional[Dict] = None) -> Any:
"""Call a Playwright MCP tool by name."""
if not self.is_running:
raise MCPTransportError("Bridge not started")
if name not in self._tools:
raise MCPToolCallError(f"Tool '{name}' not available")

result = await self._transport.send_request("tools/call", {
"name": name,
"arguments": arguments or {},
})
return result

# -------------------------------------------------------------------------
# HIGH-LEVEL BROWSER OPERATIONS
# -------------------------------------------------------------------------

async def navigate(self, url: str) -> Dict[str, Any]:
"""Navigate to a URL."""
return await self.call_tool("browser_navigate", {"url": url})

async def click(self, element: str, ref: Optional[str] = None) -> Dict[str, Any]:
"""Click an element by description or ref."""
args = {"element": element}
if ref:
args["ref"] = ref
return await self.call_tool("browser_click", args)

async def fill(self, element: str, value: str, ref: Optional[str] = None) -> Dict[str, Any]:
"""Fill a form field."""
args = {"element": element, "value": value}
if ref:
args["ref"] = ref
return await self.call_tool("browser_type", args)

async def select(self, element: str, values: List[str], ref: Optional[str] = None) -> Dict[str, Any]:
"""Select option(s) in a select element."""
args = {"element": element, "values": values}
if ref:
args["ref"] = ref
return await self.call_tool("browser_select_option", args)

async def screenshot(self, raw: bool = False) -> Dict[str, Any]:
"""Take a screenshot of the current page."""
return await self.call_tool("browser_screenshot", {"raw": raw})

async def snapshot(self) -> Dict[str, Any]:
"""Take an accessibility snapshot of the current page."""
return await self.call_tool("browser_snapshot", {})

async def wait(self, time_ms: int = 1000) -> Dict[str, Any]:
"""Wait for a specified time."""
return await self.call_tool("browser_wait", {"time": time_ms})

async def go_back(self) -> Dict[str, Any]:
"""Navigate back."""
return await self.call_tool("browser_go_back", {})

async def go_forward(self) -> Dict[str, Any]:
"""Navigate forward."""
return await self.call_tool("browser_go_forward", {})

async def press_key(self, key: str) -> Dict[str, Any]:
"""Press a keyboard key."""
return await self.call_tool("browser_press_key", {"key": key})

async def hover(self, element: str, ref: Optional[str] = None) -> Dict[str, Any]:
"""Hover over an element."""
args = {"element": element}
if ref:
args["ref"] = ref
return await self.call_tool("browser_hover", args)

async def drag(
self,
start_element: str,
end_element: str,
start_ref: Optional[str] = None,
end_ref: Optional[str] = None,
) -> Dict[str, Any]:
"""Drag from one element to another."""
args = {"startElement": start_element, "endElement": end_element}
if start_ref:
args["startRef"] = start_ref
if end_ref:
args["endRef"] = end_ref
return await self.call_tool("browser_drag", args)

async def close_tab(self) -> Dict[str, Any]:
"""Close the current tab."""
return await self.call_tool("browser_tab_close", {})

async def new_tab(self, url: Optional[str] = None) -> Dict[str, Any]:
"""Open a new tab."""
args = {}
if url:
args["url"] = url
return await self.call_tool("browser_tab_new", args)

async def list_tabs(self) -> Dict[str, Any]:
"""List open tabs."""
return await self.call_tool("browser_tab_list", {})

async def select_tab(self, index: int) -> Dict[str, Any]:
"""Select a tab by index."""
return await self.call_tool("browser_tab_select", {"index": index})

async def console_messages(self) -> Dict[str, Any]:
"""Get console messages."""
return await self.call_tool("browser_console_messages", {})

# -------------------------------------------------------------------------
# SCREENSHOT STORAGE
# -------------------------------------------------------------------------

def save_screenshot_data(
self,
base64_data: str,
name: str,
task_id: Optional[str] = None,
) -> Path:
"""Save base64 screenshot data to disk."""
import base64 as b64

subdir = self.storage_path / "screenshots"
if task_id:
subdir = subdir / task_id
subdir.mkdir(parents=True, exist_ok=True)

timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
filename = f"{name}_{timestamp}.png"
filepath = subdir / filename

raw_data = b64.b64decode(base64_data)
filepath.write_bytes(raw_data)
logger.info(f"Screenshot saved: {filepath}")
return filepath

# -------------------------------------------------------------------------
# STATUS
# -------------------------------------------------------------------------

def get_status(self) -> Dict[str, Any]:
"""Get bridge status."""
return {
"started": self._started,
"running": self.is_running,
"server_info": self._server_info,
"tools_count": len(self._tools),
"tools": list(self._tools.keys()),
"config": self.config.to_dict(),
"storage_path": str(self.storage_path),
}

def get_mcp_config_template(self) -> Dict[str, Any]:
"""
Generate an MCP server configuration template for Claude Code settings.

Returns a dict suitable for inclusion in ~/.claude/settings.json
or .claude/settings.local.json under mcpServers.
"""
npx = self.config.npx_path or shutil.which("npx") or "npx"
return {
"playwright": {
"command": npx,
"args": ["@playwright/mcp"] + self.config.to_args(),
}
}

=============================================================================

EXPORTS

=============================================================================

all = [ "PlaywrightMCPConfig", "MCPStdioTransport", "MCPTransportError", "MCPToolCallError", "PlaywrightMCPBridge", ]