fix(pipeline): fix mark-failed CSS bug + add per-extractor run buttons

- Redirect pipeline_mark_stale to pipeline_dashboard (full page) instead
  of pipeline_extractions (partial), fixing the broken CSS on form submit
- pipeline_trigger_extract accepts optional 'extractor' POST field;
  validates against workflows.toml names to prevent injection, passes
  as payload to enqueue("run_extraction")
- handle_run_extraction dispatches to per-extractor CLI entry point
  (extract-overpass, extract-eurostat, etc.) when extractor is set,
  falls back to umbrella 'extract' command otherwise
- pipeline_overview.html: add Run button to each workflow card header,
  posting extractor name with CSRF token to pipeline_trigger_extract

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-27 11:37:39 +01:00
parent 120f974970
commit fa7604301a
3 changed files with 36 additions and 7 deletions

View File

@@ -515,7 +515,7 @@ async def pipeline_mark_stale(run_id: int):
await flash(f"Run #{run_id} marked as failed.", "success")
else:
await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning")
return redirect(url_for("pipeline.pipeline_extractions"))
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Trigger extraction ────────────────────────────────────────────────────────
@@ -525,11 +525,23 @@ async def pipeline_mark_stale(run_id: int):
@role_required("admin")
@csrf_protect
async def pipeline_trigger_extract():
"""Enqueue a full pipeline extraction run."""
"""Enqueue an extraction run — all extractors, or a single named one."""
from ..worker import enqueue
form = await request.form
extractor = (form.get("extractor") or "").strip()
if extractor:
valid_names = {wf["name"] for wf in await asyncio.to_thread(_load_workflows)}
if extractor not in valid_names:
await flash(f"Unknown extractor '{extractor}'.", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
await enqueue("run_extraction", {"extractor": extractor})
await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success")
else:
await enqueue("run_extraction")
await flash("Extraction run queued. Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))

View File

@@ -26,6 +26,12 @@
{% if stale %}
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
{% endif %}
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0 ml-auto">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="extractor" value="{{ wf.name }}">
<button type="button" class="btn btn-sm" style="padding:2px 8px;font-size:11px"
onclick="confirmAction('Run {{ wf.name }} extractor?', this.closest('form'))">Run</button>
</form>
</div>
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
{% if run %}

View File

@@ -700,18 +700,29 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None:
@task("run_extraction")
async def handle_run_extraction(payload: dict) -> None:
"""Run the full extraction pipeline (all extractors) in the background.
"""Run the extraction pipeline in the background.
Shells out to `uv run extract` in the repo root. The extraction CLI
Shells out to the extraction CLI in the repo root. The extraction CLI
manages its own state in .state.sqlite and writes to the landing zone.
payload["extractor"]: optional workflow name (e.g. "overpass").
If set, runs only that extractor via its entry point (extract-overpass).
If absent, runs all extractors via the umbrella `extract` entry point.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
extractor = payload.get("extractor", "").strip()
if extractor:
cmd_name = f"extract-{extractor.replace('_', '-')}"
cmd = ["uv", "run", "--package", "padelnomics_extract", cmd_name]
else:
cmd = ["uv", "run", "--package", "padelnomics_extract", "extract"]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "--package", "padelnomics_extract", "extract"],
cmd,
capture_output=True,
text=True,
timeout=7200, # 2-hour absolute timeout