diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 0000000..8d35486 --- /dev/null +++ b/.mcp.json @@ -0,0 +1,9 @@ +{ + "mcpServers": { + "scout": { + "type": "stdio", + "command": "uv", + "args": ["run", "--directory", "tools/scout", "scout-server"] + } + } +} diff --git a/coding_philosophy.md b/coding_philosophy.md index 4d54e67..a8b83c8 100644 --- a/coding_philosophy.md +++ b/coding_philosophy.md @@ -201,6 +201,13 @@ active_users = [u for u in users if u.is_active()] - Small, focused libraries - Direct solutions - Understanding what code does + +**Approved dependencies (earn their place):** +- `msgspec` — struct types and validation at system boundaries (external APIs, user input, + inter-process data). Use `msgspec.Struct` instead of dataclasses when you need: fast + encode/decode, built-in validation, or typed containers for boundary data. + **Rule:** use Structs at boundaries (API responses, HAR entries, MCP tool I/O) — + keep internal plumbing as plain dicts/tuples. diff --git a/extract/coffee_prices/pyproject.toml b/extract/coffee_prices/pyproject.toml new file mode 100644 index 0000000..34cd61e --- /dev/null +++ b/extract/coffee_prices/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "coffee_prices" +version = "0.1.0" +description = "KC=F Coffee C futures price extractor" +requires-python = ">=3.13" +dependencies = [ + "yfinance>=0.2.55", +] + +[project.scripts] +extract_prices = "coffee_prices.execute:extract_coffee_prices" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/coffee_prices"] diff --git a/extract/coffee_prices/src/coffee_prices/__init__.py b/extract/coffee_prices/src/coffee_prices/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/extract/coffee_prices/src/coffee_prices/execute.py b/extract/coffee_prices/src/coffee_prices/execute.py new file mode 100644 index 0000000..1fa01d2 --- /dev/null +++ b/extract/coffee_prices/src/coffee_prices/execute.py @@ -0,0 +1,92 @@ +"""Coffee C (KC=F) futures price extraction. + +Downloads daily OHLCV data from Yahoo Finance via yfinance and stores as +gzip CSV in the landing directory. Uses SHA256 of CSV bytes as the +idempotency key — skips if a file with the same hash already exists. + +Landing path: LANDING_DIR/prices/coffee_kc/{hash8}.csv.gzip +""" + +import gzip +import hashlib +import io +import logging +import os +import pathlib +import sys + +import yfinance as yf + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("Coffee Prices Extractor") + +LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +TICKER = "KC=F" +DEST_SUBDIR = "prices/coffee_kc" + +# yfinance raises on network issues; give it enough time for the full history +DOWNLOAD_TIMEOUT_SECONDS = 120 + + +def extract_coffee_prices() -> None: + """Download KC=F daily OHLCV history and store as gzip CSV. + + Idempotent: computes SHA256 of CSV bytes, skips if already on disk. + On first run downloads full history (period='max'). On subsequent runs + the hash matches if no new trading days have closed since last run. + """ + logger.info(f"Downloading {TICKER} daily OHLCV from Yahoo Finance...") + + ticker = yf.Ticker(TICKER) + df = ticker.history(period="max", interval="1d", auto_adjust=False, timeout=DOWNLOAD_TIMEOUT_SECONDS) + + assert df is not None and len(df) > 0, f"yfinance returned empty DataFrame for {TICKER}" + + # Reset index so Date becomes a plain column + df = df.reset_index() + + # Keep standard OHLCV columns only; yfinance may return extra columns + keep_cols = [c for c in ["Date", "Open", "High", "Low", "Close", "Adj Close", "Volume"] if c in df.columns] + df = df[keep_cols] + + # Normalize Date to ISO string for CSV stability across timezones + df["Date"] = df["Date"].dt.strftime("%Y-%m-%d") + + # Serialize to CSV bytes + csv_buf = io.StringIO() + df.to_csv(csv_buf, index=False) + csv_bytes = csv_buf.getvalue().encode("utf-8") + + assert len(csv_bytes) > 0, "CSV serialization produced empty output" + + # Hash-based idempotency key (first 8 hex chars of SHA256) + sha256 = hashlib.sha256(csv_bytes).hexdigest() + etag = sha256[:8] + + dest_dir = LANDING_DIR / DEST_SUBDIR + local_file = dest_dir / f"{etag}.csv.gzip" + + if local_file.exists(): + logger.info(f"File {local_file.name} already exists — no new data, skipping") + return + + # Compress and write + dest_dir.mkdir(parents=True, exist_ok=True) + compressed = gzip.compress(csv_bytes) + local_file.write_bytes(compressed) + + assert local_file.exists(), f"File was not written: {local_file}" + assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" + + logger.info( + f"Stored {local_file} ({local_file.stat().st_size:,} bytes, {len(df):,} rows)" + ) + + +if __name__ == "__main__": + extract_coffee_prices() diff --git a/extract/ice_stocks/pyproject.toml b/extract/ice_stocks/pyproject.toml new file mode 100644 index 0000000..01fedb3 --- /dev/null +++ b/extract/ice_stocks/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "ice_stocks" +version = "0.1.0" +description = "ICE certified warehouse stocks extractor" +requires-python = ">=3.13" +dependencies = [ + "niquests>=3.14.1", +] + +[project.scripts] +extract_ice = "ice_stocks.execute:extract_ice_stocks" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/ice_stocks"] diff --git a/extract/ice_stocks/src/ice_stocks/__init__.py b/extract/ice_stocks/src/ice_stocks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/extract/ice_stocks/src/ice_stocks/execute.py b/extract/ice_stocks/src/ice_stocks/execute.py new file mode 100644 index 0000000..e4dccb4 --- /dev/null +++ b/extract/ice_stocks/src/ice_stocks/execute.py @@ -0,0 +1,173 @@ +"""ICE certified Coffee C warehouse stock extraction. + +Downloads daily certified stock reports from the ICE Report Center and stores +as gzip CSV in the landing directory. Uses SHA256 of content as the +idempotency key — skips if a file with the same hash already exists. + +Landing path: LANDING_DIR/ice_stocks/{year}/{date}_{hash8}.csv.gzip + +CSV format produced (matching raw.ice_warehouse_stocks columns): + report_date,total_certified_bags,pending_grading_bags + +ICE Report Center URL discovery: + Visit https://www.theice.com/report-center and locate the + "Coffee C Warehouse Stocks" report. The download URL has the pattern: + https://www.theice.com/report-center/commodities/COFFEE/reports/... + Set ICE_STOCKS_URL environment variable to the discovered URL. +""" + +import csv +import gzip +import hashlib +import io +import logging +import os +import pathlib +import sys +from datetime import datetime + +import niquests + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("ICE Stocks Extractor") + +LANDING_DIR = pathlib.Path(os.getenv("LANDING_DIR", "data/landing")) +DEST_SUBDIR = "ice_stocks" + +# ICE Report Center URL for Coffee C certified warehouse stocks. +# Discover by visiting https://www.theice.com/report-center and locating +# the Coffee C warehouse stocks CSV export. Override via environment variable. +ICE_STOCKS_URL = os.getenv( + "ICE_STOCKS_URL", + "https://www.theice.com/publicdocs/futures_us/exchange_notices/coffee_certifiedstocks.csv", +) + +HTTP_TIMEOUT_SECONDS = 60 + +# Expected column names from ICE CSV (may vary — adapt to actual column names) +# The ICE report typically has: Date, Certified Stocks (bags), Pending Grading (bags) +# We normalize to our canonical names. +COLUMN_MAPPINGS = { + # Possible ICE column name → our canonical name + "date": "report_date", + "report date": "report_date", + "Date": "report_date", + "certified stocks": "total_certified_bags", + "Certified Stocks": "total_certified_bags", + "certified stocks (bags)": "total_certified_bags", + "total certified": "total_certified_bags", + "pending grading": "pending_grading_bags", + "Pending Grading": "pending_grading_bags", + "pending grading (bags)": "pending_grading_bags", +} + + +def _normalize_row(row: dict) -> dict | None: + """Map raw ICE CSV columns to canonical schema. Returns None if date missing.""" + normalized = {} + for raw_key, value in row.items(): + canonical = COLUMN_MAPPINGS.get(raw_key.strip()) or COLUMN_MAPPINGS.get(raw_key.strip().lower()) + if canonical: + # Strip commas from numeric strings (ICE uses "1,234,567" format) + normalized[canonical] = value.strip().replace(",", "") if value else "" + + if "report_date" not in normalized or not normalized["report_date"]: + return None + + # Fill missing optional columns with empty string + normalized.setdefault("total_certified_bags", "") + normalized.setdefault("pending_grading_bags", "") + + return normalized + + +def _build_canonical_csv(raw_content: bytes) -> bytes: + """Parse ICE CSV and emit canonical CSV with our column schema.""" + text = raw_content.decode("utf-8", errors="replace") + reader = csv.DictReader(io.StringIO(text)) + + rows = [] + for row in reader: + normalized = _normalize_row(row) + if normalized: + rows.append(normalized) + + if not rows: + return b"" + + out = io.StringIO() + writer = csv.DictWriter(out, fieldnames=["report_date", "total_certified_bags", "pending_grading_bags"]) + writer.writeheader() + writer.writerows(rows) + return out.getvalue().encode("utf-8") + + +def extract_ice_stocks() -> None: + """Download ICE certified Coffee C warehouse stocks and store as gzip CSV. + + Idempotent: computes SHA256 of canonical CSV bytes, skips if already on disk. + The ICE report is a rolling file (same URL, updated daily) — we detect + changes via content hash. + """ + logger.info(f"Downloading ICE warehouse stocks from: {ICE_STOCKS_URL}") + + with niquests.Session() as session: + try: + response = session.get(ICE_STOCKS_URL, timeout=HTTP_TIMEOUT_SECONDS) + except Exception as e: + logger.error( + f"Failed to connect to ICE Report Center: {e}\n" + "If the URL has changed, set ICE_STOCKS_URL environment variable.\n" + "Visit https://www.theice.com/report-center to find the current URL." + ) + return + + if response.status_code == 404: + logger.warning( + "ICE stocks URL returned 404. The report URL may have changed.\n" + "Visit https://www.theice.com/report-center to find the current URL,\n" + "then set ICE_STOCKS_URL environment variable." + ) + return + + assert response.status_code == 200, ( + f"Unexpected status {response.status_code} from {ICE_STOCKS_URL}" + ) + assert len(response.content) > 0, "Downloaded empty file from ICE" + + canonical_csv = _build_canonical_csv(response.content) + if not canonical_csv: + logger.warning("ICE CSV parsed to 0 rows — column mapping may need updating") + return + + # Hash-based idempotency + sha256 = hashlib.sha256(canonical_csv).hexdigest() + etag = sha256[:8] + + today = datetime.now().strftime("%Y-%m-%d") + year = datetime.now().strftime("%Y") + + dest_dir = LANDING_DIR / DEST_SUBDIR / year + local_file = dest_dir / f"{today}_{etag}.csv.gzip" + + if local_file.exists(): + logger.info(f"File {local_file.name} already exists — content unchanged, skipping") + return + + dest_dir.mkdir(parents=True, exist_ok=True) + compressed = gzip.compress(canonical_csv) + local_file.write_bytes(compressed) + + assert local_file.exists(), f"File was not written: {local_file}" + assert local_file.stat().st_size > 0, f"Written file is empty: {local_file}" + + logger.info(f"Stored {local_file} ({local_file.stat().st_size:,} bytes)") + + +if __name__ == "__main__": + extract_ice_stocks() diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index 26481c3..701fab4 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -2,6 +2,11 @@ # Materia Supervisor - Continuous pipeline orchestration # Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand # https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh +# +# Environment variables (set in systemd EnvironmentFile): +# LANDING_DIR — local path for extracted landing data +# DUCKDB_PATH — path to DuckDB lakehouse file +# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts set -eu @@ -24,14 +29,33 @@ do git switch --discard-changes --detach origin/master uv sync - # Run pipelines + # Extract all data sources LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ uv run materia pipeline run extract + LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + uv run materia pipeline run extract_cot + + LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + uv run materia pipeline run extract_prices + + LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ + DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ + uv run materia pipeline run extract_ice + + # Transform all data sources LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ uv run materia pipeline run transform - ) || sleep 600 # Sleep 10 min on failure to avoid busy-loop retries + ) || { + # Notify on failure if webhook is configured, then sleep to avoid busy-loop + if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then + curl -s -d "Materia pipeline failed at $(date)" "$ALERT_WEBHOOK_URL" 2>/dev/null || true + fi + sleep 600 # Sleep 10 min on failure + } done diff --git a/pyproject.toml b/pyproject.toml index 858f4c4..46561c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "niquests>=3.15.2", "hcloud>=2.8.0", "prefect>=3.6.15", + "msgspec>=0.19", ] [project.scripts] @@ -41,7 +42,8 @@ dev = [ psdonline = {workspace = true } sqlmesh_materia = {workspace = true } cftc_cot = {workspace = true } - +coffee_prices = {workspace = true } +ice_stocks = {workspace = true } [tool.uv.workspace] members = [ "extract/*", diff --git a/src/materia/pipelines.py b/src/materia/pipelines.py index 2750082..1cfe51a 100644 --- a/src/materia/pipelines.py +++ b/src/materia/pipelines.py @@ -20,6 +20,14 @@ PIPELINES = { "command": ["uv", "run", "--package", "cftc_cot", "extract_cot"], "timeout_seconds": 1800, }, + "extract_prices": { + "command": ["uv", "run", "--package", "coffee_prices", "extract_prices"], + "timeout_seconds": 300, + }, + "extract_ice": { + "command": ["uv", "run", "--package", "ice_stocks", "extract_ice"], + "timeout_seconds": 600, + }, "transform": { "command": ["uv", "run", "--package", "sqlmesh_materia", "sqlmesh", "-p", "transform/sqlmesh_materia", "plan", "prod", "--no-prompts", "--auto-apply"], "timeout_seconds": 3600, diff --git a/tools/scout/pyproject.toml b/tools/scout/pyproject.toml new file mode 100644 index 0000000..37ce2c3 --- /dev/null +++ b/tools/scout/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "scout" +version = "0.1.0" +description = "Browser recon MCP server — discover API endpoints via HAR recording" +requires-python = ">=3.13" +dependencies = [ + "pydoll-python>=1.5", + "mcp[cli]>=1.0", + "msgspec>=0.19", +] + +[project.scripts] +scout-server = "scout.server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/scout"] diff --git a/tools/scout/src/scout/__init__.py b/tools/scout/src/scout/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/scout/src/scout/analyze.py b/tools/scout/src/scout/analyze.py new file mode 100644 index 0000000..0221553 --- /dev/null +++ b/tools/scout/src/scout/analyze.py @@ -0,0 +1,190 @@ +"""HAR file analysis — filter static assets, surface API endpoints and downloads. + +Parses HAR 1.2 JSON files produced by Pydoll's network recorder. Filters out +static assets (JS, CSS, images, fonts) and returns a structured summary of: + - API calls (JSON responses, any POST request) + - Data downloads (CSV, PDF, Excel) + +Typical call: + summary = analyze_har_file("data/scout/recording.har") + print(format_summary(summary)) +""" + +import json +import pathlib + +import msgspec + +STATIC_EXTENSIONS = frozenset( + {".js", ".css", ".png", ".jpg", ".jpeg", ".gif", ".svg", ".ico", + ".woff", ".woff2", ".ttf", ".eot", ".map", ".webp", ".avif", ".apng"} +) + +STATIC_CONTENT_TYPES = frozenset( + {"text/html", "text/javascript", "application/javascript", + "text/css", "image/", "font/", "audio/", "video/"} +) + +DOWNLOAD_CONTENT_TYPES = ( + "text/csv", + "application/pdf", + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/octet-stream", + "text/plain", +) + +POST_BODY_MAX_CHARS = 500 + + +class HarEntry(msgspec.Struct): + """A single interesting HTTP request/response from a HAR file.""" + + method: str + url: str + status: int + content_type: str + size_bytes: int + post_body: str = "" + + +class HarSummary(msgspec.Struct): + """Analysis result: static assets filtered out, interesting entries categorized.""" + + api_calls: list[HarEntry] + downloads: list[HarEntry] + other_interesting: list[HarEntry] + total_entries: int + filtered_static: int + + +def _is_static(url: str, content_type: str) -> bool: + """Return True if this entry looks like a static asset.""" + path = url.split("?")[0].lower() + ext = pathlib.PurePosixPath(path).suffix + if ext in STATIC_EXTENSIONS: + return True + ct = content_type.lower().split(";")[0].strip() + return any(ct.startswith(s) for s in STATIC_CONTENT_TYPES) + + +def _extract_entry(raw: dict) -> HarEntry | None: + """Parse a raw HAR entry dict into a typed HarEntry. Returns None for static assets.""" + request = raw.get("request", {}) + response = raw.get("response", {}) + + url = request.get("url", "") + method = request.get("method", "").upper() + status = response.get("status", 0) + + content = response.get("content", {}) + content_type = content.get("mimeType", "").lower().split(";")[0].strip() + size_bytes = max(content.get("size", 0), 0) + + if _is_static(url, content_type): + return None + + # Extract POST body from postData + post_body = "" + post_data = request.get("postData", {}) + if post_data: + text = post_data.get("text", "") + params = post_data.get("params", []) + if text: + post_body = text[:POST_BODY_MAX_CHARS] + elif params: + post_body = "&".join( + f"{p['name']}={p.get('value', '')}" for p in params + )[:POST_BODY_MAX_CHARS] + + return HarEntry( + method=method, + url=url, + status=status, + content_type=content_type, + size_bytes=size_bytes, + post_body=post_body, + ) + + +def analyze_har_file(har_path: str) -> HarSummary: + """Parse HAR JSON, filter static assets, categorize interesting entries.""" + data = json.loads(pathlib.Path(har_path).read_bytes()) + raw_entries = data.get("log", {}).get("entries", []) + + assert raw_entries, f"No entries found in HAR file: {har_path}" + + total = len(raw_entries) + filtered_static = 0 + api_calls: list[HarEntry] = [] + downloads: list[HarEntry] = [] + other_interesting: list[HarEntry] = [] + + for raw in raw_entries: + entry = _extract_entry(raw) + if entry is None: + filtered_static += 1 + continue + + ct = entry.content_type + is_download = any(ct.startswith(t) for t in DOWNLOAD_CONTENT_TYPES) + is_api = ct == "application/json" or ct == "application/xml" or entry.method == "POST" + + if is_download: + downloads.append(entry) + elif is_api: + api_calls.append(entry) + else: + other_interesting.append(entry) + + return HarSummary( + api_calls=api_calls, + downloads=downloads, + other_interesting=other_interesting, + total_entries=total, + filtered_static=filtered_static, + ) + + +def format_summary(summary: HarSummary) -> str: + """Format HarSummary as human-readable text for MCP tool response.""" + parts = [ + f"HAR Analysis: {summary.total_entries} total entries, " + f"{summary.filtered_static} static assets filtered\n" + f"Found: {len(summary.api_calls)} API calls, " + f"{len(summary.downloads)} downloads, " + f"{len(summary.other_interesting)} other\n", + ] + + if summary.api_calls: + parts.append("API Calls:") + for e in summary.api_calls: + parts.append( + f" {e.method:<6} {e.url}" + f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]" + ) + if e.post_body: + parts.append(f" Body: {e.post_body}") + parts.append("") + + if summary.downloads: + parts.append("Downloads:") + for e in summary.downloads: + parts.append( + f" {e.method:<6} {e.url}" + f" [{e.status}, {e.content_type}, {e.size_bytes:,}B]" + ) + parts.append("") + + if summary.other_interesting: + parts.append("Other (non-static, non-JSON, non-download):") + for e in summary.other_interesting[:10]: # cap output + parts.append(f" {e.method:<6} {e.url} [{e.status}, {e.content_type}]") + if len(summary.other_interesting) > 10: + parts.append(f" ... and {len(summary.other_interesting) - 10} more") + parts.append("") + + if not summary.api_calls and not summary.downloads: + parts.append("No API calls or downloads found after filtering static assets.") + + return "\n".join(parts) diff --git a/tools/scout/src/scout/browser.py b/tools/scout/src/scout/browser.py new file mode 100644 index 0000000..55cb404 --- /dev/null +++ b/tools/scout/src/scout/browser.py @@ -0,0 +1,396 @@ +"""Pydoll browser session management for the scout MCP server. + +Manages a single long-lived Chrome instance across multiple MCP tool calls. +The browser starts on the first scout_visit and stays alive until scout_close. + +State is module-level (lives for the duration of the MCP server process). +HAR recording is managed via an asyncio.Task that holds the Pydoll context +manager open between scout_har_start and scout_har_stop calls. + +Bot evasion: +- CDP-based (no chromedriver, navigator.webdriver stays false) +- Humanized mouse movement (Bezier curves) on all clicks +- Headed browser by default (no headless detection vectors) +""" + +import asyncio +import logging +import pathlib +from datetime import datetime + +import msgspec +from pydoll.browser.chromium import Chrome + +logger = logging.getLogger("scout.browser") + +# Module-level browser state — lives for the MCP server process lifetime. +# Using a plain dict so all fields are in one place and easy to reset. +_state: dict = { + "browser": None, # Chrome instance + "tab": None, # Active tab + "har_task": None, # asyncio.Task holding the recording context manager + "har_stop_event": None, # asyncio.Event signalled to stop recording + "har_result": None, # asyncio.Future resolving to HAR file path +} + +OUTPUT_DIR = pathlib.Path("data/scout") +CLICK_TIMEOUT_SECONDS = 10 +NAVIGATION_WAIT_SECONDS = 2 +ELEMENT_CAP = 60 # max elements per category to avoid huge responses + + +class PageElement(msgspec.Struct): + """An interactive element found on the current page.""" + + kind: str # "link", "button", "form", "select", "input" + text: str # visible text or label (truncated) + selector: str # usable CSS selector or description + href: str = "" # for links + action: str = "" # for forms (action URL) + method: str = "" # for forms (GET/POST) + options: list[str] = [] # for selects (option texts) + + +class PageInfo(msgspec.Struct): + """Result of a page visit or navigation action.""" + + title: str + url: str + element_count: int + + +async def _ensure_browser() -> None: + """Launch Chrome if not already running. Idempotent.""" + if _state["tab"] is not None: + return + browser = Chrome() + tab = await browser.start() + _state["browser"] = browser + _state["tab"] = tab + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + logger.info("Chrome launched") + + +async def visit(url: str) -> PageInfo: + """Navigate to url. Opens browser on first call.""" + await _ensure_browser() + tab = _state["tab"] + + await tab.go_to(url) + await asyncio.sleep(1) # let dynamic content settle + + title = await tab.title + links = await tab.query("a", find_all=True) + element_count = len(links) if links else 0 + + return PageInfo(title=title, url=url, element_count=element_count) + + +async def get_elements(filter_type: str = "") -> list[PageElement]: + """Enumerate interactive elements on the current page. + + filter_type: "", "links", "buttons", "forms", "selects", "inputs" + Returns typed PageElement structs (not screenshots). + """ + assert _state["tab"] is not None, "No browser open — call scout_visit first" + tab = _state["tab"] + elements: list[PageElement] = [] + + # Links + if not filter_type or filter_type == "links": + nodes = await tab.query("a[href]", find_all=True) or [] + for node in nodes[:ELEMENT_CAP]: + try: + text = (await node.text or "").strip()[:100] + href = (await node.get_attribute("href") or "").strip() + if text or href: + elements.append(PageElement( + kind="link", + text=text, + selector=f'a[href="{href}"]' if href else "a", + href=href, + )) + except Exception: + continue + + # Buttons + if not filter_type or filter_type == "buttons": + nodes = await tab.query( + "button, input[type=submit], input[type=button]", find_all=True + ) or [] + for node in nodes[:20]: + try: + text = (await node.text or "").strip() + if not text: + text = await node.get_attribute("value") or "" + text = text[:100] + cls = (await node.get_attribute("class") or "").strip() + sel = f"button.{cls.split()[0]}" if cls else "button" + elements.append(PageElement(kind="button", text=text, selector=sel)) + except Exception: + continue + + # Selects + if not filter_type or filter_type == "selects": + nodes = await tab.query("select", find_all=True) or [] + for node in nodes[:10]: + try: + name = ( + await node.get_attribute("name") + or await node.get_attribute("id") + or "" + ).strip() + option_nodes = await node.query("option", find_all=True) or [] + opts = [] + for opt in option_nodes[:15]: + opt_text = (await opt.text or "").strip() + if opt_text: + opts.append(opt_text) + sel = f"select[name='{name}']" if name else "select" + elements.append(PageElement( + kind="select", text=name, selector=sel, options=opts + )) + except Exception: + continue + + # Forms + if not filter_type or filter_type == "forms": + nodes = await tab.query("form", find_all=True) or [] + for node in nodes[:10]: + try: + action = (await node.get_attribute("action") or "").strip() + method = (await node.get_attribute("method") or "GET").upper() + elements.append(PageElement( + kind="form", + text=f"{method} {action}", + selector="form", + action=action, + method=method, + )) + except Exception: + continue + + # Inputs + if filter_type == "inputs": + nodes = await tab.query( + "input:not([type=hidden]):not([type=submit]):not([type=button])", + find_all=True, + ) or [] + for node in nodes[:20]: + try: + name = (await node.get_attribute("name") or "").strip() + input_type = (await node.get_attribute("type") or "text").strip() + placeholder = (await node.get_attribute("placeholder") or "").strip() + label = name or placeholder or input_type + sel = f"input[name='{name}']" if name else f"input[type='{input_type}']" + elements.append(PageElement(kind="input", text=label, selector=sel)) + except Exception: + continue + + return elements + + +def format_elements(elements: list[PageElement]) -> str: + """Format a list of PageElement structs as human-readable text.""" + if not elements: + return "No interactive elements found." + + # Group by kind + groups: dict[str, list[PageElement]] = {} + for e in elements: + groups.setdefault(e.kind, []).append(e) + + lines: list[str] = [f"Elements ({len(elements)} total):"] + kind_labels = { + "link": "Links", "button": "Buttons", + "form": "Forms", "select": "Selects", "input": "Inputs", + } + + for kind in ["link", "button", "select", "form", "input"]: + group = groups.get(kind, []) + if not group: + continue + lines.append(f"\n{kind_labels.get(kind, kind.capitalize())} ({len(group)}):") + for i, e in enumerate(group): + if kind == "link": + lines.append(f" [{i}] {e.text!r:<40} → {e.href}") + elif kind == "select": + opts = ", ".join(e.options[:5]) + if len(e.options) > 5: + opts += f", ... (+{len(e.options) - 5} more)" + lines.append(f" [{i}] {e.text!r} selector: {e.selector}") + lines.append(f" options: {opts}") + elif kind == "form": + lines.append(f" [{i}] {e.text} selector: {e.selector}") + else: + lines.append(f" [{i}] {e.text!r:<40} selector: {e.selector}") + + return "\n".join(lines) + + +async def click(selector: str) -> PageInfo: + """Click an element. Use 'text=Foo' to click by visible text, else CSS selector.""" + assert _state["tab"] is not None, "No browser open — call scout_visit first" + tab = _state["tab"] + + if selector.startswith("text="): + element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS) + else: + element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS) + + assert element is not None, f"Element not found: {selector!r}" + await element.click() + await asyncio.sleep(NAVIGATION_WAIT_SECONDS) + + title = await tab.title + url = await tab.current_url if hasattr(tab, "current_url") else "" + links = await tab.query("a", find_all=True) or [] + + return PageInfo(title=title, url=url or "", element_count=len(links)) + + +async def fill(selector: str, value: str) -> str: + """Type a value into a form field.""" + assert _state["tab"] is not None, "No browser open — call scout_visit first" + tab = _state["tab"] + + if selector.startswith("text="): + element = await tab.find(text=selector[5:], timeout=CLICK_TIMEOUT_SECONDS) + else: + element = await tab.query(selector, timeout=CLICK_TIMEOUT_SECONDS) + + assert element is not None, f"Element not found: {selector!r}" + # insert_text is instant (no keystroke simulation) + await element.insert_text(value) + return f"Filled {selector!r} with {value!r}" + + +async def select_option(selector: str, value: str) -> str: + """Select an option in a dropdown. + + Args: + selector: CSS selector for the