Files
padelnomics/web/src/padelnomics/admin/pipeline_routes.py

1028 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Pipeline Console admin blueprint.
Operational visibility for the data extraction and transformation pipeline:
/admin/pipeline/ → dashboard (health stats + tab container)
/admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats
/admin/pipeline/extractions → HTMX tab: filterable extraction run history
/admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed
/admin/pipeline/extract/trigger → POST: enqueue full extraction run
/admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data)
/admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample)
/admin/pipeline/query → HTMX tab: SQL query editor
/admin/pipeline/query/execute → POST: run user SQL, return results table
Data sources:
- data/landing/.state.sqlite (extraction run history — stdlib sqlite3, sync via to_thread)
- SERVING_DUCKDB_PATH/../_serving_meta.json (export timestamp + per-table row counts)
- analytics.duckdb (DuckDB read-only via analytics.execute_user_query)
- LANDING_DIR/ (filesystem scan for file sizes + dates)
- infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib)
"""
import asyncio
import json
import logging
import os
import re
import sqlite3
import tomllib
from datetime import UTC, datetime, timedelta
from pathlib import Path
from quart import Blueprint, flash, redirect, render_template, request, url_for
from ..auth.routes import role_required
from ..core import csrf_protect
logger = logging.getLogger(__name__)
bp = Blueprint(
"pipeline",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/admin/pipeline",
)
# ── Config ────────────────────────────────────────────────────────────────────
_LANDING_DIR = os.environ.get("LANDING_DIR", "data/landing")
_SERVING_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb")
# Repo root: web/src/padelnomics/admin/ → up 4 levels
_REPO_ROOT = Path(__file__).resolve().parents[5]
_WORKFLOWS_TOML = _REPO_ROOT / "infra" / "supervisor" / "workflows.toml"
# A "running" row older than this is considered stale/crashed.
_STALE_THRESHOLD_HOURS = 2
# Query editor limits
_QUERY_MAX_CHARS = 10_000
_QUERY_MAX_ROWS = 1_000
_QUERY_TIMEOUT_SECONDS = 10
# Blocked SQL keywords (read-only connection enforces engine-level, this adds belt+suspenders)
_BLOCKED_SQL_RE = re.compile(
r"\b(ATTACH|COPY|EXPORT|INSTALL|LOAD|CREATE|DROP|ALTER|INSERT|UPDATE|DELETE|GRANT|REVOKE|PRAGMA)\b",
re.IGNORECASE,
)
# ── Lineage DAG ───────────────────────────────────────────────────────────────
#
# Canonical model dependency map: model_name → [upstream_dependencies].
# Layer is derived from name prefix: stg_* = staging, dim_*/fct_* = foundation,
# everything else = serving.
# Update this dict whenever models are added or removed from transform/.
_DAG: dict[str, list[str]] = {
# Staging — read landing zone files, no model deps
"stg_padel_courts": [],
"stg_playtomic_venues": [],
"stg_playtomic_resources": [],
"stg_playtomic_opening_hours": [],
"stg_playtomic_availability": [],
"stg_population": [],
"stg_population_usa": [],
"stg_population_uk": [],
"stg_population_geonames": [],
"stg_income": [],
"stg_income_usa": [],
"stg_city_labels": [],
"stg_nuts2_boundaries": [],
"stg_regional_income": [],
"stg_tennis_courts": [],
# Foundation
"dim_venues": ["stg_playtomic_venues", "stg_playtomic_resources", "stg_padel_courts"],
"dim_cities": [
"dim_venues", "stg_income", "stg_city_labels",
"stg_population", "stg_population_usa", "stg_population_uk", "stg_population_geonames",
],
"dim_locations": [
"stg_population_geonames", "stg_income", "stg_nuts2_boundaries",
"stg_regional_income", "stg_income_usa", "stg_padel_courts", "stg_tennis_courts",
],
"dim_venue_capacity": [
"dim_venues", "stg_playtomic_resources", "stg_playtomic_opening_hours",
],
"fct_availability_slot": ["stg_playtomic_availability"],
"fct_daily_availability": ["fct_availability_slot", "dim_venue_capacity"],
# Serving
"venue_pricing_benchmarks": ["fct_daily_availability"],
"city_market_profile": ["dim_cities", "venue_pricing_benchmarks"],
"planner_defaults": ["venue_pricing_benchmarks", "city_market_profile"],
"location_opportunity_profile": ["dim_locations"],
"pseo_city_costs_de": [
"city_market_profile", "planner_defaults", "location_opportunity_profile",
],
"pseo_city_pricing": ["venue_pricing_benchmarks", "city_market_profile"],
"pseo_country_overview": ["pseo_city_costs_de"],
}
def _classify_layer(name: str) -> str:
"""Return 'staging', 'foundation', or 'serving' for a model name."""
if name.startswith("stg_"):
return "staging"
if name.startswith("dim_") or name.startswith("fct_"):
return "foundation"
return "serving"
def _render_lineage_svg(dag: dict[str, list[str]]) -> str:
"""Render the 3-layer model dependency DAG as an SVG string.
Layout: three vertical swim lanes (staging / foundation / serving) with
nodes stacked top-to-bottom in each lane. Edges are cubic bezier paths
flowing left-to-right. No external dependencies — pure Python string
construction.
"""
# ── Layout constants ───────────────────────────────────────────────────
CHAR_WIDTH_PX = 7.4 # approximate monospace char width at 11px
NODE_PAD_H = 10 # horizontal padding inside node rect
NODE_H = 26 # node height
NODE_VGAP = 10 # vertical gap between nodes in same lane
LANE_PAD_TOP = 52 # space for lane header
LANE_PAD_BOTTOM = 24
LANE_INNER_W = 210 # inner usable width per lane
LANE_GAP = 40 # gap between lanes
LANE_PAD_LEFT = 16 # left padding inside lane bg
LANE_COLORS = {
"staging": {"bg": "#F0FDF4", "border": "#BBF7D0", "accent": "#16A34A",
"fill": "#DCFCE7", "text": "#14532D"},
"foundation": {"bg": "#EFF6FF", "border": "#BFDBFE", "accent": "#1D4ED8",
"fill": "#DBEAFE", "text": "#1E3A8A"},
"serving": {"bg": "#FFFBEB", "border": "#FDE68A", "accent": "#D97706",
"fill": "#FEF3C7", "text": "#78350F"},
}
LANE_ORDER = ["staging", "foundation", "serving"]
LANE_LABELS = {"staging": "STAGING", "foundation": "FOUNDATION", "serving": "SERVING"}
# ── Group and sort nodes per layer ─────────────────────────────────────
# Count how many nodes each node is depended upon by (downstream count)
downstream: dict[str, int] = {n: 0 for n in dag}
for deps in dag.values():
for d in deps:
downstream[d] = downstream.get(d, 0) + 1
layers: dict[str, list[str]] = {"staging": [], "foundation": [], "serving": []}
for name in dag:
layers[_classify_layer(name)].append(name)
for layer_name, nodes in layers.items():
# Sort: most-connected first (hub nodes near vertical center), then alpha
nodes.sort(key=lambda n: (-downstream.get(n, 0), n))
# ── Compute node widths ────────────────────────────────────────────────
def node_w(name: str) -> float:
return max(len(name) * CHAR_WIDTH_PX + NODE_PAD_H * 2, 80.0)
# ── Assign positions ───────────────────────────────────────────────────
# x = left edge of lane background; node rect starts at x + LANE_PAD_LEFT
lane_x: dict[str, float] = {}
x_cursor = 0.0
for lane in LANE_ORDER:
lane_x[lane] = x_cursor
x_cursor += LANE_INNER_W + LANE_PAD_LEFT * 2 + LANE_GAP
positions: dict[str, tuple[float, float]] = {} # node → (rect_x, rect_y)
lane_heights: dict[str, float] = {}
for lane in LANE_ORDER:
nodes = layers[lane]
y = LANE_PAD_TOP
for name in nodes:
rx = lane_x[lane] + LANE_PAD_LEFT
positions[name] = (rx, y)
y += NODE_H + NODE_VGAP
lane_heights[lane] = y + LANE_PAD_BOTTOM - NODE_VGAP
total_w = x_cursor - LANE_GAP
total_h = max(lane_heights.values())
# ── SVG assembly ───────────────────────────────────────────────────────
parts: list[str] = []
# Arrowhead marker
parts.append(
'<defs>'
'<marker id="arr" markerWidth="6" markerHeight="6" refX="5" refY="3" orient="auto">'
'<path d="M0,0 L0,6 L6,3 z" fill="#94A3B8"/>'
'</marker>'
'<marker id="arr-hi" markerWidth="6" markerHeight="6" refX="5" refY="3" orient="auto">'
'<path d="M0,0 L0,6 L6,3 z" fill="#1D4ED8"/>'
'</marker>'
'</defs>'
)
# Lane backgrounds + headers
for lane in LANE_ORDER:
c = LANE_COLORS[lane]
lx = lane_x[lane]
lw = LANE_INNER_W + LANE_PAD_LEFT * 2
lh = lane_heights[lane]
parts.append(
f'<rect x="{lx:.1f}" y="0" width="{lw:.1f}" height="{lh:.1f}" '
f'rx="10" fill="{c["bg"]}" stroke="{c["border"]}" stroke-width="1"/>'
)
# Lane header label
label_x = lx + lw / 2
parts.append(
f'<text x="{label_x:.1f}" y="28" text-anchor="middle" '
f'font-family="\'DM Sans\',ui-sans-serif,system-ui,sans-serif" '
f'font-size="10" font-weight="700" letter-spacing="1.5" '
f'fill="{c["accent"]}">{LANE_LABELS[lane]}</text>'
)
# Divider line under header
parts.append(
f'<line x1="{lx + 12:.1f}" y1="36" x2="{lx + lw - 12:.1f}" y2="36" '
f'stroke="{c["border"]}" stroke-width="1"/>'
)
# Edges (rendered before nodes so nodes appear on top)
for name, deps in dag.items():
if not deps:
continue
tx, ty = positions[name]
tgt_cx = tx # left edge of target node
tgt_cy = ty + NODE_H / 2
for dep in deps:
sx, sy = positions[dep]
sw = node_w(dep)
src_cx = sx + sw # right edge of source node
src_cy = sy + NODE_H / 2
cpx1 = src_cx + (tgt_cx - src_cx) * 0.45
cpx2 = tgt_cx - (tgt_cx - src_cx) * 0.45
d = f"M{src_cx:.1f},{src_cy:.1f} C{cpx1:.1f},{src_cy:.1f} {cpx2:.1f},{tgt_cy:.1f} {tgt_cx:.1f},{tgt_cy:.1f}"
parts.append(
f'<path class="lineage-edge" data-from="{dep}" data-to="{name}" '
f'd="{d}" fill="none" stroke="#CBD5E1" stroke-width="1" '
f'marker-end="url(#arr)"/>'
)
# Nodes
for name in dag:
layer = _classify_layer(name)
c = LANE_COLORS[layer]
rx, ry = positions[name]
rw = node_w(name)
text_x = rx + NODE_PAD_H
text_y = ry + NODE_H / 2 + 4 # +4 for baseline alignment
parts.append(
f'<g class="lineage-node" data-model="{name}">'
f'<rect x="{rx:.1f}" y="{ry:.1f}" width="{rw:.1f}" height="{NODE_H}" '
f'rx="5" fill="{c["fill"]}" stroke="{c["border"]}" stroke-width="1"/>'
# Left accent bar
f'<rect x="{rx:.1f}" y="{ry:.1f}" width="3" height="{NODE_H}" '
f'rx="5" fill="{c["accent"]}"/>'
f'<text x="{text_x:.1f}" y="{text_y:.1f}" '
f'font-family="\'Commit Mono\',ui-monospace,\'Cascadia Code\',monospace" '
f'font-size="11" fill="{c["text"]}">{name}</text>'
'</g>'
)
svg_inner = "\n".join(parts)
return (
f'<svg class="lineage-svg" viewBox="0 0 {total_w:.1f} {total_h:.1f}" '
f'xmlns="http://www.w3.org/2000/svg" '
f'style="width:100%;height:auto;min-width:{total_w:.0f}px">'
f'{svg_inner}'
f'</svg>'
)
# ── Sidebar data injection (same pattern as pseo_routes.py) ──────────────────
@bp.before_request
async def _inject_sidebar_data():
"""Load unread inbox count for the admin sidebar badge."""
from quart import g
from ..core import fetch_one
try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception:
g.admin_unread_count = 0
@bp.context_processor
def _admin_context():
from quart import g
return {"unread_count": getattr(g, "admin_unread_count", 0)}
# ── Data access: state DB (sync, called via to_thread) ────────────────────────
def _state_db_path() -> Path:
return Path(_LANDING_DIR) / ".state.sqlite"
def _fetch_extraction_summary_sync() -> dict:
"""Aggregate stats from extraction_runs: total, success, failed, stale counts."""
db_path = _state_db_path()
if not db_path.exists():
return {"total": 0, "success": 0, "failed": 0, "running": 0, "stale": 0}
cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed,
SUM(CASE WHEN status = 'running' THEN 1 ELSE 0 END) AS running,
SUM(CASE WHEN status = 'running' AND started_at < ? THEN 1 ELSE 0 END) AS stale
FROM extraction_runs
""",
(cutoff,),
).fetchone()
return {
"total": row["total"] or 0,
"success": row["success"] or 0,
"failed": row["failed"] or 0,
"running": row["running"] or 0,
"stale": row["stale"] or 0,
}
finally:
conn.close()
def _fetch_latest_per_extractor_sync() -> list[dict]:
"""Return most recent run for each extractor name."""
db_path = _state_db_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT r.*
FROM extraction_runs r
INNER JOIN (
SELECT extractor, MAX(run_id) AS max_id
FROM extraction_runs
GROUP BY extractor
) latest ON r.run_id = latest.max_id
ORDER BY r.extractor
"""
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def _fetch_extraction_runs_sync(
*,
extractor: str = "",
status: str = "",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Return (rows, total_count) for the filtered run history."""
assert 1 <= limit <= 200, f"limit must be 1200, got {limit}"
assert offset >= 0, f"offset must be >= 0, got {offset}"
db_path = _state_db_path()
if not db_path.exists():
return [], 0
where_clauses = []
params: list = []
if extractor:
where_clauses.append("extractor = ?")
params.append(extractor)
if status:
where_clauses.append("status = ?")
params.append(status)
where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
total = conn.execute(
f"SELECT COUNT(*) FROM extraction_runs {where_sql}", params
).fetchone()[0]
rows = conn.execute(
f"""
SELECT run_id, extractor, started_at, finished_at, status,
files_written, files_skipped, bytes_written,
cursor_value, error_message
FROM extraction_runs {where_sql}
ORDER BY run_id DESC
LIMIT ? OFFSET ?
""",
params + [limit, offset],
).fetchall()
return [dict(r) for r in rows], total
finally:
conn.close()
def _fetch_distinct_extractors_sync() -> list[str]:
"""Return distinct extractor names for filter dropdowns."""
db_path = _state_db_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
try:
rows = conn.execute(
"SELECT DISTINCT extractor FROM extraction_runs ORDER BY extractor"
).fetchall()
return [r[0] for r in rows]
finally:
conn.close()
def _mark_run_failed_sync(run_id: int) -> bool:
"""Mark a stuck 'running' row as 'failed'. Returns True if row was updated."""
assert run_id > 0, f"run_id must be positive, got {run_id}"
db_path = _state_db_path()
if not db_path.exists():
return False
conn = sqlite3.connect(str(db_path))
try:
cur = conn.execute(
"""
UPDATE extraction_runs
SET status = 'failed',
finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
error_message = 'Marked failed manually (admin — process appeared stuck)'
WHERE run_id = ? AND status = 'running'
""",
(run_id,),
)
conn.commit()
return cur.rowcount > 0
finally:
conn.close()
# ── Data access: serving meta ─────────────────────────────────────────────────
def _load_serving_meta() -> dict | None:
"""Read _serving_meta.json alongside analytics.duckdb. Returns None if absent."""
meta_path = Path(_SERVING_DUCKDB_PATH).parent / "_serving_meta.json"
if not meta_path.exists():
return None
try:
return json.loads(meta_path.read_text())
except Exception:
logger.warning("Failed to read _serving_meta.json", exc_info=True)
return None
# ── Data access: landing zone filesystem ─────────────────────────────────────
def _get_landing_zone_stats_sync() -> list[dict]:
"""Scan LANDING_DIR and return per-source file counts + total bytes."""
landing = Path(_LANDING_DIR)
if not landing.exists():
return []
sources = []
for source_dir in sorted(landing.iterdir()):
if not source_dir.is_dir() or source_dir.name.startswith("."):
continue
files = list(source_dir.rglob("*.gz")) + list(source_dir.rglob("*.jsonl"))
total_bytes = sum(f.stat().st_size for f in files)
latest_mtime = max((f.stat().st_mtime for f in files), default=None)
sources.append({
"name": source_dir.name,
"file_count": len(files),
"total_bytes": total_bytes,
"latest_mtime": (
datetime.fromtimestamp(latest_mtime, tz=UTC).strftime("%Y-%m-%d %H:%M")
if latest_mtime
else None
),
})
return sources
# ── Data access: workflows.toml ───────────────────────────────────────────────
_SCHEDULE_LABELS = {
"hourly": "Every hour",
"daily": "Daily",
"weekly": "Weekly",
"monthly": "Monthly",
}
def _load_workflows() -> list[dict]:
"""Parse workflows.toml and return workflow definitions with human schedule labels."""
if not _WORKFLOWS_TOML.exists():
return []
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
workflows = []
for name, config in data.items():
schedule = config.get("schedule", "")
schedule_label = _SCHEDULE_LABELS.get(schedule, schedule)
workflows.append({
"name": name,
"module": config.get("module", ""),
"schedule": schedule,
"schedule_label": schedule_label,
"depends_on": config.get("depends_on", []),
})
return workflows
# ── Route helpers ─────────────────────────────────────────────────────────────
def _format_bytes(n: int) -> str:
"""Human-readable byte count."""
if n < 1024:
return f"{n} B"
if n < 1024 * 1024:
return f"{n / 1024:.1f} KB"
return f"{n / 1024 / 1024:.1f} MB"
def _duration_str(started_at: str | None, finished_at: str | None) -> str:
"""Return human-readable duration, or '' if unavailable."""
if not started_at or not finished_at:
return ""
try:
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(started_at, fmt)
end = datetime.strptime(finished_at, fmt)
delta = int((end - start).total_seconds())
if delta < 60:
return f"{delta}s"
return f"{delta // 60}m {delta % 60}s"
except ValueError:
return ""
def _is_stale(run: dict) -> bool:
"""True if a 'running' row has been stuck longer than the stale threshold."""
if run.get("status") != "running":
return False
started = run.get("started_at", "")
if not started:
return True
try:
fmt = "%Y-%m-%dT%H:%M:%SZ"
start = datetime.strptime(started, fmt).replace(tzinfo=UTC)
return (datetime.now(UTC) - start) > timedelta(hours=_STALE_THRESHOLD_HOURS)
except ValueError:
return False
# ── Dashboard ─────────────────────────────────────────────────────────────────
@bp.route("/")
@role_required("admin")
async def pipeline_dashboard():
"""Main page: health stat cards + tab container."""
from ..analytics import fetch_analytics # noqa: PLC0415
summary, serving_meta = await asyncio.gather(
asyncio.to_thread(_fetch_extraction_summary_sync),
asyncio.to_thread(_load_serving_meta),
)
if serving_meta:
total_serving_tables = len(serving_meta.get("tables", {}))
last_export = serving_meta.get("exported_at_utc", "")[:19].replace("T", " ") or ""
else:
schema_rows = await fetch_analytics(
"SELECT COUNT(*) AS n FROM information_schema.tables WHERE table_schema = 'serving'"
)
total_serving_tables = schema_rows[0]["n"] if schema_rows else 0
last_export = ""
success_rate = 0
if summary["total"] > 0:
success_rate = round(100 * summary["success"] / summary["total"])
return await render_template(
"admin/pipeline.html",
summary=summary,
success_rate=success_rate,
total_serving_tables=total_serving_tables,
last_export=last_export,
admin_page="pipeline",
)
# ── Overview tab ─────────────────────────────────────────────────────────────
@bp.route("/overview")
@role_required("admin")
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather(
asyncio.to_thread(_fetch_latest_per_extractor_sync),
asyncio.to_thread(_get_landing_zone_stats_sync),
asyncio.to_thread(_load_workflows),
asyncio.to_thread(_load_serving_meta),
)
# Build a lookup: extractor name → latest run
latest_by_name = {r["extractor"]: r for r in latest_runs}
# Enrich each workflow with its latest run data
workflow_rows = []
for wf in workflows:
run = latest_by_name.get(wf["name"])
workflow_rows.append({
"workflow": wf,
"run": run,
"stale": _is_stale(run) if run else False,
})
# Compute landing zone totals
total_landing_bytes = sum(s["total_bytes"] for s in landing_stats)
# Build serving tables list: prefer _serving_meta.json (has counts + timestamp),
# fall back to information_schema when file doesn't exist yet.
if serving_meta:
serving_tables = [
{"name": name, "row_count": meta.get("row_count")}
for name, meta in sorted(serving_meta.get("tables", {}).items())
]
last_export = serving_meta.get("exported_at_utc", "")[:19].replace("T", " ") or None
else:
from ..analytics import fetch_analytics # noqa: PLC0415
schema_rows = await fetch_analytics(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'serving' ORDER BY table_name"
)
serving_tables = [{"name": r["table_name"], "row_count": None} for r in schema_rows]
last_export = None
return await render_template(
"admin/partials/pipeline_overview.html",
workflow_rows=workflow_rows,
landing_stats=landing_stats,
total_landing_bytes=total_landing_bytes,
serving_tables=serving_tables,
last_export=last_export,
format_bytes=_format_bytes,
)
# ── Extractions tab ────────────────────────────────────────────────────────────
@bp.route("/extractions")
@role_required("admin")
async def pipeline_extractions():
"""HTMX tab: paginated + filtered extraction run history."""
extractor_filter = request.args.get("extractor", "")
status_filter = request.args.get("status", "")
page = max(1, int(request.args.get("page", 1)))
per_page = 30
(runs, total), extractors = await asyncio.gather(
asyncio.to_thread(
_fetch_extraction_runs_sync,
extractor=extractor_filter,
status=status_filter,
limit=per_page,
offset=(page - 1) * per_page,
),
asyncio.to_thread(_fetch_distinct_extractors_sync),
)
# Enrich rows with computed fields
for run in runs:
run["duration"] = _duration_str(run.get("started_at"), run.get("finished_at"))
run["bytes_label"] = _format_bytes(run.get("bytes_written") or 0)
run["is_stale"] = _is_stale(run)
total_pages = max(1, (total + per_page - 1) // per_page)
return await render_template(
"admin/partials/pipeline_extractions.html",
runs=runs,
total=total,
page=page,
per_page=per_page,
total_pages=total_pages,
extractors=extractors,
extractor_filter=extractor_filter,
status_filter=status_filter,
)
@bp.route("/extractions/<int:run_id>/mark-stale", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_mark_stale(run_id: int):
"""Mark a stuck 'running' extraction row as 'failed'."""
updated = await asyncio.to_thread(_mark_run_failed_sync, run_id)
if updated:
await flash(f"Run #{run_id} marked as failed.", "success")
else:
await flash(f"Run #{run_id} could not be updated (not in 'running' state).", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Trigger extraction ────────────────────────────────────────────────────────
@bp.route("/extract/trigger", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_trigger_extract():
"""Enqueue an extraction run — all extractors, or a single named one."""
from ..worker import enqueue
form = await request.form
extractor = (form.get("extractor") or "").strip()
if extractor:
valid_names = {wf["name"] for wf in await asyncio.to_thread(_load_workflows)}
if extractor not in valid_names:
await flash(f"Unknown extractor '{extractor}'.", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
await enqueue("run_extraction", {"extractor": extractor})
await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success")
else:
await enqueue("run_extraction")
await flash("Extraction run queued. Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Lineage tab ───────────────────────────────────────────────────────────────
# Compute downstream map once at import time (DAG is static).
_DOWNSTREAM: dict[str, list[str]] = {n: [] for n in _DAG}
for _name, _deps in _DAG.items():
for _dep in _deps:
_DOWNSTREAM.setdefault(_dep, []).append(_name)
@bp.route("/lineage")
@role_required("admin")
async def pipeline_lineage():
"""HTMX tab: data lineage DAG visualization."""
svg = await asyncio.to_thread(_render_lineage_svg, _DAG)
return await render_template(
"admin/partials/pipeline_lineage.html",
lineage_svg=svg,
node_count=len(_DAG),
)
@bp.route("/lineage/schema/<model>")
@role_required("admin")
async def pipeline_lineage_schema(model: str):
"""JSON: schema details for a lineage node.
Returns columns + types from information_schema (serving models only —
staging/foundation live in lakehouse.duckdb which the web app cannot open).
Row count is included for serving models when the table exists.
"""
from quart import jsonify
from ..analytics import fetch_analytics
if model not in _DAG:
return jsonify({"error": "unknown model"}), 404
layer = _classify_layer(model)
upstream = _DAG[model]
downstream = _DOWNSTREAM.get(model, [])
row_count = None
columns: list[dict] = []
if layer == "serving":
col_rows = await fetch_analytics(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'serving' AND table_name = ?
ORDER BY ordinal_position
""",
[model],
)
columns = [
{
"name": r["column_name"],
"type": r["data_type"],
"nullable": r["is_nullable"] == "YES",
}
for r in col_rows
]
if columns:
# model is validated against _DAG keys — safe to interpolate
count_rows = await fetch_analytics(
f"SELECT count(*) AS n FROM serving.{model}"
)
if count_rows:
row_count = count_rows[0]["n"]
return jsonify(
{
"model": model,
"layer": layer,
"upstream": upstream,
"downstream": downstream,
"row_count": row_count,
"columns": columns,
}
)
# ── Catalog tab ───────────────────────────────────────────────────────────────
@bp.route("/catalog")
@role_required("admin")
async def pipeline_catalog():
"""HTMX tab: list serving tables with row counts + column counts."""
from ..analytics import fetch_analytics
schema_rows = await fetch_analytics(
"""
SELECT table_name, column_name, data_type, ordinal_position
FROM information_schema.columns
WHERE table_schema = 'serving'
ORDER BY table_name, ordinal_position
"""
)
# Group by table
tables: dict[str, dict] = {}
for row in schema_rows:
tname = row["table_name"]
if tname not in tables:
tables[tname] = {"name": tname, "columns": [], "column_count": 0}
tables[tname]["columns"].append({
"name": row["column_name"],
"type": row["data_type"],
})
tables[tname]["column_count"] += 1
# Enrich with row counts from serving meta
serving_meta = await asyncio.to_thread(_load_serving_meta)
meta_counts = serving_meta.get("tables", {}) if serving_meta else {}
for tname, tdata in tables.items():
tdata["row_count"] = meta_counts.get(tname, {}).get("row_count")
return await render_template(
"admin/partials/pipeline_catalog.html",
tables=list(tables.values()),
serving_meta=serving_meta,
)
@bp.route("/catalog/<table_name>")
@role_required("admin")
async def pipeline_table_detail(table_name: str):
"""HTMX partial: column list + 10-row sample for a serving table."""
from ..analytics import fetch_analytics
# Validate table name before using in SQL
if not re.match(r"^[a-z_][a-z0-9_]*$", table_name):
return "Invalid table name", 400
# Confirm table exists in serving schema
exists = await fetch_analytics(
"SELECT 1 FROM information_schema.tables"
" WHERE table_schema = 'serving' AND table_name = ?",
[table_name],
)
if not exists:
return f"Table serving.{table_name} not found", 404
columns, sample = await asyncio.gather(
fetch_analytics(
"SELECT column_name, data_type, ordinal_position"
" FROM information_schema.columns"
" WHERE table_schema = 'serving' AND table_name = ?"
" ORDER BY ordinal_position",
[table_name],
),
fetch_analytics(
f"SELECT * FROM serving.{table_name} LIMIT 10" # noqa: S608
),
)
return await render_template(
"admin/partials/pipeline_table_detail.html",
table_name=table_name,
columns=columns,
sample=sample,
)
# ── Query editor ──────────────────────────────────────────────────────────────
@bp.route("/query")
@role_required("admin")
async def pipeline_query_editor():
"""HTMX tab: SQL query editor with schema sidebar."""
from ..analytics import fetch_analytics
schema_rows = await fetch_analytics(
"""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'serving'
ORDER BY table_name, ordinal_position
"""
)
# Group by table for the schema sidebar
schema: dict[str, list] = {}
for row in schema_rows:
tname = row["table_name"]
if tname not in schema:
schema[tname] = []
schema[tname].append({"name": row["column_name"], "type": row["data_type"]})
return await render_template(
"admin/partials/pipeline_query.html",
schema=schema,
max_rows=_QUERY_MAX_ROWS,
timeout_seconds=_QUERY_TIMEOUT_SECONDS,
)
@bp.route("/query/execute", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_query_execute():
"""Run user-submitted SQL and return a results table partial."""
from ..analytics import execute_user_query
form = await request.form
sql = (form.get("sql") or "").strip()
# Input validation
if not sql:
return await render_template(
"admin/partials/pipeline_query_results.html",
error="SQL query is empty.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
if len(sql) > _QUERY_MAX_CHARS:
return await render_template(
"admin/partials/pipeline_query_results.html",
error=f"Query too long ({len(sql):,} chars). Maximum is {_QUERY_MAX_CHARS:,} characters.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
if _BLOCKED_SQL_RE.search(sql):
return await render_template(
"admin/partials/pipeline_query_results.html",
error="Query contains a blocked keyword. Only SELECT statements are allowed.",
columns=[],
rows=[],
row_count=0,
elapsed_ms=0,
truncated=False,
)
columns, rows, error, elapsed_ms = await execute_user_query(
sql,
max_rows=_QUERY_MAX_ROWS,
timeout_seconds=_QUERY_TIMEOUT_SECONDS,
)
truncated = len(rows) >= _QUERY_MAX_ROWS
return await render_template(
"admin/partials/pipeline_query_results.html",
error=error,
columns=columns,
rows=rows,
row_count=len(rows),
elapsed_ms=elapsed_ms,
truncated=truncated,
)