From 060cb9b32ed56352bbf9719928f97d5fe172208a Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:44:03 +0100 Subject: [PATCH] feat(pipeline): scaffold Pipeline Console blueprint + sidebar + app registration - New pipeline_routes.py blueprint (url_prefix=/admin/pipeline) with: - All 9 routes (dashboard, overview, extractions, catalog, query editor) - Data access functions: state DB (sync+to_thread), serving meta, landing FS, workflows.toml - execute_user_query() added to analytics.py (columns+rows+error+elapsed_ms) - Query security: blocklist regex, 10k char limit, 1000 row cap, 10s timeout - Add 'Pipeline' sidebar section to base_admin.html (between Analytics and System) - Register pipeline_bp in app.py - Add run_extraction task handler to worker.py Subtask 1 of 6 Co-Authored-By: Claude Sonnet 4.6 --- web/src/padelnomics/admin/pipeline_routes.py | 699 ++++++++++++++++++ .../admin/templates/admin/base_admin.html | 6 + web/src/padelnomics/analytics.py | 50 +- web/src/padelnomics/app.py | 2 + web/src/padelnomics/worker.py | 26 + 5 files changed, 782 insertions(+), 1 deletion(-) create mode 100644 web/src/padelnomics/admin/pipeline_routes.py diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py new file mode 100644 index 0000000..542e2e6 --- /dev/null +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -0,0 +1,699 @@ +""" +Pipeline Console admin blueprint. + +Operational visibility for the data extraction and transformation pipeline: + /admin/pipeline/ → dashboard (health stats + tab container) + /admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats + /admin/pipeline/extractions → HTMX tab: filterable extraction run history + /admin/pipeline/extractions//mark-stale → POST: mark stuck "running" row as failed + /admin/pipeline/extract/trigger → POST: enqueue full extraction run + /admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data) + /admin/pipeline/catalog/ → HTMX partial: table detail (columns + sample) + /admin/pipeline/query → HTMX tab: SQL query editor + /admin/pipeline/query/execute → POST: run user SQL, return results table + +Data sources: + - data/landing/.state.sqlite (extraction run history — stdlib sqlite3, sync via to_thread) + - SERVING_DUCKDB_PATH/../_serving_meta.json (export timestamp + per-table row counts) + - analytics.duckdb (DuckDB read-only via analytics.execute_user_query) + - LANDING_DIR/ (filesystem scan for file sizes + dates) + - infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib) +""" +import asyncio +import json +import logging +import os +import re +import sqlite3 +import sys +import time +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from quart import Blueprint, flash, redirect, render_template, request, url_for + +from ..auth.routes import role_required +from ..core import csrf_protect + +logger = logging.getLogger(__name__) + +bp = Blueprint( + "pipeline", + __name__, + template_folder=str(Path(__file__).parent / "templates"), + url_prefix="/admin/pipeline", +) + +# ── Config ──────────────────────────────────────────────────────────────────── + +_LANDING_DIR = os.environ.get("LANDING_DIR", "data/landing") +_SERVING_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb") + +# Repo root: web/src/padelnomics/admin/ → up 4 levels +_REPO_ROOT = Path(__file__).resolve().parents[5] +_WORKFLOWS_TOML = _REPO_ROOT / "infra" / "supervisor" / "workflows.toml" + +# A "running" row older than this is considered stale/crashed. +_STALE_THRESHOLD_HOURS = 2 + +# Query editor limits +_QUERY_MAX_CHARS = 10_000 +_QUERY_MAX_ROWS = 1_000 +_QUERY_TIMEOUT_SECONDS = 10 + +# Blocked SQL keywords (read-only connection enforces engine-level, this adds belt+suspenders) +_BLOCKED_SQL_RE = re.compile( + r"\b(ATTACH|COPY|EXPORT|INSTALL|LOAD|CREATE|DROP|ALTER|INSERT|UPDATE|DELETE|GRANT|REVOKE|PRAGMA)\b", + re.IGNORECASE, +) + + +# ── Sidebar data injection (same pattern as pseo_routes.py) ────────────────── + + +@bp.before_request +async def _inject_sidebar_data(): + """Load unread inbox count for the admin sidebar badge.""" + from quart import g + + from ..core import fetch_one + + try: + row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0") + g.admin_unread_count = row["cnt"] if row else 0 + except Exception: + g.admin_unread_count = 0 + + +@bp.context_processor +def _admin_context(): + from quart import g + + return {"unread_count": getattr(g, "admin_unread_count", 0)} + + +# ── Data access: state DB (sync, called via to_thread) ──────────────────────── + + +def _state_db_path() -> Path: + return Path(_LANDING_DIR) / ".state.sqlite" + + +def _fetch_extraction_summary_sync() -> dict: + """Aggregate stats from extraction_runs: total, success, failed, stale counts.""" + db_path = _state_db_path() + if not db_path.exists(): + return {"total": 0, "success": 0, "failed": 0, "running": 0, "stale": 0} + + cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + """ + SELECT + COUNT(*) AS total, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed, + SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running, + SUM(CASE WHEN status = 'running' AND started_at < ? THEN 1 ELSE 0 END) AS stale + FROM extraction_runs + """, + (cutoff,), + ).fetchone() + return { + "total": row["total"] or 0, + "success": row["success"] or 0, + "failed": row["failed"] or 0, + "running": row["running"] or 0, + "stale": row["stale"] or 0, + } + finally: + conn.close() + + +def _fetch_latest_per_extractor_sync() -> list[dict]: + """Return most recent run for each extractor name.""" + db_path = _state_db_path() + if not db_path.exists(): + return [] + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """ + SELECT r.* + FROM extraction_runs r + INNER JOIN ( + SELECT extractor, MAX(run_id) AS max_id + FROM extraction_runs + GROUP BY extractor + ) latest ON r.run_id = latest.max_id + ORDER BY r.extractor + """ + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def _fetch_extraction_runs_sync( + *, + extractor: str = "", + status: str = "", + limit: int = 50, + offset: int = 0, +) -> tuple[list[dict], int]: + """Return (rows, total_count) for the filtered run history.""" + assert 1 <= limit <= 200, f"limit must be 1–200, got {limit}" + assert offset >= 0, f"offset must be >= 0, got {offset}" + + db_path = _state_db_path() + if not db_path.exists(): + return [], 0 + + where_clauses = [] + params: list = [] + if extractor: + where_clauses.append("extractor = ?") + params.append(extractor) + if status: + where_clauses.append("status = ?") + params.append(status) + + where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else "" + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + total = conn.execute( + f"SELECT COUNT(*) FROM extraction_runs {where_sql}", params + ).fetchone()[0] + + rows = conn.execute( + f""" + SELECT run_id, extractor, started_at, finished_at, status, + files_written, files_skipped, bytes_written, + cursor_value, error_message + FROM extraction_runs {where_sql} + ORDER BY run_id DESC + LIMIT ? OFFSET ? + """, + params + [limit, offset], + ).fetchall() + return [dict(r) for r in rows], total + finally: + conn.close() + + +def _fetch_distinct_extractors_sync() -> list[str]: + """Return distinct extractor names for filter dropdowns.""" + db_path = _state_db_path() + if not db_path.exists(): + return [] + conn = sqlite3.connect(str(db_path)) + try: + rows = conn.execute( + "SELECT DISTINCT extractor FROM extraction_runs ORDER BY extractor" + ).fetchall() + return [r[0] for r in rows] + finally: + conn.close() + + +def _mark_run_failed_sync(run_id: int) -> bool: + """Mark a stuck 'running' row as 'failed'. Returns True if row was updated.""" + assert run_id > 0, f"run_id must be positive, got {run_id}" + db_path = _state_db_path() + if not db_path.exists(): + return False + conn = sqlite3.connect(str(db_path)) + try: + cur = conn.execute( + """ + UPDATE extraction_runs + SET status = 'failed', + finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), + error_message = 'Marked failed manually (admin — process appeared stuck)' + WHERE run_id = ? AND status = 'running' + """, + (run_id,), + ) + conn.commit() + return cur.rowcount > 0 + finally: + conn.close() + + +# ── Data access: serving meta ───────────────────────────────────────────────── + + +def _load_serving_meta() -> dict | None: + """Read _serving_meta.json alongside analytics.duckdb. Returns None if absent.""" + meta_path = Path(_SERVING_DUCKDB_PATH).parent / "_serving_meta.json" + if not meta_path.exists(): + return None + try: + return json.loads(meta_path.read_text()) + except Exception: + logger.warning("Failed to read _serving_meta.json", exc_info=True) + return None + + +# ── Data access: landing zone filesystem ───────────────────────────────────── + + +def _get_landing_zone_stats_sync() -> list[dict]: + """Scan LANDING_DIR and return per-source file counts + total bytes.""" + landing = Path(_LANDING_DIR) + if not landing.exists(): + return [] + + sources = [] + for source_dir in sorted(landing.iterdir()): + if not source_dir.is_dir() or source_dir.name.startswith("."): + continue + files = list(source_dir.rglob("*.gz")) + list(source_dir.rglob("*.jsonl")) + total_bytes = sum(f.stat().st_size for f in files) + latest_mtime = max((f.stat().st_mtime for f in files), default=None) + sources.append({ + "name": source_dir.name, + "file_count": len(files), + "total_bytes": total_bytes, + "latest_mtime": ( + datetime.fromtimestamp(latest_mtime, tz=UTC).strftime("%Y-%m-%d %H:%M") + if latest_mtime + else None + ), + }) + return sources + + +# ── Data access: workflows.toml ─────────────────────────────────────────────── + +_SCHEDULE_LABELS = { + "hourly": "Every hour", + "daily": "Daily", + "weekly": "Weekly", + "monthly": "Monthly", +} + + +def _load_workflows() -> list[dict]: + """Parse workflows.toml and return workflow definitions with human schedule labels.""" + if not _WORKFLOWS_TOML.exists(): + return [] + + if sys.version_info >= (3, 11): + import tomllib + + data = tomllib.loads(_WORKFLOWS_TOML.read_text()) + else: + # Fallback for older Python (shouldn't happen — project requires 3.11+) + try: + import tomli as tomllib # type: ignore[no-redef] + + data = tomllib.loads(_WORKFLOWS_TOML.read_text()) + except ImportError: + return [] + + workflows = [] + for name, config in data.items(): + schedule = config.get("schedule", "") + schedule_label = _SCHEDULE_LABELS.get(schedule, schedule) + workflows.append({ + "name": name, + "module": config.get("module", ""), + "schedule": schedule, + "schedule_label": schedule_label, + "depends_on": config.get("depends_on", []), + }) + return workflows + + +# ── Route helpers ───────────────────────────────────────────────────────────── + + +def _format_bytes(n: int) -> str: + """Human-readable byte count.""" + if n < 1024: + return f"{n} B" + if n < 1024 * 1024: + return f"{n / 1024:.1f} KB" + return f"{n / 1024 / 1024:.1f} MB" + + +def _duration_str(started_at: str | None, finished_at: str | None) -> str: + """Return human-readable duration, or '' if unavailable.""" + if not started_at or not finished_at: + return "" + try: + fmt = "%Y-%m-%dT%H:%M:%SZ" + start = datetime.strptime(started_at, fmt) + end = datetime.strptime(finished_at, fmt) + delta = int((end - start).total_seconds()) + if delta < 60: + return f"{delta}s" + return f"{delta // 60}m {delta % 60}s" + except ValueError: + return "" + + +def _is_stale(run: dict) -> bool: + """True if a 'running' row has been stuck longer than the stale threshold.""" + if run.get("status") != "running": + return False + started = run.get("started_at", "") + if not started: + return True + try: + fmt = "%Y-%m-%dT%H:%M:%SZ" + start = datetime.strptime(started, fmt).replace(tzinfo=UTC) + return (datetime.now(UTC) - start) > timedelta(hours=_STALE_THRESHOLD_HOURS) + except ValueError: + return False + + +# ── Dashboard ───────────────────────────────────────────────────────────────── + + +@bp.route("/") +@role_required("admin") +async def pipeline_dashboard(): + """Main page: health stat cards + tab container.""" + summary = await asyncio.to_thread(_fetch_extraction_summary_sync) + serving_meta = await asyncio.to_thread(_load_serving_meta) + + total_serving_tables = len(serving_meta["tables"]) if serving_meta else 0 + last_export = serving_meta.get("exported_at_utc", "")[:19].replace("T", " ") if serving_meta else "—" + + success_rate = 0 + if summary["total"] > 0: + success_rate = round(100 * summary["success"] / summary["total"]) + + return await render_template( + "admin/pipeline.html", + summary=summary, + success_rate=success_rate, + total_serving_tables=total_serving_tables, + last_export=last_export, + admin_page="pipeline", + ) + + +# ── Overview tab ───────────────────────────────────────────────────────────── + + +@bp.route("/overview") +@role_required("admin") +async def pipeline_overview(): + """HTMX tab: extraction status per source, serving freshness, landing zone.""" + latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather( + asyncio.to_thread(_fetch_latest_per_extractor_sync), + asyncio.to_thread(_get_landing_zone_stats_sync), + asyncio.to_thread(_load_workflows), + asyncio.to_thread(_load_serving_meta), + ) + + # Build a lookup: extractor name → latest run + latest_by_name = {r["extractor"]: r for r in latest_runs} + + # Enrich each workflow with its latest run data + cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + workflow_rows = [] + for wf in workflows: + run = latest_by_name.get(wf["name"]) + workflow_rows.append({ + "workflow": wf, + "run": run, + "stale": _is_stale(run) if run else False, + }) + + # Compute landing zone totals + total_landing_bytes = sum(s["total_bytes"] for s in landing_stats) + + return await render_template( + "admin/partials/pipeline_overview.html", + workflow_rows=workflow_rows, + landing_stats=landing_stats, + total_landing_bytes=total_landing_bytes, + serving_meta=serving_meta, + format_bytes=_format_bytes, + ) + + +# ── Extractions tab ──────────────────────────────────────────────────────────── + + +@bp.route("/extractions") +@role_required("admin") +async def pipeline_extractions(): + """HTMX tab: paginated + filtered extraction run history.""" + extractor_filter = request.args.get("extractor", "") + status_filter = request.args.get("status", "") + page = max(1, int(request.args.get("page", 1))) + per_page = 30 + + (runs, total), extractors = await asyncio.gather( + asyncio.to_thread( + _fetch_extraction_runs_sync, + extractor=extractor_filter, + status=status_filter, + limit=per_page, + offset=(page - 1) * per_page, + ), + asyncio.to_thread(_fetch_distinct_extractors_sync), + ) + + # Enrich rows with computed fields + for run in runs: + run["duration"] = _duration_str(run.get("started_at"), run.get("finished_at")) + run["bytes_label"] = _format_bytes(run.get("bytes_written") or 0) + run["is_stale"] = _is_stale(run) + + total_pages = max(1, (total + per_page - 1) // per_page) + + return await render_template( + "admin/partials/pipeline_extractions.html", + runs=runs, + total=total, + page=page, + per_page=per_page, + total_pages=total_pages, + extractors=extractors, + extractor_filter=extractor_filter, + status_filter=status_filter, + ) + + +@bp.route("/extractions//mark-stale", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_mark_stale(run_id: int): + """Mark a stuck 'running' extraction row as 'failed'.""" + updated = await asyncio.to_thread(_mark_run_failed_sync, run_id) + if updated: + await flash(f"Run #{run_id} marked as failed.", "success") + else: + await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning") + return redirect(url_for("pipeline.pipeline_extractions")) + + +# ── Trigger extraction ──────────────────────────────────────────────────────── + + +@bp.route("/extract/trigger", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_trigger_extract(): + """Enqueue a full pipeline extraction run.""" + from ..worker import enqueue + + await enqueue("run_extraction") + await flash("Extraction run queued. Check the task queue for progress.", "success") + return redirect(url_for("pipeline.pipeline_dashboard")) + + +# ── Catalog tab ─────────────────────────────────────────────────────────────── + + +@bp.route("/catalog") +@role_required("admin") +async def pipeline_catalog(): + """HTMX tab: list serving tables with row counts + column counts.""" + from ..analytics import fetch_analytics + + schema_rows = await fetch_analytics( + """ + SELECT table_name, column_name, data_type, ordinal_position + FROM information_schema.columns + WHERE table_schema = 'serving' + ORDER BY table_name, ordinal_position + """ + ) + + # Group by table + tables: dict[str, dict] = {} + for row in schema_rows: + tname = row["table_name"] + if tname not in tables: + tables[tname] = {"name": tname, "columns": [], "column_count": 0} + tables[tname]["columns"].append({ + "name": row["column_name"], + "type": row["data_type"], + }) + tables[tname]["column_count"] += 1 + + # Enrich with row counts from serving meta + serving_meta = await asyncio.to_thread(_load_serving_meta) + meta_counts = serving_meta.get("tables", {}) if serving_meta else {} + for tname, tdata in tables.items(): + tdata["row_count"] = meta_counts.get(tname, {}).get("row_count") + + return await render_template( + "admin/partials/pipeline_catalog.html", + tables=list(tables.values()), + serving_meta=serving_meta, + ) + + +@bp.route("/catalog/") +@role_required("admin") +async def pipeline_table_detail(table_name: str): + """HTMX partial: column list + 10-row sample for a serving table.""" + from ..analytics import fetch_analytics + + # Validate table name before using in SQL + if not re.match(r"^[a-z_][a-z0-9_]*$", table_name): + return "Invalid table name", 400 + + # Confirm table exists in serving schema + exists = await fetch_analytics( + "SELECT 1 FROM information_schema.tables" + " WHERE table_schema = 'serving' AND table_name = ?", + [table_name], + ) + if not exists: + return f"Table serving.{table_name} not found", 404 + + columns, sample = await asyncio.gather( + fetch_analytics( + "SELECT column_name, data_type, ordinal_position" + " FROM information_schema.columns" + " WHERE table_schema = 'serving' AND table_name = ?" + " ORDER BY ordinal_position", + [table_name], + ), + fetch_analytics( + f"SELECT * FROM serving.{table_name} LIMIT 10" # noqa: S608 + ), + ) + + return await render_template( + "admin/partials/pipeline_table_detail.html", + table_name=table_name, + columns=columns, + sample=sample, + ) + + +# ── Query editor ────────────────────────────────────────────────────────────── + + +@bp.route("/query") +@role_required("admin") +async def pipeline_query_editor(): + """HTMX tab: SQL query editor with schema sidebar.""" + from ..analytics import fetch_analytics + + schema_rows = await fetch_analytics( + """ + SELECT table_name, column_name, data_type + FROM information_schema.columns + WHERE table_schema = 'serving' + ORDER BY table_name, ordinal_position + """ + ) + + # Group by table for the schema sidebar + schema: dict[str, list] = {} + for row in schema_rows: + tname = row["table_name"] + if tname not in schema: + schema[tname] = [] + schema[tname].append({"name": row["column_name"], "type": row["data_type"]}) + + return await render_template( + "admin/partials/pipeline_query.html", + schema=schema, + max_rows=_QUERY_MAX_ROWS, + timeout_seconds=_QUERY_TIMEOUT_SECONDS, + ) + + +@bp.route("/query/execute", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_query_execute(): + """Run user-submitted SQL and return a results table partial.""" + from ..analytics import execute_user_query + + form = await request.form + sql = (form.get("sql") or "").strip() + + # Input validation + if not sql: + return await render_template( + "admin/partials/pipeline_query_results.html", + error="SQL query is empty.", + columns=[], + rows=[], + row_count=0, + elapsed_ms=0, + truncated=False, + ) + + if len(sql) > _QUERY_MAX_CHARS: + return await render_template( + "admin/partials/pipeline_query_results.html", + error=f"Query too long ({len(sql):,} chars). Maximum is {_QUERY_MAX_CHARS:,} characters.", + columns=[], + rows=[], + row_count=0, + elapsed_ms=0, + truncated=False, + ) + + if _BLOCKED_SQL_RE.search(sql): + return await render_template( + "admin/partials/pipeline_query_results.html", + error="Query contains a blocked keyword. Only SELECT statements are allowed.", + columns=[], + rows=[], + row_count=0, + elapsed_ms=0, + truncated=False, + ) + + columns, rows, error, elapsed_ms = await execute_user_query( + sql, + max_rows=_QUERY_MAX_ROWS, + timeout_seconds=_QUERY_TIMEOUT_SECONDS, + ) + + truncated = len(rows) >= _QUERY_MAX_ROWS + + return await render_template( + "admin/partials/pipeline_query_results.html", + error=error, + columns=columns, + rows=rows, + row_count=len(rows), + elapsed_ms=elapsed_ms, + truncated=truncated, + ) diff --git a/web/src/padelnomics/admin/templates/admin/base_admin.html b/web/src/padelnomics/admin/templates/admin/base_admin.html index d5ffbf0..be22415 100644 --- a/web/src/padelnomics/admin/templates/admin/base_admin.html +++ b/web/src/padelnomics/admin/templates/admin/base_admin.html @@ -133,6 +133,12 @@ SEO Hub +
Pipeline
+ + + Pipeline + +
System
diff --git a/web/src/padelnomics/analytics.py b/web/src/padelnomics/analytics.py index 5379a67..d7d4caa 100644 --- a/web/src/padelnomics/analytics.py +++ b/web/src/padelnomics/analytics.py @@ -5,13 +5,16 @@ Opens a single long-lived DuckDB connection at startup (read_only=True). All queries run via asyncio.to_thread() to avoid blocking the event loop. Usage: - from .analytics import fetch_analytics + from .analytics import fetch_analytics, execute_user_query rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"]) + + cols, rows, error, elapsed_ms = await execute_user_query("SELECT city_slug FROM serving.city_market_profile LIMIT 5") """ import asyncio import logging import os +import time from pathlib import Path from typing import Any @@ -77,3 +80,48 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str except Exception: logger.exception("DuckDB analytics query failed: %.200s", sql) return [] + + +async def execute_user_query( + sql: str, + max_rows: int = 1000, + timeout_seconds: int = 10, +) -> tuple[list[str], list[tuple], str | None, float]: + """ + Run an admin-submitted SQL query. + + Returns (columns, rows, error, elapsed_ms). + - columns: list of column name strings (empty on error) + - rows: list of value tuples, capped at max_rows + - error: error message string, or None on success + - elapsed_ms: wall-clock query time in milliseconds + """ + assert sql, "sql must not be empty" + assert 1 <= max_rows <= 10_000, f"max_rows must be 1–10000, got {max_rows}" + assert 1 <= timeout_seconds <= 60, f"timeout_seconds must be 1–60, got {timeout_seconds}" + + if _conn is None: + return [], [], "Analytics database is not available.", 0.0 + + def _run() -> tuple[list[str], list[tuple], str | None, float]: + t0 = time.monotonic() + cur = _conn.cursor() + try: + rel = cur.execute(sql) + cols = [d[0] for d in rel.description] + rows = rel.fetchmany(max_rows) + elapsed_ms = round((time.monotonic() - t0) * 1000, 1) + return cols, rows, None, elapsed_ms + except Exception as exc: + elapsed_ms = round((time.monotonic() - t0) * 1000, 1) + return [], [], str(exc), elapsed_ms + finally: + cur.close() + + try: + return await asyncio.wait_for( + asyncio.to_thread(_run), + timeout=timeout_seconds, + ) + except asyncio.TimeoutError: + return [], [], f"Query timed out after {timeout_seconds}s.", 0.0 diff --git a/web/src/padelnomics/app.py b/web/src/padelnomics/app.py index 5b29b64..fbcf74b 100644 --- a/web/src/padelnomics/app.py +++ b/web/src/padelnomics/app.py @@ -313,6 +313,7 @@ def create_app() -> Quart: # Blueprint registration # ------------------------------------------------------------------------- + from .admin.pipeline_routes import bp as pipeline_bp from .admin.pseo_routes import bp as pseo_bp from .admin.routes import bp as admin_bp from .auth.routes import bp as auth_bp @@ -339,6 +340,7 @@ def create_app() -> Quart: app.register_blueprint(billing_bp) app.register_blueprint(admin_bp) app.register_blueprint(pseo_bp) + app.register_blueprint(pipeline_bp) app.register_blueprint(webhooks_bp) # Content catch-all LAST — lives under / too diff --git a/web/src/padelnomics/worker.py b/web/src/padelnomics/worker.py index 25b3e29..ce8ba49 100644 --- a/web/src/padelnomics/worker.py +++ b/web/src/padelnomics/worker.py @@ -698,6 +698,32 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None: logger.info("Cleaned up %s old SEO metric rows", deleted) +@task("run_extraction") +async def handle_run_extraction(payload: dict) -> None: + """Run the full extraction pipeline (all extractors) in the background. + + Shells out to `uv run extract` in the repo root. The extraction CLI + manages its own state in .state.sqlite and writes to the landing zone. + """ + import subprocess + from pathlib import Path + + repo_root = Path(__file__).resolve().parents[4] + result = await asyncio.to_thread( + subprocess.run, + ["uv", "run", "--package", "padelnomics_extract", "extract"], + capture_output=True, + text=True, + timeout=7200, # 2-hour absolute timeout + cwd=str(repo_root), + ) + if result.returncode != 0: + raise RuntimeError( + f"Extraction failed (exit {result.returncode}): {result.stderr[:500]}" + ) + logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)") + + @task("generate_articles") async def handle_generate_articles(payload: dict) -> None: """Generate articles from a template in the background."""