diff --git a/.mcp.json b/.mcp.json index 8d35486..a87aeb6 100644 --- a/.mcp.json +++ b/.mcp.json @@ -3,7 +3,7 @@ "scout": { "type": "stdio", "command": "uv", - "args": ["run", "--directory", "tools/scout", "scout-server"] + "args": ["run", "--directory", "/var/home/Deeman/Projects/scout", "scout-server"] } } } diff --git a/tools/scout/pyproject.toml b/tools/scout/pyproject.toml deleted file mode 100644 index 37ce2c3..0000000 --- a/tools/scout/pyproject.toml +++ /dev/null @@ -1,20 +0,0 @@ -[project] -name = "scout" -version = "0.1.0" -description = "Browser recon MCP server — discover API endpoints via HAR recording" -requires-python = ">=3.13" -dependencies = [ - "pydoll-python>=1.5", - "mcp[cli]>=1.0", - "msgspec>=0.19", -] - -[project.scripts] -scout-server = "scout.server:main" - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.hatch.build.targets.wheel] -packages = ["src/scout"] diff --git a/tools/scout/src/scout/__init__.py b/tools/scout/src/scout/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tools/scout/src/scout/analyze.py b/tools/scout/src/scout/analyze.py deleted file mode 100644 index 0221553..0000000 --- a/tools/scout/src/scout/analyze.py +++ /dev/null @@ -1,190 +0,0 @@ -"""HAR file analysis — filter static assets, surface API endpoints and downloads. - -Parses HAR 1.2 JSON files produced by Pydoll's network recorder. Filters out -static assets (JS, CSS, images, fonts) and returns a structured summary of: - - API calls (JSON responses, any POST request) - - Data downloads (CSV, PDF, Excel) - -Typical call: - summary = analyze_har_file("data/scout/recording.har") - print(format_summary(summary)) -""" - -import json -import pathlib - -import msgspec - -STATIC_EXTENSIONS = frozenset( - {".js", ".css", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", - ".woff", ".woff2", ".ttf", ".eot", ".map", ".webp", ".avif", ".apng"} -) - -STATIC_CONTENT_TYPES = frozenset( - {"text/html", "text/javascript", "application/javascript", - "text/css", "image/", "font/", "audio/", "video/"} -) - -DOWNLOAD_CONTENT_TYPES = ( - "text/csv", - "application/pdf", - "application/vnd.ms-excel", - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - "application/octet-stream", - "text/plain", -) - -POST_BODY_MAX_CHARS = 500 - - -class HarEntry(msgspec.Struct): - """A single interesting HTTP request/response from a HAR file.""" - - method: str - url: str - status: int - content_type: str - size_bytes: int - post_body: str = "" - - -class HarSummary(msgspec.Struct): - """Analysis result: static assets filtered out, interesting entries categorized.""" - - api_calls: list[HarEntry] - downloads: list[HarEntry] - other_interesting: list[HarEntry] - total_entries: int - filtered_static: int - - -def _is_static(url: str, content_type: str) -> bool: - """Return True if this entry looks like a static asset.""" - path = url.split("?")[0].lower() - ext = pathlib.PurePosixPath(path).suffix - if ext in STATIC_EXTENSIONS: - return True - ct = content_type.lower().split(";")[0].strip() - return any(ct.startswith(s) for s in STATIC_CONTENT_TYPES) - - -def _extract_entry(raw: dict) -> HarEntry | None: - """Parse a raw HAR entry dict into a typed HarEntry. Returns None for static assets.""" - request = raw.get("request", {}) - response = raw.get("response", {}) - - url = request.get("url", "") - method = request.get("method", "").upper() - status = response.get("status", 0) - - content = response.get("content", {}) - content_type = content.get("mimeType", "").lower().split(";")[0].strip() - size_bytes = max(content.get("size", 0), 0) - - if _is_static(url, content_type): - return None - - # Extract POST body from postData - post_body = "" - post_data = request.get("postData", {}) - if post_data: - text = post_data.get("text", "") - params = post_data.get("params", []) - if text: - post_body = text[:POST_BODY_MAX_CHARS] - elif params: - post_body = "&".join( - f"{p['name']}={p.get('value', '')}" for p in params - )[:POST_BODY_MAX_CHARS] - - return HarEntry( - method=method, - url=url, - status=status, - content_type=content_type, - size_bytes=size_bytes, - post_body=post_body, - ) - - -def analyze_har_file(har_path: str) -> HarSummary: - """Parse HAR JSON, filter static assets, categorize interesting entries.""" - data = json.loads(pathlib.Path(har_path).read_bytes()) - raw_entries = data.get("log", {}).get("entries", []) - - assert raw_entries, f"No entries found in HAR file: {har_path}" - - total = len(raw_entries) - filtered_static = 0 - api_calls: list[HarEntry] = [] - downloads: list[HarEntry] = [] - other_interesting: list[HarEntry] = [] - - for raw in raw_entries: - entry = _extract_entry(raw) - if entry is None: - filtered_static += 1 - continue - - ct = entry.content_type - is_download = any(ct.startswith(t) for t in DOWNLOAD_CONTENT_TYPES) - is_api = ct == "application/json" or ct == "application/xml" or entry.method == "POST" - - if is_download: - downloads.append(entry) - elif is_api: - api_calls.append(entry) - else: - other_interesting.append(entry) - - return HarSummary( - api_calls=api_calls, - downloads=downloads, - other_interesting=other_interesting, - total_entries=total, - filtered_static=filtered_static, - ) - - -def format_summary(summary: HarSummary) -> str: - """Format HarSummary as human-readable text for MCP tool response.""" - parts = [ - f"HAR Analysis: {summary.total_entries} total entries, " - f"{summary.filtered_static} static assets filtered\n" - f"Found: {len(summary.api_calls)} API calls, " - f"{len(summary.downloads)} downloads, " - f"{len(summary.other_interesting)} other\n", - ] - - if summary.api_calls: - parts.append("API Calls:") - for e in summary.api_calls: - parts.append( - f" {e.method:<6} {e.url}" - f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]" - ) - if e.post_body: - parts.append(f" Body: {e.post_body}") - parts.append("") - - if summary.downloads: - parts.append("Downloads:") - for e in summary.downloads: - parts.append( - f" {e.method:<6} {e.url}" - f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]" - ) - parts.append("") - - if summary.other_interesting: - parts.append("Other (non-static, non-JSON, non-download):") - for e in summary.other_interesting[:10]: # cap output - parts.append(f" {e.method:<6} {e.url} [{e.status}, {e.content_type}]") - if len(summary.other_interesting) > 10: - parts.append(f" ... and {len(summary.other_interesting) - 10} more") - parts.append("") - - if not summary.api_calls and not summary.downloads: - parts.append("No API calls or downloads found after filtering static assets.") - - return "\n".join(parts) diff --git a/tools/scout/src/scout/browser.py b/tools/scout/src/scout/browser.py deleted file mode 100644 index f58d323..0000000 --- a/tools/scout/src/scout/browser.py +++ /dev/null @@ -1,550 +0,0 @@ -"""Pydoll browser session management for the scout MCP server. - -Manages a single long-lived Chrome instance across multiple MCP tool calls. -The browser starts on the first scout_visit and stays alive until scout_close. - -State is module-level (lives for the duration of the MCP server process). -HAR recording is managed via an asyncio.Task that holds the Pydoll context -manager open between scout_har_start and scout_har_stop calls. - -Bot evasion: -- CDP-based (no chromedriver, navigator.webdriver stays false) -- Humanized mouse movement (Bezier curves) on all clicks -- Headed browser by default (no headless detection vectors) -""" - -import asyncio -import logging -import pathlib -from datetime import datetime - -import msgspec -from pydoll.browser.chromium import Chrome -from pydoll.browser.options import ChromiumOptions - -# Chrome binary search order — covers native installs and Flatpak -_CHROME_PATHS = [ - "/usr/bin/google-chrome", - "/usr/bin/google-chrome-stable", - "/usr/bin/chromium", - "/usr/bin/chromium-browser", - # Flatpak (system install) - "/var/lib/flatpak/app/com.google.Chrome/current/active/export/bin/com.google.Chrome", - # Flatpak (user install) - str(pathlib.Path.home() / ".local/share/flatpak/app/com.google.Chrome/current/active/export/bin/com.google.Chrome"), -] - - -def _find_chrome() -> str | None: - for p in _CHROME_PATHS: - if pathlib.Path(p).exists(): - return p - return None - -logger = logging.getLogger("scout.browser") - -# Module-level browser state — lives for the MCP server process lifetime. -# Using a plain dict so all fields are in one place and easy to reset. -_state: dict = { - "browser": None, # Chrome instance - "tab": None, # Active tab - "har_task": None, # asyncio.Task holding the recording context manager - "har_stop_event": None, # asyncio.Event signalled to stop recording - "har_result": None, # asyncio.Future resolving to HAR file path -} - -OUTPUT_DIR = pathlib.Path("data/scout") -CLICK_TIMEOUT_SECONDS = 10 -NAVIGATION_WAIT_SECONDS = 2 -ELEMENT_CAP = 60 # max elements per category to avoid huge responses - - -class PageElement(msgspec.Struct): - """An interactive element found on the current page.""" - - kind: str # "link", "button", "form", "select", "input" - text: str # visible text or label (truncated) - selector: str # usable CSS selector or description - href: str = "" # for links - action: str = "" # for forms (action URL) - method: str = "" # for forms (GET/POST) - options: list[str] = [] # for selects (option texts) - - -class PageInfo(msgspec.Struct): - """Result of a page visit or navigation action.""" - - title: str - url: str - element_count: int - - -async def _ensure_browser() -> None: - """Launch Chrome if not already running. Idempotent.""" - if _state["tab"] is not None: - return - chrome_path = _find_chrome() - assert chrome_path is not None, ( - "No Chrome/Chromium binary found. Install via: " - "sudo dnf install chromium OR flatpak install com.google.Chrome" - ) - logger.info("Using Chrome at: %s", chrome_path) - options = ChromiumOptions() - options.binary_location = chrome_path - browser = Chrome(options=options) - tab = await browser.start() - _state["browser"] = browser - _state["tab"] = tab - OUTPUT_DIR.mkdir(parents=True, exist_ok=True) - logger.info("Chrome launched") - - -# Cookie consent selectors — ordered by specificity (vendor-specific first, generic last) -_COOKIE_SELECTORS = [ - # OneTrust (very common on financial/data sites incl. ICE) - "#onetrust-accept-btn-handler", - ".onetrust-accept-btn-handler", - # Cookiebot - "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", - # CookieYes / CookieLaw - ".cky-btn-accept", - ".cookie-law-accept", - # Termly - "#termly-code-snippet-support", - # Consentmanager (common in Germany) - "#cmpwrapper .cmpboxbtnyes", - "#cmpbox .cmptxt_btn_yes", - # Generic accept buttons (text-based fallback) - "button[id*='accept']", - "button[class*='accept']", - "button[id*='cookie']", - "button[class*='cookie']", - "button[id*='consent']", - "button[class*='consent']", -] - -# Text patterns for cookie accept buttons — English + German (DSGVO) -_COOKIE_ACCEPT_TEXTS = [ - # English - "Accept All", "Accept all", "Accept all cookies", - "Accept Cookies", "Accept cookies", - "I Accept", "I accept", "Accept", - "Allow All", "Allow all", - "Agree", "I Agree", "OK", "Got it", - "Continue", "Dismiss", - # German (DSGVO) - "Alle akzeptieren", "Akzeptieren", "Zustimmen", - "Alle zustimmen", "Einverstanden", - "Alle Cookies akzeptieren", "Cookies akzeptieren", - "Akzeptieren und weiter", "Weiter", - "Ich stimme zu", "OK, verstanden", -] - -# Usercentrics shadow DOM JS — common in German publishers (Bild, Spiegel, etc.) -_USERCENTRICS_JS = ( - "var host = document.querySelector('#usercentrics-root');" - "if (host && host.shadowRoot) {" - " var btn = host.shadowRoot.querySelector('[data-testid=\"uc-accept-all-button\"]');" - " if (btn) { btn.click(); true; } else { false; }" - "} else { false; }" -) - - -async def _click_via_js(tab, selector: str) -> bool: - """Click an element via JS injection — bypasses pointer-events/z-index issues.""" - try: - # Escape selector for JS string - escaped = selector.replace("'", "\\'") - result = await tab.execute_script( - f"var el = document.querySelector('{escaped}'); " - f"if (el) {{ el.click(); true; }} else {{ false; }}" - ) - return bool(result) - except Exception: - return False - - -async def _dismiss_cookie_banner(tab) -> bool: - """Try to find and click a cookie consent accept button. Returns True if dismissed. - - Uses JS injection (click_js / execute_script) as primary method since cookie - banners often have z-index/pointer-events issues that block Pydoll's mouse simulation. - Covers: OneTrust, Cookiebot, CookieYes, Consentmanager, Usercentrics (shadow DOM), - and generic text patterns in English + German (DSGVO). - """ - # Usercentrics (shadow DOM) — common in German publishers, requires special handling - try: - result = await tab.execute_script(_USERCENTRICS_JS) - if result: - await asyncio.sleep(0.8) - logger.info("Cookie banner dismissed via Usercentrics shadow DOM") - return True - except Exception: - pass - - # Try CSS selectors via JS click (bypasses visibility/pointer-events issues) - for selector in _COOKIE_SELECTORS: - try: - # Check if element exists in DOM first - exists = await tab.execute_script( - f"!!document.querySelector('{selector.replace(chr(39), chr(92)+chr(39))}')" - ) - if exists: - clicked = await _click_via_js(tab, selector) - if clicked: - await asyncio.sleep(0.8) - logger.info("Cookie banner dismissed via JS selector: %s", selector) - return True - except Exception: - continue - - # Fallback: find buttons by text content via JS - for text in _COOKIE_ACCEPT_TEXTS: - try: - escaped_text = text.replace("'", "\\'") - result = await tab.execute_script( - f"var buttons = Array.from(document.querySelectorAll('button, a[role=button]'));" - f"var btn = buttons.find(b => b.textContent.trim().includes('{escaped_text}'));" - f"if (btn) {{ btn.click(); true; }} else {{ false; }}" - ) - if result: - await asyncio.sleep(0.8) - logger.info("Cookie banner dismissed via JS text: %r", text) - return True - except Exception: - continue - - # Note: cross-origin full-page iframe banners (Sourcepoint on German publishers) - # are not dismissed here — coordinate clicks are too brittle across screen sizes. - # HAR recording captures network traffic regardless of banner visibility, - # so dismissal is only needed when we must click navigation elements. - # For those cases, use scout_click_coords manually. - return False - - -async def visit(url: str) -> PageInfo: - """Navigate to url. Opens browser on first call.""" - await _ensure_browser() - tab = _state["tab"] - - await tab.go_to(url) - await asyncio.sleep(1) # let dynamic content settle - - # Auto-dismiss cookie consent banners before anything else - await _dismiss_cookie_banner(tab) - - title = await tab.title - links = await tab.query("a", find_all=True) - element_count = len(links) if links else 0 - - return PageInfo(title=title, url=url, element_count=element_count) - - -async def get_elements(filter_type: str = "") -> list[PageElement]: - """Enumerate interactive elements on the current page. - - filter_type: "", "links", "buttons", "forms", "selects", "inputs" - Returns typed PageElement structs (not screenshots). - """ - assert _state["tab"] is not None, "No browser open — call scout_visit first" - tab = _state["tab"] - elements: list[PageElement] = [] - - # Links - if not filter_type or filter_type == "links": - nodes = await tab.query("a[href]", find_all=True) or [] - for node in nodes[:ELEMENT_CAP]: - try: - text = (await node.text or "").strip()[:100] - href = (await node.get_attribute("href") or "").strip() - if text or href: - elements.append(PageElement( - kind="link", - text=text, - selector=f'a[href="{href}"]' if href else "a", - href=href, - )) - except Exception: - continue - - # Buttons - if not filter_type or filter_type == "buttons": - nodes = await tab.query( - "button, input[type=submit], input[type=button]", find_all=True - ) or [] - for node in nodes[:20]: - try: - text = (await node.text or "").strip() - if not text: - text = await node.get_attribute("value") or "" - text = text[:100] - cls = (await node.get_attribute("class") or "").strip() - sel = f"button.{cls.split()[0]}" if cls else "button" - elements.append(PageElement(kind="button", text=text, selector=sel)) - except Exception: - continue - - # Selects - if not filter_type or filter_type == "selects": - nodes = await tab.query("select", find_all=True) or [] - for node in nodes[:10]: - try: - name = ( - await node.get_attribute("name") - or await node.get_attribute("id") - or "" - ).strip() - option_nodes = await node.query("option", find_all=True) or [] - opts = [] - for opt in option_nodes[:15]: - opt_text = (await opt.text or "").strip() - if opt_text: - opts.append(opt_text) - sel = f"select[name='{name}']" if name else "select" - elements.append(PageElement( - kind="select", text=name, selector=sel, options=opts - )) - except Exception: - continue - - # Forms - if not filter_type or filter_type == "forms": - nodes = await tab.query("form", find_all=True) or [] - for node in nodes[:10]: - try: - action = (await node.get_attribute("action") or "").strip() - method = (await node.get_attribute("method") or "GET").upper() - elements.append(PageElement( - kind="form", - text=f"{method} {action}", - selector="form", - action=action, - method=method, - )) - except Exception: - continue - - # Inputs - if filter_type == "inputs": - nodes = await tab.query( - "input:not([type=hidden]):not([type=submit]):not([type=button])", - find_all=True, - ) or [] - for node in nodes[:20]: - try: - name = (await node.get_attribute("name") or "").strip() - input_type = (await node.get_attribute("type") or "text").strip() - placeholder = (await node.get_attribute("placeholder") or "").strip() - label = name or placeholder or input_type - sel = f"input[name='{name}']" if name else f"input[type='{input_type}']" - elements.append(PageElement(kind="input", text=label, selector=sel)) - except Exception: - continue - - return elements - - -def format_elements(elements: list[PageElement]) -> str: - """Format a list of PageElement structs as human-readable text.""" - if not elements: - return "No interactive elements found." - - # Group by kind - groups: dict[str, list[PageElement]] = {} - for e in elements: - groups.setdefault(e.kind, []).append(e) - - lines: list[str] = [f"Elements ({len(elements)} total):"] - kind_labels = { - "link": "Links", "button": "Buttons", - "form": "Forms", "select": "Selects", "input": "Inputs", - } - - for kind in ["link", "button", "select", "form", "input"]: - group = groups.get(kind, []) - if not group: - continue - lines.append(f"\n{kind_labels.get(kind, kind.capitalize())} ({len(group)}):") - for i, e in enumerate(group): - if kind == "link": - lines.append(f" [{i}] {e.text!r:<40} → {e.href}") - elif kind == "select": - opts = ", ".join(e.options[:5]) - if len(e.options) > 5: - opts += f", ... (+{len(e.options) - 5} more)" - lines.append(f" [{i}] {e.text!r} selector: {e.selector}") - lines.append(f" options: {opts}") - elif kind == "form": - lines.append(f" [{i}] {e.text} selector: {e.selector}") - else: - lines.append(f" [{i}] {e.text!r:<40} selector: {e.selector}") - - return "\n".join(lines) - - -async def click(selector: str) -> PageInfo: - """Click an element. Use 'text=Foo' to click by visible text, else CSS selector.""" - assert _state["tab"] is not None, "No browser open — call scout_visit first" - tab = _state["tab"] - - if selector.startswith("text="): - element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS) - else: - element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS) - - assert element is not None, f"Element not found: {selector!r}" - await element.click() - await asyncio.sleep(NAVIGATION_WAIT_SECONDS) - - title = await tab.title - url = await tab.current_url if hasattr(tab, "current_url") else "" - links = await tab.query("a", find_all=True) or [] - - return PageInfo(title=title, url=url or "", element_count=len(links)) - - -async def fill(selector: str, value: str) -> str: - """Type a value into a form field.""" - assert _state["tab"] is not None, "No browser open — call scout_visit first" - tab = _state["tab"] - - if selector.startswith("text="): - element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS) - else: - element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS) - - assert element is not None, f"Element not found: {selector!r}" - # insert_text is instant (no keystroke simulation) - await element.insert_text(value) - return f"Filled {selector!r} with {value!r}" - - -async def select_option(selector: str, value: str) -> str: - """Select an option in a dropdown. - - Args: - selector: CSS selector for the