From b167a0a9f4d604d638aacaa38f8b1a456ca21f2c Mon Sep 17 00:00:00 2001 From: Deeman Date: Sat, 21 Feb 2026 15:44:02 +0100 Subject: [PATCH] Add scout MCP server for browser recon + msgspec workspace dep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tools/scout/: browser automation MCP server using Pydoll (CDP, no WebDriver) - scout_visit, scout_elements (text-first), scout_click, scout_fill, scout_select - scout_scroll, scout_text, scout_screenshot (opt-in) - scout_har_start / scout_har_stop (asyncio task holds recording context open) - scout_analyze: HAR parsing with HarEntry/HarSummary msgspec structs - Standalone project (not workspace member — websockets conflict with prefect) - Runs via: uv run --directory tools/scout scout-server - .mcp.json: registers scout as Claude Code MCP server (project scope) - msgspec>=0.19 added to root project deps (workspace-wide struct/validation) - coding_philosophy.md: document msgspec as approved dep, usage rules Co-Authored-By: Claude Sonnet 4.6 --- .mcp.json | 9 + coding_philosophy.md | 7 + pyproject.toml | 2 +- tools/scout/pyproject.toml | 20 + tools/scout/src/scout/__init__.py | 0 tools/scout/src/scout/analyze.py | 190 +++++ tools/scout/src/scout/browser.py | 396 +++++++++++ tools/scout/src/scout/server.py | 170 +++++ tools/scout/uv.lock | 1107 +++++++++++++++++++++++++++++ uv.lock | 34 + 10 files changed, 1934 insertions(+), 1 deletion(-) create mode 100644 .mcp.json create mode 100644 tools/scout/pyproject.toml create mode 100644 tools/scout/src/scout/__init__.py create mode 100644 tools/scout/src/scout/analyze.py create mode 100644 tools/scout/src/scout/browser.py create mode 100644 tools/scout/src/scout/server.py create mode 100644 tools/scout/uv.lock diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 0000000..8d35486 --- /dev/null +++ b/.mcp.json @@ -0,0 +1,9 @@ +{ + "mcpServers": { + "scout": { + "type": "stdio", + "command": "uv", + "args": ["run", "--directory", "tools/scout", "scout-server"] + } + } +} diff --git a/coding_philosophy.md b/coding_philosophy.md index 4d54e67..a8b83c8 100644 --- a/coding_philosophy.md +++ b/coding_philosophy.md @@ -201,6 +201,13 @@ active_users = [u for u in users if u.is_active()] - Small, focused libraries - Direct solutions - Understanding what code does + +**Approved dependencies (earn their place):** +- `msgspec` — struct types and validation at system boundaries (external APIs, user input, + inter-process data). Use `msgspec.Struct` instead of dataclasses when you need: fast + encode/decode, built-in validation, or typed containers for boundary data. + **Rule:** use Structs at boundaries (API responses, HAR entries, MCP tool I/O) — + keep internal plumbing as plain dicts/tuples. diff --git a/pyproject.toml b/pyproject.toml index e09e923..46561c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "niquests>=3.15.2", "hcloud>=2.8.0", "prefect>=3.6.15", + "msgspec>=0.19", ] [project.scripts] @@ -43,7 +44,6 @@ sqlmesh_materia = {workspace = true } cftc_cot = {workspace = true } coffee_prices = {workspace = true } ice_stocks = {workspace = true } - [tool.uv.workspace] members = [ "extract/*", diff --git a/tools/scout/pyproject.toml b/tools/scout/pyproject.toml new file mode 100644 index 0000000..37ce2c3 --- /dev/null +++ b/tools/scout/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "scout" +version = "0.1.0" +description = "Browser recon MCP server — discover API endpoints via HAR recording" +requires-python = ">=3.13" +dependencies = [ + "pydoll-python>=1.5", + "mcp[cli]>=1.0", + "msgspec>=0.19", +] + +[project.scripts] +scout-server = "scout.server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/scout"] diff --git a/tools/scout/src/scout/__init__.py b/tools/scout/src/scout/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/scout/src/scout/analyze.py b/tools/scout/src/scout/analyze.py new file mode 100644 index 0000000..0221553 --- /dev/null +++ b/tools/scout/src/scout/analyze.py @@ -0,0 +1,190 @@ +"""HAR file analysis — filter static assets, surface API endpoints and downloads. + +Parses HAR 1.2 JSON files produced by Pydoll's network recorder. Filters out +static assets (JS, CSS, images, fonts) and returns a structured summary of: + - API calls (JSON responses, any POST request) + - Data downloads (CSV, PDF, Excel) + +Typical call: + summary = analyze_har_file("data/scout/recording.har") + print(format_summary(summary)) +""" + +import json +import pathlib + +import msgspec + +STATIC_EXTENSIONS = frozenset( + {".js", ".css", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", + ".woff", ".woff2", ".ttf", ".eot", ".map", ".webp", ".avif", ".apng"} +) + +STATIC_CONTENT_TYPES = frozenset( + {"text/html", "text/javascript", "application/javascript", + "text/css", "image/", "font/", "audio/", "video/"} +) + +DOWNLOAD_CONTENT_TYPES = ( + "text/csv", + "application/pdf", + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/octet-stream", + "text/plain", +) + +POST_BODY_MAX_CHARS = 500 + + +class HarEntry(msgspec.Struct): + """A single interesting HTTP request/response from a HAR file.""" + + method: str + url: str + status: int + content_type: str + size_bytes: int + post_body: str = "" + + +class HarSummary(msgspec.Struct): + """Analysis result: static assets filtered out, interesting entries categorized.""" + + api_calls: list[HarEntry] + downloads: list[HarEntry] + other_interesting: list[HarEntry] + total_entries: int + filtered_static: int + + +def _is_static(url: str, content_type: str) -> bool: + """Return True if this entry looks like a static asset.""" + path = url.split("?")[0].lower() + ext = pathlib.PurePosixPath(path).suffix + if ext in STATIC_EXTENSIONS: + return True + ct = content_type.lower().split(";")[0].strip() + return any(ct.startswith(s) for s in STATIC_CONTENT_TYPES) + + +def _extract_entry(raw: dict) -> HarEntry | None: + """Parse a raw HAR entry dict into a typed HarEntry. Returns None for static assets.""" + request = raw.get("request", {}) + response = raw.get("response", {}) + + url = request.get("url", "") + method = request.get("method", "").upper() + status = response.get("status", 0) + + content = response.get("content", {}) + content_type = content.get("mimeType", "").lower().split(";")[0].strip() + size_bytes = max(content.get("size", 0), 0) + + if _is_static(url, content_type): + return None + + # Extract POST body from postData + post_body = "" + post_data = request.get("postData", {}) + if post_data: + text = post_data.get("text", "") + params = post_data.get("params", []) + if text: + post_body = text[:POST_BODY_MAX_CHARS] + elif params: + post_body = "&".join( + f"{p['name']}={p.get('value', '')}" for p in params + )[:POST_BODY_MAX_CHARS] + + return HarEntry( + method=method, + url=url, + status=status, + content_type=content_type, + size_bytes=size_bytes, + post_body=post_body, + ) + + +def analyze_har_file(har_path: str) -> HarSummary: + """Parse HAR JSON, filter static assets, categorize interesting entries.""" + data = json.loads(pathlib.Path(har_path).read_bytes()) + raw_entries = data.get("log", {}).get("entries", []) + + assert raw_entries, f"No entries found in HAR file: {har_path}" + + total = len(raw_entries) + filtered_static = 0 + api_calls: list[HarEntry] = [] + downloads: list[HarEntry] = [] + other_interesting: list[HarEntry] = [] + + for raw in raw_entries: + entry = _extract_entry(raw) + if entry is None: + filtered_static += 1 + continue + + ct = entry.content_type + is_download = any(ct.startswith(t) for t in DOWNLOAD_CONTENT_TYPES) + is_api = ct == "application/json" or ct == "application/xml" or entry.method == "POST" + + if is_download: + downloads.append(entry) + elif is_api: + api_calls.append(entry) + else: + other_interesting.append(entry) + + return HarSummary( + api_calls=api_calls, + downloads=downloads, + other_interesting=other_interesting, + total_entries=total, + filtered_static=filtered_static, + ) + + +def format_summary(summary: HarSummary) -> str: + """Format HarSummary as human-readable text for MCP tool response.""" + parts = [ + f"HAR Analysis: {summary.total_entries} total entries, " + f"{summary.filtered_static} static assets filtered\n" + f"Found: {len(summary.api_calls)} API calls, " + f"{len(summary.downloads)} downloads, " + f"{len(summary.other_interesting)} other\n", + ] + + if summary.api_calls: + parts.append("API Calls:") + for e in summary.api_calls: + parts.append( + f" {e.method:<6} {e.url}" + f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]" + ) + if e.post_body: + parts.append(f" Body: {e.post_body}") + parts.append("") + + if summary.downloads: + parts.append("Downloads:") + for e in summary.downloads: + parts.append( + f" {e.method:<6} {e.url}" + f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]" + ) + parts.append("") + + if summary.other_interesting: + parts.append("Other (non-static, non-JSON, non-download):") + for e in summary.other_interesting[:10]: # cap output + parts.append(f" {e.method:<6} {e.url} [{e.status}, {e.content_type}]") + if len(summary.other_interesting) > 10: + parts.append(f" ... and {len(summary.other_interesting) - 10} more") + parts.append("") + + if not summary.api_calls and not summary.downloads: + parts.append("No API calls or downloads found after filtering static assets.") + + return "\n".join(parts) diff --git a/tools/scout/src/scout/browser.py b/tools/scout/src/scout/browser.py new file mode 100644 index 0000000..55cb404 --- /dev/null +++ b/tools/scout/src/scout/browser.py @@ -0,0 +1,396 @@ +"""Pydoll browser session management for the scout MCP server. + +Manages a single long-lived Chrome instance across multiple MCP tool calls. +The browser starts on the first scout_visit and stays alive until scout_close. + +State is module-level (lives for the duration of the MCP server process). +HAR recording is managed via an asyncio.Task that holds the Pydoll context +manager open between scout_har_start and scout_har_stop calls. + +Bot evasion: +- CDP-based (no chromedriver, navigator.webdriver stays false) +- Humanized mouse movement (Bezier curves) on all clicks +- Headed browser by default (no headless detection vectors) +""" + +import asyncio +import logging +import pathlib +from datetime import datetime + +import msgspec +from pydoll.browser.chromium import Chrome + +logger = logging.getLogger("scout.browser") + +# Module-level browser state — lives for the MCP server process lifetime. +# Using a plain dict so all fields are in one place and easy to reset. +_state: dict = { + "browser": None, # Chrome instance + "tab": None, # Active tab + "har_task": None, # asyncio.Task holding the recording context manager + "har_stop_event": None, # asyncio.Event signalled to stop recording + "har_result": None, # asyncio.Future resolving to HAR file path +} + +OUTPUT_DIR = pathlib.Path("data/scout") +CLICK_TIMEOUT_SECONDS = 10 +NAVIGATION_WAIT_SECONDS = 2 +ELEMENT_CAP = 60 # max elements per category to avoid huge responses + + +class PageElement(msgspec.Struct): + """An interactive element found on the current page.""" + + kind: str # "link", "button", "form", "select", "input" + text: str # visible text or label (truncated) + selector: str # usable CSS selector or description + href: str = "" # for links + action: str = "" # for forms (action URL) + method: str = "" # for forms (GET/POST) + options: list[str] = [] # for selects (option texts) + + +class PageInfo(msgspec.Struct): + """Result of a page visit or navigation action.""" + + title: str + url: str + element_count: int + + +async def _ensure_browser() -> None: + """Launch Chrome if not already running. Idempotent.""" + if _state["tab"] is not None: + return + browser = Chrome() + tab = await browser.start() + _state["browser"] = browser + _state["tab"] = tab + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + logger.info("Chrome launched") + + +async def visit(url: str) -> PageInfo: + """Navigate to url. Opens browser on first call.""" + await _ensure_browser() + tab = _state["tab"] + + await tab.go_to(url) + await asyncio.sleep(1) # let dynamic content settle + + title = await tab.title + links = await tab.query("a", find_all=True) + element_count = len(links) if links else 0 + + return PageInfo(title=title, url=url, element_count=element_count) + + +async def get_elements(filter_type: str = "") -> list[PageElement]: + """Enumerate interactive elements on the current page. + + filter_type: "", "links", "buttons", "forms", "selects", "inputs" + Returns typed PageElement structs (not screenshots). + """ + assert _state["tab"] is not None, "No browser open — call scout_visit first" + tab = _state["tab"] + elements: list[PageElement] = [] + + # Links + if not filter_type or filter_type == "links": + nodes = await tab.query("a[href]", find_all=True) or [] + for node in nodes[:ELEMENT_CAP]: + try: + text = (await node.text or "").strip()[:100] + href = (await node.get_attribute("href") or "").strip() + if text or href: + elements.append(PageElement( + kind="link", + text=text, + selector=f'a[href="{href}"]' if href else "a", + href=href, + )) + except Exception: + continue + + # Buttons + if not filter_type or filter_type == "buttons": + nodes = await tab.query( + "button, input[type=submit], input[type=button]", find_all=True + ) or [] + for node in nodes[:20]: + try: + text = (await node.text or "").strip() + if not text: + text = await node.get_attribute("value") or "" + text = text[:100] + cls = (await node.get_attribute("class") or "").strip() + sel = f"button.{cls.split()[0]}" if cls else "button" + elements.append(PageElement(kind="button", text=text, selector=sel)) + except Exception: + continue + + # Selects + if not filter_type or filter_type == "selects": + nodes = await tab.query("select", find_all=True) or [] + for node in nodes[:10]: + try: + name = ( + await node.get_attribute("name") + or await node.get_attribute("id") + or "" + ).strip() + option_nodes = await node.query("option", find_all=True) or [] + opts = [] + for opt in option_nodes[:15]: + opt_text = (await opt.text or "").strip() + if opt_text: + opts.append(opt_text) + sel = f"select[name='{name}']" if name else "select" + elements.append(PageElement( + kind="select", text=name, selector=sel, options=opts + )) + except Exception: + continue + + # Forms + if not filter_type or filter_type == "forms": + nodes = await tab.query("form", find_all=True) or [] + for node in nodes[:10]: + try: + action = (await node.get_attribute("action") or "").strip() + method = (await node.get_attribute("method") or "GET").upper() + elements.append(PageElement( + kind="form", + text=f"{method} {action}", + selector="form", + action=action, + method=method, + )) + except Exception: + continue + + # Inputs + if filter_type == "inputs": + nodes = await tab.query( + "input:not([type=hidden]):not([type=submit]):not([type=button])", + find_all=True, + ) or [] + for node in nodes[:20]: + try: + name = (await node.get_attribute("name") or "").strip() + input_type = (await node.get_attribute("type") or "text").strip() + placeholder = (await node.get_attribute("placeholder") or "").strip() + label = name or placeholder or input_type + sel = f"input[name='{name}']" if name else f"input[type='{input_type}']" + elements.append(PageElement(kind="input", text=label, selector=sel)) + except Exception: + continue + + return elements + + +def format_elements(elements: list[PageElement]) -> str: + """Format a list of PageElement structs as human-readable text.""" + if not elements: + return "No interactive elements found." + + # Group by kind + groups: dict[str, list[PageElement]] = {} + for e in elements: + groups.setdefault(e.kind, []).append(e) + + lines: list[str] = [f"Elements ({len(elements)} total):"] + kind_labels = { + "link": "Links", "button": "Buttons", + "form": "Forms", "select": "Selects", "input": "Inputs", + } + + for kind in ["link", "button", "select", "form", "input"]: + group = groups.get(kind, []) + if not group: + continue + lines.append(f"\n{kind_labels.get(kind, kind.capitalize())} ({len(group)}):") + for i, e in enumerate(group): + if kind == "link": + lines.append(f" [{i}] {e.text!r:<40} → {e.href}") + elif kind == "select": + opts = ", ".join(e.options[:5]) + if len(e.options) > 5: + opts += f", ... (+{len(e.options) - 5} more)" + lines.append(f" [{i}] {e.text!r} selector: {e.selector}") + lines.append(f" options: {opts}") + elif kind == "form": + lines.append(f" [{i}] {e.text} selector: {e.selector}") + else: + lines.append(f" [{i}] {e.text!r:<40} selector: {e.selector}") + + return "\n".join(lines) + + +async def click(selector: str) -> PageInfo: + """Click an element. Use 'text=Foo' to click by visible text, else CSS selector.""" + assert _state["tab"] is not None, "No browser open — call scout_visit first" + tab = _state["tab"] + + if selector.startswith("text="): + element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS) + else: + element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS) + + assert element is not None, f"Element not found: {selector!r}" + await element.click() + await asyncio.sleep(NAVIGATION_WAIT_SECONDS) + + title = await tab.title + url = await tab.current_url if hasattr(tab, "current_url") else "" + links = await tab.query("a", find_all=True) or [] + + return PageInfo(title=title, url=url or "", element_count=len(links)) + + +async def fill(selector: str, value: str) -> str: + """Type a value into a form field.""" + assert _state["tab"] is not None, "No browser open — call scout_visit first" + tab = _state["tab"] + + if selector.startswith("text="): + element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS) + else: + element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS) + + assert element is not None, f"Element not found: {selector!r}" + # insert_text is instant (no keystroke simulation) + await element.insert_text(value) + return f"Filled {selector!r} with {value!r}" + + +async def select_option(selector: str, value: str) -> str: + """Select an option in a dropdown. + + Args: + selector: CSS selector for the