feat(admin): Pipeline Console — 4-tab data pipeline operational dashboard

Adds a new "Pipeline" section to the admin panel at /admin/pipeline with
full visibility into the data extraction → transform → serve pipeline.

Tabs:
- Overview: per-extractor status grid, serving table freshness, landing
  zone file stats
- Extractions: filterable run history, mark-stale action for stuck runs,
  trigger-all button
- Catalog: serving schema browser with lazy-loaded column types and 10-row
  samples
- Query: dark-themed SQL editor with schema sidebar, keyword blocklist,
  1k-row cap, 10s timeout, HTMX result swapping

Also:
- Adds execute_user_query() to analytics.py for the query editor
- Registers pipeline_bp in app.py
- Adds run_extraction background task to worker.py
- 29 tests (all passing), CHANGELOG + PROJECT.md updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-25 13:05:53 +01:00
15 changed files with 2164 additions and 2 deletions

View File

@@ -7,6 +7,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased] ## [Unreleased]
### Added ### Added
- **Pipeline Console admin section** — full operational visibility into the data engineering pipeline at `/admin/pipeline/`:
- **Overview tab** — extraction status grid (one card per workflow with status dot, schedule, last-run timestamp, error preview), serving table row counts from `_serving_meta.json`, landing zone file stats (per-source file count + total size)
- **Extractions tab** — filterable, paginated run history table from `.state.sqlite` (extractor + status dropdowns, HTMX live filter); stale "running" row detection (amber highlight) with "Mark Failed" button; "Run All Extractors" button enqueues `run_extraction` task
- **Catalog tab** — accordion list of serving tables with row count badges; click-to-expand lazy-loads column schema + 10-row sample data per table
- **Query editor tab** — dark-themed SQL textarea (`Commit Mono`, navy background, electric blue focus glow); schema sidebar (collapsible table/column list with types); Tab-key indent and Cmd/Ctrl+Enter submit; results table with sticky headers + row count + elapsed time; query security (read-only DuckDB, blocklist regex, 10k char limit, 1000 row cap, 10s timeout)
- **`analytics.execute_user_query()`** — new function returning `(columns, rows, error, elapsed_ms)` for admin query editor
- **`worker.run_extraction` task** — background handler shells out to `uv run extract` from repo root (2h timeout)
- 29 new tests covering all routes, data access helpers, security checks, and `execute_user_query()`
- **Email template system** — all 11 transactional emails migrated from inline f-string HTML in `worker.py` to Jinja2 templates: - **Email template system** — all 11 transactional emails migrated from inline f-string HTML in `worker.py` to Jinja2 templates:
- **Standalone renderer** (`email_templates.py`) — `render_email_template()` uses a module-level `jinja2.Environment` with `autoescape=True`, works outside Quart request context (worker process); `tformat` filter mirrors the one in `app.py` - **Standalone renderer** (`email_templates.py`) — `render_email_template()` uses a module-level `jinja2.Environment` with `autoescape=True`, works outside Quart request context (worker process); `tformat` filter mirrors the one in `app.py`
- **`_base.html`** — branded shell (dark header, 3px blue accent, white card body, footer with tagline + copyright); replaces the old `_email_wrap()` helper - **`_base.html`** — branded shell (dark header, 3px blue accent, white card body, footer with tagline + copyright); replaces the old `_email_wrap()` helper

View File

@@ -114,6 +114,7 @@
- [x] **Admin email gallery** (`/admin/emails/gallery`) — card grid of all templates, EN/DE preview in sandboxed iframe, "View in sent log" cross-link; compose page now has HTMX live preview pane - [x] **Admin email gallery** (`/admin/emails/gallery`) — card grid of all templates, EN/DE preview in sandboxed iframe, "View in sent log" cross-link; compose page now has HTMX live preview pane
- [x] **pSEO Engine tab** (`/admin/pseo`) — content gap detection, data freshness signals, article health checks (hreflang orphans, missing build files, broken scenario refs), generation job monitoring with live progress bars - [x] **pSEO Engine tab** (`/admin/pseo`) — content gap detection, data freshness signals, article health checks (hreflang orphans, missing build files, broken scenario refs), generation job monitoring with live progress bars
- [x] **Marketplace admin dashboard** (`/admin/marketplace`) — lead funnel, credit economy, supplier engagement, live activity stream, inline feature flag toggles - [x] **Marketplace admin dashboard** (`/admin/marketplace`) — lead funnel, credit economy, supplier engagement, live activity stream, inline feature flag toggles
- [x] **Pipeline Console** (`/admin/pipeline`) — 4-tab operational dashboard: extraction status grid per source, filterable run history with stale-run management ("Mark Failed"), data catalog with column schema + 10-row sample, SQL query editor with dark-themed textarea + schema sidebar + read-only security sandboxing (keyword blocklist, 10s timeout, 1,000-row cap)
- [x] **Lead matching notifications**`notify_matching_suppliers` task on quote verification + `send_weekly_lead_digest` every Monday; one-click CTA token in forward emails - [x] **Lead matching notifications**`notify_matching_suppliers` task on quote verification + `send_weekly_lead_digest` every Monday; one-click CTA token in forward emails
- [x] **Migration 0022**`status_updated_at`, `supplier_note`, `cta_token` on `lead_forwards`; supplier respond endpoint; inline HTMX lead detail actions; extended quote form fields - [x] **Migration 0022**`status_updated_at`, `supplier_note`, `cta_token` on `lead_forwards`; supplier respond endpoint; inline HTMX lead detail actions; extended quote form fields

View File

@@ -0,0 +1,684 @@
"""
Pipeline Console admin blueprint.
Operational visibility for the data extraction and transformation pipeline:
/admin/pipeline/ → dashboard (health stats + tab container)
/admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats
/admin/pipeline/extractions → HTMX tab: filterable extraction run history
/admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed
/admin/pipeline/extract/trigger → POST: enqueue full extraction run
/admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data)
/admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample)
/admin/pipeline/query → HTMX tab: SQL query editor
/admin/pipeline/query/execute → POST: run user SQL, return results table
Data sources:
- data/landing/.state.sqlite (extraction run history — stdlib sqlite3, sync via to_thread)
- SERVING_DUCKDB_PATH/../_serving_meta.json (export timestamp + per-table row counts)
- analytics.duckdb (DuckDB read-only via analytics.execute_user_query)
- LANDING_DIR/ (filesystem scan for file sizes + dates)
- infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib)
"""
import asyncio
import json
import logging
import os
import re
import sqlite3
import tomllib
from datetime import UTC, datetime, timedelta
from pathlib import Path
from quart import Blueprint, flash, redirect, render_template, request, url_for
from ..auth.routes import role_required
from ..core import csrf_protect
logger = logging.getLogger(__name__)
bp = Blueprint(
"pipeline",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/admin/pipeline",
)
# ── Config ────────────────────────────────────────────────────────────────────
_LANDING_DIR = os.environ.get("LANDING_DIR", "data/landing")
_SERVING_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb")
# Repo root: web/src/padelnomics/admin/ → up 4 levels
_REPO_ROOT = Path(__file__).resolve().parents[5]
_WORKFLOWS_TOML = _REPO_ROOT / "infra" / "supervisor" / "workflows.toml"
# A "running" row older than this is considered stale/crashed.
_STALE_THRESHOLD_HOURS = 2
# Query editor limits
_QUERY_MAX_CHARS = 10_000
_QUERY_MAX_ROWS = 1_000
_QUERY_TIMEOUT_SECONDS = 10
# Blocked SQL keywords (read-only connection enforces engine-level, this adds belt+suspenders)
_BLOCKED_SQL_RE = re.compile(
r"\b(ATTACH|COPY|EXPORT|INSTALL|LOAD|CREATE|DROP|ALTER|INSERT|UPDATE|DELETE|GRANT|REVOKE|PRAGMA)\b",
re.IGNORECASE,
)
# ── Sidebar data injection (same pattern as pseo_routes.py) ──────────────────
@bp.before_request
async def _inject_sidebar_data():
"""Load unread inbox count for the admin sidebar badge."""
from quart import g
from ..core import fetch_one
try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception:
g.admin_unread_count = 0
@bp.context_processor
def _admin_context():
from quart import g
return {"unread_count": getattr(g, "admin_unread_count", 0)}
# ── Data access: state DB (sync, called via to_thread) ────────────────────────
def _state_db_path() -> Path:
return Path(_LANDING_DIR) / ".state.sqlite"
def _fetch_extraction_summary_sync() -> dict:
"""Aggregate stats from extraction_runs: total, success, failed, stale counts."""
db_path = _state_db_path()
if not db_path.exists():
return {"total": 0, "success": 0, "failed": 0, "running": 0, "stale": 0}
cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed,
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN status = 'running' AND started_at < ? THEN 1 ELSE 0 END) AS stale
FROM extraction_runs
""",
(cutoff,),
).fetchone()
return {
"total": row["total"] or 0,
"success": row["success"] or 0,
"failed": row["failed"] or 0,
"running": row["running"] or 0,
"stale": row["stale"] or 0,
}
finally:
conn.close()
def _fetch_latest_per_extractor_sync() -> list[dict]:
"""Return most recent run for each extractor name."""
db_path = _state_db_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT r.*
FROM extraction_runs r
INNER JOIN (
SELECT extractor, MAX(run_id) AS max_id
FROM extraction_runs
GROUP BY extractor
) latest ON r.run_id = latest.max_id
ORDER BY r.extractor
"""
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def _fetch_extraction_runs_sync(
*,
extractor: str = "",
status: str = "",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Return (rows, total_count) for the filtered run history."""
assert 1 <= limit <= 200, f"limit must be 1200, got {limit}"
assert offset >= 0, f"offset must be >= 0, got {offset}"
db_path = _state_db_path()
if not db_path.exists():
return [], 0
where_clauses = []
params: list = []
if extractor:
where_clauses.append("extractor = ?")
params.append(extractor)
if status:
where_clauses.append("status = ?")
params.append(status)
where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
total = conn.execute(
f"SELECT COUNT(*) FROM extraction_runs {where_sql}", params
).fetchone()[0]
rows = conn.execute(
f"""
SELECT run_id, extractor, started_at, finished_at, status,
files_written, files_skipped, bytes_written,
cursor_value, error_message
FROM extraction_runs {where_sql}
ORDER BY run_id DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
return [dict(r) for r in rows], total
finally:
conn.close()
def _fetch_distinct_extractors_sync() -> list[str]:
"""Return distinct extractor names for filter dropdowns."""
db_path = _state_db_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
try:
rows = conn.execute(
"SELECT DISTINCT extractor FROM extraction_runs ORDER BY extractor"
).fetchall()
return [r[0] for r in rows]
finally:
conn.close()
def _mark_run_failed_sync(run_id: int) -> bool:
"""Mark a stuck 'running' row as 'failed'. Returns True if row was updated."""
assert run_id > 0, f"run_id must be positive, got {run_id}"
db_path = _state_db_path()
if not db_path.exists():
return False
conn = sqlite3.connect(str(db_path))
try:
cur = conn.execute(
"""
UPDATE extraction_runs
SET status = 'failed',
finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
error_message = 'Marked failed manually (admin — process appeared stuck)'
WHERE run_id = ? AND status = 'running'
""",
(run_id,),
)
conn.commit()
return cur.rowcount > 0
finally:
conn.close()
# ── Data access: serving meta ─────────────────────────────────────────────────
def _load_serving_meta() -> dict | None:
"""Read _serving_meta.json alongside analytics.duckdb. Returns None if absent."""
meta_path = Path(_SERVING_DUCKDB_PATH).parent / "_serving_meta.json"
if not meta_path.exists():
return None
try:
return json.loads(meta_path.read_text())
except Exception:
logger.warning("Failed to read _serving_meta.json", exc_info=True)
return None
# ── Data access: landing zone filesystem ─────────────────────────────────────
def _get_landing_zone_stats_sync() -> list[dict]:
"""Scan LANDING_DIR and return per-source file counts + total bytes."""
landing = Path(_LANDING_DIR)
if not landing.exists():
return []
sources = []
for source_dir in sorted(landing.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
files = list(source_dir.rglob("*.gz")) + list(source_dir.rglob("*.jsonl"))
total_bytes = sum(f.stat().st_size for f in files)
latest_mtime = max((f.stat().st_mtime for f in files), default=None)
sources.append({
"name": source_dir.name,
"file_count": len(files),
"total_bytes": total_bytes,
"latest_mtime": (
datetime.fromtimestamp(latest_mtime, tz=UTC).strftime("%Y-%m-%d %H:%M")
if latest_mtime
else None
),
})
return sources
# ── Data access: workflows.toml ───────────────────────────────────────────────
_SCHEDULE_LABELS = {
"hourly": "Every hour",
"daily": "Daily",
"weekly": "Weekly",
"monthly": "Monthly",
}
def _load_workflows() -> list[dict]:
"""Parse workflows.toml and return workflow definitions with human schedule labels."""
if not _WORKFLOWS_TOML.exists():
return []
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
workflows = []
for name, config in data.items():
schedule = config.get("schedule", "")
schedule_label = _SCHEDULE_LABELS.get(schedule, schedule)
workflows.append({
"name": name,
"module": config.get("module", ""),
"schedule": schedule,
"schedule_label": schedule_label,
"depends_on": config.get("depends_on", []),
})
return workflows
# ── Route helpers ─────────────────────────────────────────────────────────────
def _format_bytes(n: int) -> str:
"""Human-readable byte count."""
if n < 1024:
return f"{n} B"
if n < 1024 * 1024:
return f"{n / 1024:.1f} KB"
return f"{n / 1024 / 1024:.1f} MB"
def _duration_str(started_at: str | None, finished_at: str | None) -> str:
"""Return human-readable duration, or '' if unavailable."""
if not started_at or not finished_at:
return ""
try:
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(started_at, fmt)
end = datetime.strptime(finished_at, fmt)
delta = int((end - start).total_seconds())
if delta < 60:
return f"{delta}s"
return f"{delta // 60}m {delta % 60}s"
except ValueError:
return ""
def _is_stale(run: dict) -> bool:
"""True if a 'running' row has been stuck longer than the stale threshold."""
if run.get("status") != "running":
return False
started = run.get("started_at", "")
if not started:
return True
try:
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(started, fmt).replace(tzinfo=UTC)
return (datetime.now(UTC) - start) > timedelta(hours=_STALE_THRESHOLD_HOURS)
except ValueError:
return False
# ── Dashboard ─────────────────────────────────────────────────────────────────
@bp.route("/")
@role_required("admin")
async def pipeline_dashboard():
"""Main page: health stat cards + tab container."""
summary = await asyncio.to_thread(_fetch_extraction_summary_sync)
serving_meta = await asyncio.to_thread(_load_serving_meta)
total_serving_tables = len(serving_meta["tables"]) if serving_meta else 0
last_export = serving_meta.get("exported_at_utc", "")[:19].replace("T", " ") if serving_meta else ""
success_rate = 0
if summary["total"] > 0:
success_rate = round(100 * summary["success"] / summary["total"])
return await render_template(
"admin/pipeline.html",
summary=summary,
success_rate=success_rate,
total_serving_tables=total_serving_tables,
last_export=last_export,
admin_page="pipeline",
)
# ── Overview tab ─────────────────────────────────────────────────────────────
@bp.route("/overview")
@role_required("admin")
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather(
asyncio.to_thread(_fetch_latest_per_extractor_sync),
asyncio.to_thread(_get_landing_zone_stats_sync),
asyncio.to_thread(_load_workflows),
asyncio.to_thread(_load_serving_meta),
)
# Build a lookup: extractor name → latest run
latest_by_name = {r["extractor"]: r for r in latest_runs}
# Enrich each workflow with its latest run data
workflow_rows = []
for wf in workflows:
run = latest_by_name.get(wf["name"])
workflow_rows.append({
"workflow": wf,
"run": run,
"stale": _is_stale(run) if run else False,
})
# Compute landing zone totals
total_landing_bytes = sum(s["total_bytes"] for s in landing_stats)
return await render_template(
"admin/partials/pipeline_overview.html",
workflow_rows=workflow_rows,
landing_stats=landing_stats,
total_landing_bytes=total_landing_bytes,
serving_meta=serving_meta,
format_bytes=_format_bytes,
)
# ── Extractions tab ────────────────────────────────────────────────────────────
@bp.route("/extractions")
@role_required("admin")
async def pipeline_extractions():
"""HTMX tab: paginated + filtered extraction run history."""
extractor_filter = request.args.get("extractor", "")
status_filter = request.args.get("status", "")
page = max(1, int(request.args.get("page", 1)))
per_page = 30
(runs, total), extractors = await asyncio.gather(
asyncio.to_thread(
_fetch_extraction_runs_sync,
extractor=extractor_filter,
status=status_filter,
limit=per_page,
offset=(page - 1) * per_page,
),
asyncio.to_thread(_fetch_distinct_extractors_sync),
)
# Enrich rows with computed fields
for run in runs:
run["duration"] = _duration_str(run.get("started_at"), run.get("finished_at"))
run["bytes_label"] = _format_bytes(run.get("bytes_written") or 0)
run["is_stale"] = _is_stale(run)
total_pages = max(1, (total + per_page - 1) // per_page)
return await render_template(
"admin/partials/pipeline_extractions.html",
runs=runs,
total=total,
page=page,
per_page=per_page,
total_pages=total_pages,
extractors=extractors,
extractor_filter=extractor_filter,
status_filter=status_filter,
)
@bp.route("/extractions/<int:run_id>/mark-stale", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_mark_stale(run_id: int):
"""Mark a stuck 'running' extraction row as 'failed'."""
updated = await asyncio.to_thread(_mark_run_failed_sync, run_id)
if updated:
await flash(f"Run #{run_id} marked as failed.", "success")
else:
await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning")
return redirect(url_for("pipeline.pipeline_extractions"))
# ── Trigger extraction ────────────────────────────────────────────────────────
@bp.route("/extract/trigger", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_trigger_extract():
"""Enqueue a full pipeline extraction run."""
from ..worker import enqueue
await enqueue("run_extraction")
await flash("Extraction run queued. Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Catalog tab ───────────────────────────────────────────────────────────────
@bp.route("/catalog")
@role_required("admin")
async def pipeline_catalog():
"""HTMX tab: list serving tables with row counts + column counts."""
from ..analytics import fetch_analytics
schema_rows = await fetch_analytics(
"""
SELECT table_name, column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'serving'
ORDER BY table_name, ordinal_position
"""
)
# Group by table
tables: dict[str, dict] = {}
for row in schema_rows:
tname = row["table_name"]
if tname not in tables:
tables[tname] = {"name": tname, "columns": [], "column_count": 0}
tables[tname]["columns"].append({
"name": row["column_name"],
"type": row["data_type"],
})
tables[tname]["column_count"] += 1
# Enrich with row counts from serving meta
serving_meta = await asyncio.to_thread(_load_serving_meta)
meta_counts = serving_meta.get("tables", {}) if serving_meta else {}
for tname, tdata in tables.items():
tdata["row_count"] = meta_counts.get(tname, {}).get("row_count")
return await render_template(
"admin/partials/pipeline_catalog.html",
tables=list(tables.values()),
serving_meta=serving_meta,
)
@bp.route("/catalog/<table_name>")
@role_required("admin")
async def pipeline_table_detail(table_name: str):
"""HTMX partial: column list + 10-row sample for a serving table."""
from ..analytics import fetch_analytics
# Validate table name before using in SQL
if not re.match(r"^[a-z_][a-z0-9_]*$", table_name):
return "Invalid table name", 400
# Confirm table exists in serving schema
exists = await fetch_analytics(
"SELECT 1 FROM information_schema.tables"
" WHERE table_schema = 'serving' AND table_name = ?",
[table_name],
)
if not exists:
return f"Table serving.{table_name} not found", 404
columns, sample = await asyncio.gather(
fetch_analytics(
"SELECT column_name, data_type, ordinal_position"
" FROM information_schema.columns"
" WHERE table_schema = 'serving' AND table_name = ?"
" ORDER BY ordinal_position",
[table_name],
),
fetch_analytics(
f"SELECT * FROM serving.{table_name} LIMIT 10" # noqa: S608
),
)
return await render_template(
"admin/partials/pipeline_table_detail.html",
table_name=table_name,
columns=columns,
sample=sample,
)
# ── Query editor ──────────────────────────────────────────────────────────────
@bp.route("/query")
@role_required("admin")
async def pipeline_query_editor():
"""HTMX tab: SQL query editor with schema sidebar."""
from ..analytics import fetch_analytics
schema_rows = await fetch_analytics(
"""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'serving'
ORDER BY table_name, ordinal_position
"""
)
# Group by table for the schema sidebar
schema: dict[str, list] = {}
for row in schema_rows:
tname = row["table_name"]
if tname not in schema:
schema[tname] = []
schema[tname].append({"name": row["column_name"], "type": row["data_type"]})
return await render_template(
"admin/partials/pipeline_query.html",
schema=schema,
max_rows=_QUERY_MAX_ROWS,
timeout_seconds=_QUERY_TIMEOUT_SECONDS,
)
@bp.route("/query/execute", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_query_execute():
"""Run user-submitted SQL and return a results table partial."""
from ..analytics import execute_user_query
form = await request.form
sql = (form.get("sql") or "").strip()
# Input validation
if not sql:
return await render_template(
"admin/partials/pipeline_query_results.html",
error="SQL query is empty.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
if len(sql) > _QUERY_MAX_CHARS:
return await render_template(
"admin/partials/pipeline_query_results.html",
error=f"Query too long ({len(sql):,} chars). Maximum is {_QUERY_MAX_CHARS:,} characters.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
if _BLOCKED_SQL_RE.search(sql):
return await render_template(
"admin/partials/pipeline_query_results.html",
error="Query contains a blocked keyword. Only SELECT statements are allowed.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
columns, rows, error, elapsed_ms = await execute_user_query(
sql,
max_rows=_QUERY_MAX_ROWS,
timeout_seconds=_QUERY_TIMEOUT_SECONDS,
)
truncated = len(rows) >= _QUERY_MAX_ROWS
return await render_template(
"admin/partials/pipeline_query_results.html",
error=error,
columns=columns,
rows=rows,
row_count=len(rows),
elapsed_ms=elapsed_ms,
truncated=truncated,
)

View File

@@ -133,6 +133,12 @@
SEO Hub SEO Hub
</a> </a>
<div class="admin-sidebar__section">Pipeline</div>
<a href="{{ url_for('pipeline.pipeline_dashboard') }}" class="{% if admin_page == 'pipeline' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M20.25 6.375c0 2.278-3.694 4.125-8.25 4.125S3.75 8.653 3.75 6.375m16.5 0c0-2.278-3.694-4.125-8.25-4.125S3.75 4.097 3.75 6.375m16.5 0v11.25c0 2.278-3.694 4.125-8.25 4.125s-8.25-1.847-8.25-4.125V6.375m16.5 0v3.75m-16.5-3.75v3.75m16.5 0v3.75C20.25 16.153 16.556 18 12 18s-8.25-1.847-8.25-4.125v-3.75m16.5 0c0 2.278-3.694 4.125-8.25 4.125s-8.25-1.847-8.25-4.125"/></svg>
Pipeline
</a>
<div class="admin-sidebar__section">System</div> <div class="admin-sidebar__section">System</div>
<a href="{{ url_for('admin.flags') }}" class="{% if admin_page == 'flags' %}active{% endif %}"> <a href="{{ url_for('admin.flags') }}" class="{% if admin_page == 'flags' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M3 3v1.5M3 21v-6m0 0 2.77-.693a9 9 0 0 1 6.208.682l.108.054a9 9 0 0 0 6.086.71l3.114-.732a48.524 48.524 0 0 1-.005-10.499l-3.11.732a9 9 0 0 1-6.085-.711l-.108-.054a9 9 0 0 0-6.208-.682L3 4.5M3 15V4.5"/></svg> <svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M3 3v1.5M3 21v-6m0 0 2.77-.693a9 9 0 0 1 6.208.682l.108.054a9 9 0 0 0 6.086.71l3.114-.732a48.524 48.524 0 0 1-.005-10.499l-3.11.732a9 9 0 0 1-6.085-.711l-.108-.054a9 9 0 0 0-6.208-.682L3 4.5M3 15V4.5"/></svg>

View File

@@ -0,0 +1,74 @@
<!-- Pipeline Catalog Tab: serving tables with column + sample data expandable -->
{% if serving_meta %}
<p class="text-xs text-slate mb-3">
Serving DB last exported: <span class="mono font-semibold">{{ serving_meta.exported_at_utc[:19].replace('T', ' ') }}</span>
</p>
{% endif %}
{% if tables %}
<div style="display:grid;grid-template-columns:1fr;gap:0.75rem">
{% for table in tables %}
<div class="card" style="padding:0" id="catalog-table-{{ table.name }}">
<!-- Table header row -->
<div class="flex items-center justify-between"
style="padding:0.875rem 1.25rem;cursor:pointer"
onclick="toggleCatalogTable('{{ table.name }}')">
<div class="flex items-center gap-3">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5"
stroke="currentColor" style="width:16px;height:16px;color:#94A3B8;flex-shrink:0">
<path stroke-linecap="round" stroke-linejoin="round"
d="M3.375 19.5h17.25m-17.25 0a1.125 1.125 0 0 1-1.125-1.125M3.375 19.5h7.5c.621 0 1.125-.504 1.125-1.125m-9.75 0V5.625m0 12.75v-1.5c0-.621.504-1.125 1.125-1.125m18.375 2.625V5.625m0 12.75c0 .621-.504 1.125-1.125 1.125m1.125-1.125v-1.5c0-.621-.504-1.125-1.125-1.125m0 3.75h-7.5A1.125 1.125 0 0 1 12 18.375m9.75-12.75c0-.621-.504-1.125-1.125-1.125H3.375c-.621 0-1.125.504-1.125 1.125m19.5 0v1.5c0 .621-.504 1.125-1.125 1.125M2.25 5.625v1.5c0 .621.504 1.125 1.125 1.125m0 0h17.25m-17.25 0h7.5c.621 0 1.125.504 1.125 1.125M3.375 8.25c-.621 0-1.125.504-1.125 1.125v1.5c0 .621.504 1.125 1.125 1.125h17.25c.621 0 1.125-.504 1.125-1.125v-1.5c0-.621-.504-1.125-1.125-1.125H3.375Z"/>
</svg>
<span class="font-semibold text-sm text-navy">serving.<span class="mono">{{ table.name }}</span></span>
<span class="text-xs text-slate">{{ table.column_count }} col{{ 's' if table.column_count != 1 }}</span>
{% if table.row_count is not none %}
<span class="badge" style="background:#F0FDF4;color:#16A34A;font-size:10px">
{{ "{:,}".format(table.row_count) }} rows
</span>
{% endif %}
</div>
<svg id="chevron-{{ table.name }}" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24"
stroke-width="2" stroke="currentColor"
style="width:14px;height:14px;color:#94A3B8;transition:transform 0.2s">
<path stroke-linecap="round" stroke-linejoin="round" d="m8.25 4.5 7.5 7.5-7.5 7.5"/>
</svg>
</div>
<!-- Expandable detail (HTMX lazy-loaded on first open) -->
<div id="catalog-detail-{{ table.name }}" style="display:none;border-top:1px solid #E2E8F0">
<div id="catalog-content-{{ table.name }}"
hx-get="{{ url_for('pipeline.pipeline_table_detail', table_name=table.name) }}"
hx-trigger="revealed"
hx-target="#catalog-content-{{ table.name }}"
hx-swap="outerHTML">
<div style="padding:1.5rem;text-align:center">
<p class="text-sm text-slate">Loading…</p>
</div>
</div>
</div>
</div>
{% endfor %}
</div>
{% else %}
<div class="card text-center" style="padding:2rem">
<p class="text-slate">No serving tables found. Run the pipeline to generate them.</p>
</div>
{% endif %}
<script>
function toggleCatalogTable(name) {
var detail = document.getElementById('catalog-detail-' + name);
var chevron = document.getElementById('chevron-' + name);
var isOpen = detail.style.display !== 'none';
detail.style.display = isOpen ? 'none' : 'block';
chevron.style.transform = isOpen ? '' : 'rotate(90deg)';
// Trigger HTMX revealed for lazy-loading when first opened
if (!isOpen) {
var content = document.getElementById('catalog-content-' + name);
if (content && content.getAttribute('hx-trigger') === 'revealed') {
htmx.trigger(content, 'revealed');
}
}
}
</script>

View File

@@ -0,0 +1,135 @@
<!-- Pipeline Extractions Tab: filterable run history + mark-stale + trigger -->
<!-- Filter bar + trigger button -->
<div class="card mb-4" style="padding:0.875rem 1.25rem">
<div class="flex items-center justify-between gap-3 flex-wrap">
<form id="extraction-filters"
hx-get="{{ url_for('pipeline.pipeline_extractions') }}"
hx-target="#pipeline-tab-content"
hx-swap="innerHTML"
hx-trigger="change"
class="flex gap-3 items-end flex-wrap">
<div>
<label class="form-label">Extractor</label>
<select name="extractor" class="form-input" style="min-width:180px">
<option value="" {% if not extractor_filter %}selected{% endif %}>All extractors</option>
{% for name in extractors %}
<option value="{{ name }}" {% if extractor_filter == name %}selected{% endif %}>{{ name }}</option>
{% endfor %}
</select>
</div>
<div>
<label class="form-label">Status</label>
<select name="status" class="form-input">
<option value="" {% if not status_filter %}selected{% endif %}>All statuses</option>
<option value="success" {% if status_filter == 'success' %}selected{% endif %}>Success</option>
<option value="failed" {% if status_filter == 'failed' %}selected{% endif %}>Failed</option>
<option value="running" {% if status_filter == 'running' %}selected{% endif %}>Running</option>
</select>
</div>
</form>
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="button" class="btn-outline btn-sm"
onclick="confirmAction('Enqueue a full extraction run? This will run all extractors in the background.', this.closest('form'))">
Run All Extractors
</button>
</form>
</div>
</div>
<!-- Run history -->
<div class="card">
<div class="flex justify-between items-center mb-3">
<p class="text-sm text-slate">
Showing {{ runs | length }} of {{ total }} run{{ 's' if total != 1 }}
{% if extractor_filter or status_filter %} (filtered){% endif %}
</p>
{% if total_pages > 1 %}
<div class="flex gap-2 items-center">
{% if page > 1 %}
<button class="btn-outline btn-sm"
hx-get="{{ url_for('pipeline.pipeline_extractions') }}"
hx-vals='{"page": "{{ page - 1 }}", "extractor": "{{ extractor_filter }}", "status": "{{ status_filter }}"}'
hx-target="#pipeline-tab-content" hx-swap="innerHTML">← Prev</button>
{% endif %}
<span class="text-xs text-slate">{{ page }} / {{ total_pages }}</span>
{% if page < total_pages %}
<button class="btn-outline btn-sm"
hx-get="{{ url_for('pipeline.pipeline_extractions') }}"
hx-vals='{"page": "{{ page + 1 }}", "extractor": "{{ extractor_filter }}", "status": "{{ status_filter }}"}'
hx-target="#pipeline-tab-content" hx-swap="innerHTML">Next →</button>
{% endif %}
</div>
{% endif %}
</div>
{% if runs %}
<div style="overflow-x:auto">
<table class="table">
<thead>
<tr>
<th style="width:60px">#</th>
<th>Extractor</th>
<th>Started</th>
<th>Duration</th>
<th>Status</th>
<th style="text-align:right">Files</th>
<th style="text-align:right">Size</th>
<th>Error</th>
<th></th>
</tr>
</thead>
<tbody>
{% for run in runs %}
<tr{% if run.is_stale %} style="background:#FFFBEB"{% endif %}>
<td class="mono text-xs text-slate">#{{ run.run_id }}</td>
<td class="text-sm font-medium">{{ run.extractor }}</td>
<td class="mono text-xs">{{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }}</td>
<td class="mono text-xs text-slate">{{ run.duration or '—' }}</td>
<td>
{% if run.is_stale %}
<span class="badge-warning">stale</span>
{% elif run.status == 'success' %}
<span class="badge-success">success</span>
{% elif run.status == 'failed' %}
<span class="badge-danger">failed</span>
{% else %}
<span class="badge" style="background:#EFF6FF;color:#1D4ED8">{{ run.status }}</span>
{% endif %}
</td>
<td class="text-right text-xs">{{ run.files_written or 0 }}</td>
<td class="text-right mono text-xs">{{ run.bytes_label }}</td>
<td style="max-width:200px">
{% if run.error_message %}
<span class="text-xs text-danger mono" style="word-break:break-all" title="{{ run.error_message }}">
{{ run.error_message[:60] }}{% if run.error_message|length > 60 %}…{% endif %}
</span>
{% endif %}
</td>
<td>
{% if run.status == 'running' %}
<form method="post"
action="{{ url_for('pipeline.pipeline_mark_stale', run_id=run.run_id) }}"
class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="button" class="btn-danger btn-sm"
style="padding:2px 8px;font-size:11px"
onclick="confirmAction('Mark run #{{ run.run_id }} as failed? Only do this if the process is definitely dead.', this.closest('form'))">
Mark Failed
</button>
</form>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-sm text-slate text-center" style="padding:2rem 0">
No extraction runs found{% if extractor_filter or status_filter %} matching the current filters{% endif %}.
</p>
{% endif %}
</div>

View File

@@ -0,0 +1,121 @@
<!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone -->
<!-- Extraction Status Grid -->
<div class="card mb-4">
<p class="card-header">Extraction Status</p>
{% if workflow_rows %}
<div style="display:grid;grid-template-columns:repeat(auto-fill,minmax(260px,1fr));gap:0.75rem">
{% for row in workflow_rows %}
{% set wf = row.workflow %}
{% set run = row.run %}
{% set stale = row.stale %}
<div style="border:1px solid #E2E8F0;border-radius:10px;padding:0.875rem;background:#FAFAFA">
<div class="flex items-center gap-2 mb-2">
{% if not run %}
<span class="status-dot pending"></span>
{% elif stale %}
<span class="status-dot stale"></span>
{% elif run.status == 'success' %}
<span class="status-dot ok"></span>
{% elif run.status == 'failed' %}
<span class="status-dot failed"></span>
{% else %}
<span class="status-dot running"></span>
{% endif %}
<span class="text-sm font-semibold text-navy">{{ wf.name }}</span>
{% if stale %}
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
{% endif %}
</div>
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
{% if run %}
<p class="text-xs mono text-slate-dark mt-1">{{ run.started_at[:16].replace('T', ' ') if run.started_at else '—' }}</p>
{% if run.status == 'failed' and run.error_message %}
<p class="text-xs text-danger mt-1" style="font-family:monospace;word-break:break-all">
{{ run.error_message[:80] }}{% if run.error_message|length > 80 %}…{% endif %}
</p>
{% endif %}
{% if run.files_written %}
<p class="text-xs text-slate mt-1">{{ run.files_written }} file{{ 's' if run.files_written != 1 }},
{{ "{:,}".format(run.bytes_written or 0) }} B</p>
{% endif %}
{% else %}
<p class="text-xs text-slate mt-1">No runs yet</p>
{% endif %}
</div>
{% endfor %}
</div>
{% else %}
<p class="text-sm text-slate">No workflows found. Check that <code>infra/supervisor/workflows.toml</code> exists.</p>
{% endif %}
</div>
<!-- Two-column row: Serving Freshness + Landing Zone -->
<div style="display:grid;grid-template-columns:1fr 1fr;gap:1rem">
<!-- Serving Freshness -->
<div class="card">
<p class="card-header">Serving Tables</p>
{% if serving_meta %}
<p class="text-xs text-slate mb-3">
Last export: <span class="mono font-semibold text-navy">{{ serving_meta.exported_at_utc[:19].replace('T', ' ') }}</span>
</p>
<table class="table" style="font-size:0.8125rem">
<thead>
<tr>
<th>Table</th>
<th style="text-align:right">Rows</th>
</tr>
</thead>
<tbody>
{% for tname, tmeta in serving_meta.tables.items() | sort %}
<tr>
<td class="mono">serving.{{ tname }}</td>
<td class="mono text-right font-semibold">{{ "{:,}".format(tmeta.row_count) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="text-sm text-slate">
<code>_serving_meta.json</code> not found — run the pipeline to generate it.
</p>
{% endif %}
</div>
<!-- Landing Zone -->
<div class="card">
<p class="card-header">Landing Zone
<span class="text-xs font-normal text-slate ml-2">
Total: <span class="font-semibold">{{ format_bytes(total_landing_bytes) }}</span>
</span>
</p>
{% if landing_stats %}
<table class="table" style="font-size:0.8125rem">
<thead>
<tr>
<th>Source</th>
<th style="text-align:right">Files</th>
<th style="text-align:right">Size</th>
<th style="text-align:right">Latest</th>
</tr>
</thead>
<tbody>
{% for s in landing_stats %}
<tr>
<td class="mono">{{ s.name }}</td>
<td class="text-right text-slate">{{ s.file_count }}</td>
<td class="text-right font-semibold">{{ format_bytes(s.total_bytes) }}</td>
<td class="text-right mono text-xs text-slate">{{ s.latest_mtime or '—' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p class="text-sm text-slate">
Landing zone empty or not found at <code>data/landing</code>.
</p>
{% endif %}
</div>
</div>

View File

@@ -0,0 +1,258 @@
<!-- Pipeline Query Tab: SQL editor + schema sidebar + results -->
<style>
.query-layout {
display: flex; gap: 1rem; align-items: flex-start;
}
.query-editor-wrap { flex: 1; min-width: 0; }
/* Dark code editor textarea */
.query-editor {
width: 100%;
min-height: 200px;
max-height: 480px;
resize: vertical;
font-family: 'Commit Mono', 'JetBrains Mono', 'Fira Code', ui-monospace, monospace;
font-size: 0.8125rem;
line-height: 1.6;
tab-size: 2;
background: #0F172A;
color: #CBD5E1;
border: 1px solid #334155;
border-radius: 12px;
padding: 1rem 1.125rem;
caret-color: #60A5FA;
}
.query-editor:focus {
outline: none;
border-color: #3B82F6;
box-shadow: 0 0 0 3px rgba(59,130,246,0.18);
}
.query-editor::placeholder { color: #475569; }
/* Schema panel */
.schema-panel {
width: 230px;
flex-shrink: 0;
background: #F8FAFC;
border: 1px solid #E2E8F0;
border-radius: 12px;
max-height: 480px;
overflow-y: auto;
font-size: 0.75rem;
}
.schema-panel-header {
padding: 0.625rem 0.875rem;
font-size: 0.6875rem;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.06em;
color: #94A3B8;
border-bottom: 1px solid #E2E8F0;
position: sticky;
top: 0;
background: #F8FAFC;
}
.schema-table-section { border-bottom: 1px solid #F1F5F9; }
.schema-table-toggle {
width: 100%;
text-align: left;
padding: 0.5rem 0.875rem;
font-size: 0.75rem;
font-weight: 600;
color: #0F172A;
background: none;
border: none;
cursor: pointer;
font-family: 'Commit Mono', monospace;
display: flex;
align-items: center;
justify-content: space-between;
gap: 4px;
}
.schema-table-toggle:hover { background: #EFF6FF; color: #1D4ED8; }
.schema-cols { display: none; padding: 0 0.875rem 0.5rem; }
.schema-cols.open { display: block; }
.schema-col {
padding: 2px 0;
display: flex;
justify-content: space-between;
gap: 4px;
}
.schema-col-name { color: #475569; font-family: 'Commit Mono', monospace; font-size: 0.6875rem; }
.schema-col-type { color: #94A3B8; font-size: 0.625rem; white-space: nowrap; }
/* Query controls bar */
.query-controls {
display: flex;
align-items: center;
gap: 0.75rem;
margin-top: 0.625rem;
flex-wrap: wrap;
}
.query-limit-note {
font-size: 0.6875rem;
color: #94A3B8;
margin-left: auto;
}
/* Results area */
#query-results { margin-top: 1rem; }
.results-meta {
font-size: 0.75rem;
color: #64748B;
margin-bottom: 0.5rem;
display: flex;
align-items: center;
gap: 0.5rem;
}
.results-table-wrap {
overflow-x: auto;
max-height: 440px;
overflow-y: auto;
border: 1px solid #E2E8F0;
border-radius: 10px;
}
.results-table-wrap table {
font-size: 0.75rem;
border-radius: 0;
}
.results-table-wrap table thead th {
position: sticky;
top: 0;
background: #F8FAFC;
z-index: 1;
white-space: nowrap;
}
.results-table-wrap table td.mono {
max-width: 240px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.query-error {
border: 1px solid #FCA5A5;
background: #FEF2F2;
border-radius: 10px;
padding: 0.875rem 1rem;
font-size: 0.8125rem;
color: #DC2626;
font-family: 'Commit Mono', monospace;
word-break: break-all;
}
.results-truncated {
font-size: 0.75rem;
color: #D97706;
margin-top: 0.5rem;
}
@media (max-width: 900px) {
.query-layout { flex-direction: column; }
.schema-panel { width: 100%; max-height: 200px; }
}
</style>
<form id="query-form"
hx-post="{{ url_for('pipeline.pipeline_query_execute') }}"
hx-target="#query-results"
hx-swap="innerHTML"
hx-indicator="#query-spinner">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div class="query-layout">
<!-- SQL textarea -->
<div class="query-editor-wrap">
<textarea
name="sql"
id="query-sql"
class="query-editor"
rows="10"
spellcheck="false"
autocomplete="off"
autocorrect="off"
autocapitalize="off"
placeholder="-- SELECT * FROM serving.city_market_profile&#10;-- WHERE country_code = 'DE'&#10;-- ORDER BY marktreife_score DESC&#10;-- LIMIT 20"
></textarea>
<div class="query-controls">
<button type="submit" class="btn btn-sm" hx-disabled-elt="this">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="2"
stroke="currentColor" style="width:13px;height:13px;display:inline;margin-right:4px">
<path stroke-linecap="round" stroke-linejoin="round" d="M5.25 5.653c0-.856.917-1.398 1.667-.986l11.54 6.347a1.125 1.125 0 0 1 0 1.972l-11.54 6.347a1.125 1.125 0 0 1-1.667-.986V5.653Z"/>
</svg>
Execute
</button>
<button type="button" class="btn-outline btn-sm" onclick="document.getElementById('query-sql').value='';document.getElementById('query-results').innerHTML=''">
Clear
</button>
<svg id="query-spinner" class="htmx-indicator search-spinner" xmlns="http://www.w3.org/2000/svg" width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2">
<path d="M21 12a9 9 0 1 1-6.219-8.56"/>
</svg>
<span class="query-limit-note">
{{ max_rows | default(1000) }} row limit · {{ timeout_seconds | default(10) }}s timeout · SELECT only
</span>
</div>
</div>
<!-- Schema sidebar -->
<aside class="schema-panel">
<div class="schema-panel-header">Schema — serving.*</div>
{% if schema %}
{% for tname, cols in schema.items() | sort %}
<div class="schema-table-section">
<button type="button" class="schema-table-toggle" onclick="toggleSchema('{{ tname }}')">
{{ tname }}
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24"
stroke-width="2" stroke="currentColor"
id="schema-chevron-{{ tname }}"
style="width:10px;height:10px;transition:transform 0.15s;flex-shrink:0">
<path stroke-linecap="round" stroke-linejoin="round" d="m8.25 4.5 7.5 7.5-7.5 7.5"/>
</svg>
</button>
<div id="schema-cols-{{ tname }}" class="schema-cols">
{% for col in cols %}
<div class="schema-col">
<span class="schema-col-name">{{ col.name }}</span>
<span class="schema-col-type">{{ col.type | upper }}</span>
</div>
{% endfor %}
</div>
</div>
{% endfor %}
{% else %}
<p style="padding:1rem;font-size:0.75rem;color:#94A3B8">
Analytics DB not available.
</p>
{% endif %}
</aside>
</div>
</form>
<!-- Results area (populated by HTMX after query execution) -->
<div id="query-results"></div>
<script>
// Tab key in textarea inserts spaces instead of losing focus
document.getElementById('query-sql').addEventListener('keydown', function(e) {
if (e.key === 'Tab') {
e.preventDefault();
var start = this.selectionStart;
var end = this.selectionEnd;
this.value = this.value.substring(0, start) + ' ' + this.value.substring(end);
this.selectionStart = this.selectionEnd = start + 2;
}
// Cmd/Ctrl+Enter submits
if ((e.metaKey || e.ctrlKey) && e.key === 'Enter') {
e.preventDefault();
htmx.trigger(document.getElementById('query-form'), 'submit');
}
});
function toggleSchema(name) {
var cols = document.getElementById('schema-cols-' + name);
var chevron = document.getElementById('schema-chevron-' + name);
var isOpen = cols.classList.contains('open');
cols.classList.toggle('open', !isOpen);
chevron.style.transform = isOpen ? '' : 'rotate(90deg)';
}
</script>

View File

@@ -0,0 +1,46 @@
<!-- Pipeline Query Results Partial: swapped into #query-results after POST -->
{% if error %}
<div class="query-error">
<strong style="display:block;margin-bottom:4px">Query error</strong>
{{ error }}
</div>
{% elif columns %}
<div class="results-meta">
<span>{{ row_count | default(0) }} row{{ 's' if row_count != 1 }}</span>
<span style="color:#CBD5E1">·</span>
<span>{{ "%.1f" | format(elapsed_ms) }} ms</span>
{% if truncated %}
<span class="results-truncated">Result truncated at {{ row_count }} rows</span>
{% endif %}
</div>
<div class="results-table-wrap">
<table class="table">
<thead>
<tr>
{% for col in columns %}
<th>{{ col }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in rows %}
<tr>
{% for val in row %}
<td class="mono" title="{{ val if val is not none else 'null' }}">
{% if val is none %}
<span style="color:#CBD5E1">null</span>
{% else %}
{{ val | string | truncate(60, true) }}
{% endif %}
</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div style="padding:1.5rem;text-align:center">
<p class="text-sm text-slate">Query returned no rows.</p>
</div>
{% endif %}

View File

@@ -0,0 +1,59 @@
<!-- Pipeline Table Detail: column schema + 10-row sample data -->
<!-- This partial replaces #catalog-content-<table_name> via HTMX -->
<div id="catalog-content-{{ table_name }}" style="padding:1.25rem">
<!-- Column schema -->
<div class="mb-4">
<p class="text-xs font-semibold text-slate uppercase mb-2" style="letter-spacing:0.05em">Schema</p>
<div style="display:grid;grid-template-columns:repeat(auto-fill,minmax(200px,1fr));gap:6px">
{% for col in columns %}
<div style="background:#F8FAFC;border:1px solid #E2E8F0;border-radius:6px;padding:6px 10px">
<span class="text-xs font-semibold text-navy mono">{{ col.column_name }}</span>
<span class="text-xs text-slate ml-2">{{ col.data_type | upper }}</span>
</div>
{% endfor %}
</div>
</div>
<!-- Sample rows -->
{% if sample %}
<div>
<p class="text-xs font-semibold text-slate uppercase mb-2" style="letter-spacing:0.05em">
Sample (first {{ sample | length }} rows)
</p>
<div style="overflow-x:auto;max-height:320px;overflow-y:auto">
<table class="table" style="font-size:0.75rem">
<thead style="position:sticky;top:0;background:#fff;z-index:1">
<tr>
{% for col in columns %}
<th style="white-space:nowrap">{{ col.column_name }}</th>
{% endfor %}
</tr>
</thead>
<tbody>
{% for row in sample %}
<tr>
{% for col in columns %}
<td class="mono" style="max-width:180px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap"
title="{{ row[col.column_name] | string }}">
{% set val = row[col.column_name] %}
{% if val is none %}
<span class="text-slate">null</span>
{% elif val | string | length > 40 %}
{{ val | string | truncate(40, true) }}
{% else %}
{{ val }}
{% endif %}
</td>
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
{% else %}
<p class="text-sm text-slate">Table is empty.</p>
{% endif %}
</div>

View File

@@ -0,0 +1,116 @@
{% extends "admin/base_admin.html" %}
{% set admin_page = "pipeline" %}
{% block title %}Data Pipeline - Admin - {{ config.APP_NAME }}{% endblock %}
{% block admin_head %}
<style>
.pipeline-tabs {
display: flex; gap: 0; border-bottom: 2px solid #E2E8F0; margin-bottom: 1.5rem;
}
.pipeline-tabs button {
padding: 0.625rem 1.25rem; font-size: 0.8125rem; font-weight: 600;
color: #64748B; background: none; border: none; cursor: pointer;
border-bottom: 2px solid transparent; margin-bottom: -2px; transition: all 0.15s;
}
.pipeline-tabs button:hover { color: #1D4ED8; }
.pipeline-tabs button.active { color: #1D4ED8; border-bottom-color: #1D4ED8; }
/* Status dot */
.status-dot {
display: inline-block; width: 8px; height: 8px; border-radius: 50%; flex-shrink: 0;
}
.status-dot.ok { background: #16A34A; }
.status-dot.failed { background: #EF4444; }
.status-dot.stale { background: #D97706; }
.status-dot.running { background: #3B82F6; }
.status-dot.pending { background: #CBD5E1; }
</style>
{% endblock %}
{% block admin_content %}
<header class="flex justify-between items-center mb-6">
<div>
<h1 class="text-2xl">Data Pipeline</h1>
<p class="text-sm text-slate mt-1">Extraction status, data catalog, and ad-hoc query editor</p>
</div>
<div class="flex gap-2">
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="button" class="btn btn-sm"
onclick="confirmAction('Enqueue a full extraction run? This will run all extractors in the background.', this.closest('form'))">
Run Pipeline
</button>
</form>
<a href="{{ url_for('admin.tasks') }}" class="btn-outline btn-sm">Task Queue</a>
</div>
</header>
<!-- Health stat cards -->
<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:0.75rem" class="mb-6">
<div class="card text-center" style="padding:0.875rem">
<p class="text-xs text-slate">Total Runs</p>
<p class="text-2xl font-bold text-navy metric">{{ summary.total | default(0) }}</p>
</div>
<div class="card text-center" style="padding:0.875rem">
<p class="text-xs text-slate">Success Rate</p>
<p class="text-2xl font-bold {% if success_rate >= 90 %}text-accent{% elif success_rate >= 70 %}text-warning{% else %}text-danger{% endif %} metric">
{{ success_rate }}%
</p>
</div>
<div class="card text-center" style="padding:0.875rem">
<p class="text-xs text-slate">Serving Tables</p>
<p class="text-2xl font-bold text-navy metric">{{ total_serving_tables }}</p>
</div>
<div class="card text-center" style="padding:0.875rem">
<p class="text-xs text-slate">Last Export</p>
<p class="text-sm font-semibold text-navy mono mt-1">{{ last_export }}</p>
</div>
</div>
{% if summary.stale > 0 %}
<div class="flash-warning mb-4">
<strong>{{ summary.stale }} stale run{{ 's' if summary.stale != 1 }}.</strong>
An extraction appears to have crashed without updating its status.
Go to the Extractions tab to mark it failed.
</div>
{% endif %}
<!-- Tabs -->
<div class="pipeline-tabs" id="pipeline-tabs">
<button class="active" data-tab="overview"
hx-get="{{ url_for('pipeline.pipeline_overview') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Overview</button>
<button data-tab="extractions"
hx-get="{{ url_for('pipeline.pipeline_extractions') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Extractions</button>
<button data-tab="catalog"
hx-get="{{ url_for('pipeline.pipeline_catalog') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Catalog</button>
<button data-tab="query"
hx-get="{{ url_for('pipeline.pipeline_query_editor') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Query</button>
</div>
<!-- Tab content (Overview loads on page load) -->
<div id="pipeline-tab-content"
hx-get="{{ url_for('pipeline.pipeline_overview') }}"
hx-trigger="load"
hx-swap="innerHTML">
<div class="card text-center" style="padding:2rem">
<p class="text-slate">Loading overview…</p>
</div>
</div>
<script>
document.getElementById('pipeline-tabs').addEventListener('click', function(e) {
if (e.target.tagName === 'BUTTON') {
this.querySelectorAll('button').forEach(b => b.classList.remove('active'));
e.target.classList.add('active');
}
});
</script>
{% endblock %}

View File

@@ -5,13 +5,16 @@ Opens a single long-lived DuckDB connection at startup (read_only=True).
All queries run via asyncio.to_thread() to avoid blocking the event loop. All queries run via asyncio.to_thread() to avoid blocking the event loop.
Usage: Usage:
from .analytics import fetch_analytics from .analytics import fetch_analytics, execute_user_query
rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"]) rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"])
cols, rows, error, elapsed_ms = await execute_user_query("SELECT city_slug FROM serving.city_market_profile LIMIT 5")
""" """
import asyncio import asyncio
import logging import logging
import os import os
import time
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -71,9 +74,54 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str
asyncio.to_thread(_run), asyncio.to_thread(_run),
timeout=_QUERY_TIMEOUT_SECONDS, timeout=_QUERY_TIMEOUT_SECONDS,
) )
except asyncio.TimeoutError: except TimeoutError:
logger.error("DuckDB analytics query timed out after %ds: %.200s", _QUERY_TIMEOUT_SECONDS, sql) logger.error("DuckDB analytics query timed out after %ds: %.200s", _QUERY_TIMEOUT_SECONDS, sql)
return [] return []
except Exception: except Exception:
logger.exception("DuckDB analytics query failed: %.200s", sql) logger.exception("DuckDB analytics query failed: %.200s", sql)
return [] return []
async def execute_user_query(
sql: str,
max_rows: int = 1000,
timeout_seconds: int = 10,
) -> tuple[list[str], list[tuple], str | None, float]:
"""
Run an admin-submitted SQL query.
Returns (columns, rows, error, elapsed_ms).
- columns: list of column name strings (empty on error)
- rows: list of value tuples, capped at max_rows
- error: error message string, or None on success
- elapsed_ms: wall-clock query time in milliseconds
"""
assert sql, "sql must not be empty"
assert 1 <= max_rows <= 10_000, f"max_rows must be 110000, got {max_rows}"
assert 1 <= timeout_seconds <= 60, f"timeout_seconds must be 160, got {timeout_seconds}"
if _conn is None:
return [], [], "Analytics database is not available.", 0.0
def _run() -> tuple[list[str], list[tuple], str | None, float]:
t0 = time.monotonic()
cur = _conn.cursor()
try:
rel = cur.execute(sql)
cols = [d[0] for d in rel.description]
rows = rel.fetchmany(max_rows)
elapsed_ms = round((time.monotonic() - t0) * 1000, 1)
return cols, rows, None, elapsed_ms
except Exception as exc:
elapsed_ms = round((time.monotonic() - t0) * 1000, 1)
return [], [], str(exc), elapsed_ms
finally:
cur.close()
try:
return await asyncio.wait_for(
asyncio.to_thread(_run),
timeout=timeout_seconds,
)
except TimeoutError:
return [], [], f"Query timed out after {timeout_seconds}s.", 0.0

View File

@@ -313,6 +313,7 @@ def create_app() -> Quart:
# Blueprint registration # Blueprint registration
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
from .admin.pipeline_routes import bp as pipeline_bp
from .admin.pseo_routes import bp as pseo_bp from .admin.pseo_routes import bp as pseo_bp
from .admin.routes import bp as admin_bp from .admin.routes import bp as admin_bp
from .auth.routes import bp as auth_bp from .auth.routes import bp as auth_bp
@@ -339,6 +340,7 @@ def create_app() -> Quart:
app.register_blueprint(billing_bp) app.register_blueprint(billing_bp)
app.register_blueprint(admin_bp) app.register_blueprint(admin_bp)
app.register_blueprint(pseo_bp) app.register_blueprint(pseo_bp)
app.register_blueprint(pipeline_bp)
app.register_blueprint(webhooks_bp) app.register_blueprint(webhooks_bp)
# Content catch-all LAST — lives under /<lang> too # Content catch-all LAST — lives under /<lang> too

View File

@@ -698,6 +698,32 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None:
logger.info("Cleaned up %s old SEO metric rows", deleted) logger.info("Cleaned up %s old SEO metric rows", deleted)
@task("run_extraction")
async def handle_run_extraction(payload: dict) -> None:
"""Run the full extraction pipeline (all extractors) in the background.
Shells out to `uv run extract` in the repo root. The extraction CLI
manages its own state in .state.sqlite and writes to the landing zone.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "--package", "padelnomics_extract", "extract"],
capture_output=True,
text=True,
timeout=7200, # 2-hour absolute timeout
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Extraction failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("generate_articles") @task("generate_articles")
async def handle_generate_articles(payload: dict) -> None: async def handle_generate_articles(payload: dict) -> None:
"""Generate articles from a template in the background.""" """Generate articles from a template in the background."""

578
web/tests/test_pipeline.py Normal file
View File

@@ -0,0 +1,578 @@
"""
Tests for the Pipeline Console admin blueprint.
Covers:
- admin/pipeline_routes.py: all 9 routes
- analytics.py: execute_user_query() function
- Data access functions: state DB, serving meta, landing zone
"""
import json
import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import padelnomics.admin.pipeline_routes as pipeline_mod
import pytest
from padelnomics.core import utcnow_iso
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
async def admin_client(app, db):
"""Authenticated admin test client."""
now = utcnow_iso()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("pipeline-admin@test.com", "Pipeline Admin", now),
) as cursor:
admin_id = cursor.lastrowid
await db.execute(
"INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,)
)
await db.commit()
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = admin_id
yield c
@pytest.fixture
def state_db_dir():
"""Temp directory with a seeded .state.sqlite for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / ".state.sqlite"
conn = sqlite3.connect(str(db_path))
conn.execute(
"""
CREATE TABLE extraction_runs (
run_id INTEGER PRIMARY KEY AUTOINCREMENT,
extractor TEXT NOT NULL,
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
files_written INTEGER DEFAULT 0,
files_skipped INTEGER DEFAULT 0,
bytes_written INTEGER DEFAULT 0,
cursor_value TEXT,
error_message TEXT
)
"""
)
conn.executemany(
"""INSERT INTO extraction_runs
(extractor, started_at, finished_at, status, files_written, bytes_written, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
[
("overpass", "2026-02-01T08:00:00Z", "2026-02-01T08:05:00Z", "success", 1, 400000, None),
("playtomic_tenants", "2026-02-24T06:00:00Z", "2026-02-24T06:10:00Z", "success", 1, 7700000, None),
("playtomic_availability", "2026-02-25T06:00:00Z", "2026-02-25T07:30:00Z", "failed", 0, 0, "ReadTimeout: connection timed out"),
# Stale running row (started 1970)
("eurostat", "1970-01-01T00:00:00Z", None, "running", 0, 0, None),
],
)
conn.commit()
conn.close()
yield tmpdir
@pytest.fixture
def serving_meta_dir():
"""Temp directory with a _serving_meta.json file."""
with tempfile.TemporaryDirectory() as tmpdir:
meta = {
"exported_at_utc": "2026-02-25T08:30:00+00:00",
"tables": {
"city_market_profile": {"row_count": 612},
"planner_defaults": {"row_count": 612},
"pseo_city_costs_de": {"row_count": 487},
},
}
(Path(tmpdir) / "_serving_meta.json").write_text(json.dumps(meta))
# Fake duckdb file so the path exists
(Path(tmpdir) / "analytics.duckdb").touch()
yield tmpdir
# ── Schema + query mocks ──────────────────────────────────────────────────────
_MOCK_SCHEMA_ROWS = [
{"table_name": "city_market_profile", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1},
{"table_name": "city_market_profile", "column_name": "country_code", "data_type": "VARCHAR", "ordinal_position": 2},
{"table_name": "city_market_profile", "column_name": "marktreife_score", "data_type": "DOUBLE", "ordinal_position": 3},
{"table_name": "planner_defaults", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1},
]
_MOCK_TABLE_EXISTS = [{"1": 1}]
_MOCK_SAMPLE_ROWS = [
{"city_slug": "berlin", "country_code": "DE", "marktreife_score": 82.5},
{"city_slug": "munich", "country_code": "DE", "marktreife_score": 77.0},
]
def _make_fetch_analytics_mock(schema=True):
"""Return an async mock for fetch_analytics that returns schema or table data."""
async def _mock(sql, params=None):
if "information_schema.tables" in sql:
return _MOCK_TABLE_EXISTS
if "information_schema.columns" in sql and params:
return [r for r in _MOCK_SCHEMA_ROWS if r["table_name"] == params[0]]
if "information_schema.columns" in sql:
return _MOCK_SCHEMA_ROWS
if "city_market_profile" in sql:
return _MOCK_SAMPLE_ROWS
return []
return _mock
# ════════════════════════════════════════════════════════════════════════════
# Dashboard
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_dashboard_loads(admin_client, state_db_dir, serving_meta_dir):
"""Dashboard returns 200 with stat cards."""
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "Data Pipeline" in data
assert "Total Runs" in data
assert "Success Rate" in data
assert "Serving Tables" in data
@pytest.mark.asyncio
async def test_pipeline_dashboard_requires_admin(client):
"""Unauthenticated access redirects to login."""
resp = await client.get("/admin/pipeline/")
assert resp.status_code in (302, 401)
@pytest.mark.asyncio
async def test_pipeline_dashboard_stale_warning(admin_client, state_db_dir, serving_meta_dir):
"""Stale run banner appears when a running row is old."""
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "stale run" in data.lower()
# ════════════════════════════════════════════════════════════════════════════
# Overview tab
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_overview(admin_client, state_db_dir, serving_meta_dir):
"""Overview tab returns extraction status grid and serving table counts."""
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/overview")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "city_market_profile" in data
assert "612" in data # row count from serving meta
@pytest.mark.asyncio
async def test_pipeline_overview_no_state_db(admin_client, serving_meta_dir):
"""Overview handles gracefully when .state.sqlite doesn't exist."""
with tempfile.TemporaryDirectory() as empty_dir:
with (
patch.object(pipeline_mod, "_LANDING_DIR", empty_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/overview")
assert resp.status_code == 200
# ════════════════════════════════════════════════════════════════════════════
# Extractions tab
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_extractions_list(admin_client, state_db_dir):
"""Extractions tab returns run history table."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.get("/admin/pipeline/extractions")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "overpass" in data
assert "playtomic_tenants" in data
assert "success" in data
@pytest.mark.asyncio
async def test_pipeline_extractions_filter_extractor(admin_client, state_db_dir):
"""Extractor filter returns only 1 matching run (not 4)."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.get("/admin/pipeline/extractions?extractor=overpass")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "overpass" in data
# Filtered result should show "Showing 1 of 1"
assert "Showing 1 of 1" in data
@pytest.mark.asyncio
async def test_pipeline_extractions_filter_status(admin_client, state_db_dir):
"""Status filter returns only runs with matching status."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.get("/admin/pipeline/extractions?status=failed")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "failed" in data
assert "ReadTimeout" in data # error message shown
@pytest.mark.asyncio
async def test_pipeline_mark_stale(admin_client, state_db_dir):
"""POST to mark-stale updates a running row to failed."""
# Find the run_id of the stale running row (eurostat, started 1970)
db_path = Path(state_db_dir) / ".state.sqlite"
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT run_id FROM extraction_runs WHERE status = 'running' ORDER BY run_id LIMIT 1"
).fetchone()
conn.close()
assert row is not None
run_id = row[0]
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.post(
f"/admin/pipeline/extractions/{run_id}/mark-stale",
form={"csrf_token": "test"},
)
# Should redirect (flash + redirect pattern)
assert resp.status_code in (302, 200)
# Verify DB was updated
conn = sqlite3.connect(str(db_path))
updated = conn.execute(
"SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,)
).fetchone()
conn.close()
assert updated[0] == "failed"
@pytest.mark.asyncio
async def test_pipeline_mark_stale_already_finished(admin_client, state_db_dir):
"""Cannot mark an already-finished (success) row as stale."""
db_path = Path(state_db_dir) / ".state.sqlite"
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT run_id FROM extraction_runs WHERE status = 'success' ORDER BY run_id LIMIT 1"
).fetchone()
conn.close()
run_id = row[0]
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.post(
f"/admin/pipeline/extractions/{run_id}/mark-stale",
form={"csrf_token": "test"},
)
assert resp.status_code in (302, 200)
# Verify status unchanged
conn = sqlite3.connect(str(db_path))
status = conn.execute(
"SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,)
).fetchone()[0]
conn.close()
assert status == "success"
@pytest.mark.asyncio
async def test_pipeline_trigger_extract(admin_client, state_db_dir):
"""POST to trigger enqueues a run_extraction task and redirects."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
# enqueue is imported inside the route handler, so patch at the source module
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch("padelnomics.worker.enqueue", new_callable=AsyncMock) as mock_enqueue,
):
resp = await admin_client.post(
"/admin/pipeline/extract/trigger",
form={"csrf_token": "test"},
)
assert resp.status_code in (302, 200)
mock_enqueue.assert_called_once_with("run_extraction")
# ════════════════════════════════════════════════════════════════════════════
# Catalog tab
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_catalog(admin_client, serving_meta_dir):
"""Catalog tab lists serving tables with row counts."""
with (
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()),
):
resp = await admin_client.get("/admin/pipeline/catalog")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "city_market_profile" in data
assert "612" in data # row count from serving meta
@pytest.mark.asyncio
async def test_pipeline_table_detail(admin_client):
"""Table detail returns columns and sample rows."""
with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()):
resp = await admin_client.get("/admin/pipeline/catalog/city_market_profile")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "city_slug" in data
assert "berlin" in data # from sample rows
@pytest.mark.asyncio
async def test_pipeline_table_detail_invalid_name(admin_client):
"""Table name with uppercase characters (invalid) returns 400."""
with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()):
resp = await admin_client.get("/admin/pipeline/catalog/InvalidTableName")
assert resp.status_code in (400, 404)
@pytest.mark.asyncio
async def test_pipeline_table_detail_unknown_table(admin_client):
"""Non-existent table returns 404."""
async def _empty_fetch(sql, params=None):
return []
with patch("padelnomics.analytics.fetch_analytics", side_effect=_empty_fetch):
resp = await admin_client.get("/admin/pipeline/catalog/nonexistent_table")
assert resp.status_code == 404
# ════════════════════════════════════════════════════════════════════════════
# Query editor
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_query_editor_loads(admin_client):
"""Query editor tab returns textarea and schema sidebar."""
with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()):
resp = await admin_client.get("/admin/pipeline/query")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "query-editor" in data
assert "schema-panel" in data
assert "city_market_profile" in data
@pytest.mark.asyncio
async def test_pipeline_query_execute_valid(admin_client):
"""Valid SELECT query returns results table."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
mock_result = (
["city_slug", "country_code"],
[("berlin", "DE"), ("munich", "DE")],
None,
12.5,
)
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result):
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "SELECT city_slug, country_code FROM serving.city_market_profile"},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "berlin" in data
assert "city_slug" in data
@pytest.mark.asyncio
async def test_pipeline_query_execute_blocked_keyword(admin_client):
"""Queries with blocked keywords return an error (no DB call made)."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q:
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "DROP TABLE serving.city_market_profile"},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "blocked" in data.lower() or "error" in data.lower()
mock_q.assert_not_called()
@pytest.mark.asyncio
async def test_pipeline_query_execute_empty(admin_client):
"""Empty SQL returns validation error."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q:
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": ""},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "empty" in data.lower() or "error" in data.lower()
mock_q.assert_not_called()
@pytest.mark.asyncio
async def test_pipeline_query_execute_too_long(admin_client):
"""SQL over 10,000 chars returns a length error."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q:
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "SELECT " + "x" * 10_001},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "long" in data.lower() or "error" in data.lower()
mock_q.assert_not_called()
@pytest.mark.asyncio
async def test_pipeline_query_execute_db_error(admin_client):
"""DB error from execute_user_query is displayed as error message."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
mock_result = ([], [], "Table 'foo' not found", 5.0)
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result):
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "SELECT * FROM serving.foo"},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "not found" in data
# ════════════════════════════════════════════════════════════════════════════
# analytics.execute_user_query()
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_execute_user_query_no_connection():
"""Returns error tuple when _conn is None."""
import padelnomics.analytics as analytics_mod
with patch.object(analytics_mod, "_conn", None):
cols, rows, error, elapsed = await analytics_mod.execute_user_query("SELECT 1")
assert cols == []
assert rows == []
assert error is not None
assert "not available" in error.lower()
@pytest.mark.asyncio
async def test_execute_user_query_timeout():
"""Returns timeout error when query takes too long."""
import asyncio
import padelnomics.analytics as analytics_mod
def _slow():
import time
time.sleep(10)
return [], [], None, 0.0
mock_conn = MagicMock()
async def _slow_thread(_fn):
await asyncio.sleep(10)
with (
patch.object(analytics_mod, "_conn", mock_conn),
patch("padelnomics.analytics.asyncio.to_thread", side_effect=_slow_thread),
):
cols, rows, error, elapsed = await analytics_mod.execute_user_query(
"SELECT 1", timeout_seconds=1
)
assert error is not None
assert "timed out" in error.lower()
# ════════════════════════════════════════════════════════════════════════════
# Unit tests: data access helpers
# ════════════════════════════════════════════════════════════════════════════
def test_fetch_extraction_summary_missing_db():
"""Returns zero-filled dict when state DB doesn't exist."""
with tempfile.TemporaryDirectory() as empty_dir:
with patch.object(pipeline_mod, "_LANDING_DIR", empty_dir):
result = pipeline_mod._fetch_extraction_summary_sync()
assert result["total"] == 0
assert result["stale"] == 0
def test_fetch_extraction_summary_counts(state_db_dir):
"""Returns correct total/success/failed/running/stale counts."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
result = pipeline_mod._fetch_extraction_summary_sync()
assert result["total"] == 4
assert result["success"] == 2
assert result["failed"] == 1
assert result["running"] == 1
assert result["stale"] == 1 # eurostat started in 1970
def test_load_serving_meta(serving_meta_dir):
"""Parses _serving_meta.json correctly."""
with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")):
meta = pipeline_mod._load_serving_meta()
assert meta is not None
assert "city_market_profile" in meta["tables"]
assert meta["tables"]["city_market_profile"]["row_count"] == 612
def test_load_serving_meta_missing():
"""Returns None when _serving_meta.json doesn't exist."""
with tempfile.TemporaryDirectory() as empty_dir:
with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(empty_dir) / "analytics.duckdb")):
meta = pipeline_mod._load_serving_meta()
assert meta is None
def test_format_bytes():
assert pipeline_mod._format_bytes(0) == "0 B"
assert pipeline_mod._format_bytes(512) == "512 B"
assert pipeline_mod._format_bytes(1536) == "1.5 KB"
assert pipeline_mod._format_bytes(1_572_864) == "1.5 MB"
def test_duration_str():
assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:00:45Z") == "45s"
assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:02:30Z") == "2m 30s"
assert pipeline_mod._duration_str(None, "2026-02-01T08:00:00Z") == ""