From 060cb9b32ed56352bbf9719928f97d5fe172208a Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:44:03 +0100 Subject: [PATCH 1/6] feat(pipeline): scaffold Pipeline Console blueprint + sidebar + app registration - New pipeline_routes.py blueprint (url_prefix=/admin/pipeline) with: - All 9 routes (dashboard, overview, extractions, catalog, query editor) - Data access functions: state DB (sync+to_thread), serving meta, landing FS, workflows.toml - execute_user_query() added to analytics.py (columns+rows+error+elapsed_ms) - Query security: blocklist regex, 10k char limit, 1000 row cap, 10s timeout - Add 'Pipeline' sidebar section to base_admin.html (between Analytics and System) - Register pipeline_bp in app.py - Add run_extraction task handler to worker.py Subtask 1 of 6 Co-Authored-By: Claude Sonnet 4.6 --- web/src/padelnomics/admin/pipeline_routes.py | 699 ++++++++++++++++++ .../admin/templates/admin/base_admin.html | 6 + web/src/padelnomics/analytics.py | 50 +- web/src/padelnomics/app.py | 2 + web/src/padelnomics/worker.py | 26 + 5 files changed, 782 insertions(+), 1 deletion(-) create mode 100644 web/src/padelnomics/admin/pipeline_routes.py diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py new file mode 100644 index 0000000..542e2e6 --- /dev/null +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -0,0 +1,699 @@ +""" +Pipeline Console admin blueprint. + +Operational visibility for the data extraction and transformation pipeline: + /admin/pipeline/ → dashboard (health stats + tab container) + /admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats + /admin/pipeline/extractions → HTMX tab: filterable extraction run history + /admin/pipeline/extractions//mark-stale → POST: mark stuck "running" row as failed + /admin/pipeline/extract/trigger → POST: enqueue full extraction run + /admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data) + /admin/pipeline/catalog/ → HTMX partial: table detail (columns + sample) + /admin/pipeline/query → HTMX tab: SQL query editor + /admin/pipeline/query/execute → POST: run user SQL, return results table + +Data sources: + - data/landing/.state.sqlite (extraction run history — stdlib sqlite3, sync via to_thread) + - SERVING_DUCKDB_PATH/../_serving_meta.json (export timestamp + per-table row counts) + - analytics.duckdb (DuckDB read-only via analytics.execute_user_query) + - LANDING_DIR/ (filesystem scan for file sizes + dates) + - infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib) +""" +import asyncio +import json +import logging +import os +import re +import sqlite3 +import sys +import time +from datetime import UTC, datetime, timedelta +from pathlib import Path + +from quart import Blueprint, flash, redirect, render_template, request, url_for + +from ..auth.routes import role_required +from ..core import csrf_protect + +logger = logging.getLogger(__name__) + +bp = Blueprint( + "pipeline", + __name__, + template_folder=str(Path(__file__).parent / "templates"), + url_prefix="/admin/pipeline", +) + +# ── Config ──────────────────────────────────────────────────────────────────── + +_LANDING_DIR = os.environ.get("LANDING_DIR", "data/landing") +_SERVING_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb") + +# Repo root: web/src/padelnomics/admin/ → up 4 levels +_REPO_ROOT = Path(__file__).resolve().parents[5] +_WORKFLOWS_TOML = _REPO_ROOT / "infra" / "supervisor" / "workflows.toml" + +# A "running" row older than this is considered stale/crashed. +_STALE_THRESHOLD_HOURS = 2 + +# Query editor limits +_QUERY_MAX_CHARS = 10_000 +_QUERY_MAX_ROWS = 1_000 +_QUERY_TIMEOUT_SECONDS = 10 + +# Blocked SQL keywords (read-only connection enforces engine-level, this adds belt+suspenders) +_BLOCKED_SQL_RE = re.compile( + r"\b(ATTACH|COPY|EXPORT|INSTALL|LOAD|CREATE|DROP|ALTER|INSERT|UPDATE|DELETE|GRANT|REVOKE|PRAGMA)\b", + re.IGNORECASE, +) + + +# ── Sidebar data injection (same pattern as pseo_routes.py) ────────────────── + + +@bp.before_request +async def _inject_sidebar_data(): + """Load unread inbox count for the admin sidebar badge.""" + from quart import g + + from ..core import fetch_one + + try: + row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0") + g.admin_unread_count = row["cnt"] if row else 0 + except Exception: + g.admin_unread_count = 0 + + +@bp.context_processor +def _admin_context(): + from quart import g + + return {"unread_count": getattr(g, "admin_unread_count", 0)} + + +# ── Data access: state DB (sync, called via to_thread) ──────────────────────── + + +def _state_db_path() -> Path: + return Path(_LANDING_DIR) / ".state.sqlite" + + +def _fetch_extraction_summary_sync() -> dict: + """Aggregate stats from extraction_runs: total, success, failed, stale counts.""" + db_path = _state_db_path() + if not db_path.exists(): + return {"total": 0, "success": 0, "failed": 0, "running": 0, "stale": 0} + + cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + row = conn.execute( + """ + SELECT + COUNT(*) AS total, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed, + SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running, + SUM(CASE WHEN status = 'running' AND started_at < ? THEN 1 ELSE 0 END) AS stale + FROM extraction_runs + """, + (cutoff,), + ).fetchone() + return { + "total": row["total"] or 0, + "success": row["success"] or 0, + "failed": row["failed"] or 0, + "running": row["running"] or 0, + "stale": row["stale"] or 0, + } + finally: + conn.close() + + +def _fetch_latest_per_extractor_sync() -> list[dict]: + """Return most recent run for each extractor name.""" + db_path = _state_db_path() + if not db_path.exists(): + return [] + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """ + SELECT r.* + FROM extraction_runs r + INNER JOIN ( + SELECT extractor, MAX(run_id) AS max_id + FROM extraction_runs + GROUP BY extractor + ) latest ON r.run_id = latest.max_id + ORDER BY r.extractor + """ + ).fetchall() + return [dict(r) for r in rows] + finally: + conn.close() + + +def _fetch_extraction_runs_sync( + *, + extractor: str = "", + status: str = "", + limit: int = 50, + offset: int = 0, +) -> tuple[list[dict], int]: + """Return (rows, total_count) for the filtered run history.""" + assert 1 <= limit <= 200, f"limit must be 1–200, got {limit}" + assert offset >= 0, f"offset must be >= 0, got {offset}" + + db_path = _state_db_path() + if not db_path.exists(): + return [], 0 + + where_clauses = [] + params: list = [] + if extractor: + where_clauses.append("extractor = ?") + params.append(extractor) + if status: + where_clauses.append("status = ?") + params.append(status) + + where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else "" + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + total = conn.execute( + f"SELECT COUNT(*) FROM extraction_runs {where_sql}", params + ).fetchone()[0] + + rows = conn.execute( + f""" + SELECT run_id, extractor, started_at, finished_at, status, + files_written, files_skipped, bytes_written, + cursor_value, error_message + FROM extraction_runs {where_sql} + ORDER BY run_id DESC + LIMIT ? OFFSET ? + """, + params + [limit, offset], + ).fetchall() + return [dict(r) for r in rows], total + finally: + conn.close() + + +def _fetch_distinct_extractors_sync() -> list[str]: + """Return distinct extractor names for filter dropdowns.""" + db_path = _state_db_path() + if not db_path.exists(): + return [] + conn = sqlite3.connect(str(db_path)) + try: + rows = conn.execute( + "SELECT DISTINCT extractor FROM extraction_runs ORDER BY extractor" + ).fetchall() + return [r[0] for r in rows] + finally: + conn.close() + + +def _mark_run_failed_sync(run_id: int) -> bool: + """Mark a stuck 'running' row as 'failed'. Returns True if row was updated.""" + assert run_id > 0, f"run_id must be positive, got {run_id}" + db_path = _state_db_path() + if not db_path.exists(): + return False + conn = sqlite3.connect(str(db_path)) + try: + cur = conn.execute( + """ + UPDATE extraction_runs + SET status = 'failed', + finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), + error_message = 'Marked failed manually (admin — process appeared stuck)' + WHERE run_id = ? AND status = 'running' + """, + (run_id,), + ) + conn.commit() + return cur.rowcount > 0 + finally: + conn.close() + + +# ── Data access: serving meta ───────────────────────────────────────────────── + + +def _load_serving_meta() -> dict | None: + """Read _serving_meta.json alongside analytics.duckdb. Returns None if absent.""" + meta_path = Path(_SERVING_DUCKDB_PATH).parent / "_serving_meta.json" + if not meta_path.exists(): + return None + try: + return json.loads(meta_path.read_text()) + except Exception: + logger.warning("Failed to read _serving_meta.json", exc_info=True) + return None + + +# ── Data access: landing zone filesystem ───────────────────────────────────── + + +def _get_landing_zone_stats_sync() -> list[dict]: + """Scan LANDING_DIR and return per-source file counts + total bytes.""" + landing = Path(_LANDING_DIR) + if not landing.exists(): + return [] + + sources = [] + for source_dir in sorted(landing.iterdir()): + if not source_dir.is_dir() or source_dir.name.startswith("."): + continue + files = list(source_dir.rglob("*.gz")) + list(source_dir.rglob("*.jsonl")) + total_bytes = sum(f.stat().st_size for f in files) + latest_mtime = max((f.stat().st_mtime for f in files), default=None) + sources.append({ + "name": source_dir.name, + "file_count": len(files), + "total_bytes": total_bytes, + "latest_mtime": ( + datetime.fromtimestamp(latest_mtime, tz=UTC).strftime("%Y-%m-%d %H:%M") + if latest_mtime + else None + ), + }) + return sources + + +# ── Data access: workflows.toml ─────────────────────────────────────────────── + +_SCHEDULE_LABELS = { + "hourly": "Every hour", + "daily": "Daily", + "weekly": "Weekly", + "monthly": "Monthly", +} + + +def _load_workflows() -> list[dict]: + """Parse workflows.toml and return workflow definitions with human schedule labels.""" + if not _WORKFLOWS_TOML.exists(): + return [] + + if sys.version_info >= (3, 11): + import tomllib + + data = tomllib.loads(_WORKFLOWS_TOML.read_text()) + else: + # Fallback for older Python (shouldn't happen — project requires 3.11+) + try: + import tomli as tomllib # type: ignore[no-redef] + + data = tomllib.loads(_WORKFLOWS_TOML.read_text()) + except ImportError: + return [] + + workflows = [] + for name, config in data.items(): + schedule = config.get("schedule", "") + schedule_label = _SCHEDULE_LABELS.get(schedule, schedule) + workflows.append({ + "name": name, + "module": config.get("module", ""), + "schedule": schedule, + "schedule_label": schedule_label, + "depends_on": config.get("depends_on", []), + }) + return workflows + + +# ── Route helpers ───────────────────────────────────────────────────────────── + + +def _format_bytes(n: int) -> str: + """Human-readable byte count.""" + if n < 1024: + return f"{n} B" + if n < 1024 * 1024: + return f"{n / 1024:.1f} KB" + return f"{n / 1024 / 1024:.1f} MB" + + +def _duration_str(started_at: str | None, finished_at: str | None) -> str: + """Return human-readable duration, or '' if unavailable.""" + if not started_at or not finished_at: + return "" + try: + fmt = "%Y-%m-%dT%H:%M:%SZ" + start = datetime.strptime(started_at, fmt) + end = datetime.strptime(finished_at, fmt) + delta = int((end - start).total_seconds()) + if delta < 60: + return f"{delta}s" + return f"{delta // 60}m {delta % 60}s" + except ValueError: + return "" + + +def _is_stale(run: dict) -> bool: + """True if a 'running' row has been stuck longer than the stale threshold.""" + if run.get("status") != "running": + return False + started = run.get("started_at", "") + if not started: + return True + try: + fmt = "%Y-%m-%dT%H:%M:%SZ" + start = datetime.strptime(started, fmt).replace(tzinfo=UTC) + return (datetime.now(UTC) - start) > timedelta(hours=_STALE_THRESHOLD_HOURS) + except ValueError: + return False + + +# ── Dashboard ───────────────────────────────────────────────────────────────── + + +@bp.route("/") +@role_required("admin") +async def pipeline_dashboard(): + """Main page: health stat cards + tab container.""" + summary = await asyncio.to_thread(_fetch_extraction_summary_sync) + serving_meta = await asyncio.to_thread(_load_serving_meta) + + total_serving_tables = len(serving_meta["tables"]) if serving_meta else 0 + last_export = serving_meta.get("exported_at_utc", "")[:19].replace("T", " ") if serving_meta else "—" + + success_rate = 0 + if summary["total"] > 0: + success_rate = round(100 * summary["success"] / summary["total"]) + + return await render_template( + "admin/pipeline.html", + summary=summary, + success_rate=success_rate, + total_serving_tables=total_serving_tables, + last_export=last_export, + admin_page="pipeline", + ) + + +# ── Overview tab ───────────────────────────────────────────────────────────── + + +@bp.route("/overview") +@role_required("admin") +async def pipeline_overview(): + """HTMX tab: extraction status per source, serving freshness, landing zone.""" + latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather( + asyncio.to_thread(_fetch_latest_per_extractor_sync), + asyncio.to_thread(_get_landing_zone_stats_sync), + asyncio.to_thread(_load_workflows), + asyncio.to_thread(_load_serving_meta), + ) + + # Build a lookup: extractor name → latest run + latest_by_name = {r["extractor"]: r for r in latest_runs} + + # Enrich each workflow with its latest run data + cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + workflow_rows = [] + for wf in workflows: + run = latest_by_name.get(wf["name"]) + workflow_rows.append({ + "workflow": wf, + "run": run, + "stale": _is_stale(run) if run else False, + }) + + # Compute landing zone totals + total_landing_bytes = sum(s["total_bytes"] for s in landing_stats) + + return await render_template( + "admin/partials/pipeline_overview.html", + workflow_rows=workflow_rows, + landing_stats=landing_stats, + total_landing_bytes=total_landing_bytes, + serving_meta=serving_meta, + format_bytes=_format_bytes, + ) + + +# ── Extractions tab ──────────────────────────────────────────────────────────── + + +@bp.route("/extractions") +@role_required("admin") +async def pipeline_extractions(): + """HTMX tab: paginated + filtered extraction run history.""" + extractor_filter = request.args.get("extractor", "") + status_filter = request.args.get("status", "") + page = max(1, int(request.args.get("page", 1))) + per_page = 30 + + (runs, total), extractors = await asyncio.gather( + asyncio.to_thread( + _fetch_extraction_runs_sync, + extractor=extractor_filter, + status=status_filter, + limit=per_page, + offset=(page - 1) * per_page, + ), + asyncio.to_thread(_fetch_distinct_extractors_sync), + ) + + # Enrich rows with computed fields + for run in runs: + run["duration"] = _duration_str(run.get("started_at"), run.get("finished_at")) + run["bytes_label"] = _format_bytes(run.get("bytes_written") or 0) + run["is_stale"] = _is_stale(run) + + total_pages = max(1, (total + per_page - 1) // per_page) + + return await render_template( + "admin/partials/pipeline_extractions.html", + runs=runs, + total=total, + page=page, + per_page=per_page, + total_pages=total_pages, + extractors=extractors, + extractor_filter=extractor_filter, + status_filter=status_filter, + ) + + +@bp.route("/extractions//mark-stale", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_mark_stale(run_id: int): + """Mark a stuck 'running' extraction row as 'failed'.""" + updated = await asyncio.to_thread(_mark_run_failed_sync, run_id) + if updated: + await flash(f"Run #{run_id} marked as failed.", "success") + else: + await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning") + return redirect(url_for("pipeline.pipeline_extractions")) + + +# ── Trigger extraction ──────────────────────────────────────────────────────── + + +@bp.route("/extract/trigger", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_trigger_extract(): + """Enqueue a full pipeline extraction run.""" + from ..worker import enqueue + + await enqueue("run_extraction") + await flash("Extraction run queued. Check the task queue for progress.", "success") + return redirect(url_for("pipeline.pipeline_dashboard")) + + +# ── Catalog tab ─────────────────────────────────────────────────────────────── + + +@bp.route("/catalog") +@role_required("admin") +async def pipeline_catalog(): + """HTMX tab: list serving tables with row counts + column counts.""" + from ..analytics import fetch_analytics + + schema_rows = await fetch_analytics( + """ + SELECT table_name, column_name, data_type, ordinal_position + FROM information_schema.columns + WHERE table_schema = 'serving' + ORDER BY table_name, ordinal_position + """ + ) + + # Group by table + tables: dict[str, dict] = {} + for row in schema_rows: + tname = row["table_name"] + if tname not in tables: + tables[tname] = {"name": tname, "columns": [], "column_count": 0} + tables[tname]["columns"].append({ + "name": row["column_name"], + "type": row["data_type"], + }) + tables[tname]["column_count"] += 1 + + # Enrich with row counts from serving meta + serving_meta = await asyncio.to_thread(_load_serving_meta) + meta_counts = serving_meta.get("tables", {}) if serving_meta else {} + for tname, tdata in tables.items(): + tdata["row_count"] = meta_counts.get(tname, {}).get("row_count") + + return await render_template( + "admin/partials/pipeline_catalog.html", + tables=list(tables.values()), + serving_meta=serving_meta, + ) + + +@bp.route("/catalog/") +@role_required("admin") +async def pipeline_table_detail(table_name: str): + """HTMX partial: column list + 10-row sample for a serving table.""" + from ..analytics import fetch_analytics + + # Validate table name before using in SQL + if not re.match(r"^[a-z_][a-z0-9_]*$", table_name): + return "Invalid table name", 400 + + # Confirm table exists in serving schema + exists = await fetch_analytics( + "SELECT 1 FROM information_schema.tables" + " WHERE table_schema = 'serving' AND table_name = ?", + [table_name], + ) + if not exists: + return f"Table serving.{table_name} not found", 404 + + columns, sample = await asyncio.gather( + fetch_analytics( + "SELECT column_name, data_type, ordinal_position" + " FROM information_schema.columns" + " WHERE table_schema = 'serving' AND table_name = ?" + " ORDER BY ordinal_position", + [table_name], + ), + fetch_analytics( + f"SELECT * FROM serving.{table_name} LIMIT 10" # noqa: S608 + ), + ) + + return await render_template( + "admin/partials/pipeline_table_detail.html", + table_name=table_name, + columns=columns, + sample=sample, + ) + + +# ── Query editor ────────────────────────────────────────────────────────────── + + +@bp.route("/query") +@role_required("admin") +async def pipeline_query_editor(): + """HTMX tab: SQL query editor with schema sidebar.""" + from ..analytics import fetch_analytics + + schema_rows = await fetch_analytics( + """ + SELECT table_name, column_name, data_type + FROM information_schema.columns + WHERE table_schema = 'serving' + ORDER BY table_name, ordinal_position + """ + ) + + # Group by table for the schema sidebar + schema: dict[str, list] = {} + for row in schema_rows: + tname = row["table_name"] + if tname not in schema: + schema[tname] = [] + schema[tname].append({"name": row["column_name"], "type": row["data_type"]}) + + return await render_template( + "admin/partials/pipeline_query.html", + schema=schema, + max_rows=_QUERY_MAX_ROWS, + timeout_seconds=_QUERY_TIMEOUT_SECONDS, + ) + + +@bp.route("/query/execute", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_query_execute(): + """Run user-submitted SQL and return a results table partial.""" + from ..analytics import execute_user_query + + form = await request.form + sql = (form.get("sql") or "").strip() + + # Input validation + if not sql: + return await render_template( + "admin/partials/pipeline_query_results.html", + error="SQL query is empty.", + columns=[], + rows=[], + row_count=0, + elapsed_ms=0, + truncated=False, + ) + + if len(sql) > _QUERY_MAX_CHARS: + return await render_template( + "admin/partials/pipeline_query_results.html", + error=f"Query too long ({len(sql):,} chars). Maximum is {_QUERY_MAX_CHARS:,} characters.", + columns=[], + rows=[], + row_count=0, + elapsed_ms=0, + truncated=False, + ) + + if _BLOCKED_SQL_RE.search(sql): + return await render_template( + "admin/partials/pipeline_query_results.html", + error="Query contains a blocked keyword. Only SELECT statements are allowed.", + columns=[], + rows=[], + row_count=0, + elapsed_ms=0, + truncated=False, + ) + + columns, rows, error, elapsed_ms = await execute_user_query( + sql, + max_rows=_QUERY_MAX_ROWS, + timeout_seconds=_QUERY_TIMEOUT_SECONDS, + ) + + truncated = len(rows) >= _QUERY_MAX_ROWS + + return await render_template( + "admin/partials/pipeline_query_results.html", + error=error, + columns=columns, + rows=rows, + row_count=len(rows), + elapsed_ms=elapsed_ms, + truncated=truncated, + ) diff --git a/web/src/padelnomics/admin/templates/admin/base_admin.html b/web/src/padelnomics/admin/templates/admin/base_admin.html index d5ffbf0..be22415 100644 --- a/web/src/padelnomics/admin/templates/admin/base_admin.html +++ b/web/src/padelnomics/admin/templates/admin/base_admin.html @@ -133,6 +133,12 @@ SEO Hub +
Pipeline
+ + + Pipeline + +
System
diff --git a/web/src/padelnomics/analytics.py b/web/src/padelnomics/analytics.py index 5379a67..d7d4caa 100644 --- a/web/src/padelnomics/analytics.py +++ b/web/src/padelnomics/analytics.py @@ -5,13 +5,16 @@ Opens a single long-lived DuckDB connection at startup (read_only=True). All queries run via asyncio.to_thread() to avoid blocking the event loop. Usage: - from .analytics import fetch_analytics + from .analytics import fetch_analytics, execute_user_query rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"]) + + cols, rows, error, elapsed_ms = await execute_user_query("SELECT city_slug FROM serving.city_market_profile LIMIT 5") """ import asyncio import logging import os +import time from pathlib import Path from typing import Any @@ -77,3 +80,48 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str except Exception: logger.exception("DuckDB analytics query failed: %.200s", sql) return [] + + +async def execute_user_query( + sql: str, + max_rows: int = 1000, + timeout_seconds: int = 10, +) -> tuple[list[str], list[tuple], str | None, float]: + """ + Run an admin-submitted SQL query. + + Returns (columns, rows, error, elapsed_ms). + - columns: list of column name strings (empty on error) + - rows: list of value tuples, capped at max_rows + - error: error message string, or None on success + - elapsed_ms: wall-clock query time in milliseconds + """ + assert sql, "sql must not be empty" + assert 1 <= max_rows <= 10_000, f"max_rows must be 1–10000, got {max_rows}" + assert 1 <= timeout_seconds <= 60, f"timeout_seconds must be 1–60, got {timeout_seconds}" + + if _conn is None: + return [], [], "Analytics database is not available.", 0.0 + + def _run() -> tuple[list[str], list[tuple], str | None, float]: + t0 = time.monotonic() + cur = _conn.cursor() + try: + rel = cur.execute(sql) + cols = [d[0] for d in rel.description] + rows = rel.fetchmany(max_rows) + elapsed_ms = round((time.monotonic() - t0) * 1000, 1) + return cols, rows, None, elapsed_ms + except Exception as exc: + elapsed_ms = round((time.monotonic() - t0) * 1000, 1) + return [], [], str(exc), elapsed_ms + finally: + cur.close() + + try: + return await asyncio.wait_for( + asyncio.to_thread(_run), + timeout=timeout_seconds, + ) + except asyncio.TimeoutError: + return [], [], f"Query timed out after {timeout_seconds}s.", 0.0 diff --git a/web/src/padelnomics/app.py b/web/src/padelnomics/app.py index 5b29b64..fbcf74b 100644 --- a/web/src/padelnomics/app.py +++ b/web/src/padelnomics/app.py @@ -313,6 +313,7 @@ def create_app() -> Quart: # Blueprint registration # ------------------------------------------------------------------------- + from .admin.pipeline_routes import bp as pipeline_bp from .admin.pseo_routes import bp as pseo_bp from .admin.routes import bp as admin_bp from .auth.routes import bp as auth_bp @@ -339,6 +340,7 @@ def create_app() -> Quart: app.register_blueprint(billing_bp) app.register_blueprint(admin_bp) app.register_blueprint(pseo_bp) + app.register_blueprint(pipeline_bp) app.register_blueprint(webhooks_bp) # Content catch-all LAST — lives under / too diff --git a/web/src/padelnomics/worker.py b/web/src/padelnomics/worker.py index 25b3e29..ce8ba49 100644 --- a/web/src/padelnomics/worker.py +++ b/web/src/padelnomics/worker.py @@ -698,6 +698,32 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None: logger.info("Cleaned up %s old SEO metric rows", deleted) +@task("run_extraction") +async def handle_run_extraction(payload: dict) -> None: + """Run the full extraction pipeline (all extractors) in the background. + + Shells out to `uv run extract` in the repo root. The extraction CLI + manages its own state in .state.sqlite and writes to the landing zone. + """ + import subprocess + from pathlib import Path + + repo_root = Path(__file__).resolve().parents[4] + result = await asyncio.to_thread( + subprocess.run, + ["uv", "run", "--package", "padelnomics_extract", "extract"], + capture_output=True, + text=True, + timeout=7200, # 2-hour absolute timeout + cwd=str(repo_root), + ) + if result.returncode != 0: + raise RuntimeError( + f"Extraction failed (exit {result.returncode}): {result.stderr[:500]}" + ) + logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)") + + @task("generate_articles") async def handle_generate_articles(payload: dict) -> None: """Generate articles from a template in the background.""" From cac876e48f4ca8791226c2e4bc30e4aac841e046 Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:53:02 +0100 Subject: [PATCH 2/6] feat(pipeline): dashboard + overview tab templates - pipeline.html: 4 stat cards (total runs, success rate, serving tables, last export) + stale-run warning banner + tab bar (Overview/Extractions/ Catalog/Query) + tab container (lazy-loaded via HTMX on page load) - partials/pipeline_overview.html: extraction status grid (one card per workflow with status dot, schedule, last run timestamp, error preview), serving freshness table (row counts per table), landing zone file stats Subtask 2 of 6 Co-Authored-By: Claude Sonnet 4.6 --- .../admin/partials/pipeline_overview.html | 121 ++++++++++++++++++ .../admin/templates/admin/pipeline.html | 116 +++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html create mode 100644 web/src/padelnomics/admin/templates/admin/pipeline.html diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html new file mode 100644 index 0000000..7015fc5 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html @@ -0,0 +1,121 @@ + + + +
+

Extraction Status

+ {% if workflow_rows %} +
+ {% for row in workflow_rows %} + {% set wf = row.workflow %} + {% set run = row.run %} + {% set stale = row.stale %} +
+
+ {% if not run %} + + {% elif stale %} + + {% elif run.status == 'success' %} + + {% elif run.status == 'failed' %} + + {% else %} + + {% endif %} + {{ wf.name }} + {% if stale %} + stale + {% endif %} +
+

{{ wf.schedule_label }}

+ {% if run %} +

{{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }}

+ {% if run.status == 'failed' and run.error_message %} +

+ {{ run.error_message[:80] }}{% if run.error_message|length > 80 %}…{% endif %} +

+ {% endif %} + {% if run.files_written %} +

{{ run.files_written }} file{{ 's' if run.files_written != 1 }}, + {{ "{:,}".format(run.bytes_written or 0) }} B

+ {% endif %} + {% else %} +

No runs yet

+ {% endif %} +
+ {% endfor %} +
+ {% else %} +

No workflows found. Check that infra/supervisor/workflows.toml exists.

+ {% endif %} +
+ + +
+ + +
+

Serving Tables

+ {% if serving_meta %} +

+ Last export: {{ serving_meta.exported_at_utc[:19].replace('T', ' ') }} +

+
+ + + + + + + + {% for tname, tmeta in serving_meta.tables.items() | sort %} + + + + + {% endfor %} + +
TableRows
serving.{{ tname }}{{ "{:,}".format(tmeta.row_count) }}
+ {% else %} +

+ _serving_meta.json not found — run the pipeline to generate it. +

+ {% endif %} + + + +
+

Landing Zone + + Total: {{ format_bytes(total_landing_bytes) }} + +

+ {% if landing_stats %} + + + + + + + + + + + {% for s in landing_stats %} + + + + + + + {% endfor %} + +
SourceFilesSizeLatest
{{ s.name }}{{ s.file_count }}{{ format_bytes(s.total_bytes) }}{{ s.latest_mtime or '—' }}
+ {% else %} +

+ Landing zone empty or not found at data/landing. +

+ {% endif %} +
+ + diff --git a/web/src/padelnomics/admin/templates/admin/pipeline.html b/web/src/padelnomics/admin/templates/admin/pipeline.html new file mode 100644 index 0000000..f6ff572 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/pipeline.html @@ -0,0 +1,116 @@ +{% extends "admin/base_admin.html" %} +{% set admin_page = "pipeline" %} +{% block title %}Data Pipeline - Admin - {{ config.APP_NAME }}{% endblock %} + +{% block admin_head %} + +{% endblock %} + +{% block admin_content %} +
+
+

Data Pipeline

+

Extraction status, data catalog, and ad-hoc query editor

+
+
+
+ + +
+
+

Total Runs

+

{{ summary.total | default(0) }}

+
+
+

Success Rate

+

+ {{ success_rate }}% +

+
+
+

Serving Tables

+

{{ total_serving_tables }}

+
+
+

Last Export

+

{{ last_export }}

+
+
+ + {% if summary.stale > 0 %} +
+ {{ summary.stale }} stale run{{ 's' if summary.stale != 1 }}. + An extraction appears to have crashed without updating its status. + Go to the Extractions tab to mark it failed. +
+ {% endif %} + + +
+ + + + +
+ + +
+
+

Loading overview…

+
+
+ + +{% endblock %} From 947a1a778eacf27a9abddec4c602cbc223ad3ec1 Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:53:36 +0100 Subject: [PATCH 3/6] feat(pipeline): extractions tab template - Filterable extraction run history table (extractor + status dropdowns, HTMX live filter via 'change' trigger) - Status badges with stale row highlighting (amber background) - 'Mark Failed' button for stuck 'running' rows (with confirm dialog) - 'Run All Extractors' trigger button - Pagination via hx-vals Subtask 3 of 6 Co-Authored-By: Claude Sonnet 4.6 --- .../admin/partials/pipeline_extractions.html | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 web/src/padelnomics/admin/templates/admin/partials/pipeline_extractions.html diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_extractions.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_extractions.html new file mode 100644 index 0000000..de54678 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_extractions.html @@ -0,0 +1,135 @@ + + + +
+
+
+
+ + +
+
+ + +
+
+ +
+ + +
+
+
+ + +
+
+

+ Showing {{ runs | length }} of {{ total }} run{{ 's' if total != 1 }} + {% if extractor_filter or status_filter %} (filtered){% endif %} +

+ {% if total_pages > 1 %} +
+ {% if page > 1 %} + + {% endif %} + {{ page }} / {{ total_pages }} + {% if page < total_pages %} + + {% endif %} +
+ {% endif %} +
+ + {% if runs %} +
+ + + + + + + + + + + + + + + + {% for run in runs %} + + + + + + + + + + + + {% endfor %} + +
#ExtractorStartedDurationStatusFilesSizeError
#{{ run.run_id }}{{ run.extractor }}{{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }}{{ run.duration or '—' }} + {% if run.is_stale %} + stale + {% elif run.status == 'success' %} + success + {% elif run.status == 'failed' %} + failed + {% else %} + {{ run.status }} + {% endif %} + {{ run.files_written or 0 }}{{ run.bytes_label }} + {% if run.error_message %} + + {{ run.error_message[:60] }}{% if run.error_message|length > 60 %}…{% endif %} + + {% endif %} + + {% if run.status == 'running' %} +
+ + +
+ {% endif %} +
+
+ {% else %} +

+ No extraction runs found{% if extractor_filter or status_filter %} matching the current filters{% endif %}. +

+ {% endif %} +
From 5b48a11e01ffda711c84fbf43a04a1f594401124 Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:54:21 +0100 Subject: [PATCH 4/6] feat(pipeline): catalog tab templates - partials/pipeline_catalog.html: accordion list of serving tables with row count badges, column count, click-to-expand lazy-loaded detail - partials/pipeline_table_detail.html: column schema grid + sticky-header sample data table (10 rows, truncated values with title attribute) - JS: toggleCatalogTable() + htmx.trigger(content, 'revealed') for lazy-loading detail only on first open Subtask 4 of 6 Co-Authored-By: Claude Sonnet 4.6 --- .../admin/partials/pipeline_catalog.html | 74 +++++++++++++++++++ .../admin/partials/pipeline_table_detail.html | 59 +++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 web/src/padelnomics/admin/templates/admin/partials/pipeline_catalog.html create mode 100644 web/src/padelnomics/admin/templates/admin/partials/pipeline_table_detail.html diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_catalog.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_catalog.html new file mode 100644 index 0000000..beb556f --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_catalog.html @@ -0,0 +1,74 @@ + + +{% if serving_meta %} +

+ Serving DB last exported: {{ serving_meta.exported_at_utc[:19].replace('T', ' ') }} +

+{% endif %} + +{% if tables %} +
+ {% for table in tables %} +
+ +
+
+ + + + serving.{{ table.name }} + {{ table.column_count }} col{{ 's' if table.column_count != 1 }} + {% if table.row_count is not none %} + + {{ "{:,}".format(table.row_count) }} rows + + {% endif %} +
+ + + +
+ + + +
+ {% endfor %} +
+{% else %} +
+

No serving tables found. Run the pipeline to generate them.

+
+{% endif %} + + diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_table_detail.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_table_detail.html new file mode 100644 index 0000000..7aade3c --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_table_detail.html @@ -0,0 +1,59 @@ + + +
+ + +
+

Schema

+
+ {% for col in columns %} +
+ {{ col.column_name }} + {{ col.data_type | upper }} +
+ {% endfor %} +
+
+ + + {% if sample %} +
+

+ Sample (first {{ sample | length }} rows) +

+
+ + + + {% for col in columns %} + + {% endfor %} + + + + {% for row in sample %} + + {% for col in columns %} + + {% endfor %} + + {% endfor %} + +
{{ col.column_name }}
+ {% set val = row[col.column_name] %} + {% if val is none %} + null + {% elif val | string | length > 40 %} + {{ val | string | truncate(40, true) }} + {% else %} + {{ val }} + {% endif %} +
+
+
+ {% else %} +

Table is empty.

+ {% endif %} + +
From 8f8f7f7acb84abcde3fd450858a3fe38977d98e6 Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 12:55:20 +0100 Subject: [PATCH 5/6] feat(pipeline): query editor tab templates - partials/pipeline_query.html: dark-themed SQL textarea (navy bg, Commit Mono, 12px border-radius, electric blue focus glow) + schema sidebar (collapsible per-table column lists with types) + controls bar (Execute, Clear, limit/timeout note) + Tab-key indent + Cmd/Ctrl+Enter submit - partials/pipeline_query_results.html: results table with sticky headers, horizontal scroll, row count + elapsed time metadata, truncation warning, error display in red monospace card Subtask 5 of 6 Co-Authored-By: Claude Sonnet 4.6 --- .../admin/partials/pipeline_query.html | 258 ++++++++++++++++++ .../partials/pipeline_query_results.html | 46 ++++ 2 files changed, 304 insertions(+) create mode 100644 web/src/padelnomics/admin/templates/admin/partials/pipeline_query.html create mode 100644 web/src/padelnomics/admin/templates/admin/partials/pipeline_query_results.html diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_query.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_query.html new file mode 100644 index 0000000..0afb9dc --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_query.html @@ -0,0 +1,258 @@ + + + +
+ + + +
+ +
+ + +
+ + + + + + + {{ max_rows | default(1000) }} row limit · {{ timeout_seconds | default(10) }}s timeout · SELECT only + +
+
+ + + +
+
+ + +
+ + diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_query_results.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_query_results.html new file mode 100644 index 0000000..5a75d75 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_query_results.html @@ -0,0 +1,46 @@ + +{% if error %} +
+ Query error + {{ error }} +
+{% elif columns %} +
+ {{ row_count | default(0) }} row{{ 's' if row_count != 1 }} + · + {{ "%.1f" | format(elapsed_ms) }} ms + {% if truncated %} + Result truncated at {{ row_count }} rows + {% endif %} +
+
+ + + + {% for col in columns %} + + {% endfor %} + + + + {% for row in rows %} + + {% for val in row %} + + {% endfor %} + + {% endfor %} + +
{{ col }}
+ {% if val is none %} + null + {% else %} + {{ val | string | truncate(60, true) }} + {% endif %} +
+
+{% else %} +
+

Query returned no rows.

+
+{% endif %} From d6376877954a7d67ea66ecd80f61632c633d437a Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 13:02:51 +0100 Subject: [PATCH 6/6] feat(pipeline): tests, docs, and ruff fixes (subtask 6/6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 29-test suite for all pipeline routes, data helpers, and query execution (test_pipeline.py); all 1333 tests pass - Fix ruff UP041: asyncio.TimeoutError → TimeoutError in analytics.py - Fix ruff UP036/F401: replace sys.version_info tomllib block with plain `import tomllib` (project requires Python 3.11+) - Fix ruff F841: remove unused `cutoff` variable in pipeline_overview - Update CHANGELOG.md with Pipeline Console entry - Update PROJECT.md: add Pipeline Console to Admin Panel done list Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 8 + PROJECT.md | 1 + web/src/padelnomics/admin/pipeline_routes.py | 19 +- web/src/padelnomics/analytics.py | 4 +- web/tests/test_pipeline.py | 578 +++++++++++++++++++ 5 files changed, 591 insertions(+), 19 deletions(-) create mode 100644 web/tests/test_pipeline.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 61ff8cf..fd5c456 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- **Pipeline Console admin section** — full operational visibility into the data engineering pipeline at `/admin/pipeline/`: + - **Overview tab** — extraction status grid (one card per workflow with status dot, schedule, last-run timestamp, error preview), serving table row counts from `_serving_meta.json`, landing zone file stats (per-source file count + total size) + - **Extractions tab** — filterable, paginated run history table from `.state.sqlite` (extractor + status dropdowns, HTMX live filter); stale "running" row detection (amber highlight) with "Mark Failed" button; "Run All Extractors" button enqueues `run_extraction` task + - **Catalog tab** — accordion list of serving tables with row count badges; click-to-expand lazy-loads column schema + 10-row sample data per table + - **Query editor tab** — dark-themed SQL textarea (`Commit Mono`, navy background, electric blue focus glow); schema sidebar (collapsible table/column list with types); Tab-key indent and Cmd/Ctrl+Enter submit; results table with sticky headers + row count + elapsed time; query security (read-only DuckDB, blocklist regex, 10k char limit, 1000 row cap, 10s timeout) + - **`analytics.execute_user_query()`** — new function returning `(columns, rows, error, elapsed_ms)` for admin query editor + - **`worker.run_extraction` task** — background handler shells out to `uv run extract` from repo root (2h timeout) + - 29 new tests covering all routes, data access helpers, security checks, and `execute_user_query()` - **Email template system** — all 11 transactional emails migrated from inline f-string HTML in `worker.py` to Jinja2 templates: - **Standalone renderer** (`email_templates.py`) — `render_email_template()` uses a module-level `jinja2.Environment` with `autoescape=True`, works outside Quart request context (worker process); `tformat` filter mirrors the one in `app.py` - **`_base.html`** — branded shell (dark header, 3px blue accent, white card body, footer with tagline + copyright); replaces the old `_email_wrap()` helper diff --git a/PROJECT.md b/PROJECT.md index 0a96835..970093a 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -114,6 +114,7 @@ - [x] **Admin email gallery** (`/admin/emails/gallery`) — card grid of all templates, EN/DE preview in sandboxed iframe, "View in sent log" cross-link; compose page now has HTMX live preview pane - [x] **pSEO Engine tab** (`/admin/pseo`) — content gap detection, data freshness signals, article health checks (hreflang orphans, missing build files, broken scenario refs), generation job monitoring with live progress bars - [x] **Marketplace admin dashboard** (`/admin/marketplace`) — lead funnel, credit economy, supplier engagement, live activity stream, inline feature flag toggles +- [x] **Pipeline Console** (`/admin/pipeline`) — 4-tab operational dashboard: extraction status grid per source, filterable run history with stale-run management ("Mark Failed"), data catalog with column schema + 10-row sample, SQL query editor with dark-themed textarea + schema sidebar + read-only security sandboxing (keyword blocklist, 10s timeout, 1,000-row cap) - [x] **Lead matching notifications** — `notify_matching_suppliers` task on quote verification + `send_weekly_lead_digest` every Monday; one-click CTA token in forward emails - [x] **Migration 0022** — `status_updated_at`, `supplier_note`, `cta_token` on `lead_forwards`; supplier respond endpoint; inline HTMX lead detail actions; extended quote form fields diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py index 542e2e6..62260f4 100644 --- a/web/src/padelnomics/admin/pipeline_routes.py +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -25,8 +25,7 @@ import logging import os import re import sqlite3 -import sys -import time +import tomllib from datetime import UTC, datetime, timedelta from pathlib import Path @@ -307,18 +306,7 @@ def _load_workflows() -> list[dict]: if not _WORKFLOWS_TOML.exists(): return [] - if sys.version_info >= (3, 11): - import tomllib - - data = tomllib.loads(_WORKFLOWS_TOML.read_text()) - else: - # Fallback for older Python (shouldn't happen — project requires 3.11+) - try: - import tomli as tomllib # type: ignore[no-redef] - - data = tomllib.loads(_WORKFLOWS_TOML.read_text()) - except ImportError: - return [] + data = tomllib.loads(_WORKFLOWS_TOML.read_text()) workflows = [] for name, config in data.items(): @@ -422,9 +410,6 @@ async def pipeline_overview(): latest_by_name = {r["extractor"]: r for r in latest_runs} # Enrich each workflow with its latest run data - cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime( - "%Y-%m-%dT%H:%M:%SZ" - ) workflow_rows = [] for wf in workflows: run = latest_by_name.get(wf["name"]) diff --git a/web/src/padelnomics/analytics.py b/web/src/padelnomics/analytics.py index d7d4caa..34f5486 100644 --- a/web/src/padelnomics/analytics.py +++ b/web/src/padelnomics/analytics.py @@ -74,7 +74,7 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str asyncio.to_thread(_run), timeout=_QUERY_TIMEOUT_SECONDS, ) - except asyncio.TimeoutError: + except TimeoutError: logger.error("DuckDB analytics query timed out after %ds: %.200s", _QUERY_TIMEOUT_SECONDS, sql) return [] except Exception: @@ -123,5 +123,5 @@ async def execute_user_query( asyncio.to_thread(_run), timeout=timeout_seconds, ) - except asyncio.TimeoutError: + except TimeoutError: return [], [], f"Query timed out after {timeout_seconds}s.", 0.0 diff --git a/web/tests/test_pipeline.py b/web/tests/test_pipeline.py new file mode 100644 index 0000000..e4352f3 --- /dev/null +++ b/web/tests/test_pipeline.py @@ -0,0 +1,578 @@ +""" +Tests for the Pipeline Console admin blueprint. + +Covers: + - admin/pipeline_routes.py: all 9 routes + - analytics.py: execute_user_query() function + - Data access functions: state DB, serving meta, landing zone +""" +import json +import sqlite3 +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import padelnomics.admin.pipeline_routes as pipeline_mod +import pytest +from padelnomics.core import utcnow_iso + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +@pytest.fixture +async def admin_client(app, db): + """Authenticated admin test client.""" + now = utcnow_iso() + async with db.execute( + "INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)", + ("pipeline-admin@test.com", "Pipeline Admin", now), + ) as cursor: + admin_id = cursor.lastrowid + await db.execute( + "INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,) + ) + await db.commit() + + async with app.test_client() as c: + async with c.session_transaction() as sess: + sess["user_id"] = admin_id + yield c + + +@pytest.fixture +def state_db_dir(): + """Temp directory with a seeded .state.sqlite for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / ".state.sqlite" + conn = sqlite3.connect(str(db_path)) + conn.execute( + """ + CREATE TABLE extraction_runs ( + run_id INTEGER PRIMARY KEY AUTOINCREMENT, + extractor TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + finished_at TEXT, + status TEXT NOT NULL DEFAULT 'running', + files_written INTEGER DEFAULT 0, + files_skipped INTEGER DEFAULT 0, + bytes_written INTEGER DEFAULT 0, + cursor_value TEXT, + error_message TEXT + ) + """ + ) + conn.executemany( + """INSERT INTO extraction_runs + (extractor, started_at, finished_at, status, files_written, bytes_written, error_message) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [ + ("overpass", "2026-02-01T08:00:00Z", "2026-02-01T08:05:00Z", "success", 1, 400000, None), + ("playtomic_tenants", "2026-02-24T06:00:00Z", "2026-02-24T06:10:00Z", "success", 1, 7700000, None), + ("playtomic_availability", "2026-02-25T06:00:00Z", "2026-02-25T07:30:00Z", "failed", 0, 0, "ReadTimeout: connection timed out"), + # Stale running row (started 1970) + ("eurostat", "1970-01-01T00:00:00Z", None, "running", 0, 0, None), + ], + ) + conn.commit() + conn.close() + yield tmpdir + + +@pytest.fixture +def serving_meta_dir(): + """Temp directory with a _serving_meta.json file.""" + with tempfile.TemporaryDirectory() as tmpdir: + meta = { + "exported_at_utc": "2026-02-25T08:30:00+00:00", + "tables": { + "city_market_profile": {"row_count": 612}, + "planner_defaults": {"row_count": 612}, + "pseo_city_costs_de": {"row_count": 487}, + }, + } + (Path(tmpdir) / "_serving_meta.json").write_text(json.dumps(meta)) + # Fake duckdb file so the path exists + (Path(tmpdir) / "analytics.duckdb").touch() + yield tmpdir + + +# ── Schema + query mocks ────────────────────────────────────────────────────── + +_MOCK_SCHEMA_ROWS = [ + {"table_name": "city_market_profile", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1}, + {"table_name": "city_market_profile", "column_name": "country_code", "data_type": "VARCHAR", "ordinal_position": 2}, + {"table_name": "city_market_profile", "column_name": "marktreife_score", "data_type": "DOUBLE", "ordinal_position": 3}, + {"table_name": "planner_defaults", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1}, +] + +_MOCK_TABLE_EXISTS = [{"1": 1}] +_MOCK_SAMPLE_ROWS = [ + {"city_slug": "berlin", "country_code": "DE", "marktreife_score": 82.5}, + {"city_slug": "munich", "country_code": "DE", "marktreife_score": 77.0}, +] + + +def _make_fetch_analytics_mock(schema=True): + """Return an async mock for fetch_analytics that returns schema or table data.""" + async def _mock(sql, params=None): + if "information_schema.tables" in sql: + return _MOCK_TABLE_EXISTS + if "information_schema.columns" in sql and params: + return [r for r in _MOCK_SCHEMA_ROWS if r["table_name"] == params[0]] + if "information_schema.columns" in sql: + return _MOCK_SCHEMA_ROWS + if "city_market_profile" in sql: + return _MOCK_SAMPLE_ROWS + return [] + return _mock + + +# ════════════════════════════════════════════════════════════════════════════ +# Dashboard +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_dashboard_loads(admin_client, state_db_dir, serving_meta_dir): + """Dashboard returns 200 with stat cards.""" + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "Data Pipeline" in data + assert "Total Runs" in data + assert "Success Rate" in data + assert "Serving Tables" in data + + +@pytest.mark.asyncio +async def test_pipeline_dashboard_requires_admin(client): + """Unauthenticated access redirects to login.""" + resp = await client.get("/admin/pipeline/") + assert resp.status_code in (302, 401) + + +@pytest.mark.asyncio +async def test_pipeline_dashboard_stale_warning(admin_client, state_db_dir, serving_meta_dir): + """Stale run banner appears when a running row is old.""" + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "stale run" in data.lower() + + +# ════════════════════════════════════════════════════════════════════════════ +# Overview tab +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_overview(admin_client, state_db_dir, serving_meta_dir): + """Overview tab returns extraction status grid and serving table counts.""" + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/overview") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "city_market_profile" in data + assert "612" in data # row count from serving meta + + +@pytest.mark.asyncio +async def test_pipeline_overview_no_state_db(admin_client, serving_meta_dir): + """Overview handles gracefully when .state.sqlite doesn't exist.""" + with tempfile.TemporaryDirectory() as empty_dir: + with ( + patch.object(pipeline_mod, "_LANDING_DIR", empty_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/overview") + assert resp.status_code == 200 + + +# ════════════════════════════════════════════════════════════════════════════ +# Extractions tab +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_extractions_list(admin_client, state_db_dir): + """Extractions tab returns run history table.""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.get("/admin/pipeline/extractions") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "overpass" in data + assert "playtomic_tenants" in data + assert "success" in data + + +@pytest.mark.asyncio +async def test_pipeline_extractions_filter_extractor(admin_client, state_db_dir): + """Extractor filter returns only 1 matching run (not 4).""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.get("/admin/pipeline/extractions?extractor=overpass") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "overpass" in data + # Filtered result should show "Showing 1 of 1" + assert "Showing 1 of 1" in data + + +@pytest.mark.asyncio +async def test_pipeline_extractions_filter_status(admin_client, state_db_dir): + """Status filter returns only runs with matching status.""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.get("/admin/pipeline/extractions?status=failed") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "failed" in data + assert "ReadTimeout" in data # error message shown + + +@pytest.mark.asyncio +async def test_pipeline_mark_stale(admin_client, state_db_dir): + """POST to mark-stale updates a running row to failed.""" + # Find the run_id of the stale running row (eurostat, started 1970) + db_path = Path(state_db_dir) / ".state.sqlite" + conn = sqlite3.connect(str(db_path)) + row = conn.execute( + "SELECT run_id FROM extraction_runs WHERE status = 'running' ORDER BY run_id LIMIT 1" + ).fetchone() + conn.close() + assert row is not None + run_id = row[0] + + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.post( + f"/admin/pipeline/extractions/{run_id}/mark-stale", + form={"csrf_token": "test"}, + ) + # Should redirect (flash + redirect pattern) + assert resp.status_code in (302, 200) + + # Verify DB was updated + conn = sqlite3.connect(str(db_path)) + updated = conn.execute( + "SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,) + ).fetchone() + conn.close() + assert updated[0] == "failed" + + +@pytest.mark.asyncio +async def test_pipeline_mark_stale_already_finished(admin_client, state_db_dir): + """Cannot mark an already-finished (success) row as stale.""" + db_path = Path(state_db_dir) / ".state.sqlite" + conn = sqlite3.connect(str(db_path)) + row = conn.execute( + "SELECT run_id FROM extraction_runs WHERE status = 'success' ORDER BY run_id LIMIT 1" + ).fetchone() + conn.close() + run_id = row[0] + + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.post( + f"/admin/pipeline/extractions/{run_id}/mark-stale", + form={"csrf_token": "test"}, + ) + assert resp.status_code in (302, 200) + # Verify status unchanged + conn = sqlite3.connect(str(db_path)) + status = conn.execute( + "SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,) + ).fetchone()[0] + conn.close() + assert status == "success" + + +@pytest.mark.asyncio +async def test_pipeline_trigger_extract(admin_client, state_db_dir): + """POST to trigger enqueues a run_extraction task and redirects.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + # enqueue is imported inside the route handler, so patch at the source module + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch("padelnomics.worker.enqueue", new_callable=AsyncMock) as mock_enqueue, + ): + resp = await admin_client.post( + "/admin/pipeline/extract/trigger", + form={"csrf_token": "test"}, + ) + assert resp.status_code in (302, 200) + mock_enqueue.assert_called_once_with("run_extraction") + + +# ════════════════════════════════════════════════════════════════════════════ +# Catalog tab +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_catalog(admin_client, serving_meta_dir): + """Catalog tab lists serving tables with row counts.""" + with ( + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()), + ): + resp = await admin_client.get("/admin/pipeline/catalog") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "city_market_profile" in data + assert "612" in data # row count from serving meta + + +@pytest.mark.asyncio +async def test_pipeline_table_detail(admin_client): + """Table detail returns columns and sample rows.""" + with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): + resp = await admin_client.get("/admin/pipeline/catalog/city_market_profile") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "city_slug" in data + assert "berlin" in data # from sample rows + + +@pytest.mark.asyncio +async def test_pipeline_table_detail_invalid_name(admin_client): + """Table name with uppercase characters (invalid) returns 400.""" + with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): + resp = await admin_client.get("/admin/pipeline/catalog/InvalidTableName") + assert resp.status_code in (400, 404) + + +@pytest.mark.asyncio +async def test_pipeline_table_detail_unknown_table(admin_client): + """Non-existent table returns 404.""" + async def _empty_fetch(sql, params=None): + return [] + + with patch("padelnomics.analytics.fetch_analytics", side_effect=_empty_fetch): + resp = await admin_client.get("/admin/pipeline/catalog/nonexistent_table") + assert resp.status_code == 404 + + +# ════════════════════════════════════════════════════════════════════════════ +# Query editor +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_query_editor_loads(admin_client): + """Query editor tab returns textarea and schema sidebar.""" + with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): + resp = await admin_client.get("/admin/pipeline/query") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "query-editor" in data + assert "schema-panel" in data + assert "city_market_profile" in data + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_valid(admin_client): + """Valid SELECT query returns results table.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + mock_result = ( + ["city_slug", "country_code"], + [("berlin", "DE"), ("munich", "DE")], + None, + 12.5, + ) + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result): + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "SELECT city_slug, country_code FROM serving.city_market_profile"}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "berlin" in data + assert "city_slug" in data + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_blocked_keyword(admin_client): + """Queries with blocked keywords return an error (no DB call made).""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "DROP TABLE serving.city_market_profile"}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "blocked" in data.lower() or "error" in data.lower() + mock_q.assert_not_called() + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_empty(admin_client): + """Empty SQL returns validation error.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": ""}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "empty" in data.lower() or "error" in data.lower() + mock_q.assert_not_called() + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_too_long(admin_client): + """SQL over 10,000 chars returns a length error.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "SELECT " + "x" * 10_001}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "long" in data.lower() or "error" in data.lower() + mock_q.assert_not_called() + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_db_error(admin_client): + """DB error from execute_user_query is displayed as error message.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + mock_result = ([], [], "Table 'foo' not found", 5.0) + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result): + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "SELECT * FROM serving.foo"}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "not found" in data + + +# ════════════════════════════════════════════════════════════════════════════ +# analytics.execute_user_query() +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_execute_user_query_no_connection(): + """Returns error tuple when _conn is None.""" + import padelnomics.analytics as analytics_mod + + with patch.object(analytics_mod, "_conn", None): + cols, rows, error, elapsed = await analytics_mod.execute_user_query("SELECT 1") + assert cols == [] + assert rows == [] + assert error is not None + assert "not available" in error.lower() + + +@pytest.mark.asyncio +async def test_execute_user_query_timeout(): + """Returns timeout error when query takes too long.""" + import asyncio + + import padelnomics.analytics as analytics_mod + + def _slow(): + import time + time.sleep(10) + return [], [], None, 0.0 + + mock_conn = MagicMock() + + async def _slow_thread(_fn): + await asyncio.sleep(10) + + with ( + patch.object(analytics_mod, "_conn", mock_conn), + patch("padelnomics.analytics.asyncio.to_thread", side_effect=_slow_thread), + ): + cols, rows, error, elapsed = await analytics_mod.execute_user_query( + "SELECT 1", timeout_seconds=1 + ) + assert error is not None + assert "timed out" in error.lower() + + +# ════════════════════════════════════════════════════════════════════════════ +# Unit tests: data access helpers +# ════════════════════════════════════════════════════════════════════════════ + + +def test_fetch_extraction_summary_missing_db(): + """Returns zero-filled dict when state DB doesn't exist.""" + with tempfile.TemporaryDirectory() as empty_dir: + with patch.object(pipeline_mod, "_LANDING_DIR", empty_dir): + result = pipeline_mod._fetch_extraction_summary_sync() + assert result["total"] == 0 + assert result["stale"] == 0 + + +def test_fetch_extraction_summary_counts(state_db_dir): + """Returns correct total/success/failed/running/stale counts.""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + result = pipeline_mod._fetch_extraction_summary_sync() + assert result["total"] == 4 + assert result["success"] == 2 + assert result["failed"] == 1 + assert result["running"] == 1 + assert result["stale"] == 1 # eurostat started in 1970 + + +def test_load_serving_meta(serving_meta_dir): + """Parses _serving_meta.json correctly.""" + with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")): + meta = pipeline_mod._load_serving_meta() + assert meta is not None + assert "city_market_profile" in meta["tables"] + assert meta["tables"]["city_market_profile"]["row_count"] == 612 + + +def test_load_serving_meta_missing(): + """Returns None when _serving_meta.json doesn't exist.""" + with tempfile.TemporaryDirectory() as empty_dir: + with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(empty_dir) / "analytics.duckdb")): + meta = pipeline_mod._load_serving_meta() + assert meta is None + + +def test_format_bytes(): + assert pipeline_mod._format_bytes(0) == "0 B" + assert pipeline_mod._format_bytes(512) == "512 B" + assert pipeline_mod._format_bytes(1536) == "1.5 KB" + assert pipeline_mod._format_bytes(1_572_864) == "1.5 MB" + + +def test_duration_str(): + assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:00:45Z") == "45s" + assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:02:30Z") == "2m 30s" + assert pipeline_mod._duration_str(None, "2026-02-01T08:00:00Z") == ""