feat(lineage): hover tooltip + click-to-inspect schema panel

- New route GET /admin/pipeline/lineage/schema/<model> — returns JSON
  with columns+types (from information_schema for serving models),
  row count, upstream and downstream model lists. Validates model
  against _DAG to prevent arbitrary table access.
- Precomputes _DOWNSTREAM map at import time from _DAG.
- Lineage template: replaces minimal edge-highlight JS with full UX —
  hover triggers schema prefetch + floating tooltip (layer badge, top 4
  columns, "+N more" note); click opens 320px slide-in panel showing
  row count, full schema table, upstream/downstream dep lists.
  Dep items in panel are clickable to navigate between models.
  Schema responses are cached client-side to avoid repeat fetches.
  Staging/foundation models show "schema in lakehouse.duckdb only".

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-27 13:23:54 +01:00
parent 41a598df53
commit 89ff931212
2 changed files with 335 additions and 17 deletions

View File

@@ -767,6 +767,12 @@ async def pipeline_trigger_extract():
# ── Lineage tab ───────────────────────────────────────────────────────────────
# Compute downstream map once at import time (DAG is static).
_DOWNSTREAM: dict[str, list[str]] = {n: [] for n in _DAG}
for _name, _deps in _DAG.items():
for _dep in _deps:
_DOWNSTREAM.setdefault(_dep, []).append(_name)
@bp.route("/lineage")
@role_required("admin")
@@ -780,6 +786,67 @@ async def pipeline_lineage():
)
@bp.route("/lineage/schema/<model>")
@role_required("admin")
async def pipeline_lineage_schema(model: str):
"""JSON: schema details for a lineage node.
Returns columns + types from information_schema (serving models only —
staging/foundation live in lakehouse.duckdb which the web app cannot open).
Row count is included for serving models when the table exists.
"""
from quart import jsonify
from ..analytics import fetch_analytics
if model not in _DAG:
return jsonify({"error": "unknown model"}), 404
layer = _classify_layer(model)
upstream = _DAG[model]
downstream = _DOWNSTREAM.get(model, [])
row_count = None
columns: list[dict] = []
if layer == "serving":
col_rows = await fetch_analytics(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'serving' AND table_name = ?
ORDER BY ordinal_position
""",
[model],
)
columns = [
{
"name": r["column_name"],
"type": r["data_type"],
"nullable": r["is_nullable"] == "YES",
}
for r in col_rows
]
if columns:
# model is validated against _DAG keys — safe to interpolate
count_rows = await fetch_analytics(
f"SELECT count(*) AS n FROM serving.{model}"
)
if count_rows:
row_count = count_rows[0]["n"]
return jsonify(
{
"model": model,
"layer": layer,
"upstream": upstream,
"downstream": downstream,
"row_count": row_count,
"columns": columns,
}
)
# ── Catalog tab ───────────────────────────────────────────────────────────────