feat(pipeline): tests, docs, and ruff fixes (subtask 6/6)

- Add 29-test suite for all pipeline routes, data helpers, and query
  execution (test_pipeline.py); all 1333 tests pass
- Fix ruff UP041: asyncio.TimeoutError → TimeoutError in analytics.py
- Fix ruff UP036/F401: replace sys.version_info tomllib block with
  plain `import tomllib` (project requires Python 3.11+)
- Fix ruff F841: remove unused `cutoff` variable in pipeline_overview
- Update CHANGELOG.md with Pipeline Console entry
- Update PROJECT.md: add Pipeline Console to Admin Panel done list

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-25 13:02:51 +01:00
parent 8f8f7f7acb
commit d637687795
5 changed files with 591 additions and 19 deletions

View File

@@ -25,8 +25,7 @@ import logging
import os
import re
import sqlite3
import sys
import time
import tomllib
from datetime import UTC, datetime, timedelta
from pathlib import Path
@@ -307,18 +306,7 @@ def _load_workflows() -> list[dict]:
if not _WORKFLOWS_TOML.exists():
return []
if sys.version_info >= (3, 11):
import tomllib
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
else:
# Fallback for older Python (shouldn't happen — project requires 3.11+)
try:
import tomli as tomllib # type: ignore[no-redef]
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
except ImportError:
return []
data = tomllib.loads(_WORKFLOWS_TOML.read_text())
workflows = []
for name, config in data.items():
@@ -422,9 +410,6 @@ async def pipeline_overview():
latest_by_name = {r["extractor"]: r for r in latest_runs}
# Enrich each workflow with its latest run data
cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
workflow_rows = []
for wf in workflows:
run = latest_by_name.get(wf["name"])

View File

@@ -74,7 +74,7 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str
asyncio.to_thread(_run),
timeout=_QUERY_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
except TimeoutError:
logger.error("DuckDB analytics query timed out after %ds: %.200s", _QUERY_TIMEOUT_SECONDS, sql)
return []
except Exception:
@@ -123,5 +123,5 @@ async def execute_user_query(
asyncio.to_thread(_run),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
except TimeoutError:
return [], [], f"Query timed out after {timeout_seconds}s.", 0.0

578
web/tests/test_pipeline.py Normal file
View File

@@ -0,0 +1,578 @@
"""
Tests for the Pipeline Console admin blueprint.
Covers:
- admin/pipeline_routes.py: all 9 routes
- analytics.py: execute_user_query() function
- Data access functions: state DB, serving meta, landing zone
"""
import json
import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import padelnomics.admin.pipeline_routes as pipeline_mod
import pytest
from padelnomics.core import utcnow_iso
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
async def admin_client(app, db):
"""Authenticated admin test client."""
now = utcnow_iso()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("pipeline-admin@test.com", "Pipeline Admin", now),
) as cursor:
admin_id = cursor.lastrowid
await db.execute(
"INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,)
)
await db.commit()
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = admin_id
yield c
@pytest.fixture
def state_db_dir():
"""Temp directory with a seeded .state.sqlite for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / ".state.sqlite"
conn = sqlite3.connect(str(db_path))
conn.execute(
"""
CREATE TABLE extraction_runs (
run_id INTEGER PRIMARY KEY AUTOINCREMENT,
extractor TEXT NOT NULL,
started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running',
files_written INTEGER DEFAULT 0,
files_skipped INTEGER DEFAULT 0,
bytes_written INTEGER DEFAULT 0,
cursor_value TEXT,
error_message TEXT
)
"""
)
conn.executemany(
"""INSERT INTO extraction_runs
(extractor, started_at, finished_at, status, files_written, bytes_written, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
[
("overpass", "2026-02-01T08:00:00Z", "2026-02-01T08:05:00Z", "success", 1, 400000, None),
("playtomic_tenants", "2026-02-24T06:00:00Z", "2026-02-24T06:10:00Z", "success", 1, 7700000, None),
("playtomic_availability", "2026-02-25T06:00:00Z", "2026-02-25T07:30:00Z", "failed", 0, 0, "ReadTimeout: connection timed out"),
# Stale running row (started 1970)
("eurostat", "1970-01-01T00:00:00Z", None, "running", 0, 0, None),
],
)
conn.commit()
conn.close()
yield tmpdir
@pytest.fixture
def serving_meta_dir():
"""Temp directory with a _serving_meta.json file."""
with tempfile.TemporaryDirectory() as tmpdir:
meta = {
"exported_at_utc": "2026-02-25T08:30:00+00:00",
"tables": {
"city_market_profile": {"row_count": 612},
"planner_defaults": {"row_count": 612},
"pseo_city_costs_de": {"row_count": 487},
},
}
(Path(tmpdir) / "_serving_meta.json").write_text(json.dumps(meta))
# Fake duckdb file so the path exists
(Path(tmpdir) / "analytics.duckdb").touch()
yield tmpdir
# ── Schema + query mocks ──────────────────────────────────────────────────────
_MOCK_SCHEMA_ROWS = [
{"table_name": "city_market_profile", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1},
{"table_name": "city_market_profile", "column_name": "country_code", "data_type": "VARCHAR", "ordinal_position": 2},
{"table_name": "city_market_profile", "column_name": "marktreife_score", "data_type": "DOUBLE", "ordinal_position": 3},
{"table_name": "planner_defaults", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1},
]
_MOCK_TABLE_EXISTS = [{"1": 1}]
_MOCK_SAMPLE_ROWS = [
{"city_slug": "berlin", "country_code": "DE", "marktreife_score": 82.5},
{"city_slug": "munich", "country_code": "DE", "marktreife_score": 77.0},
]
def _make_fetch_analytics_mock(schema=True):
"""Return an async mock for fetch_analytics that returns schema or table data."""
async def _mock(sql, params=None):
if "information_schema.tables" in sql:
return _MOCK_TABLE_EXISTS
if "information_schema.columns" in sql and params:
return [r for r in _MOCK_SCHEMA_ROWS if r["table_name"] == params[0]]
if "information_schema.columns" in sql:
return _MOCK_SCHEMA_ROWS
if "city_market_profile" in sql:
return _MOCK_SAMPLE_ROWS
return []
return _mock
# ════════════════════════════════════════════════════════════════════════════
# Dashboard
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_dashboard_loads(admin_client, state_db_dir, serving_meta_dir):
"""Dashboard returns 200 with stat cards."""
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "Data Pipeline" in data
assert "Total Runs" in data
assert "Success Rate" in data
assert "Serving Tables" in data
@pytest.mark.asyncio
async def test_pipeline_dashboard_requires_admin(client):
"""Unauthenticated access redirects to login."""
resp = await client.get("/admin/pipeline/")
assert resp.status_code in (302, 401)
@pytest.mark.asyncio
async def test_pipeline_dashboard_stale_warning(admin_client, state_db_dir, serving_meta_dir):
"""Stale run banner appears when a running row is old."""
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "stale run" in data.lower()
# ════════════════════════════════════════════════════════════════════════════
# Overview tab
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_overview(admin_client, state_db_dir, serving_meta_dir):
"""Overview tab returns extraction status grid and serving table counts."""
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/overview")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "city_market_profile" in data
assert "612" in data # row count from serving meta
@pytest.mark.asyncio
async def test_pipeline_overview_no_state_db(admin_client, serving_meta_dir):
"""Overview handles gracefully when .state.sqlite doesn't exist."""
with tempfile.TemporaryDirectory() as empty_dir:
with (
patch.object(pipeline_mod, "_LANDING_DIR", empty_dir),
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
):
resp = await admin_client.get("/admin/pipeline/overview")
assert resp.status_code == 200
# ════════════════════════════════════════════════════════════════════════════
# Extractions tab
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_extractions_list(admin_client, state_db_dir):
"""Extractions tab returns run history table."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.get("/admin/pipeline/extractions")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "overpass" in data
assert "playtomic_tenants" in data
assert "success" in data
@pytest.mark.asyncio
async def test_pipeline_extractions_filter_extractor(admin_client, state_db_dir):
"""Extractor filter returns only 1 matching run (not 4)."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.get("/admin/pipeline/extractions?extractor=overpass")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "overpass" in data
# Filtered result should show "Showing 1 of 1"
assert "Showing 1 of 1" in data
@pytest.mark.asyncio
async def test_pipeline_extractions_filter_status(admin_client, state_db_dir):
"""Status filter returns only runs with matching status."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.get("/admin/pipeline/extractions?status=failed")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "failed" in data
assert "ReadTimeout" in data # error message shown
@pytest.mark.asyncio
async def test_pipeline_mark_stale(admin_client, state_db_dir):
"""POST to mark-stale updates a running row to failed."""
# Find the run_id of the stale running row (eurostat, started 1970)
db_path = Path(state_db_dir) / ".state.sqlite"
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT run_id FROM extraction_runs WHERE status = 'running' ORDER BY run_id LIMIT 1"
).fetchone()
conn.close()
assert row is not None
run_id = row[0]
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.post(
f"/admin/pipeline/extractions/{run_id}/mark-stale",
form={"csrf_token": "test"},
)
# Should redirect (flash + redirect pattern)
assert resp.status_code in (302, 200)
# Verify DB was updated
conn = sqlite3.connect(str(db_path))
updated = conn.execute(
"SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,)
).fetchone()
conn.close()
assert updated[0] == "failed"
@pytest.mark.asyncio
async def test_pipeline_mark_stale_already_finished(admin_client, state_db_dir):
"""Cannot mark an already-finished (success) row as stale."""
db_path = Path(state_db_dir) / ".state.sqlite"
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT run_id FROM extraction_runs WHERE status = 'success' ORDER BY run_id LIMIT 1"
).fetchone()
conn.close()
run_id = row[0]
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
resp = await admin_client.post(
f"/admin/pipeline/extractions/{run_id}/mark-stale",
form={"csrf_token": "test"},
)
assert resp.status_code in (302, 200)
# Verify status unchanged
conn = sqlite3.connect(str(db_path))
status = conn.execute(
"SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,)
).fetchone()[0]
conn.close()
assert status == "success"
@pytest.mark.asyncio
async def test_pipeline_trigger_extract(admin_client, state_db_dir):
"""POST to trigger enqueues a run_extraction task and redirects."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
# enqueue is imported inside the route handler, so patch at the source module
with (
patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir),
patch("padelnomics.worker.enqueue", new_callable=AsyncMock) as mock_enqueue,
):
resp = await admin_client.post(
"/admin/pipeline/extract/trigger",
form={"csrf_token": "test"},
)
assert resp.status_code in (302, 200)
mock_enqueue.assert_called_once_with("run_extraction")
# ════════════════════════════════════════════════════════════════════════════
# Catalog tab
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_catalog(admin_client, serving_meta_dir):
"""Catalog tab lists serving tables with row counts."""
with (
patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")),
patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()),
):
resp = await admin_client.get("/admin/pipeline/catalog")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "city_market_profile" in data
assert "612" in data # row count from serving meta
@pytest.mark.asyncio
async def test_pipeline_table_detail(admin_client):
"""Table detail returns columns and sample rows."""
with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()):
resp = await admin_client.get("/admin/pipeline/catalog/city_market_profile")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "city_slug" in data
assert "berlin" in data # from sample rows
@pytest.mark.asyncio
async def test_pipeline_table_detail_invalid_name(admin_client):
"""Table name with uppercase characters (invalid) returns 400."""
with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()):
resp = await admin_client.get("/admin/pipeline/catalog/InvalidTableName")
assert resp.status_code in (400, 404)
@pytest.mark.asyncio
async def test_pipeline_table_detail_unknown_table(admin_client):
"""Non-existent table returns 404."""
async def _empty_fetch(sql, params=None):
return []
with patch("padelnomics.analytics.fetch_analytics", side_effect=_empty_fetch):
resp = await admin_client.get("/admin/pipeline/catalog/nonexistent_table")
assert resp.status_code == 404
# ════════════════════════════════════════════════════════════════════════════
# Query editor
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_pipeline_query_editor_loads(admin_client):
"""Query editor tab returns textarea and schema sidebar."""
with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()):
resp = await admin_client.get("/admin/pipeline/query")
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "query-editor" in data
assert "schema-panel" in data
assert "city_market_profile" in data
@pytest.mark.asyncio
async def test_pipeline_query_execute_valid(admin_client):
"""Valid SELECT query returns results table."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
mock_result = (
["city_slug", "country_code"],
[("berlin", "DE"), ("munich", "DE")],
None,
12.5,
)
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result):
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "SELECT city_slug, country_code FROM serving.city_market_profile"},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "berlin" in data
assert "city_slug" in data
@pytest.mark.asyncio
async def test_pipeline_query_execute_blocked_keyword(admin_client):
"""Queries with blocked keywords return an error (no DB call made)."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q:
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "DROP TABLE serving.city_market_profile"},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "blocked" in data.lower() or "error" in data.lower()
mock_q.assert_not_called()
@pytest.mark.asyncio
async def test_pipeline_query_execute_empty(admin_client):
"""Empty SQL returns validation error."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q:
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": ""},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "empty" in data.lower() or "error" in data.lower()
mock_q.assert_not_called()
@pytest.mark.asyncio
async def test_pipeline_query_execute_too_long(admin_client):
"""SQL over 10,000 chars returns a length error."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q:
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "SELECT " + "x" * 10_001},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "long" in data.lower() or "error" in data.lower()
mock_q.assert_not_called()
@pytest.mark.asyncio
async def test_pipeline_query_execute_db_error(admin_client):
"""DB error from execute_user_query is displayed as error message."""
async with admin_client.session_transaction() as sess:
sess["csrf_token"] = "test"
mock_result = ([], [], "Table 'foo' not found", 5.0)
with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result):
resp = await admin_client.post(
"/admin/pipeline/query/execute",
form={"csrf_token": "test", "sql": "SELECT * FROM serving.foo"},
)
assert resp.status_code == 200
data = await resp.get_data(as_text=True)
assert "not found" in data
# ════════════════════════════════════════════════════════════════════════════
# analytics.execute_user_query()
# ════════════════════════════════════════════════════════════════════════════
@pytest.mark.asyncio
async def test_execute_user_query_no_connection():
"""Returns error tuple when _conn is None."""
import padelnomics.analytics as analytics_mod
with patch.object(analytics_mod, "_conn", None):
cols, rows, error, elapsed = await analytics_mod.execute_user_query("SELECT 1")
assert cols == []
assert rows == []
assert error is not None
assert "not available" in error.lower()
@pytest.mark.asyncio
async def test_execute_user_query_timeout():
"""Returns timeout error when query takes too long."""
import asyncio
import padelnomics.analytics as analytics_mod
def _slow():
import time
time.sleep(10)
return [], [], None, 0.0
mock_conn = MagicMock()
async def _slow_thread(_fn):
await asyncio.sleep(10)
with (
patch.object(analytics_mod, "_conn", mock_conn),
patch("padelnomics.analytics.asyncio.to_thread", side_effect=_slow_thread),
):
cols, rows, error, elapsed = await analytics_mod.execute_user_query(
"SELECT 1", timeout_seconds=1
)
assert error is not None
assert "timed out" in error.lower()
# ════════════════════════════════════════════════════════════════════════════
# Unit tests: data access helpers
# ════════════════════════════════════════════════════════════════════════════
def test_fetch_extraction_summary_missing_db():
"""Returns zero-filled dict when state DB doesn't exist."""
with tempfile.TemporaryDirectory() as empty_dir:
with patch.object(pipeline_mod, "_LANDING_DIR", empty_dir):
result = pipeline_mod._fetch_extraction_summary_sync()
assert result["total"] == 0
assert result["stale"] == 0
def test_fetch_extraction_summary_counts(state_db_dir):
"""Returns correct total/success/failed/running/stale counts."""
with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir):
result = pipeline_mod._fetch_extraction_summary_sync()
assert result["total"] == 4
assert result["success"] == 2
assert result["failed"] == 1
assert result["running"] == 1
assert result["stale"] == 1 # eurostat started in 1970
def test_load_serving_meta(serving_meta_dir):
"""Parses _serving_meta.json correctly."""
with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")):
meta = pipeline_mod._load_serving_meta()
assert meta is not None
assert "city_market_profile" in meta["tables"]
assert meta["tables"]["city_market_profile"]["row_count"] == 612
def test_load_serving_meta_missing():
"""Returns None when _serving_meta.json doesn't exist."""
with tempfile.TemporaryDirectory() as empty_dir:
with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(empty_dir) / "analytics.duckdb")):
meta = pipeline_mod._load_serving_meta()
assert meta is None
def test_format_bytes():
assert pipeline_mod._format_bytes(0) == "0 B"
assert pipeline_mod._format_bytes(512) == "512 B"
assert pipeline_mod._format_bytes(1536) == "1.5 KB"
assert pipeline_mod._format_bytes(1_572_864) == "1.5 MB"
def test_duration_str():
assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:00:45Z") == "45s"
assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:02:30Z") == "2m 30s"
assert pipeline_mod._duration_str(None, "2026-02-01T08:00:00Z") == ""