feat(pipeline): add Lineage tab — server-rendered SVG DAG visualization

Adds a 5th tab to the admin pipeline page showing the full 3-layer
SQLMesh data lineage: 28 models, 35 edges across staging / foundation /
serving swim lanes.

- _DAG: canonical model dependency dict in pipeline_routes.py;
  update when models are added/removed
- _classify_layer(): derives layer from name prefix (stg_/dim_fct_/rest)
- _render_lineage_svg(): pure Python SVG generator — 3-column swim lane
  layout, bezier edges, color-coded per layer (green/blue/amber),
  no external dependencies
- /lineage route: HTMX tab handler
- pipeline_lineage.html: partial with SVG embed + vanilla JS hover
  effects (highlight connected edges, dim unrelated)
- pipeline.html: 5th "Lineage" tab button

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-27 11:55:39 +01:00
parent c269caf048
commit 160c2c6f7b
3 changed files with 289 additions and 0 deletions

View File

@@ -66,6 +66,226 @@ _BLOCKED_SQL_RE = re.compile(
re.IGNORECASE, re.IGNORECASE,
) )
# ── Lineage DAG ───────────────────────────────────────────────────────────────
#
# Canonical model dependency map: model_name → [upstream_dependencies].
# Layer is derived from name prefix: stg_* = staging, dim_*/fct_* = foundation,
# everything else = serving.
# Update this dict whenever models are added or removed from transform/.
_DAG: dict[str, list[str]] = {
# Staging — read landing zone files, no model deps
"stg_padel_courts": [],
"stg_playtomic_venues": [],
"stg_playtomic_resources": [],
"stg_playtomic_opening_hours": [],
"stg_playtomic_availability": [],
"stg_population": [],
"stg_population_usa": [],
"stg_population_uk": [],
"stg_population_geonames": [],
"stg_income": [],
"stg_income_usa": [],
"stg_city_labels": [],
"stg_nuts2_boundaries": [],
"stg_regional_income": [],
"stg_tennis_courts": [],
# Foundation
"dim_venues": ["stg_playtomic_venues", "stg_playtomic_resources", "stg_padel_courts"],
"dim_cities": [
"dim_venues", "stg_income", "stg_city_labels",
"stg_population", "stg_population_usa", "stg_population_uk", "stg_population_geonames",
],
"dim_locations": [
"stg_population_geonames", "stg_income", "stg_nuts2_boundaries",
"stg_regional_income", "stg_income_usa", "stg_padel_courts", "stg_tennis_courts",
],
"dim_venue_capacity": [
"stg_playtomic_venues", "stg_playtomic_resources", "stg_playtomic_opening_hours",
],
"fct_availability_slot": ["stg_playtomic_availability"],
"fct_daily_availability": ["fct_availability_slot", "dim_venue_capacity"],
# Serving
"venue_pricing_benchmarks": ["fct_daily_availability"],
"city_market_profile": ["dim_cities", "venue_pricing_benchmarks"],
"planner_defaults": ["venue_pricing_benchmarks", "city_market_profile"],
"location_opportunity_profile": ["dim_locations"],
"pseo_city_costs_de": [
"city_market_profile", "planner_defaults", "location_opportunity_profile",
],
"pseo_city_pricing": ["venue_pricing_benchmarks", "city_market_profile"],
"pseo_country_overview": ["pseo_city_costs_de"],
}
def _classify_layer(name: str) -> str:
"""Return 'staging', 'foundation', or 'serving' for a model name."""
if name.startswith("stg_"):
return "staging"
if name.startswith("dim_") or name.startswith("fct_"):
return "foundation"
return "serving"
def _render_lineage_svg(dag: dict[str, list[str]]) -> str:
"""Render the 3-layer model dependency DAG as an SVG string.
Layout: three vertical swim lanes (staging / foundation / serving) with
nodes stacked top-to-bottom in each lane. Edges are cubic bezier paths
flowing left-to-right. No external dependencies — pure Python string
construction.
"""
# ── Layout constants ───────────────────────────────────────────────────
CHAR_WIDTH_PX = 7.4 # approximate monospace char width at 11px
NODE_PAD_H = 10 # horizontal padding inside node rect
NODE_H = 26 # node height
NODE_VGAP = 10 # vertical gap between nodes in same lane
LANE_PAD_TOP = 52 # space for lane header
LANE_PAD_BOTTOM = 24
LANE_INNER_W = 210 # inner usable width per lane
LANE_GAP = 40 # gap between lanes
LANE_PAD_LEFT = 16 # left padding inside lane bg
LANE_COLORS = {
"staging": {"bg": "#F0FDF4", "border": "#BBF7D0", "accent": "#16A34A",
"fill": "#DCFCE7", "text": "#14532D"},
"foundation": {"bg": "#EFF6FF", "border": "#BFDBFE", "accent": "#1D4ED8",
"fill": "#DBEAFE", "text": "#1E3A8A"},
"serving": {"bg": "#FFFBEB", "border": "#FDE68A", "accent": "#D97706",
"fill": "#FEF3C7", "text": "#78350F"},
}
LANE_ORDER = ["staging", "foundation", "serving"]
LANE_LABELS = {"staging": "STAGING", "foundation": "FOUNDATION", "serving": "SERVING"}
# ── Group and sort nodes per layer ─────────────────────────────────────
# Count how many nodes each node is depended upon by (downstream count)
downstream: dict[str, int] = {n: 0 for n in dag}
for deps in dag.values():
for d in deps:
downstream[d] = downstream.get(d, 0) + 1
layers: dict[str, list[str]] = {"staging": [], "foundation": [], "serving": []}
for name in dag:
layers[_classify_layer(name)].append(name)
for layer_name, nodes in layers.items():
# Sort: most-connected first (hub nodes near vertical center), then alpha
nodes.sort(key=lambda n: (-downstream.get(n, 0), n))
# ── Compute node widths ────────────────────────────────────────────────
def node_w(name: str) -> float:
return max(len(name) * CHAR_WIDTH_PX + NODE_PAD_H * 2, 80.0)
# ── Assign positions ───────────────────────────────────────────────────
# x = left edge of lane background; node rect starts at x + LANE_PAD_LEFT
lane_x: dict[str, float] = {}
x_cursor = 0.0
for lane in LANE_ORDER:
lane_x[lane] = x_cursor
x_cursor += LANE_INNER_W + LANE_PAD_LEFT * 2 + LANE_GAP
positions: dict[str, tuple[float, float]] = {} # node → (rect_x, rect_y)
lane_heights: dict[str, float] = {}
for lane in LANE_ORDER:
nodes = layers[lane]
y = LANE_PAD_TOP
for name in nodes:
rx = lane_x[lane] + LANE_PAD_LEFT
positions[name] = (rx, y)
y += NODE_H + NODE_VGAP
lane_heights[lane] = y + LANE_PAD_BOTTOM - NODE_VGAP
total_w = x_cursor - LANE_GAP
total_h = max(lane_heights.values())
# ── SVG assembly ───────────────────────────────────────────────────────
parts: list[str] = []
# Arrowhead marker
parts.append(
'<defs>'
'<marker id="arr" markerWidth="6" markerHeight="6" refX="5" refY="3" orient="auto">'
'<path d="M0,0 L0,6 L6,3 z" fill="#94A3B8"/>'
'</marker>'
'<marker id="arr-hi" markerWidth="6" markerHeight="6" refX="5" refY="3" orient="auto">'
'<path d="M0,0 L0,6 L6,3 z" fill="#1D4ED8"/>'
'</marker>'
'</defs>'
)
# Lane backgrounds + headers
for lane in LANE_ORDER:
c = LANE_COLORS[lane]
lx = lane_x[lane]
lw = LANE_INNER_W + LANE_PAD_LEFT * 2
lh = lane_heights[lane]
parts.append(
f'<rect x="{lx:.1f}" y="0" width="{lw:.1f}" height="{lh:.1f}" '
f'rx="10" fill="{c["bg"]}" stroke="{c["border"]}" stroke-width="1"/>'
)
# Lane header label
label_x = lx + lw / 2
parts.append(
f'<text x="{label_x:.1f}" y="28" text-anchor="middle" '
f'font-family="\'DM Sans\',ui-sans-serif,system-ui,sans-serif" '
f'font-size="10" font-weight="700" letter-spacing="1.5" '
f'fill="{c["accent"]}">{LANE_LABELS[lane]}</text>'
)
# Divider line under header
parts.append(
f'<line x1="{lx + 12:.1f}" y1="36" x2="{lx + lw - 12:.1f}" y2="36" '
f'stroke="{c["border"]}" stroke-width="1"/>'
)
# Edges (rendered before nodes so nodes appear on top)
for name, deps in dag.items():
if not deps:
continue
tx, ty = positions[name]
tgt_cx = tx # left edge of target node
tgt_cy = ty + NODE_H / 2
for dep in deps:
sx, sy = positions[dep]
sw = node_w(dep)
src_cx = sx + sw # right edge of source node
src_cy = sy + NODE_H / 2
cpx1 = src_cx + (tgt_cx - src_cx) * 0.45
cpx2 = tgt_cx - (tgt_cx - src_cx) * 0.45
d = f"M{src_cx:.1f},{src_cy:.1f} C{cpx1:.1f},{src_cy:.1f} {cpx2:.1f},{tgt_cy:.1f} {tgt_cx:.1f},{tgt_cy:.1f}"
parts.append(
f'<path class="lineage-edge" data-from="{dep}" data-to="{name}" '
f'd="{d}" fill="none" stroke="#CBD5E1" stroke-width="1" '
f'marker-end="url(#arr)"/>'
)
# Nodes
for name in dag:
layer = _classify_layer(name)
c = LANE_COLORS[layer]
rx, ry = positions[name]
rw = node_w(name)
text_x = rx + NODE_PAD_H
text_y = ry + NODE_H / 2 + 4 # +4 for baseline alignment
parts.append(
f'<g class="lineage-node" data-model="{name}">'
f'<rect x="{rx:.1f}" y="{ry:.1f}" width="{rw:.1f}" height="{NODE_H}" '
f'rx="5" fill="{c["fill"]}" stroke="{c["border"]}" stroke-width="1"/>'
# Left accent bar
f'<rect x="{rx:.1f}" y="{ry:.1f}" width="3" height="{NODE_H}" '
f'rx="5" fill="{c["accent"]}"/>'
f'<text x="{text_x:.1f}" y="{text_y:.1f}" '
f'font-family="\'Commit Mono\',ui-monospace,\'Cascadia Code\',monospace" '
f'font-size="11" fill="{c["text"]}">{name}</text>'
'</g>'
)
svg_inner = "\n".join(parts)
return (
f'<svg class="lineage-svg" viewBox="0 0 {total_w:.1f} {total_h:.1f}" '
f'xmlns="http://www.w3.org/2000/svg" '
f'style="width:100%;height:auto;min-width:{total_w:.0f}px">'
f'{svg_inner}'
f'</svg>'
)
# ── Sidebar data injection (same pattern as pseo_routes.py) ────────────────── # ── Sidebar data injection (same pattern as pseo_routes.py) ──────────────────
@@ -545,6 +765,21 @@ async def pipeline_trigger_extract():
return redirect(url_for("pipeline.pipeline_dashboard")) return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Lineage tab ───────────────────────────────────────────────────────────────
@bp.route("/lineage")
@role_required("admin")
async def pipeline_lineage():
"""HTMX tab: data lineage DAG visualization."""
svg = await asyncio.to_thread(_render_lineage_svg, _DAG)
return await render_template(
"admin/partials/pipeline_lineage.html",
lineage_svg=svg,
node_count=len(_DAG),
)
# ── Catalog tab ─────────────────────────────────────────────────────────────── # ── Catalog tab ───────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,50 @@
<!-- Pipeline Lineage Tab: 3-layer DAG visualization (server-rendered SVG) -->
<div class="card">
<p class="card-header">Data Lineage
<span class="text-xs font-normal text-slate ml-2">
{{ node_count }} models &mdash; staging &rarr; foundation &rarr; serving
</span>
</p>
<div style="overflow-x:auto;padding:1rem 0.5rem 0.5rem">
{{ lineage_svg | safe }}
</div>
</div>
<style>
.lineage-node { cursor: default; }
.lineage-node rect:first-of-type { transition: filter 0.12s; }
.lineage-node:hover rect:first-of-type { filter: brightness(0.94); }
.lineage-edge { transition: stroke 0.12s, stroke-width 0.12s, opacity 0.12s; }
.lineage-edge.hi { stroke: #1D4ED8 !important; stroke-width: 2 !important; marker-end: url(#arr-hi) !important; }
.lineage-edge.dim { opacity: 0.12; }
</style>
<script>
(function () {
var svg = document.querySelector('.lineage-svg');
if (!svg) return;
var nodes = svg.querySelectorAll('.lineage-node');
var edges = svg.querySelectorAll('.lineage-edge');
nodes.forEach(function (g) {
var model = g.dataset.model;
g.addEventListener('mouseenter', function () {
edges.forEach(function (e) {
if (e.dataset.from === model || e.dataset.to === model) {
e.classList.add('hi');
e.classList.remove('dim');
} else {
e.classList.add('dim');
e.classList.remove('hi');
}
});
});
g.addEventListener('mouseleave', function () {
edges.forEach(function (e) {
e.classList.remove('hi', 'dim');
});
});
});
})();
</script>

View File

@@ -93,6 +93,10 @@
hx-get="{{ url_for('pipeline.pipeline_query_editor') }}" hx-get="{{ url_for('pipeline.pipeline_query_editor') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML" hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Query</button> hx-trigger="click">Query</button>
<button data-tab="lineage"
hx-get="{{ url_for('pipeline.pipeline_lineage') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Lineage</button>
</div> </div>
<!-- Tab content (Overview loads on page load) --> <!-- Tab content (Overview loads on page load) -->