diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py index 258b9f0..3bc926f 100644 --- a/web/src/padelnomics/admin/pipeline_routes.py +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -6,7 +6,9 @@ Operational visibility for the data extraction and transformation pipeline: /admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats /admin/pipeline/extractions → HTMX tab: filterable extraction run history /admin/pipeline/extractions//mark-stale → POST: mark stuck "running" row as failed - /admin/pipeline/extract/trigger → POST: enqueue full extraction run + /admin/pipeline/extract/trigger → POST: enqueue extraction run (HTMX-aware) + /admin/pipeline/transform → HTMX tab: SQLMesh + export status, run history + /admin/pipeline/transform/trigger → POST: enqueue transform/export/pipeline step /admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data) /admin/pipeline/catalog/ → HTMX partial: table detail (columns + sample) /admin/pipeline/query → HTMX tab: SQL query editor @@ -18,6 +20,7 @@ Data sources: - analytics.duckdb (DuckDB read-only via analytics.execute_user_query) - LANDING_DIR/ (filesystem scan for file sizes + dates) - infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib) + - app.db tasks table (run_transform, run_export, run_pipeline task rows) """ import asyncio import json @@ -626,10 +629,8 @@ async def pipeline_dashboard(): # ── Overview tab ───────────────────────────────────────────────────────────── -@bp.route("/overview") -@role_required("admin") -async def pipeline_overview(): - """HTMX tab: extraction status per source, serving freshness, landing zone.""" +async def _render_overview_partial(): + """Build and render the pipeline overview partial (shared by GET and POST triggers).""" latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather( asyncio.to_thread(_fetch_latest_per_extractor_sync), asyncio.to_thread(_get_landing_zone_stats_sync), @@ -650,6 +651,13 @@ async def pipeline_overview(): "stale": _is_stale(run) if run else False, }) + # Treat pending extraction tasks as "running" (queued or active). + from ..core import fetch_all as _fetch_all # noqa: PLC0415 + pending_extraction = await _fetch_all( + "SELECT id FROM tasks WHERE task_name = 'run_extraction' AND status = 'pending' LIMIT 1" + ) + any_running = bool(pending_extraction) + # Compute landing zone totals total_landing_bytes = sum(s["total_bytes"] for s in landing_stats) @@ -677,10 +685,18 @@ async def pipeline_overview(): total_landing_bytes=total_landing_bytes, serving_tables=serving_tables, last_export=last_export, + any_running=any_running, format_bytes=_format_bytes, ) +@bp.route("/overview") +@role_required("admin") +async def pipeline_overview(): + """HTMX tab: extraction status per source, serving freshness, landing zone.""" + return await _render_overview_partial() + + # ── Extractions tab ──────────────────────────────────────────────────────────── @@ -745,7 +761,11 @@ async def pipeline_mark_stale(run_id: int): @role_required("admin") @csrf_protect async def pipeline_trigger_extract(): - """Enqueue an extraction run — all extractors, or a single named one.""" + """Enqueue an extraction run — all extractors, or a single named one. + + HTMX-aware: if the HX-Request header is present, returns the overview partial + directly so the UI can update in-place without a redirect. + """ from ..worker import enqueue form = await request.form @@ -757,11 +777,15 @@ async def pipeline_trigger_extract(): await flash(f"Unknown extractor '{extractor}'.", "warning") return redirect(url_for("pipeline.pipeline_dashboard")) await enqueue("run_extraction", {"extractor": extractor}) - await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success") else: await enqueue("run_extraction") - await flash("Extraction run queued. Check the task queue for progress.", "success") + is_htmx = request.headers.get("HX-Request") == "true" + if is_htmx: + return await _render_overview_partial() + + msg = f"Extractor '{extractor}' queued." if extractor else "Extraction run queued." + await flash(f"{msg} Check the task queue for progress.", "success") return redirect(url_for("pipeline.pipeline_dashboard")) @@ -847,6 +871,156 @@ async def pipeline_lineage_schema(model: str): ) +# ── Transform tab ───────────────────────────────────────────────────────────── + +_TRANSFORM_TASK_NAMES = ("run_transform", "run_export", "run_pipeline") + + +async def _fetch_pipeline_tasks() -> dict: + """Fetch the latest task row for each transform task type, plus recent run history. + + Returns: + { + "latest": {"run_transform": row|None, "run_export": row|None, "run_pipeline": row|None}, + "history": [row, ...], # last 20 rows across all three task types, newest first + } + """ + from ..core import fetch_all as _fetch_all # noqa: PLC0415 + + # Latest row per task type (may be pending, complete, or failed) + latest_rows = await _fetch_all( + """ + SELECT t.* + FROM tasks t + INNER JOIN ( + SELECT task_name, MAX(id) AS max_id + FROM tasks + WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline') + GROUP BY task_name + ) latest ON t.id = latest.max_id + """ + ) + latest: dict = {"run_transform": None, "run_export": None, "run_pipeline": None} + for row in latest_rows: + latest[row["task_name"]] = dict(row) + + history = await _fetch_all( + """ + SELECT id, task_name, status, created_at, completed_at, error + FROM tasks + WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline') + ORDER BY id DESC + LIMIT 20 + """ + ) + return {"latest": latest, "history": [dict(r) for r in history]} + + +def _format_duration(created_at: str | None, completed_at: str | None) -> str: + """Human-readable duration between created_at and completed_at, or '' if unavailable.""" + if not created_at or not completed_at: + return "" + try: + fmt = "%Y-%m-%d %H:%M:%S" + start = datetime.strptime(created_at, fmt) + end = datetime.strptime(completed_at, fmt) + delta = int((end - start).total_seconds()) + if delta < 0: + return "" + if delta < 60: + return f"{delta}s" + return f"{delta // 60}m {delta % 60}s" + except ValueError: + return "" + + +async def _render_transform_partial(): + """Build and render the transform tab partial.""" + task_data = await _fetch_pipeline_tasks() + latest = task_data["latest"] + history = task_data["history"] + + # Enrich history rows with duration + for row in history: + row["duration"] = _format_duration(row.get("created_at"), row.get("completed_at")) + # Truncate error for display + if row.get("error"): + row["error_short"] = row["error"][:120] + else: + row["error_short"] = None + + any_running = any( + t is not None and t["status"] == "pending" for t in latest.values() + ) + + serving_meta = await asyncio.to_thread(_load_serving_meta) + + return await render_template( + "admin/partials/pipeline_transform.html", + latest=latest, + history=history, + any_running=any_running, + serving_meta=serving_meta, + format_duration=_format_duration, + ) + + +@bp.route("/transform") +@role_required("admin") +async def pipeline_transform(): + """HTMX tab: SQLMesh transform + export status, run history.""" + return await _render_transform_partial() + + +@bp.route("/transform/trigger", methods=["POST"]) +@role_required("admin") +@csrf_protect +async def pipeline_trigger_transform(): + """Enqueue a transform, export, or full pipeline task. + + form field `step`: 'transform' | 'export' | 'pipeline' + Concurrency guard: rejects if the same task type is already pending. + HTMX-aware: returns the transform partial for HTMX requests. + """ + from ..core import fetch_one as _fetch_one # noqa: PLC0415 + from ..worker import enqueue + + form = await request.form + step = (form.get("step") or "").strip() + + step_to_task = { + "transform": "run_transform", + "export": "run_export", + "pipeline": "run_pipeline", + } + if step not in step_to_task: + await flash(f"Unknown step '{step}'.", "warning") + return redirect(url_for("pipeline.pipeline_dashboard")) + + task_name = step_to_task[step] + + # Concurrency guard: reject if same task type is already pending + existing = await _fetch_one( + "SELECT id FROM tasks WHERE task_name = ? AND status = 'pending' LIMIT 1", + (task_name,), + ) + if existing: + is_htmx = request.headers.get("HX-Request") == "true" + if is_htmx: + return await _render_transform_partial() + await flash(f"A '{step}' task is already queued (task #{existing['id']}).", "warning") + return redirect(url_for("pipeline.pipeline_dashboard")) + + await enqueue(task_name) + + is_htmx = request.headers.get("HX-Request") == "true" + if is_htmx: + return await _render_transform_partial() + + await flash(f"'{step}' task queued. Check the task queue for progress.", "success") + return redirect(url_for("pipeline.pipeline_dashboard")) + + # ── Catalog tab ─────────────────────────────────────────────────────────────── diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html index c1668c4..43209c8 100644 --- a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html @@ -1,4 +1,11 @@ - + + +
@@ -26,12 +33,14 @@ {% if stale %} stale {% endif %} -
- - - - +

{{ wf.schedule_label }}

{% if run %} @@ -131,3 +140,5 @@
+ +{# end #pipeline-overview-content #} diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_transform.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_transform.html new file mode 100644 index 0000000..5f16034 --- /dev/null +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_transform.html @@ -0,0 +1,197 @@ + + +
+ + +
+ + + {% set tx = latest['run_transform'] %} +
+

SQLMesh Transform

+
+ {% if tx is none %} + + Never run + {% elif tx.status == 'pending' %} + + Running… + {% elif tx.status == 'complete' %} + + Complete + {% else %} + + Failed + {% endif %} +
+ {% if tx %} +

+ Started: {{ (tx.created_at or '')[:19] or '—' }} +

+ {% if tx.completed_at %} +

+ Finished: {{ tx.completed_at[:19] }} +

+ {% endif %} + {% if tx.status == 'failed' and tx.error %} +
+ Error +
{{ tx.error[:400] }}
+
+ {% endif %} + {% endif %} +
+ +
+
+ + + {% set ex = latest['run_export'] %} +
+

Export Serving

+
+ {% if ex is none %} + + Never run + {% elif ex.status == 'pending' %} + + Running… + {% elif ex.status == 'complete' %} + + Complete + {% else %} + + Failed + {% endif %} +
+ {% if ex %} +

+ Started: {{ (ex.created_at or '')[:19] or '—' }} +

+ {% if ex.completed_at %} +

+ Finished: {{ ex.completed_at[:19] }} +

+ {% endif %} + {% if serving_meta %} +

+ Last export: {{ (serving_meta.exported_at_utc or '')[:19].replace('T', ' ') or '—' }} +

+ {% endif %} + {% if ex.status == 'failed' and ex.error %} +
+ Error +
{{ ex.error[:400] }}
+
+ {% endif %} + {% endif %} +
+ +
+
+ +
+ + +{% set pl = latest['run_pipeline'] %} +
+
+
+

Full Pipeline

+

Runs extract → transform → export sequentially

+ {% if pl %} +

+ Last: {{ (pl.created_at or '')[:19] or '—' }} + {% if pl.status == 'complete' %}Complete{% endif %} + {% if pl.status == 'pending' %}Running…{% endif %} + {% if pl.status == 'failed' %}Failed{% endif %} +

+ {% endif %} +
+ +
+
+ + +
+

Recent Runs

+ {% if history %} +
+
+ + + + + + + + + + + + {% for row in history %} + + + + + + + + + {% endfor %} + +
#StepStartedDurationStatusError
{{ row.id }}{{ row.task_name | replace('run_', '') }}{{ (row.created_at or '')[:19] or '—' }}{{ row.duration or '—' }} + {% if row.status == 'complete' %} + Complete + {% elif row.status == 'failed' %} + Failed + {% else %} + Running… + {% endif %} + + {% if row.error_short %} +
+ Error +
{{ row.error_short }}
+
+ {% else %}—{% endif %} +
+ + {% else %} +

No transform runs yet.

+ {% endif %} + + +{# end #pipeline-transform-content #} diff --git a/web/src/padelnomics/admin/templates/admin/pipeline.html b/web/src/padelnomics/admin/templates/admin/pipeline.html index 19df2c0..7f8d216 100644 --- a/web/src/padelnomics/admin/templates/admin/pipeline.html +++ b/web/src/padelnomics/admin/templates/admin/pipeline.html @@ -33,6 +33,9 @@ .status-dot.failed { background: #EF4444; } .status-dot.stale { background: #D97706; } .status-dot.running { background: #3B82F6; } + + @keyframes pulse-dot { 0%,100%{opacity:1} 50%{opacity:0.4} } + .status-dot.running { animation: pulse-dot 1.5s ease-in-out infinite; } .status-dot.pending { background: #CBD5E1; } .pipeline-two-col { @@ -53,10 +56,11 @@

Extraction status, data catalog, and ad-hoc query editor

-
+ +
@@ -116,6 +120,10 @@ hx-get="{{ url_for('pipeline.pipeline_lineage') }}" hx-target="#pipeline-tab-content" hx-swap="innerHTML" hx-trigger="click">Lineage +
diff --git a/web/src/padelnomics/worker.py b/web/src/padelnomics/worker.py index faf4f87..0a91771 100644 --- a/web/src/padelnomics/worker.py +++ b/web/src/padelnomics/worker.py @@ -735,6 +735,107 @@ async def handle_run_extraction(payload: dict) -> None: logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)") +@task("run_transform") +async def handle_run_transform(payload: dict) -> None: + """Run SQLMesh transform (prod plan --auto-apply) in the background. + + Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply`. + 2-hour absolute timeout — same as extraction. + """ + import subprocess + from pathlib import Path + + repo_root = Path(__file__).resolve().parents[4] + result = await asyncio.to_thread( + subprocess.run, + ["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"], + capture_output=True, + text=True, + timeout=7200, + cwd=str(repo_root), + ) + if result.returncode != 0: + raise RuntimeError( + f"SQLMesh transform failed (exit {result.returncode}): {result.stderr[:500]}" + ) + logger.info("SQLMesh transform completed: %s", result.stdout[-300:] if result.stdout else "(no output)") + + +@task("run_export") +async def handle_run_export(payload: dict) -> None: + """Export serving tables from lakehouse.duckdb → analytics.duckdb. + + Shells out to `uv run python src/padelnomics/export_serving.py`. + 10-minute absolute timeout. + """ + import subprocess + from pathlib import Path + + repo_root = Path(__file__).resolve().parents[4] + result = await asyncio.to_thread( + subprocess.run, + ["uv", "run", "python", "src/padelnomics/export_serving.py"], + capture_output=True, + text=True, + timeout=600, + cwd=str(repo_root), + ) + if result.returncode != 0: + raise RuntimeError( + f"Export failed (exit {result.returncode}): {result.stderr[:500]}" + ) + logger.info("Export completed: %s", result.stdout[-300:] if result.stdout else "(no output)") + + +@task("run_pipeline") +async def handle_run_pipeline(payload: dict) -> None: + """Run full ELT pipeline: extract → transform → export, stopping on first failure.""" + import subprocess + from pathlib import Path + + repo_root = Path(__file__).resolve().parents[4] + + steps = [ + ( + "extraction", + ["uv", "run", "--package", "padelnomics_extract", "extract"], + 7200, + ), + ( + "transform", + ["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"], + 7200, + ), + ( + "export", + ["uv", "run", "python", "src/padelnomics/export_serving.py"], + 600, + ), + ] + + for step_name, cmd, timeout_seconds in steps: + logger.info("Pipeline step starting: %s", step_name) + result = await asyncio.to_thread( + subprocess.run, + cmd, + capture_output=True, + text=True, + timeout=timeout_seconds, + cwd=str(repo_root), + ) + if result.returncode != 0: + raise RuntimeError( + f"Pipeline failed at {step_name} (exit {result.returncode}): {result.stderr[:500]}" + ) + logger.info( + "Pipeline step complete: %s — %s", + step_name, + result.stdout[-200:] if result.stdout else "(no output)", + ) + + logger.info("Full pipeline complete (extract → transform → export)") + + @task("generate_articles") async def handle_generate_articles(payload: dict) -> None: """Generate articles from a template in the background."""