From d6376877954a7d67ea66ecd80f61632c633d437a Mon Sep 17 00:00:00 2001 From: Deeman Date: Wed, 25 Feb 2026 13:02:51 +0100 Subject: [PATCH] feat(pipeline): tests, docs, and ruff fixes (subtask 6/6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 29-test suite for all pipeline routes, data helpers, and query execution (test_pipeline.py); all 1333 tests pass - Fix ruff UP041: asyncio.TimeoutError → TimeoutError in analytics.py - Fix ruff UP036/F401: replace sys.version_info tomllib block with plain `import tomllib` (project requires Python 3.11+) - Fix ruff F841: remove unused `cutoff` variable in pipeline_overview - Update CHANGELOG.md with Pipeline Console entry - Update PROJECT.md: add Pipeline Console to Admin Panel done list Co-Authored-By: Claude Sonnet 4.6 --- CHANGELOG.md | 8 + PROJECT.md | 1 + web/src/padelnomics/admin/pipeline_routes.py | 19 +- web/src/padelnomics/analytics.py | 4 +- web/tests/test_pipeline.py | 578 +++++++++++++++++++ 5 files changed, 591 insertions(+), 19 deletions(-) create mode 100644 web/tests/test_pipeline.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 61ff8cf..fd5c456 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- **Pipeline Console admin section** — full operational visibility into the data engineering pipeline at `/admin/pipeline/`: + - **Overview tab** — extraction status grid (one card per workflow with status dot, schedule, last-run timestamp, error preview), serving table row counts from `_serving_meta.json`, landing zone file stats (per-source file count + total size) + - **Extractions tab** — filterable, paginated run history table from `.state.sqlite` (extractor + status dropdowns, HTMX live filter); stale "running" row detection (amber highlight) with "Mark Failed" button; "Run All Extractors" button enqueues `run_extraction` task + - **Catalog tab** — accordion list of serving tables with row count badges; click-to-expand lazy-loads column schema + 10-row sample data per table + - **Query editor tab** — dark-themed SQL textarea (`Commit Mono`, navy background, electric blue focus glow); schema sidebar (collapsible table/column list with types); Tab-key indent and Cmd/Ctrl+Enter submit; results table with sticky headers + row count + elapsed time; query security (read-only DuckDB, blocklist regex, 10k char limit, 1000 row cap, 10s timeout) + - **`analytics.execute_user_query()`** — new function returning `(columns, rows, error, elapsed_ms)` for admin query editor + - **`worker.run_extraction` task** — background handler shells out to `uv run extract` from repo root (2h timeout) + - 29 new tests covering all routes, data access helpers, security checks, and `execute_user_query()` - **Email template system** — all 11 transactional emails migrated from inline f-string HTML in `worker.py` to Jinja2 templates: - **Standalone renderer** (`email_templates.py`) — `render_email_template()` uses a module-level `jinja2.Environment` with `autoescape=True`, works outside Quart request context (worker process); `tformat` filter mirrors the one in `app.py` - **`_base.html`** — branded shell (dark header, 3px blue accent, white card body, footer with tagline + copyright); replaces the old `_email_wrap()` helper diff --git a/PROJECT.md b/PROJECT.md index 0a96835..970093a 100644 --- a/PROJECT.md +++ b/PROJECT.md @@ -114,6 +114,7 @@ - [x] **Admin email gallery** (`/admin/emails/gallery`) — card grid of all templates, EN/DE preview in sandboxed iframe, "View in sent log" cross-link; compose page now has HTMX live preview pane - [x] **pSEO Engine tab** (`/admin/pseo`) — content gap detection, data freshness signals, article health checks (hreflang orphans, missing build files, broken scenario refs), generation job monitoring with live progress bars - [x] **Marketplace admin dashboard** (`/admin/marketplace`) — lead funnel, credit economy, supplier engagement, live activity stream, inline feature flag toggles +- [x] **Pipeline Console** (`/admin/pipeline`) — 4-tab operational dashboard: extraction status grid per source, filterable run history with stale-run management ("Mark Failed"), data catalog with column schema + 10-row sample, SQL query editor with dark-themed textarea + schema sidebar + read-only security sandboxing (keyword blocklist, 10s timeout, 1,000-row cap) - [x] **Lead matching notifications** — `notify_matching_suppliers` task on quote verification + `send_weekly_lead_digest` every Monday; one-click CTA token in forward emails - [x] **Migration 0022** — `status_updated_at`, `supplier_note`, `cta_token` on `lead_forwards`; supplier respond endpoint; inline HTMX lead detail actions; extended quote form fields diff --git a/web/src/padelnomics/admin/pipeline_routes.py b/web/src/padelnomics/admin/pipeline_routes.py index 542e2e6..62260f4 100644 --- a/web/src/padelnomics/admin/pipeline_routes.py +++ b/web/src/padelnomics/admin/pipeline_routes.py @@ -25,8 +25,7 @@ import logging import os import re import sqlite3 -import sys -import time +import tomllib from datetime import UTC, datetime, timedelta from pathlib import Path @@ -307,18 +306,7 @@ def _load_workflows() -> list[dict]: if not _WORKFLOWS_TOML.exists(): return [] - if sys.version_info >= (3, 11): - import tomllib - - data = tomllib.loads(_WORKFLOWS_TOML.read_text()) - else: - # Fallback for older Python (shouldn't happen — project requires 3.11+) - try: - import tomli as tomllib # type: ignore[no-redef] - - data = tomllib.loads(_WORKFLOWS_TOML.read_text()) - except ImportError: - return [] + data = tomllib.loads(_WORKFLOWS_TOML.read_text()) workflows = [] for name, config in data.items(): @@ -422,9 +410,6 @@ async def pipeline_overview(): latest_by_name = {r["extractor"]: r for r in latest_runs} # Enrich each workflow with its latest run data - cutoff = (datetime.now(UTC) - timedelta(hours=_STALE_THRESHOLD_HOURS)).strftime( - "%Y-%m-%dT%H:%M:%SZ" - ) workflow_rows = [] for wf in workflows: run = latest_by_name.get(wf["name"]) diff --git a/web/src/padelnomics/analytics.py b/web/src/padelnomics/analytics.py index d7d4caa..34f5486 100644 --- a/web/src/padelnomics/analytics.py +++ b/web/src/padelnomics/analytics.py @@ -74,7 +74,7 @@ async def fetch_analytics(sql: str, params: list | None = None) -> list[dict[str asyncio.to_thread(_run), timeout=_QUERY_TIMEOUT_SECONDS, ) - except asyncio.TimeoutError: + except TimeoutError: logger.error("DuckDB analytics query timed out after %ds: %.200s", _QUERY_TIMEOUT_SECONDS, sql) return [] except Exception: @@ -123,5 +123,5 @@ async def execute_user_query( asyncio.to_thread(_run), timeout=timeout_seconds, ) - except asyncio.TimeoutError: + except TimeoutError: return [], [], f"Query timed out after {timeout_seconds}s.", 0.0 diff --git a/web/tests/test_pipeline.py b/web/tests/test_pipeline.py new file mode 100644 index 0000000..e4352f3 --- /dev/null +++ b/web/tests/test_pipeline.py @@ -0,0 +1,578 @@ +""" +Tests for the Pipeline Console admin blueprint. + +Covers: + - admin/pipeline_routes.py: all 9 routes + - analytics.py: execute_user_query() function + - Data access functions: state DB, serving meta, landing zone +""" +import json +import sqlite3 +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import padelnomics.admin.pipeline_routes as pipeline_mod +import pytest +from padelnomics.core import utcnow_iso + +# ── Fixtures ────────────────────────────────────────────────────────────────── + + +@pytest.fixture +async def admin_client(app, db): + """Authenticated admin test client.""" + now = utcnow_iso() + async with db.execute( + "INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)", + ("pipeline-admin@test.com", "Pipeline Admin", now), + ) as cursor: + admin_id = cursor.lastrowid + await db.execute( + "INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,) + ) + await db.commit() + + async with app.test_client() as c: + async with c.session_transaction() as sess: + sess["user_id"] = admin_id + yield c + + +@pytest.fixture +def state_db_dir(): + """Temp directory with a seeded .state.sqlite for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / ".state.sqlite" + conn = sqlite3.connect(str(db_path)) + conn.execute( + """ + CREATE TABLE extraction_runs ( + run_id INTEGER PRIMARY KEY AUTOINCREMENT, + extractor TEXT NOT NULL, + started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), + finished_at TEXT, + status TEXT NOT NULL DEFAULT 'running', + files_written INTEGER DEFAULT 0, + files_skipped INTEGER DEFAULT 0, + bytes_written INTEGER DEFAULT 0, + cursor_value TEXT, + error_message TEXT + ) + """ + ) + conn.executemany( + """INSERT INTO extraction_runs + (extractor, started_at, finished_at, status, files_written, bytes_written, error_message) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + [ + ("overpass", "2026-02-01T08:00:00Z", "2026-02-01T08:05:00Z", "success", 1, 400000, None), + ("playtomic_tenants", "2026-02-24T06:00:00Z", "2026-02-24T06:10:00Z", "success", 1, 7700000, None), + ("playtomic_availability", "2026-02-25T06:00:00Z", "2026-02-25T07:30:00Z", "failed", 0, 0, "ReadTimeout: connection timed out"), + # Stale running row (started 1970) + ("eurostat", "1970-01-01T00:00:00Z", None, "running", 0, 0, None), + ], + ) + conn.commit() + conn.close() + yield tmpdir + + +@pytest.fixture +def serving_meta_dir(): + """Temp directory with a _serving_meta.json file.""" + with tempfile.TemporaryDirectory() as tmpdir: + meta = { + "exported_at_utc": "2026-02-25T08:30:00+00:00", + "tables": { + "city_market_profile": {"row_count": 612}, + "planner_defaults": {"row_count": 612}, + "pseo_city_costs_de": {"row_count": 487}, + }, + } + (Path(tmpdir) / "_serving_meta.json").write_text(json.dumps(meta)) + # Fake duckdb file so the path exists + (Path(tmpdir) / "analytics.duckdb").touch() + yield tmpdir + + +# ── Schema + query mocks ────────────────────────────────────────────────────── + +_MOCK_SCHEMA_ROWS = [ + {"table_name": "city_market_profile", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1}, + {"table_name": "city_market_profile", "column_name": "country_code", "data_type": "VARCHAR", "ordinal_position": 2}, + {"table_name": "city_market_profile", "column_name": "marktreife_score", "data_type": "DOUBLE", "ordinal_position": 3}, + {"table_name": "planner_defaults", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1}, +] + +_MOCK_TABLE_EXISTS = [{"1": 1}] +_MOCK_SAMPLE_ROWS = [ + {"city_slug": "berlin", "country_code": "DE", "marktreife_score": 82.5}, + {"city_slug": "munich", "country_code": "DE", "marktreife_score": 77.0}, +] + + +def _make_fetch_analytics_mock(schema=True): + """Return an async mock for fetch_analytics that returns schema or table data.""" + async def _mock(sql, params=None): + if "information_schema.tables" in sql: + return _MOCK_TABLE_EXISTS + if "information_schema.columns" in sql and params: + return [r for r in _MOCK_SCHEMA_ROWS if r["table_name"] == params[0]] + if "information_schema.columns" in sql: + return _MOCK_SCHEMA_ROWS + if "city_market_profile" in sql: + return _MOCK_SAMPLE_ROWS + return [] + return _mock + + +# ════════════════════════════════════════════════════════════════════════════ +# Dashboard +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_dashboard_loads(admin_client, state_db_dir, serving_meta_dir): + """Dashboard returns 200 with stat cards.""" + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "Data Pipeline" in data + assert "Total Runs" in data + assert "Success Rate" in data + assert "Serving Tables" in data + + +@pytest.mark.asyncio +async def test_pipeline_dashboard_requires_admin(client): + """Unauthenticated access redirects to login.""" + resp = await client.get("/admin/pipeline/") + assert resp.status_code in (302, 401) + + +@pytest.mark.asyncio +async def test_pipeline_dashboard_stale_warning(admin_client, state_db_dir, serving_meta_dir): + """Stale run banner appears when a running row is old.""" + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "stale run" in data.lower() + + +# ════════════════════════════════════════════════════════════════════════════ +# Overview tab +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_overview(admin_client, state_db_dir, serving_meta_dir): + """Overview tab returns extraction status grid and serving table counts.""" + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/overview") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "city_market_profile" in data + assert "612" in data # row count from serving meta + + +@pytest.mark.asyncio +async def test_pipeline_overview_no_state_db(admin_client, serving_meta_dir): + """Overview handles gracefully when .state.sqlite doesn't exist.""" + with tempfile.TemporaryDirectory() as empty_dir: + with ( + patch.object(pipeline_mod, "_LANDING_DIR", empty_dir), + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + ): + resp = await admin_client.get("/admin/pipeline/overview") + assert resp.status_code == 200 + + +# ════════════════════════════════════════════════════════════════════════════ +# Extractions tab +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_extractions_list(admin_client, state_db_dir): + """Extractions tab returns run history table.""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.get("/admin/pipeline/extractions") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "overpass" in data + assert "playtomic_tenants" in data + assert "success" in data + + +@pytest.mark.asyncio +async def test_pipeline_extractions_filter_extractor(admin_client, state_db_dir): + """Extractor filter returns only 1 matching run (not 4).""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.get("/admin/pipeline/extractions?extractor=overpass") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "overpass" in data + # Filtered result should show "Showing 1 of 1" + assert "Showing 1 of 1" in data + + +@pytest.mark.asyncio +async def test_pipeline_extractions_filter_status(admin_client, state_db_dir): + """Status filter returns only runs with matching status.""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.get("/admin/pipeline/extractions?status=failed") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "failed" in data + assert "ReadTimeout" in data # error message shown + + +@pytest.mark.asyncio +async def test_pipeline_mark_stale(admin_client, state_db_dir): + """POST to mark-stale updates a running row to failed.""" + # Find the run_id of the stale running row (eurostat, started 1970) + db_path = Path(state_db_dir) / ".state.sqlite" + conn = sqlite3.connect(str(db_path)) + row = conn.execute( + "SELECT run_id FROM extraction_runs WHERE status = 'running' ORDER BY run_id LIMIT 1" + ).fetchone() + conn.close() + assert row is not None + run_id = row[0] + + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.post( + f"/admin/pipeline/extractions/{run_id}/mark-stale", + form={"csrf_token": "test"}, + ) + # Should redirect (flash + redirect pattern) + assert resp.status_code in (302, 200) + + # Verify DB was updated + conn = sqlite3.connect(str(db_path)) + updated = conn.execute( + "SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,) + ).fetchone() + conn.close() + assert updated[0] == "failed" + + +@pytest.mark.asyncio +async def test_pipeline_mark_stale_already_finished(admin_client, state_db_dir): + """Cannot mark an already-finished (success) row as stale.""" + db_path = Path(state_db_dir) / ".state.sqlite" + conn = sqlite3.connect(str(db_path)) + row = conn.execute( + "SELECT run_id FROM extraction_runs WHERE status = 'success' ORDER BY run_id LIMIT 1" + ).fetchone() + conn.close() + run_id = row[0] + + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + resp = await admin_client.post( + f"/admin/pipeline/extractions/{run_id}/mark-stale", + form={"csrf_token": "test"}, + ) + assert resp.status_code in (302, 200) + # Verify status unchanged + conn = sqlite3.connect(str(db_path)) + status = conn.execute( + "SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,) + ).fetchone()[0] + conn.close() + assert status == "success" + + +@pytest.mark.asyncio +async def test_pipeline_trigger_extract(admin_client, state_db_dir): + """POST to trigger enqueues a run_extraction task and redirects.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + # enqueue is imported inside the route handler, so patch at the source module + with ( + patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), + patch("padelnomics.worker.enqueue", new_callable=AsyncMock) as mock_enqueue, + ): + resp = await admin_client.post( + "/admin/pipeline/extract/trigger", + form={"csrf_token": "test"}, + ) + assert resp.status_code in (302, 200) + mock_enqueue.assert_called_once_with("run_extraction") + + +# ════════════════════════════════════════════════════════════════════════════ +# Catalog tab +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_catalog(admin_client, serving_meta_dir): + """Catalog tab lists serving tables with row counts.""" + with ( + patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), + patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()), + ): + resp = await admin_client.get("/admin/pipeline/catalog") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "city_market_profile" in data + assert "612" in data # row count from serving meta + + +@pytest.mark.asyncio +async def test_pipeline_table_detail(admin_client): + """Table detail returns columns and sample rows.""" + with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): + resp = await admin_client.get("/admin/pipeline/catalog/city_market_profile") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "city_slug" in data + assert "berlin" in data # from sample rows + + +@pytest.mark.asyncio +async def test_pipeline_table_detail_invalid_name(admin_client): + """Table name with uppercase characters (invalid) returns 400.""" + with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): + resp = await admin_client.get("/admin/pipeline/catalog/InvalidTableName") + assert resp.status_code in (400, 404) + + +@pytest.mark.asyncio +async def test_pipeline_table_detail_unknown_table(admin_client): + """Non-existent table returns 404.""" + async def _empty_fetch(sql, params=None): + return [] + + with patch("padelnomics.analytics.fetch_analytics", side_effect=_empty_fetch): + resp = await admin_client.get("/admin/pipeline/catalog/nonexistent_table") + assert resp.status_code == 404 + + +# ════════════════════════════════════════════════════════════════════════════ +# Query editor +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_pipeline_query_editor_loads(admin_client): + """Query editor tab returns textarea and schema sidebar.""" + with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): + resp = await admin_client.get("/admin/pipeline/query") + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "query-editor" in data + assert "schema-panel" in data + assert "city_market_profile" in data + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_valid(admin_client): + """Valid SELECT query returns results table.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + mock_result = ( + ["city_slug", "country_code"], + [("berlin", "DE"), ("munich", "DE")], + None, + 12.5, + ) + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result): + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "SELECT city_slug, country_code FROM serving.city_market_profile"}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "berlin" in data + assert "city_slug" in data + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_blocked_keyword(admin_client): + """Queries with blocked keywords return an error (no DB call made).""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "DROP TABLE serving.city_market_profile"}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "blocked" in data.lower() or "error" in data.lower() + mock_q.assert_not_called() + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_empty(admin_client): + """Empty SQL returns validation error.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": ""}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "empty" in data.lower() or "error" in data.lower() + mock_q.assert_not_called() + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_too_long(admin_client): + """SQL over 10,000 chars returns a length error.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "SELECT " + "x" * 10_001}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "long" in data.lower() or "error" in data.lower() + mock_q.assert_not_called() + + +@pytest.mark.asyncio +async def test_pipeline_query_execute_db_error(admin_client): + """DB error from execute_user_query is displayed as error message.""" + async with admin_client.session_transaction() as sess: + sess["csrf_token"] = "test" + + mock_result = ([], [], "Table 'foo' not found", 5.0) + with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result): + resp = await admin_client.post( + "/admin/pipeline/query/execute", + form={"csrf_token": "test", "sql": "SELECT * FROM serving.foo"}, + ) + assert resp.status_code == 200 + data = await resp.get_data(as_text=True) + assert "not found" in data + + +# ════════════════════════════════════════════════════════════════════════════ +# analytics.execute_user_query() +# ════════════════════════════════════════════════════════════════════════════ + + +@pytest.mark.asyncio +async def test_execute_user_query_no_connection(): + """Returns error tuple when _conn is None.""" + import padelnomics.analytics as analytics_mod + + with patch.object(analytics_mod, "_conn", None): + cols, rows, error, elapsed = await analytics_mod.execute_user_query("SELECT 1") + assert cols == [] + assert rows == [] + assert error is not None + assert "not available" in error.lower() + + +@pytest.mark.asyncio +async def test_execute_user_query_timeout(): + """Returns timeout error when query takes too long.""" + import asyncio + + import padelnomics.analytics as analytics_mod + + def _slow(): + import time + time.sleep(10) + return [], [], None, 0.0 + + mock_conn = MagicMock() + + async def _slow_thread(_fn): + await asyncio.sleep(10) + + with ( + patch.object(analytics_mod, "_conn", mock_conn), + patch("padelnomics.analytics.asyncio.to_thread", side_effect=_slow_thread), + ): + cols, rows, error, elapsed = await analytics_mod.execute_user_query( + "SELECT 1", timeout_seconds=1 + ) + assert error is not None + assert "timed out" in error.lower() + + +# ════════════════════════════════════════════════════════════════════════════ +# Unit tests: data access helpers +# ════════════════════════════════════════════════════════════════════════════ + + +def test_fetch_extraction_summary_missing_db(): + """Returns zero-filled dict when state DB doesn't exist.""" + with tempfile.TemporaryDirectory() as empty_dir: + with patch.object(pipeline_mod, "_LANDING_DIR", empty_dir): + result = pipeline_mod._fetch_extraction_summary_sync() + assert result["total"] == 0 + assert result["stale"] == 0 + + +def test_fetch_extraction_summary_counts(state_db_dir): + """Returns correct total/success/failed/running/stale counts.""" + with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): + result = pipeline_mod._fetch_extraction_summary_sync() + assert result["total"] == 4 + assert result["success"] == 2 + assert result["failed"] == 1 + assert result["running"] == 1 + assert result["stale"] == 1 # eurostat started in 1970 + + +def test_load_serving_meta(serving_meta_dir): + """Parses _serving_meta.json correctly.""" + with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")): + meta = pipeline_mod._load_serving_meta() + assert meta is not None + assert "city_market_profile" in meta["tables"] + assert meta["tables"]["city_market_profile"]["row_count"] == 612 + + +def test_load_serving_meta_missing(): + """Returns None when _serving_meta.json doesn't exist.""" + with tempfile.TemporaryDirectory() as empty_dir: + with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(empty_dir) / "analytics.duckdb")): + meta = pipeline_mod._load_serving_meta() + assert meta is None + + +def test_format_bytes(): + assert pipeline_mod._format_bytes(0) == "0 B" + assert pipeline_mod._format_bytes(512) == "512 B" + assert pipeline_mod._format_bytes(1536) == "1.5 KB" + assert pipeline_mod._format_bytes(1_572_864) == "1.5 MB" + + +def test_duration_str(): + assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:00:45Z") == "45s" + assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:02:30Z") == "2m 30s" + assert pipeline_mod._duration_str(None, "2026-02-01T08:00:00Z") == ""