scout: extract to standalone repo at Projects/scout

Move scout MCP server out of tools/scout/ into its own repo at
/var/home/Deeman/Projects/scout. Update .mcp.json to use absolute path
so any project can reference it.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-21 17:58:03 +01:00
parent 079c189e0a
commit ff39d65dc6
7 changed files with 1 additions and 2075 deletions

View File

@@ -3,7 +3,7 @@
"scout": {
"type": "stdio",
"command": "uv",
"args": ["run", "--directory", "tools/scout", "scout-server"]
"args": ["run", "--directory", "/var/home/Deeman/Projects/scout", "scout-server"]
}
}
}

View File

@@ -1,20 +0,0 @@
[project]
name = "scout"
version = "0.1.0"
description = "Browser recon MCP server — discover API endpoints via HAR recording"
requires-python = ">=3.13"
dependencies = [
"pydoll-python>=1.5",
"mcp[cli]>=1.0",
"msgspec>=0.19",
]
[project.scripts]
scout-server = "scout.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/scout"]

View File

@@ -1,190 +0,0 @@
"""HAR file analysis — filter static assets, surface API endpoints and downloads.
Parses HAR 1.2 JSON files produced by Pydoll's network recorder. Filters out
static assets (JS, CSS, images, fonts) and returns a structured summary of:
- API calls (JSON responses, any POST request)
- Data downloads (CSV, PDF, Excel)
Typical call:
summary = analyze_har_file("data/scout/recording.har")
print(format_summary(summary))
"""
import json
import pathlib
import msgspec
STATIC_EXTENSIONS = frozenset(
{".js", ".css", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico",
".woff", ".woff2", ".ttf", ".eot", ".map", ".webp", ".avif", ".apng"}
)
STATIC_CONTENT_TYPES = frozenset(
{"text/html", "text/javascript", "application/javascript",
"text/css", "image/", "font/", "audio/", "video/"}
)
DOWNLOAD_CONTENT_TYPES = (
"text/csv",
"application/pdf",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/octet-stream",
"text/plain",
)
POST_BODY_MAX_CHARS = 500
class HarEntry(msgspec.Struct):
"""A single interesting HTTP request/response from a HAR file."""
method: str
url: str
status: int
content_type: str
size_bytes: int
post_body: str = ""
class HarSummary(msgspec.Struct):
"""Analysis result: static assets filtered out, interesting entries categorized."""
api_calls: list[HarEntry]
downloads: list[HarEntry]
other_interesting: list[HarEntry]
total_entries: int
filtered_static: int
def _is_static(url: str, content_type: str) -> bool:
"""Return True if this entry looks like a static asset."""
path = url.split("?")[0].lower()
ext = pathlib.PurePosixPath(path).suffix
if ext in STATIC_EXTENSIONS:
return True
ct = content_type.lower().split(";")[0].strip()
return any(ct.startswith(s) for s in STATIC_CONTENT_TYPES)
def _extract_entry(raw: dict) -> HarEntry | None:
"""Parse a raw HAR entry dict into a typed HarEntry. Returns None for static assets."""
request = raw.get("request", {})
response = raw.get("response", {})
url = request.get("url", "")
method = request.get("method", "").upper()
status = response.get("status", 0)
content = response.get("content", {})
content_type = content.get("mimeType", "").lower().split(";")[0].strip()
size_bytes = max(content.get("size", 0), 0)
if _is_static(url, content_type):
return None
# Extract POST body from postData
post_body = ""
post_data = request.get("postData", {})
if post_data:
text = post_data.get("text", "")
params = post_data.get("params", [])
if text:
post_body = text[:POST_BODY_MAX_CHARS]
elif params:
post_body = "&".join(
f"{p['name']}={p.get('value', '')}" for p in params
)[:POST_BODY_MAX_CHARS]
return HarEntry(
method=method,
url=url,
status=status,
content_type=content_type,
size_bytes=size_bytes,
post_body=post_body,
)
def analyze_har_file(har_path: str) -> HarSummary:
"""Parse HAR JSON, filter static assets, categorize interesting entries."""
data = json.loads(pathlib.Path(har_path).read_bytes())
raw_entries = data.get("log", {}).get("entries", [])
assert raw_entries, f"No entries found in HAR file: {har_path}"
total = len(raw_entries)
filtered_static = 0
api_calls: list[HarEntry] = []
downloads: list[HarEntry] = []
other_interesting: list[HarEntry] = []
for raw in raw_entries:
entry = _extract_entry(raw)
if entry is None:
filtered_static += 1
continue
ct = entry.content_type
is_download = any(ct.startswith(t) for t in DOWNLOAD_CONTENT_TYPES)
is_api = ct == "application/json" or ct == "application/xml" or entry.method == "POST"
if is_download:
downloads.append(entry)
elif is_api:
api_calls.append(entry)
else:
other_interesting.append(entry)
return HarSummary(
api_calls=api_calls,
downloads=downloads,
other_interesting=other_interesting,
total_entries=total,
filtered_static=filtered_static,
)
def format_summary(summary: HarSummary) -> str:
"""Format HarSummary as human-readable text for MCP tool response."""
parts = [
f"HAR Analysis: {summary.total_entries} total entries, "
f"{summary.filtered_static} static assets filtered\n"
f"Found: {len(summary.api_calls)} API calls, "
f"{len(summary.downloads)} downloads, "
f"{len(summary.other_interesting)} other\n",
]
if summary.api_calls:
parts.append("API Calls:")
for e in summary.api_calls:
parts.append(
f" {e.method:<6} {e.url}"
f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]"
)
if e.post_body:
parts.append(f" Body: {e.post_body}")
parts.append("")
if summary.downloads:
parts.append("Downloads:")
for e in summary.downloads:
parts.append(
f" {e.method:<6} {e.url}"
f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]"
)
parts.append("")
if summary.other_interesting:
parts.append("Other (non-static, non-JSON, non-download):")
for e in summary.other_interesting[:10]: # cap output
parts.append(f" {e.method:<6} {e.url} [{e.status}, {e.content_type}]")
if len(summary.other_interesting) > 10:
parts.append(f" ... and {len(summary.other_interesting) - 10} more")
parts.append("")
if not summary.api_calls and not summary.downloads:
parts.append("No API calls or downloads found after filtering static assets.")
return "\n".join(parts)

View File

@@ -1,550 +0,0 @@
"""Pydoll browser session management for the scout MCP server.
Manages a single long-lived Chrome instance across multiple MCP tool calls.
The browser starts on the first scout_visit and stays alive until scout_close.
State is module-level (lives for the duration of the MCP server process).
HAR recording is managed via an asyncio.Task that holds the Pydoll context
manager open between scout_har_start and scout_har_stop calls.
Bot evasion:
- CDP-based (no chromedriver, navigator.webdriver stays false)
- Humanized mouse movement (Bezier curves) on all clicks
- Headed browser by default (no headless detection vectors)
"""
import asyncio
import logging
import pathlib
from datetime import datetime
import msgspec
from pydoll.browser.chromium import Chrome
from pydoll.browser.options import ChromiumOptions
# Chrome binary search order — covers native installs and Flatpak
_CHROME_PATHS = [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
# Flatpak (system install)
"/var/lib/flatpak/app/com.google.Chrome/current/active/export/bin/com.google.Chrome",
# Flatpak (user install)
str(pathlib.Path.home() / ".local/share/flatpak/app/com.google.Chrome/current/active/export/bin/com.google.Chrome"),
]
def _find_chrome() -> str | None:
for p in _CHROME_PATHS:
if pathlib.Path(p).exists():
return p
return None
logger = logging.getLogger("scout.browser")
# Module-level browser state — lives for the MCP server process lifetime.
# Using a plain dict so all fields are in one place and easy to reset.
_state: dict = {
"browser": None, # Chrome instance
"tab": None, # Active tab
"har_task": None, # asyncio.Task holding the recording context manager
"har_stop_event": None, # asyncio.Event signalled to stop recording
"har_result": None, # asyncio.Future resolving to HAR file path
}
OUTPUT_DIR = pathlib.Path("data/scout")
CLICK_TIMEOUT_SECONDS = 10
NAVIGATION_WAIT_SECONDS = 2
ELEMENT_CAP = 60 # max elements per category to avoid huge responses
class PageElement(msgspec.Struct):
"""An interactive element found on the current page."""
kind: str # "link", "button", "form", "select", "input"
text: str # visible text or label (truncated)
selector: str # usable CSS selector or description
href: str = "" # for links
action: str = "" # for forms (action URL)
method: str = "" # for forms (GET/POST)
options: list[str] = [] # for selects (option texts)
class PageInfo(msgspec.Struct):
"""Result of a page visit or navigation action."""
title: str
url: str
element_count: int
async def _ensure_browser() -> None:
"""Launch Chrome if not already running. Idempotent."""
if _state["tab"] is not None:
return
chrome_path = _find_chrome()
assert chrome_path is not None, (
"No Chrome/Chromium binary found. Install via: "
"sudo dnf install chromium OR flatpak install com.google.Chrome"
)
logger.info("Using Chrome at: %s", chrome_path)
options = ChromiumOptions()
options.binary_location = chrome_path
browser = Chrome(options=options)
tab = await browser.start()
_state["browser"] = browser
_state["tab"] = tab
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
logger.info("Chrome launched")
# Cookie consent selectors — ordered by specificity (vendor-specific first, generic last)
_COOKIE_SELECTORS = [
# OneTrust (very common on financial/data sites incl. ICE)
"#onetrust-accept-btn-handler",
".onetrust-accept-btn-handler",
# Cookiebot
"#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
# CookieYes / CookieLaw
".cky-btn-accept",
".cookie-law-accept",
# Termly
"#termly-code-snippet-support",
# Consentmanager (common in Germany)
"#cmpwrapper .cmpboxbtnyes",
"#cmpbox .cmptxt_btn_yes",
# Generic accept buttons (text-based fallback)
"button[id*='accept']",
"button[class*='accept']",
"button[id*='cookie']",
"button[class*='cookie']",
"button[id*='consent']",
"button[class*='consent']",
]
# Text patterns for cookie accept buttons — English + German (DSGVO)
_COOKIE_ACCEPT_TEXTS = [
# English
"Accept All", "Accept all", "Accept all cookies",
"Accept Cookies", "Accept cookies",
"I Accept", "I accept", "Accept",
"Allow All", "Allow all",
"Agree", "I Agree", "OK", "Got it",
"Continue", "Dismiss",
# German (DSGVO)
"Alle akzeptieren", "Akzeptieren", "Zustimmen",
"Alle zustimmen", "Einverstanden",
"Alle Cookies akzeptieren", "Cookies akzeptieren",
"Akzeptieren und weiter", "Weiter",
"Ich stimme zu", "OK, verstanden",
]
# Usercentrics shadow DOM JS — common in German publishers (Bild, Spiegel, etc.)
_USERCENTRICS_JS = (
"var host = document.querySelector('#usercentrics-root');"
"if (host && host.shadowRoot) {"
" var btn = host.shadowRoot.querySelector('[data-testid=\"uc-accept-all-button\"]');"
" if (btn) { btn.click(); true; } else { false; }"
"} else { false; }"
)
async def _click_via_js(tab, selector: str) -> bool:
"""Click an element via JS injection — bypasses pointer-events/z-index issues."""
try:
# Escape selector for JS string
escaped = selector.replace("'", "\\'")
result = await tab.execute_script(
f"var el = document.querySelector('{escaped}'); "
f"if (el) {{ el.click(); true; }} else {{ false; }}"
)
return bool(result)
except Exception:
return False
async def _dismiss_cookie_banner(tab) -> bool:
"""Try to find and click a cookie consent accept button. Returns True if dismissed.
Uses JS injection (click_js / execute_script) as primary method since cookie
banners often have z-index/pointer-events issues that block Pydoll's mouse simulation.
Covers: OneTrust, Cookiebot, CookieYes, Consentmanager, Usercentrics (shadow DOM),
and generic text patterns in English + German (DSGVO).
"""
# Usercentrics (shadow DOM) — common in German publishers, requires special handling
try:
result = await tab.execute_script(_USERCENTRICS_JS)
if result:
await asyncio.sleep(0.8)
logger.info("Cookie banner dismissed via Usercentrics shadow DOM")
return True
except Exception:
pass
# Try CSS selectors via JS click (bypasses visibility/pointer-events issues)
for selector in _COOKIE_SELECTORS:
try:
# Check if element exists in DOM first
exists = await tab.execute_script(
f"!!document.querySelector('{selector.replace(chr(39), chr(92)+chr(39))}')"
)
if exists:
clicked = await _click_via_js(tab, selector)
if clicked:
await asyncio.sleep(0.8)
logger.info("Cookie banner dismissed via JS selector: %s", selector)
return True
except Exception:
continue
# Fallback: find buttons by text content via JS
for text in _COOKIE_ACCEPT_TEXTS:
try:
escaped_text = text.replace("'", "\\'")
result = await tab.execute_script(
f"var buttons = Array.from(document.querySelectorAll('button, a[role=button]'));"
f"var btn = buttons.find(b => b.textContent.trim().includes('{escaped_text}'));"
f"if (btn) {{ btn.click(); true; }} else {{ false; }}"
)
if result:
await asyncio.sleep(0.8)
logger.info("Cookie banner dismissed via JS text: %r", text)
return True
except Exception:
continue
# Note: cross-origin full-page iframe banners (Sourcepoint on German publishers)
# are not dismissed here — coordinate clicks are too brittle across screen sizes.
# HAR recording captures network traffic regardless of banner visibility,
# so dismissal is only needed when we must click navigation elements.
# For those cases, use scout_click_coords manually.
return False
async def visit(url: str) -> PageInfo:
"""Navigate to url. Opens browser on first call."""
await _ensure_browser()
tab = _state["tab"]
await tab.go_to(url)
await asyncio.sleep(1) # let dynamic content settle
# Auto-dismiss cookie consent banners before anything else
await _dismiss_cookie_banner(tab)
title = await tab.title
links = await tab.query("a", find_all=True)
element_count = len(links) if links else 0
return PageInfo(title=title, url=url, element_count=element_count)
async def get_elements(filter_type: str = "") -> list[PageElement]:
"""Enumerate interactive elements on the current page.
filter_type: "", "links", "buttons", "forms", "selects", "inputs"
Returns typed PageElement structs (not screenshots).
"""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
elements: list[PageElement] = []
# Links
if not filter_type or filter_type == "links":
nodes = await tab.query("a[href]", find_all=True) or []
for node in nodes[:ELEMENT_CAP]:
try:
text = (await node.text or "").strip()[:100]
href = (await node.get_attribute("href") or "").strip()
if text or href:
elements.append(PageElement(
kind="link",
text=text,
selector=f'a[href="{href}"]' if href else "a",
href=href,
))
except Exception:
continue
# Buttons
if not filter_type or filter_type == "buttons":
nodes = await tab.query(
"button, input[type=submit], input[type=button]", find_all=True
) or []
for node in nodes[:20]:
try:
text = (await node.text or "").strip()
if not text:
text = await node.get_attribute("value") or ""
text = text[:100]
cls = (await node.get_attribute("class") or "").strip()
sel = f"button.{cls.split()[0]}" if cls else "button"
elements.append(PageElement(kind="button", text=text, selector=sel))
except Exception:
continue
# Selects
if not filter_type or filter_type == "selects":
nodes = await tab.query("select", find_all=True) or []
for node in nodes[:10]:
try:
name = (
await node.get_attribute("name")
or await node.get_attribute("id")
or ""
).strip()
option_nodes = await node.query("option", find_all=True) or []
opts = []
for opt in option_nodes[:15]:
opt_text = (await opt.text or "").strip()
if opt_text:
opts.append(opt_text)
sel = f"select[name='{name}']" if name else "select"
elements.append(PageElement(
kind="select", text=name, selector=sel, options=opts
))
except Exception:
continue
# Forms
if not filter_type or filter_type == "forms":
nodes = await tab.query("form", find_all=True) or []
for node in nodes[:10]:
try:
action = (await node.get_attribute("action") or "").strip()
method = (await node.get_attribute("method") or "GET").upper()
elements.append(PageElement(
kind="form",
text=f"{method} {action}",
selector="form",
action=action,
method=method,
))
except Exception:
continue
# Inputs
if filter_type == "inputs":
nodes = await tab.query(
"input:not([type=hidden]):not([type=submit]):not([type=button])",
find_all=True,
) or []
for node in nodes[:20]:
try:
name = (await node.get_attribute("name") or "").strip()
input_type = (await node.get_attribute("type") or "text").strip()
placeholder = (await node.get_attribute("placeholder") or "").strip()
label = name or placeholder or input_type
sel = f"input[name='{name}']" if name else f"input[type='{input_type}']"
elements.append(PageElement(kind="input", text=label, selector=sel))
except Exception:
continue
return elements
def format_elements(elements: list[PageElement]) -> str:
"""Format a list of PageElement structs as human-readable text."""
if not elements:
return "No interactive elements found."
# Group by kind
groups: dict[str, list[PageElement]] = {}
for e in elements:
groups.setdefault(e.kind, []).append(e)
lines: list[str] = [f"Elements ({len(elements)} total):"]
kind_labels = {
"link": "Links", "button": "Buttons",
"form": "Forms", "select": "Selects", "input": "Inputs",
}
for kind in ["link", "button", "select", "form", "input"]:
group = groups.get(kind, [])
if not group:
continue
lines.append(f"\n{kind_labels.get(kind, kind.capitalize())} ({len(group)}):")
for i, e in enumerate(group):
if kind == "link":
lines.append(f" [{i}] {e.text!r:<40}{e.href}")
elif kind == "select":
opts = ", ".join(e.options[:5])
if len(e.options) > 5:
opts += f", ... (+{len(e.options) - 5} more)"
lines.append(f" [{i}] {e.text!r} selector: {e.selector}")
lines.append(f" options: {opts}")
elif kind == "form":
lines.append(f" [{i}] {e.text} selector: {e.selector}")
else:
lines.append(f" [{i}] {e.text!r:<40} selector: {e.selector}")
return "\n".join(lines)
async def click(selector: str) -> PageInfo:
"""Click an element. Use 'text=Foo' to click by visible text, else CSS selector."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
if selector.startswith("text="):
element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS)
else:
element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS)
assert element is not None, f"Element not found: {selector!r}"
await element.click()
await asyncio.sleep(NAVIGATION_WAIT_SECONDS)
title = await tab.title
url = await tab.current_url if hasattr(tab, "current_url") else ""
links = await tab.query("a", find_all=True) or []
return PageInfo(title=title, url=url or "", element_count=len(links))
async def fill(selector: str, value: str) -> str:
"""Type a value into a form field."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
if selector.startswith("text="):
element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS)
else:
element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS)
assert element is not None, f"Element not found: {selector!r}"
# insert_text is instant (no keystroke simulation)
await element.insert_text(value)
return f"Filled {selector!r} with {value!r}"
async def select_option(selector: str, value: str) -> str:
"""Select an option in a <select> element."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS)
assert element is not None, f"Select element not found: {selector!r}"
await element.select_option(value)
return f"Selected {value!r} in {selector!r}"
async def scroll(direction: str, amount_px: int = 400) -> str:
"""Scroll the page up or down."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
# Execute scroll via JS — simple and reliable
direction_sign = 1 if direction == "down" else -1
await tab.execute_script(f"window.scrollBy(0, {direction_sign * amount_px})")
return f"Scrolled {direction} {amount_px}px"
async def get_text(selector: str = "") -> str:
"""Get visible text from the page or a specific element."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
if selector:
element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS)
assert element is not None, f"Element not found: {selector!r}"
text = await element.text or ""
else:
# Get body text content
body = await tab.query("body", timeout=5)
text = await body.text if body else ""
# Truncate very long text to avoid overwhelming the response
return text[:3000] if text else "(no text content)"
async def screenshot(label: str = "") -> str:
"""Take a screenshot and save to data/scout/. Returns the file path."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
tab = _state["tab"]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
name = f"{label}_{timestamp}" if label else timestamp
path = OUTPUT_DIR / f"screenshot_{name}.png"
await tab.take_screenshot(str(path), beyond_viewport=False)
assert path.exists(), f"Screenshot was not written: {path}"
return str(path)
# --- HAR recording (asyncio Task holds context manager open) ---
async def _har_recording_task(tab, har_path: pathlib.Path, stop_event: asyncio.Event, result_future: asyncio.Future) -> None:
"""Background task: enters recording context, waits for stop, saves HAR."""
try:
async with tab.request.record() as capture:
await stop_event.wait()
# Save while still inside context manager (capture is valid here)
capture.save(str(har_path))
result_future.set_result(str(har_path))
except Exception as e:
result_future.set_exception(e)
async def har_start() -> str:
"""Start recording all network traffic. Use scout_har_stop to save."""
assert _state["tab"] is not None, "No browser open — call scout_visit first"
assert _state["har_task"] is None, "HAR recording already in progress"
tab = _state["tab"]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
har_path = OUTPUT_DIR / f"har_{timestamp}.har"
stop_event = asyncio.Event()
result_future: asyncio.Future = asyncio.get_event_loop().create_future()
_state["har_stop_event"] = stop_event
_state["har_result"] = result_future
_state["har_task"] = asyncio.create_task(
_har_recording_task(tab, har_path, stop_event, result_future)
)
# Give the task time to enter the context manager before returning
await asyncio.sleep(0.2)
return f"Recording started — will save to {har_path}"
async def har_stop() -> str:
"""Stop recording and save HAR file. Returns the file path."""
assert _state["har_stop_event"] is not None, "No HAR recording in progress"
_state["har_stop_event"].set()
har_path = await asyncio.wait_for(_state["har_result"], timeout=15.0)
_state["har_task"] = None
_state["har_stop_event"] = None
_state["har_result"] = None
assert pathlib.Path(har_path).exists(), f"HAR file not written: {har_path}"
size_kb = pathlib.Path(har_path).stat().st_size // 1024
return f"HAR saved: {har_path} ({size_kb}KB)"
async def close() -> str:
"""Close the browser and clean up all state."""
# Stop any active HAR recording first
if _state["har_stop_event"] is not None:
try:
await har_stop()
except Exception:
pass
if _state["browser"] is not None:
try:
await _state["browser"].stop()
except Exception:
pass
_state["browser"] = None
_state["tab"] = None
_state["har_task"] = None
_state["har_stop_event"] = None
_state["har_result"] = None
return "Browser closed"

View File

@@ -1,207 +0,0 @@
"""Scout MCP server — browser recon tools for Claude Code.
Exposes browser automation as MCP tools. The server runs as a Claude Code
child process (stdio transport) — starts when Claude Code starts, dies when
Claude Code exits. No daemon, no port, no systemd.
The browser session is stateful across tool calls: scout_visit opens Chrome,
subsequent tools operate on the same tab, scout_close shuts down.
Text-first: tools return structured text (element lists, page titles, HAR
summaries). Screenshots are an explicit opt-in via scout_screenshot.
Usage (via .mcp.json):
uv run --package scout scout-server
"""
import asyncio
import logging
import sys
from mcp.server.fastmcp import FastMCP
from scout import analyze, browser
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
handlers=[logging.StreamHandler(sys.stderr)],
)
mcp = FastMCP("scout")
@mcp.tool()
async def scout_visit(url: str) -> str:
"""Visit a URL in the browser. Opens Chrome on the first call.
Returns: page title, URL, and element count.
"""
info = await browser.visit(url)
return f"Title: {info.title}\nURL: {info.url}\nElements detected: {info.element_count}"
@mcp.tool()
async def scout_elements(filter: str = "") -> str:
"""List interactive elements on the current page.
Args:
filter: Optional category — "links", "buttons", "forms", "selects",
"inputs", or "" for all.
Returns: structured text list with selectors for use in scout_click.
"""
elements = await browser.get_elements(filter)
return browser.format_elements(elements)
@mcp.tool()
async def scout_click(selector: str) -> str:
"""Click an element on the current page.
Args:
selector: "text=Foo" to click by visible text, or a CSS selector
like "a[href*=COFFEE]" or "button.download-btn".
Returns: new page title and URL if navigation occurred.
"""
info = await browser.click(selector)
return f"Clicked {selector!r}\nTitle: {info.title}\nURL: {info.url}\nElements: {info.element_count}"
@mcp.tool()
async def scout_fill(selector: str, value: str) -> str:
"""Type a value into a form field.
Args:
selector: CSS selector or "text=Label" for the input field.
value: The text to type.
"""
return await browser.fill(selector, value)
@mcp.tool()
async def scout_select(selector: str, value: str) -> str:
"""Select an option in a <select> dropdown.
Args:
selector: CSS selector for the <select> element.
value: The option value or text to select.
"""
return await browser.select_option(selector, value)
@mcp.tool()
async def scout_scroll(direction: str = "down", amount_px: int = 400) -> str:
"""Scroll the page up or down.
Args:
direction: "down" or "up".
amount_px: Number of pixels to scroll (default 400).
"""
assert direction in ("down", "up"), f"direction must be 'down' or 'up', got {direction!r}"
return await browser.scroll(direction, amount_px)
@mcp.tool()
async def scout_text(selector: str = "") -> str:
"""Get visible text from the page or a specific element.
Args:
selector: CSS selector for a specific element, or "" for full page body text.
Text is truncated to 3000 chars.
"""
return await browser.get_text(selector)
@mcp.tool()
async def scout_screenshot(label: str = "") -> str:
"""Take a screenshot and save to data/scout/. Use Read tool to view it.
Args:
label: Optional label included in the filename.
Returns: file path to the saved PNG.
"""
path = await browser.screenshot(label)
return f"Screenshot saved: {path}\nUse the Read tool to view it."
@mcp.tool()
async def scout_har_start() -> str:
"""Start recording all network traffic (HAR format).
Call scout_har_stop when done navigating. HAR spans all tool calls
between start and stop — visit, click, fill, etc.
"""
return await browser.har_start()
@mcp.tool()
async def scout_har_stop() -> str:
"""Stop network recording and save the HAR file.
Returns the HAR file path. Pass to scout_analyze to extract API endpoints.
"""
return await browser.har_stop()
@mcp.tool()
async def scout_analyze(har_path: str) -> str:
"""Analyze a HAR file for API endpoints, POST requests, and data downloads.
Filters out static assets (JS, CSS, images, fonts) and summarizes:
- API calls (JSON responses, POST requests)
- Downloads (CSV, PDF, Excel)
Args:
har_path: Path to the HAR file returned by scout_har_stop.
"""
summary = analyze.analyze_har_file(har_path)
return analyze.format_summary(summary)
@mcp.tool()
async def scout_click_coords(x: int, y: int) -> str:
"""Click at specific viewport coordinates. Useful for cross-origin iframes
where CSS selectors can't reach (e.g. Sourcepoint cookie banners on German sites).
Args:
x: Horizontal pixel position from left edge.
y: Vertical pixel position from top edge.
"""
assert browser._state["tab"] is not None, "No browser open — call scout_visit first"
await browser._state["tab"].mouse.click(x, y)
await asyncio.sleep(1.0)
title = await browser._state["tab"].title
return f"Clicked at ({x}, {y})\nCurrent title: {title}"
@mcp.tool()
async def scout_js(script: str) -> str:
"""Execute JavaScript on the current page and return the result.
Useful for interacting with elements that are hard to reach via CSS selectors
(shadow DOM, z-index overlays, pointer-events:none, cookie banners, etc.).
Args:
script: JavaScript to evaluate. Return value is stringified.
Examples:
"document.title"
"document.querySelector('#onetrust-accept-btn-handler').click(); 'clicked'"
"Array.from(document.querySelectorAll('button')).map(b=>b.textContent.trim()).join('|')"
"""
assert browser._state["tab"] is not None, "No browser open — call scout_visit first"
result = await browser._state["tab"].execute_script(script)
return str(result) if result is not None else "(no return value)"
@mcp.tool()
async def scout_close() -> str:
"""Close the browser and clean up. Stops any active HAR recording first."""
return await browser.close()
def main() -> None:
mcp.run()

1107
tools/scout/uv.lock generated

File diff suppressed because it is too large Load Diff