merge: fix mark-failed CSS bug + per-extractor run buttons
This commit is contained in:
@@ -515,7 +515,7 @@ async def pipeline_mark_stale(run_id: int):
|
|||||||
await flash(f"Run #{run_id} marked as failed.", "success")
|
await flash(f"Run #{run_id} marked as failed.", "success")
|
||||||
else:
|
else:
|
||||||
await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning")
|
await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning")
|
||||||
return redirect(url_for("pipeline.pipeline_extractions"))
|
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||||
|
|
||||||
|
|
||||||
# ── Trigger extraction ────────────────────────────────────────────────────────
|
# ── Trigger extraction ────────────────────────────────────────────────────────
|
||||||
@@ -525,11 +525,23 @@ async def pipeline_mark_stale(run_id: int):
|
|||||||
@role_required("admin")
|
@role_required("admin")
|
||||||
@csrf_protect
|
@csrf_protect
|
||||||
async def pipeline_trigger_extract():
|
async def pipeline_trigger_extract():
|
||||||
"""Enqueue a full pipeline extraction run."""
|
"""Enqueue an extraction run — all extractors, or a single named one."""
|
||||||
from ..worker import enqueue
|
from ..worker import enqueue
|
||||||
|
|
||||||
await enqueue("run_extraction")
|
form = await request.form
|
||||||
await flash("Extraction run queued. Check the task queue for progress.", "success")
|
extractor = (form.get("extractor") or "").strip()
|
||||||
|
|
||||||
|
if extractor:
|
||||||
|
valid_names = {wf["name"] for wf in await asyncio.to_thread(_load_workflows)}
|
||||||
|
if extractor not in valid_names:
|
||||||
|
await flash(f"Unknown extractor '{extractor}'.", "warning")
|
||||||
|
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||||
|
await enqueue("run_extraction", {"extractor": extractor})
|
||||||
|
await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success")
|
||||||
|
else:
|
||||||
|
await enqueue("run_extraction")
|
||||||
|
await flash("Extraction run queued. Check the task queue for progress.", "success")
|
||||||
|
|
||||||
return redirect(url_for("pipeline.pipeline_dashboard"))
|
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,12 @@
|
|||||||
{% if stale %}
|
{% if stale %}
|
||||||
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
|
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
|
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0 ml-auto">
|
||||||
|
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||||
|
<input type="hidden" name="extractor" value="{{ wf.name }}">
|
||||||
|
<button type="button" class="btn btn-sm" style="padding:2px 8px;font-size:11px"
|
||||||
|
onclick="confirmAction('Run {{ wf.name }} extractor?', this.closest('form'))">Run</button>
|
||||||
|
</form>
|
||||||
</div>
|
</div>
|
||||||
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
|
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
|
||||||
{% if run %}
|
{% if run %}
|
||||||
|
|||||||
@@ -700,18 +700,29 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None:
|
|||||||
|
|
||||||
@task("run_extraction")
|
@task("run_extraction")
|
||||||
async def handle_run_extraction(payload: dict) -> None:
|
async def handle_run_extraction(payload: dict) -> None:
|
||||||
"""Run the full extraction pipeline (all extractors) in the background.
|
"""Run the extraction pipeline in the background.
|
||||||
|
|
||||||
Shells out to `uv run extract` in the repo root. The extraction CLI
|
Shells out to the extraction CLI in the repo root. The extraction CLI
|
||||||
manages its own state in .state.sqlite and writes to the landing zone.
|
manages its own state in .state.sqlite and writes to the landing zone.
|
||||||
|
|
||||||
|
payload["extractor"]: optional workflow name (e.g. "overpass").
|
||||||
|
If set, runs only that extractor via its entry point (extract-overpass).
|
||||||
|
If absent, runs all extractors via the umbrella `extract` entry point.
|
||||||
"""
|
"""
|
||||||
import subprocess
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
repo_root = Path(__file__).resolve().parents[4]
|
repo_root = Path(__file__).resolve().parents[4]
|
||||||
|
extractor = payload.get("extractor", "").strip()
|
||||||
|
if extractor:
|
||||||
|
cmd_name = f"extract-{extractor.replace('_', '-')}"
|
||||||
|
cmd = ["uv", "run", "--package", "padelnomics_extract", cmd_name]
|
||||||
|
else:
|
||||||
|
cmd = ["uv", "run", "--package", "padelnomics_extract", "extract"]
|
||||||
|
|
||||||
result = await asyncio.to_thread(
|
result = await asyncio.to_thread(
|
||||||
subprocess.run,
|
subprocess.run,
|
||||||
["uv", "run", "--package", "padelnomics_extract", "extract"],
|
cmd,
|
||||||
capture_output=True,
|
capture_output=True,
|
||||||
text=True,
|
text=True,
|
||||||
timeout=7200, # 2-hour absolute timeout
|
timeout=7200, # 2-hour absolute timeout
|
||||||
|
|||||||
Reference in New Issue
Block a user