feat(pipeline): scaffold Pipeline Console blueprint + sidebar + app registration

- New pipeline_routes.py blueprint (url_prefix=/admin/pipeline) with:
  - All 9 routes (dashboard, overview, extractions, catalog, query editor)
  - Data access functions: state DB (sync+to_thread), serving meta, landing FS, workflows.toml
  - execute_user_query() added to analytics.py (columns+rows+error+elapsed_ms)
  - Query security: blocklist regex, 10k char limit, 1000 row cap, 10s timeout
- Add 'Pipeline' sidebar section to base_admin.html (between Analytics and System)
- Register pipeline_bp in app.py
- Add run_extraction task handler to worker.py

Subtask 1 of 6

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-25 12:44:03 +01:00
parent 1905844cd2
commit 060cb9b32e
5 changed files with 782 additions and 1 deletions

View File

@@ -0,0 +1,699 @@
"""
Pipeline Console admin blueprint.
Operational visibility for the data extraction and transformation pipeline:
/admin/pipeline/ → dashboard (health stats + tab container)
/admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats
/admin/pipeline/extractions → HTMX tab: filterable extraction run history
/admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed
/admin/pipeline/extract/trigger → POST: enqueue full extraction run
/admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data)
/admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample)
/admin/pipeline/query → HTMX tab: SQL query editor
/admin/pipeline/query/execute → POST: run user SQL, return results table
Data sources:
- data/landing/.state.sqlite (extraction run history — stdlib sqlite3, sync via to_thread)
- SERVING_DUCKDB_PATH/../_serving_meta.json (export timestamp + per-table row counts)
- analytics.duckdb (DuckDB read-only via analytics.execute_user_query)
- LANDING_DIR/ (filesystem scan for file sizes + dates)
- infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib)
"""
import asyncio
import json
import logging
import os
import re
import sqlite3
import sys
import time
from datetime import UTC, datetime, timedelta
from pathlib import Path
from quart import Blueprint, flash, redirect, render_template, request, url_for
from ..auth.routes import role_required
from ..core import csrf_protect
logger = logging.getLogger(__name__)
bp = Blueprint(
"pipeline",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/admin/pipeline",
)
# ── Config ────────────────────────────────────────────────────────────────────
_LANDING_DIR = os.environ.get("LANDING_DIR", "data/landing")
_SERVING_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb")
# Repo root: web/src/padelnomics/admin/ → up 4 levels
_REPO_ROOT = Path(__file__).resolve().parents[5]
_WORKFLOWS_TOML = _REPO_ROOT / "infra" / "supervisor" / "workflows.toml"
# A "running" row older than this is considered stale/crashed.
_STALE_THRESHOLD_HOURS = 2
# Query editor limits
_QUERY_MAX_CHARS = 10_000
_QUERY_MAX_ROWS = 1_000
_QUERY_TIMEOUT_SECONDS = 10
# Blocked SQL keywords (read-only connection enforces engine-level, this adds belt+suspenders)
_BLOCKED_SQL_RE = re.compile(
r"\b(ATTACH|COPY|EXPORT|INSTALL|LOAD|CREATE|DROP|ALTER|INSERT|UPDATE|DELETE|GRANT|REVOKE|PRAGMA)\b",
re.IGNORECASE,
)
# ── Sidebar data injection (same pattern as pseo_routes.py) ──────────────────
@bp.before_request
async def _inject_sidebar_data():
"""Load unread inbox count for the admin sidebar badge."""
from quart import g
from ..core import fetch_one
try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception:
g.admin_unread_count = 0
@bp.context_processor
def _admin_context():
from quart import g
return {"unread_count": getattr(g, "admin_unread_count", 0)}
# ── Data access: state DB (sync, called via to_thread) ────────────────────────
def _state_db_path() -> Path:
return Path(_LANDING_DIR) / ".state.sqlite"
def _fetch_extraction_summary_sync() -> dict:
"""Aggregate stats from extraction_runs: total, success, failed, stale counts."""
db_path = _state_db_path()
if not db_path.exists():
return {"total": 0, "success": 0, "failed": 0, "running": 0, "stale": 0}
cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed,
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN status = 'running' AND started_at < ? THEN 1 ELSE 0 END) AS stale
FROM extraction_runs
""",
(cutoff,),
).fetchone()
return {
"total": row["total"] or 0,
"success": row["success"] or 0,
"failed": row["failed"] or 0,
"running": row["running"] or 0,
"stale": row["stale"] or 0,
}
finally:
conn.close()
def _fetch_latest_per_extractor_sync() -> list[dict]:
"""Return most recent run for each extractor name."""
db_path = _state_db_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT r.*
FROM extraction_runs r
INNER JOIN (
SELECT extractor, MAX(run_id) AS max_id
FROM extraction_runs
GROUP BY extractor
) latest ON r.run_id = latest.max_id
ORDER BY r.extractor
"""
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def _fetch_extraction_runs_sync(
*,
extractor: str = "",
status: str = "",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Return (rows, total_count) for the filtered run history."""
assert 1 <= limit <= 200, f"limit must be 1200, got {limit}"
assert offset >= 0, f"offset must be >= 0, got {offset}"
db_path = _state_db_path()
if not db_path.exists():
return [], 0
where_clauses = []
params: list = []
if extractor:
where_clauses.append("extractor = ?")
params.append(extractor)
if status:
where_clauses.append("status = ?")
params.append(status)
where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
total = conn.execute(
f"SELECT COUNT(*) FROM extraction_runs {where_sql}", params
).fetchone()[0]
rows = conn.execute(
f"""
SELECT run_id, extractor, started_at, finished_at, status,
files_written, files_skipped, bytes_written,
cursor_value, error_message
FROM extraction_runs {where_sql}
ORDER BY run_id DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
return [dict(r) for r in rows], total
finally:
conn.close()
def _fetch_distinct_extractors_sync() -> list[str]:
"""Return distinct extractor names for filter dropdowns."""
db_path = _state_db_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
try:
rows = conn.execute(
"SELECT DISTINCT extractor FROM extraction_runs ORDER BY extractor"
).fetchall()
return [r[0] for r in rows]
finally:
conn.close()
def _mark_run_failed_sync(run_id: int) -> bool:
"""Mark a stuck 'running' row as 'failed'. Returns True if row was updated."""
assert run_id > 0, f"run_id must be positive, got {run_id}"
db_path = _state_db_path()
if not db_path.exists():
return False
conn = sqlite3.connect(str(db_path))
try:
cur = conn.execute(
"""
UPDATE extraction_runs
SET status = 'failed',
finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
error_message = 'Marked failed manually (admin — process appeared stuck)'
WHERE run_id = ? AND status = 'running'
""",
(run_id,),
)
conn.commit()
return cur.rowcount > 0
finally:
conn.close()
# ── Data access: serving meta ─────────────────────────────────────────────────
def _load_serving_meta() -> dict | None:
"""Read _serving_meta.json alongside analytics.duckdb. Returns None if absent."""
meta_path = Path(_SERVING_DUCKDB_PATH).parent / "_serving_meta.json"
if not meta_path.exists():
return None
try:
return json.loads(meta_path.read_text())
except Exception:
logger.warning("Failed to read _serving_meta.json", exc_info=True)
return None
# ── Data access: landing zone filesystem ─────────────────────────────────────
def _get_landing_zone_stats_sync() -> list[dict]:
"""Scan LANDING_DIR and return per-source file counts + total bytes."""
landing = Path(_LANDING_DIR)
if not landing.exists():
return []
sources = []
for source_dir in sorted(landing.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
files = list(source_dir.rglob("*.gz")) + list(source_dir.rglob("*.jsonl"))
total_bytes = sum(f.stat().st_size for f in files)
latest_mtime = max((f.stat().st_mtime for f in files), default=None)
sources.append({
"name": source_dir.name,
"file_count": len(files),
"total_bytes": total_bytes,
"latest_mtime": (
datetime.fromtimestamp(latest_mtime, tz=UTC).strftime("%Y-%m-%d %H:%M")
if latest_mtime
else None
),
})
return sources
# ── Data access: workflows.toml ───────────────────────────────────────────────
_SCHEDULE_LABELS = {
"hourly": "Every hour",
"daily": "Daily",
"weekly": "Weekly",
"monthly": "Monthly",
}
def _load_workflows() -> list[dict]:
"""Parse workflows.toml and return workflow definitions with human schedule labels."""
if not _WORKFLOWS_TOML.exists():
return []
if sys.version_info >= (3, 11):
import tomllib
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
else:
# Fallback for older Python (shouldn't happen — project requires 3.11+)
try:
import tomli as tomllib # type: ignore[no-redef]
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
except ImportError:
return []
workflows = []
for name, config in data.items():
schedule = config.get("schedule", "")
schedule_label = _SCHEDULE_LABELS.get(schedule, schedule)
workflows.append({
"name": name,
"module": config.get("module", ""),
"schedule": schedule,
"schedule_label": schedule_label,
"depends_on": config.get("depends_on", []),
})
return workflows
# ── Route helpers ─────────────────────────────────────────────────────────────
def _format_bytes(n: int) -> str:
"""Human-readable byte count."""
if n < 1024:
return f"{n} B"
if n < 1024 * 1024:
return f"{n / 1024:.1f} KB"
return f"{n / 1024 / 1024:.1f} MB"
def _duration_str(started_at: str | None, finished_at: str | None) -> str:
"""Return human-readable duration, or '' if unavailable."""
if not started_at or not finished_at:
return ""
try:
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(started_at, fmt)
end = datetime.strptime(finished_at, fmt)
delta = int((end - start).total_seconds())
if delta < 60:
return f"{delta}s"
return f"{delta // 60}m {delta % 60}s"
except ValueError:
return ""
def _is_stale(run: dict) -> bool:
"""True if a 'running' row has been stuck longer than the stale threshold."""
if run.get("status") != "running":
return False
started = run.get("started_at", "")
if not started:
return True
try:
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(started, fmt).replace(tzinfo=UTC)
return (datetime.now(UTC) - start) > timedelta(hours=_STALE_THRESHOLD_HOURS)
except ValueError:
return False
# ── Dashboard ─────────────────────────────────────────────────────────────────
@bp.route("/")
@role_required("admin")
async def pipeline_dashboard():
"""Main page: health stat cards + tab container."""
summary = await asyncio.to_thread(_fetch_extraction_summary_sync)
serving_meta = await asyncio.to_thread(_load_serving_meta)
total_serving_tables = len(serving_meta["tables"]) if serving_meta else 0
last_export = serving_meta.get("exported_at_utc", "")[:19].replace("T", " ") if serving_meta else ""
success_rate = 0
if summary["total"] > 0:
success_rate = round(100 * summary["success"] / summary["total"])
return await render_template(
"admin/pipeline.html",
summary=summary,
success_rate=success_rate,
total_serving_tables=total_serving_tables,
last_export=last_export,
admin_page="pipeline",
)
# ── Overview tab ─────────────────────────────────────────────────────────────
@bp.route("/overview")
@role_required("admin")
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather(
asyncio.to_thread(_fetch_latest_per_extractor_sync),
asyncio.to_thread(_get_landing_zone_stats_sync),
asyncio.to_thread(_load_workflows),
asyncio.to_thread(_load_serving_meta),
)
# Build a lookup: extractor name → latest run
latest_by_name = {r["extractor"]: r for r in latest_runs}
# Enrich each workflow with its latest run data
cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
workflow_rows = []
for wf in workflows:
run = latest_by_name.get(wf["name"])
workflow_rows.append({
"workflow": wf,
"run": run,
"stale": _is_stale(run) if run else False,
})
# Compute landing zone totals
total_landing_bytes = sum(s["total_bytes"] for s in landing_stats)
return await render_template(
"admin/partials/pipeline_overview.html",
workflow_rows=workflow_rows,
landing_stats=landing_stats,
total_landing_bytes=total_landing_bytes,
serving_meta=serving_meta,
format_bytes=_format_bytes,
)
# ── Extractions tab ────────────────────────────────────────────────────────────
@bp.route("/extractions")
@role_required("admin")
async def pipeline_extractions():
"""HTMX tab: paginated + filtered extraction run history."""
extractor_filter = request.args.get("extractor", "")
status_filter = request.args.get("status", "")
page = max(1, int(request.args.get("page", 1)))
per_page = 30
(runs, total), extractors = await asyncio.gather(
asyncio.to_thread(
_fetch_extraction_runs_sync,
extractor=extractor_filter,
status=status_filter,
limit=per_page,
offset=(page - 1) * per_page,
),
asyncio.to_thread(_fetch_distinct_extractors_sync),
)
# Enrich rows with computed fields
for run in runs:
run["duration"] = _duration_str(run.get("started_at"), run.get("finished_at"))
run["bytes_label"] = _format_bytes(run.get("bytes_written") or 0)
run["is_stale"] = _is_stale(run)
total_pages = max(1, (total + per_page - 1) // per_page)
return await render_template(
"admin/partials/pipeline_extractions.html",
runs=runs,
total=total,
page=page,
per_page=per_page,
total_pages=total_pages,
extractors=extractors,
extractor_filter=extractor_filter,
status_filter=status_filter,
)
@bp.route("/extractions/<int:run_id>/mark-stale", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_mark_stale(run_id: int):
"""Mark a stuck 'running' extraction row as 'failed'."""
updated = await asyncio.to_thread(_mark_run_failed_sync, run_id)
if updated:
await flash(f"Run #{run_id} marked as failed.", "success")
else:
await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning")
return redirect(url_for("pipeline.pipeline_extractions"))
# ── Trigger extraction ────────────────────────────────────────────────────────
@bp.route("/extract/trigger", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_trigger_extract():
"""Enqueue a full pipeline extraction run."""
from ..worker import enqueue
await enqueue("run_extraction")
await flash("Extraction run queued. Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Catalog tab ───────────────────────────────────────────────────────────────
@bp.route("/catalog")
@role_required("admin")
async def pipeline_catalog():
"""HTMX tab: list serving tables with row counts + column counts."""
from ..analytics import fetch_analytics
schema_rows = await fetch_analytics(
"""
SELECT table_name, column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'serving'
ORDER BY table_name, ordinal_position
"""
)
# Group by table
tables: dict[str, dict] = {}
for row in schema_rows:
tname = row["table_name"]
if tname not in tables:
tables[tname] = {"name": tname, "columns": [], "column_count": 0}
tables[tname]["columns"].append({
"name": row["column_name"],
"type": row["data_type"],
})
tables[tname]["column_count"] += 1
# Enrich with row counts from serving meta
serving_meta = await asyncio.to_thread(_load_serving_meta)
meta_counts = serving_meta.get("tables", {}) if serving_meta else {}
for tname, tdata in tables.items():
tdata["row_count"] = meta_counts.get(tname, {}).get("row_count")
return await render_template(
"admin/partials/pipeline_catalog.html",
tables=list(tables.values()),
serving_meta=serving_meta,
)
@bp.route("/catalog/<table_name>")
@role_required("admin")
async def pipeline_table_detail(table_name: str):
"""HTMX partial: column list + 10-row sample for a serving table."""
from ..analytics import fetch_analytics
# Validate table name before using in SQL
if not re.match(r"^[a-z_][a-z0-9_]*$", table_name):
return "Invalid table name", 400
# Confirm table exists in serving schema
exists = await fetch_analytics(
"SELECT 1 FROM information_schema.tables"
" WHERE table_schema = 'serving' AND table_name = ?",
[table_name],
)
if not exists:
return f"Table serving.{table_name} not found", 404
columns, sample = await asyncio.gather(
fetch_analytics(
"SELECT column_name, data_type, ordinal_position"
" FROM information_schema.columns"
" WHERE table_schema = 'serving' AND table_name = ?"
" ORDER BY ordinal_position",
[table_name],
),
fetch_analytics(
f"SELECT * FROM serving.{table_name} LIMIT 10" # noqa: S608
),
)
return await render_template(
"admin/partials/pipeline_table_detail.html",
table_name=table_name,
columns=columns,
sample=sample,
)
# ── Query editor ──────────────────────────────────────────────────────────────
@bp.route("/query")
@role_required("admin")
async def pipeline_query_editor():
"""HTMX tab: SQL query editor with schema sidebar."""
from ..analytics import fetch_analytics
schema_rows = await fetch_analytics(
"""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'serving'
ORDER BY table_name, ordinal_position
"""
)
# Group by table for the schema sidebar
schema: dict[str, list] = {}
for row in schema_rows:
tname = row["table_name"]
if tname not in schema:
schema[tname] = []
schema[tname].append({"name": row["column_name"], "type": row["data_type"]})
return await render_template(
"admin/partials/pipeline_query.html",
schema=schema,
max_rows=_QUERY_MAX_ROWS,
timeout_seconds=_QUERY_TIMEOUT_SECONDS,
)
@bp.route("/query/execute", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_query_execute():
"""Run user-submitted SQL and return a results table partial."""
from ..analytics import execute_user_query
form = await request.form
sql = (form.get("sql") or "").strip()
# Input validation
if not sql:
return await render_template(
"admin/partials/pipeline_query_results.html",
error="SQL query is empty.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
if len(sql) > _QUERY_MAX_CHARS:
return await render_template(
"admin/partials/pipeline_query_results.html",
error=f"Query too long ({len(sql):,} chars). Maximum is {_QUERY_MAX_CHARS:,} characters.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
if _BLOCKED_SQL_RE.search(sql):
return await render_template(
"admin/partials/pipeline_query_results.html",
error="Query contains a blocked keyword. Only SELECT statements are allowed.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
columns, rows, error, elapsed_ms = await execute_user_query(
sql,
max_rows=_QUERY_MAX_ROWS,
timeout_seconds=_QUERY_TIMEOUT_SECONDS,
)
truncated = len(rows) >= _QUERY_MAX_ROWS
return await render_template(
"admin/partials/pipeline_query_results.html",
error=error,
columns=columns,
rows=rows,
row_count=len(rows),
elapsed_ms=elapsed_ms,
truncated=truncated,
)

View File

@@ -133,6 +133,12 @@
SEO Hub
</a>
<div class="admin-sidebar__section">Pipeline</div>
<a href="{{ url_for('pipeline.pipeline_dashboard') }}" class="{% if admin_page == 'pipeline' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M20.25 6.375c0 2.278-3.694 4.125-8.25 4.125S3.75 8.653 3.75 6.375m16.5 0c0-2.278-3.694-4.125-8.25-4.125S3.75 4.097 3.75 6.375m16.5 0v11.25c0 2.278-3.694 4.125-8.25 4.125s-8.25-1.847-8.25-4.125V6.375m16.5 0v3.75m-16.5-3.75v3.75m16.5 0v3.75C20.25 16.153 16.556 18 12 18s-8.25-1.847-8.25-4.125v-3.75m16.5 0c0 2.278-3.694 4.125-8.25 4.125s-8.25-1.847-8.25-4.125"/></svg>
Pipeline
</a>
<div class="admin-sidebar__section">System</div>
<a href="{{ url_for('admin.flags') }}" class="{% if admin_page == 'flags' %}active{% endif %}">
<svg xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 24 24" stroke-width="1.5" stroke="currentColor"><path stroke-linecap="round" stroke-linejoin="round" d="M3 3v1.5M3 21v-6m0 0 2.77-.693a9 9 0 0 1 6.208.682l.108.054a9 9 0 0 0 6.086.71l3.114-.732a48.524 48.524 0 0 1-.005-10.499l-3.11.732a9 9 0 0 1-6.085-.711l-.108-.054a9 9 0 0 0-6.208-.682L3 4.5M3 15V4.5"/></svg>

View File

@@ -5,13 +5,16 @@ Opens a single long-lived DuckDB connection at startup (read_only=True).
All queries run via asyncio.to_thread() to avoid blocking the event loop.
Usage:
from .analytics import fetch_analytics
from .analytics import fetch_analytics, execute_user_query
rows = await fetch_analytics("SELECT * FROM serving.planner_defaults WHERE city_slug = ?", ["berlin"])
cols, rows, error, elapsed_ms = await execute_user_query("SELECT city_slug FROM serving.city_market_profile LIMIT 5")
"""
import asyncio
import logging
import os
import time
from pathlib import Path
from typing import Any
@@ -77,3 +80,48 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str
except Exception:
logger.exception("DuckDB analytics query failed: %.200s", sql)
return []
async def execute_user_query(
sql: str,
max_rows: int = 1000,
timeout_seconds: int = 10,
) -> tuple[list[str], list[tuple], str | None, float]:
"""
Run an admin-submitted SQL query.
Returns (columns, rows, error, elapsed_ms).
- columns: list of column name strings (empty on error)
- rows: list of value tuples, capped at max_rows
- error: error message string, or None on success
- elapsed_ms: wall-clock query time in milliseconds
"""
assert sql, "sql must not be empty"
assert 1 <= max_rows <= 10_000, f"max_rows must be 110000, got {max_rows}"
assert 1 <= timeout_seconds <= 60, f"timeout_seconds must be 160, got {timeout_seconds}"
if _conn is None:
return [], [], "Analytics database is not available.", 0.0
def _run() -> tuple[list[str], list[tuple], str | None, float]:
t0 = time.monotonic()
cur = _conn.cursor()
try:
rel = cur.execute(sql)
cols = [d[0] for d in rel.description]
rows = rel.fetchmany(max_rows)
elapsed_ms = round((time.monotonic() - t0) * 1000, 1)
return cols, rows, None, elapsed_ms
except Exception as exc:
elapsed_ms = round((time.monotonic() - t0) * 1000, 1)
return [], [], str(exc), elapsed_ms
finally:
cur.close()
try:
return await asyncio.wait_for(
asyncio.to_thread(_run),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
return [], [], f"Query timed out after {timeout_seconds}s.", 0.0

View File

@@ -313,6 +313,7 @@ def create_app() -> Quart:
# Blueprint registration
# -------------------------------------------------------------------------
from .admin.pipeline_routes import bp as pipeline_bp
from .admin.pseo_routes import bp as pseo_bp
from .admin.routes import bp as admin_bp
from .auth.routes import bp as auth_bp
@@ -339,6 +340,7 @@ def create_app() -> Quart:
app.register_blueprint(billing_bp)
app.register_blueprint(admin_bp)
app.register_blueprint(pseo_bp)
app.register_blueprint(pipeline_bp)
app.register_blueprint(webhooks_bp)
# Content catch-all LAST — lives under /<lang> too

View File

@@ -698,6 +698,32 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None:
logger.info("Cleaned up %s old SEO metric rows", deleted)
@task("run_extraction")
async def handle_run_extraction(payload: dict) -> None:
"""Run the full extraction pipeline (all extractors) in the background.
Shells out to `uv run extract` in the repo root. The extraction CLI
manages its own state in .state.sqlite and writes to the landing zone.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "--package", "padelnomics_extract", "extract"],
capture_output=True,
text=True,
timeout=7200, # 2-hour absolute timeout
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Extraction failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("generate_articles")
async def handle_generate_articles(payload: dict) -> None:
"""Generate articles from a template in the background."""