feat(pipeline): live extraction status + Transform tab

- worker: add run_transform, run_export, run_pipeline task handlers
  - run_transform: sqlmesh plan prod --auto-apply, 2h timeout
  - run_export: export_serving.py, 10min timeout
  - run_pipeline: sequential extract → transform → export, stops on first failure

- pipeline_routes: refactor overview into _render_overview_partial() helper,
  make pipeline_trigger_extract() HTMX-aware (returns partial on HX-Request),
  add _fetch_pipeline_tasks(), _format_duration() helpers,
  add pipeline_transform() + pipeline_trigger_transform() with concurrency guard

- pipeline_overview.html: wrap in self-polling div (every 5s while any_running),
  convert Run buttons to hx-post targeting #pipeline-overview-content

- pipeline.html: add pulse animation for .status-dot.running, add Transform tab
  button, rewire header "Run Pipeline" button to enqueue run_pipeline task

- pipeline_transform.html: new partial — status cards for transform + export,
  "Run Full Pipeline" card, recent runs table with duration + error details

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-01 13:46:11 +01:00
parent 169092c8ea
commit e5cbcf462e
5 changed files with 508 additions and 17 deletions

View File

@@ -6,7 +6,9 @@ Operational visibility for the data extraction and transformation pipeline:
/admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats /admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats
/admin/pipeline/extractions → HTMX tab: filterable extraction run history /admin/pipeline/extractions → HTMX tab: filterable extraction run history
/admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed /admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed
/admin/pipeline/extract/trigger → POST: enqueue full extraction run /admin/pipeline/extract/trigger → POST: enqueue extraction run (HTMX-aware)
/admin/pipeline/transform → HTMX tab: SQLMesh + export status, run history
/admin/pipeline/transform/trigger → POST: enqueue transform/export/pipeline step
/admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data) /admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data)
/admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample) /admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample)
/admin/pipeline/query → HTMX tab: SQL query editor /admin/pipeline/query → HTMX tab: SQL query editor
@@ -18,6 +20,7 @@ Data sources:
- analytics.duckdb (DuckDB read-only via analytics.execute_user_query) - analytics.duckdb (DuckDB read-only via analytics.execute_user_query)
- LANDING_DIR/ (filesystem scan for file sizes + dates) - LANDING_DIR/ (filesystem scan for file sizes + dates)
- infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib) - infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib)
- app.db tasks table (run_transform, run_export, run_pipeline task rows)
""" """
import asyncio import asyncio
import json import json
@@ -626,10 +629,8 @@ async def pipeline_dashboard():
# ── Overview tab ───────────────────────────────────────────────────────────── # ── Overview tab ─────────────────────────────────────────────────────────────
@bp.route("/overview") async def _render_overview_partial():
@role_required("admin") """Build and render the pipeline overview partial (shared by GET and POST triggers)."""
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather( latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather(
asyncio.to_thread(_fetch_latest_per_extractor_sync), asyncio.to_thread(_fetch_latest_per_extractor_sync),
asyncio.to_thread(_get_landing_zone_stats_sync), asyncio.to_thread(_get_landing_zone_stats_sync),
@@ -650,6 +651,13 @@ async def pipeline_overview():
"stale": _is_stale(run) if run else False, "stale": _is_stale(run) if run else False,
}) })
# Treat pending extraction tasks as "running" (queued or active).
from ..core import fetch_all as _fetch_all # noqa: PLC0415
pending_extraction = await _fetch_all(
"SELECT id FROM tasks WHERE task_name = 'run_extraction' AND status = 'pending' LIMIT 1"
)
any_running = bool(pending_extraction)
# Compute landing zone totals # Compute landing zone totals
total_landing_bytes = sum(s["total_bytes"] for s in landing_stats) total_landing_bytes = sum(s["total_bytes"] for s in landing_stats)
@@ -677,10 +685,18 @@ async def pipeline_overview():
total_landing_bytes=total_landing_bytes, total_landing_bytes=total_landing_bytes,
serving_tables=serving_tables, serving_tables=serving_tables,
last_export=last_export, last_export=last_export,
any_running=any_running,
format_bytes=_format_bytes, format_bytes=_format_bytes,
) )
@bp.route("/overview")
@role_required("admin")
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
return await _render_overview_partial()
# ── Extractions tab ──────────────────────────────────────────────────────────── # ── Extractions tab ────────────────────────────────────────────────────────────
@@ -745,7 +761,11 @@ async def pipeline_mark_stale(run_id: int):
@role_required("admin") @role_required("admin")
@csrf_protect @csrf_protect
async def pipeline_trigger_extract(): async def pipeline_trigger_extract():
"""Enqueue an extraction run — all extractors, or a single named one.""" """Enqueue an extraction run — all extractors, or a single named one.
HTMX-aware: if the HX-Request header is present, returns the overview partial
directly so the UI can update in-place without a redirect.
"""
from ..worker import enqueue from ..worker import enqueue
form = await request.form form = await request.form
@@ -757,11 +777,15 @@ async def pipeline_trigger_extract():
await flash(f"Unknown extractor '{extractor}'.", "warning") await flash(f"Unknown extractor '{extractor}'.", "warning")
return redirect(url_for("pipeline.pipeline_dashboard")) return redirect(url_for("pipeline.pipeline_dashboard"))
await enqueue("run_extraction", {"extractor": extractor}) await enqueue("run_extraction", {"extractor": extractor})
await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success")
else: else:
await enqueue("run_extraction") await enqueue("run_extraction")
await flash("Extraction run queued. Check the task queue for progress.", "success")
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
return await _render_overview_partial()
msg = f"Extractor '{extractor}' queued." if extractor else "Extraction run queued."
await flash(f"{msg} Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard")) return redirect(url_for("pipeline.pipeline_dashboard"))
@@ -847,6 +871,156 @@ async def pipeline_lineage_schema(model: str):
) )
# ── Transform tab ─────────────────────────────────────────────────────────────
_TRANSFORM_TASK_NAMES = ("run_transform", "run_export", "run_pipeline")
async def _fetch_pipeline_tasks() -> dict:
"""Fetch the latest task row for each transform task type, plus recent run history.
Returns:
{
"latest": {"run_transform": row|None, "run_export": row|None, "run_pipeline": row|None},
"history": [row, ...], # last 20 rows across all three task types, newest first
}
"""
from ..core import fetch_all as _fetch_all # noqa: PLC0415
# Latest row per task type (may be pending, complete, or failed)
latest_rows = await _fetch_all(
"""
SELECT t.*
FROM tasks t
INNER JOIN (
SELECT task_name, MAX(id) AS max_id
FROM tasks
WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline')
GROUP BY task_name
) latest ON t.id = latest.max_id
"""
)
latest: dict = {"run_transform": None, "run_export": None, "run_pipeline": None}
for row in latest_rows:
latest[row["task_name"]] = dict(row)
history = await _fetch_all(
"""
SELECT id, task_name, status, created_at, completed_at, error
FROM tasks
WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline')
ORDER BY id DESC
LIMIT 20
"""
)
return {"latest": latest, "history": [dict(r) for r in history]}
def _format_duration(created_at: str | None, completed_at: str | None) -> str:
"""Human-readable duration between created_at and completed_at, or '' if unavailable."""
if not created_at or not completed_at:
return ""
try:
fmt = "%Y-%m-%d %H:%M:%S"
start = datetime.strptime(created_at, fmt)
end = datetime.strptime(completed_at, fmt)
delta = int((end - start).total_seconds())
if delta < 0:
return ""
if delta < 60:
return f"{delta}s"
return f"{delta // 60}m {delta % 60}s"
except ValueError:
return ""
async def _render_transform_partial():
"""Build and render the transform tab partial."""
task_data = await _fetch_pipeline_tasks()
latest = task_data["latest"]
history = task_data["history"]
# Enrich history rows with duration
for row in history:
row["duration"] = _format_duration(row.get("created_at"), row.get("completed_at"))
# Truncate error for display
if row.get("error"):
row["error_short"] = row["error"][:120]
else:
row["error_short"] = None
any_running = any(
t is not None and t["status"] == "pending" for t in latest.values()
)
serving_meta = await asyncio.to_thread(_load_serving_meta)
return await render_template(
"admin/partials/pipeline_transform.html",
latest=latest,
history=history,
any_running=any_running,
serving_meta=serving_meta,
format_duration=_format_duration,
)
@bp.route("/transform")
@role_required("admin")
async def pipeline_transform():
"""HTMX tab: SQLMesh transform + export status, run history."""
return await _render_transform_partial()
@bp.route("/transform/trigger", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_trigger_transform():
"""Enqueue a transform, export, or full pipeline task.
form field `step`: 'transform' | 'export' | 'pipeline'
Concurrency guard: rejects if the same task type is already pending.
HTMX-aware: returns the transform partial for HTMX requests.
"""
from ..core import fetch_one as _fetch_one # noqa: PLC0415
from ..worker import enqueue
form = await request.form
step = (form.get("step") or "").strip()
step_to_task = {
"transform": "run_transform",
"export": "run_export",
"pipeline": "run_pipeline",
}
if step not in step_to_task:
await flash(f"Unknown step '{step}'.", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
task_name = step_to_task[step]
# Concurrency guard: reject if same task type is already pending
existing = await _fetch_one(
"SELECT id FROM tasks WHERE task_name = ? AND status = 'pending' LIMIT 1",
(task_name,),
)
if existing:
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
return await _render_transform_partial()
await flash(f"A '{step}' task is already queued (task #{existing['id']}).", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
await enqueue(task_name)
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
return await _render_transform_partial()
await flash(f"'{step}' task queued. Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Catalog tab ─────────────────────────────────────────────────────────────── # ── Catalog tab ───────────────────────────────────────────────────────────────

View File

@@ -1,4 +1,11 @@
<!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone --> <!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone
Self-polls every 5s while any extraction task is pending, stops when quiet. -->
<div id="pipeline-overview-content"
hx-get="{{ url_for('pipeline.pipeline_overview') }}"
hx-target="this"
hx-swap="outerHTML"
{% if any_running %}hx-trigger="every 5s"{% endif %}>
<!-- Extraction Status Grid --> <!-- Extraction Status Grid -->
<div class="card mb-4"> <div class="card mb-4">
@@ -26,12 +33,14 @@
{% if stale %} {% if stale %}
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span> <span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
{% endif %} {% endif %}
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0 ml-auto"> <button type="button"
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> class="btn btn-sm ml-auto"
<input type="hidden" name="extractor" value="{{ wf.name }}"> style="padding:2px 8px;font-size:11px"
<button type="button" class="btn btn-sm" style="padding:2px 8px;font-size:11px" hx-post="{{ url_for('pipeline.pipeline_trigger_extract') }}"
onclick="confirmAction('Run {{ wf.name }} extractor?', this.closest('form'))">Run</button> hx-target="#pipeline-overview-content"
</form> hx-swap="outerHTML"
hx-vals='{"extractor": "{{ wf.name }}", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Run {{ wf.name }} extractor?')) return false;">Run</button>
</div> </div>
<p class="text-xs text-slate">{{ wf.schedule_label }}</p> <p class="text-xs text-slate">{{ wf.schedule_label }}</p>
{% if run %} {% if run %}
@@ -131,3 +140,5 @@
</div> </div>
</div> </div>
</div>{# end #pipeline-overview-content #}

View File

@@ -0,0 +1,197 @@
<!-- Pipeline Transform Tab: SQLMesh + export status, run history
Self-polls every 5s while any transform/export task is pending. -->
<div id="pipeline-transform-content"
hx-get="{{ url_for('pipeline.pipeline_transform') }}"
hx-target="this"
hx-swap="outerHTML"
{% if any_running %}hx-trigger="every 5s"{% endif %}>
<!-- Status Cards: Transform + Export -->
<div class="pipeline-two-col mb-4">
<!-- SQLMesh Transform -->
{% set tx = latest['run_transform'] %}
<div class="card">
<p class="card-header">SQLMesh Transform</p>
<div class="flex items-center gap-2 mb-3">
{% if tx is none %}
<span class="status-dot pending"></span>
<span class="text-sm text-slate">Never run</span>
{% elif tx.status == 'pending' %}
<span class="status-dot running"></span>
<span class="text-sm text-slate">Running…</span>
{% elif tx.status == 'complete' %}
<span class="status-dot ok"></span>
<span class="text-sm text-slate">Complete</span>
{% else %}
<span class="status-dot failed"></span>
<span class="text-sm text-danger">Failed</span>
{% endif %}
</div>
{% if tx %}
<p class="text-xs text-slate mono">
Started: {{ (tx.created_at or '')[:19] or '—' }}
</p>
{% if tx.completed_at %}
<p class="text-xs text-slate mono">
Finished: {{ tx.completed_at[:19] }}
</p>
{% endif %}
{% if tx.status == 'failed' and tx.error %}
<details class="mt-2">
<summary class="text-xs text-danger cursor-pointer">Error</summary>
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-height:8rem;white-space:pre-wrap">{{ tx.error[:400] }}</pre>
</details>
{% endif %}
{% endif %}
<div class="mt-3">
<button type="button"
class="btn btn-sm"
{% if any_running %}disabled{% endif %}
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
hx-target="#pipeline-transform-content"
hx-swap="outerHTML"
hx-vals='{"step": "transform", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Run SQLMesh transform (prod --auto-apply)?')) return false;">
Run Transform
</button>
</div>
</div>
<!-- Export Serving -->
{% set ex = latest['run_export'] %}
<div class="card">
<p class="card-header">Export Serving</p>
<div class="flex items-center gap-2 mb-3">
{% if ex is none %}
<span class="status-dot pending"></span>
<span class="text-sm text-slate">Never run</span>
{% elif ex.status == 'pending' %}
<span class="status-dot running"></span>
<span class="text-sm text-slate">Running…</span>
{% elif ex.status == 'complete' %}
<span class="status-dot ok"></span>
<span class="text-sm text-slate">Complete</span>
{% else %}
<span class="status-dot failed"></span>
<span class="text-sm text-danger">Failed</span>
{% endif %}
</div>
{% if ex %}
<p class="text-xs text-slate mono">
Started: {{ (ex.created_at or '')[:19] or '—' }}
</p>
{% if ex.completed_at %}
<p class="text-xs text-slate mono">
Finished: {{ ex.completed_at[:19] }}
</p>
{% endif %}
{% if serving_meta %}
<p class="text-xs text-slate mt-1">
Last export: <span class="font-semibold mono">{{ (serving_meta.exported_at_utc or '')[:19].replace('T', ' ') or '—' }}</span>
</p>
{% endif %}
{% if ex.status == 'failed' and ex.error %}
<details class="mt-2">
<summary class="text-xs text-danger cursor-pointer">Error</summary>
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-height:8rem;white-space:pre-wrap">{{ ex.error[:400] }}</pre>
</details>
{% endif %}
{% endif %}
<div class="mt-3">
<button type="button"
class="btn btn-sm"
{% if any_running %}disabled{% endif %}
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
hx-target="#pipeline-transform-content"
hx-swap="outerHTML"
hx-vals='{"step": "export", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Export serving tables (lakehouse → analytics.duckdb)?')) return false;">
Run Export
</button>
</div>
</div>
</div>
<!-- Run Full Pipeline -->
{% set pl = latest['run_pipeline'] %}
<div class="card mb-4">
<div class="flex items-center justify-between flex-wrap gap-3">
<div>
<p class="font-semibold text-navy text-sm">Full Pipeline</p>
<p class="text-xs text-slate mt-1">Runs extract → transform → export sequentially</p>
{% if pl %}
<p class="text-xs text-slate mono mt-1">
Last: {{ (pl.created_at or '')[:19] or '—' }}
{% if pl.status == 'complete' %}<span class="badge-success ml-2">Complete</span>{% endif %}
{% if pl.status == 'pending' %}<span class="badge-warning ml-2">Running…</span>{% endif %}
{% if pl.status == 'failed' %}<span class="badge-danger ml-2">Failed</span>{% endif %}
</p>
{% endif %}
</div>
<button type="button"
class="btn btn-sm"
{% if any_running %}disabled{% endif %}
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
hx-target="#pipeline-transform-content"
hx-swap="outerHTML"
hx-vals='{"step": "pipeline", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Run full ELT pipeline (extract → transform → export)?')) return false;">
Run Full Pipeline
</button>
</div>
</div>
<!-- Recent Runs -->
<div class="card">
<p class="card-header">Recent Runs</p>
{% if history %}
<div style="overflow-x:auto">
<table class="table" style="font-size:0.8125rem">
<thead>
<tr>
<th>#</th>
<th>Step</th>
<th>Started</th>
<th>Duration</th>
<th>Status</th>
<th>Error</th>
</tr>
</thead>
<tbody>
{% for row in history %}
<tr>
<td class="text-xs text-slate">{{ row.id }}</td>
<td class="mono text-xs">{{ row.task_name | replace('run_', '') }}</td>
<td class="mono text-xs text-slate">{{ (row.created_at or '')[:19] or '—' }}</td>
<td class="mono text-xs text-slate">{{ row.duration or '—' }}</td>
<td>
{% if row.status == 'complete' %}
<span class="badge-success">Complete</span>
{% elif row.status == 'failed' %}
<span class="badge-danger">Failed</span>
{% else %}
<span class="badge-warning">Running…</span>
{% endif %}
</td>
<td>
{% if row.error_short %}
<details>
<summary class="text-xs text-danger cursor-pointer">Error</summary>
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-width:24rem;white-space:pre-wrap">{{ row.error_short }}</pre>
</details>
{% else %}—{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-sm text-slate">No transform runs yet.</p>
{% endif %}
</div>
</div>{# end #pipeline-transform-content #}

View File

@@ -33,6 +33,9 @@
.status-dot.failed { background: #EF4444; } .status-dot.failed { background: #EF4444; }
.status-dot.stale { background: #D97706; } .status-dot.stale { background: #D97706; }
.status-dot.running { background: #3B82F6; } .status-dot.running { background: #3B82F6; }
@keyframes pulse-dot { 0%,100%{opacity:1} 50%{opacity:0.4} }
.status-dot.running { animation: pulse-dot 1.5s ease-in-out infinite; }
.status-dot.pending { background: #CBD5E1; } .status-dot.pending { background: #CBD5E1; }
.pipeline-two-col { .pipeline-two-col {
@@ -53,10 +56,11 @@
<p class="text-sm text-slate mt-1">Extraction status, data catalog, and ad-hoc query editor</p> <p class="text-sm text-slate mt-1">Extraction status, data catalog, and ad-hoc query editor</p>
</div> </div>
<div class="flex gap-2"> <div class="flex gap-2">
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0"> <form method="post" action="{{ url_for('pipeline.pipeline_trigger_transform') }}" class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"> <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="step" value="pipeline">
<button type="button" class="btn btn-sm" <button type="button" class="btn btn-sm"
onclick="confirmAction('Enqueue a full extraction run? This will run all extractors in the background.', this.closest('form'))"> onclick="confirmAction('Run full ELT pipeline (extract → transform → export)? This runs in the background.', this.closest('form'))">
Run Pipeline Run Pipeline
</button> </button>
</form> </form>
@@ -116,6 +120,10 @@
hx-get="{{ url_for('pipeline.pipeline_lineage') }}" hx-get="{{ url_for('pipeline.pipeline_lineage') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML" hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Lineage</button> hx-trigger="click">Lineage</button>
<button data-tab="transform"
hx-get="{{ url_for('pipeline.pipeline_transform') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Transform</button>
</div> </div>
<!-- Tab content (Overview loads on page load) --> <!-- Tab content (Overview loads on page load) -->

View File

@@ -735,6 +735,107 @@ async def handle_run_extraction(payload: dict) -> None:
logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)") logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_transform")
async def handle_run_transform(payload: dict) -> None:
"""Run SQLMesh transform (prod plan --auto-apply) in the background.
Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply`.
2-hour absolute timeout — same as extraction.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"],
capture_output=True,
text=True,
timeout=7200,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"SQLMesh transform failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("SQLMesh transform completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_export")
async def handle_run_export(payload: dict) -> None:
"""Export serving tables from lakehouse.duckdb → analytics.duckdb.
Shells out to `uv run python src/padelnomics/export_serving.py`.
10-minute absolute timeout.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "python", "src/padelnomics/export_serving.py"],
capture_output=True,
text=True,
timeout=600,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Export failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("Export completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_pipeline")
async def handle_run_pipeline(payload: dict) -> None:
"""Run full ELT pipeline: extract → transform → export, stopping on first failure."""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
steps = [
(
"extraction",
["uv", "run", "--package", "padelnomics_extract", "extract"],
7200,
),
(
"transform",
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"],
7200,
),
(
"export",
["uv", "run", "python", "src/padelnomics/export_serving.py"],
600,
),
]
for step_name, cmd, timeout_seconds in steps:
logger.info("Pipeline step starting: %s", step_name)
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=timeout_seconds,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Pipeline failed at {step_name} (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info(
"Pipeline step complete: %s%s",
step_name,
result.stdout[-200:] if result.stdout else "(no output)",
)
logger.info("Full pipeline complete (extract → transform → export)")
@task("generate_articles") @task("generate_articles")
async def handle_generate_articles(payload: dict) -> None: async def handle_generate_articles(payload: dict) -> None:
"""Generate articles from a template in the background.""" """Generate articles from a template in the background."""