diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py index 7409f36..79b632f 100644 --- a/web/src/padelnomics/admin/pipeline_routes.py +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -515,7 +515,7 @@ async def pipeline_mark_stale(run_id: int): await flash(f"Run #{run_id} marked as failed.", "success") else: await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning") - return redirect(url_for("pipeline.pipeline_extractions")) + return redirect(url_for("pipeline.pipeline_dashboard")) # ── Trigger extraction ──────────────────────────────────────────────────────── @@ -525,11 +525,23 @@ async def pipeline_mark_stale(run_id: int): @role_required("admin") @csrf_protect async def pipeline_trigger_extract(): - """Enqueue a full pipeline extraction run.""" + """Enqueue an extraction run — all extractors, or a single named one.""" from ..worker import enqueue - await enqueue("run_extraction") - await flash("Extraction run queued. Check the task queue for progress.", "success") + form = await request.form + extractor = (form.get("extractor") or "").strip() + + if extractor: + valid_names = {wf["name"] for wf in await asyncio.to_thread(_load_workflows)} + if extractor not in valid_names: + await flash(f"Unknown extractor '{extractor}'.", "warning") + return redirect(url_for("pipeline.pipeline_dashboard")) + await enqueue("run_extraction", {"extractor": extractor}) + await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success") + else: + await enqueue("run_extraction") + await flash("Extraction run queued. Check the task queue for progress.", "success") + return redirect(url_for("pipeline.pipeline_dashboard")) diff --git a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html index 6d5b0d0..6eafb79 100644 --- a/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html +++ b/web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html @@ -26,6 +26,12 @@ {% if stale %} stale {% endif %} +
{{ wf.schedule_label }}
{% if run %} diff --git a/web/src/padelnomics/worker.py b/web/src/padelnomics/worker.py index ce8ba49..8ef4381 100644 --- a/web/src/padelnomics/worker.py +++ b/web/src/padelnomics/worker.py @@ -700,18 +700,29 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None: @task("run_extraction") async def handle_run_extraction(payload: dict) -> None: - """Run the full extraction pipeline (all extractors) in the background. + """Run the extraction pipeline in the background. - Shells out to `uv run extract` in the repo root. The extraction CLI + Shells out to the extraction CLI in the repo root. The extraction CLI manages its own state in .state.sqlite and writes to the landing zone. + + payload["extractor"]: optional workflow name (e.g. "overpass"). + If set, runs only that extractor via its entry point (extract-overpass). + If absent, runs all extractors via the umbrella `extract` entry point. """ import subprocess from pathlib import Path repo_root = Path(__file__).resolve().parents[4] + extractor = payload.get("extractor", "").strip() + if extractor: + cmd_name = f"extract-{extractor.replace('_', '-')}" + cmd = ["uv", "run", "--package", "padelnomics_extract", cmd_name] + else: + cmd = ["uv", "run", "--package", "padelnomics_extract", "extract"] + result = await asyncio.to_thread( subprocess.run, - ["uv", "run", "--package", "padelnomics_extract", "extract"], + cmd, capture_output=True, text=True, timeout=7200, # 2-hour absolute timeout