""" Tests for the Pipeline Console admin blueprint. Covers: - admin/pipeline_routes.py: all 9 routes - analytics.py: execute_user_query() function - Data access functions: state DB, serving meta, landing zone """ import json import sqlite3 import tempfile from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import padelnomics.admin.pipeline_routes as pipeline_mod import pytest from padelnomics.core import utcnow_iso # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture async def admin_client(app, db): """Authenticated admin test client.""" now = utcnow_iso() async with db.execute( "INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)", ("pipeline-admin@test.com", "Pipeline Admin", now), ) as cursor: admin_id = cursor.lastrowid await db.execute( "INSERT INTO user_roles (user_id, role) VALUES (?, 'admin')", (admin_id,) ) await db.commit() async with app.test_client() as c: async with c.session_transaction() as sess: sess["user_id"] = admin_id yield c @pytest.fixture def state_db_dir(): """Temp directory with a seeded .state.sqlite for testing.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / ".state.sqlite" conn = sqlite3.connect(str(db_path)) conn.execute( """ CREATE TABLE extraction_runs ( run_id INTEGER PRIMARY KEY AUTOINCREMENT, extractor TEXT NOT NULL, started_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), finished_at TEXT, status TEXT NOT NULL DEFAULT 'running', files_written INTEGER DEFAULT 0, files_skipped INTEGER DEFAULT 0, bytes_written INTEGER DEFAULT 0, cursor_value TEXT, error_message TEXT ) """ ) conn.executemany( """INSERT INTO extraction_runs (extractor, started_at, finished_at, status, files_written, bytes_written, error_message) VALUES (?, ?, ?, ?, ?, ?, ?)""", [ ("overpass", "2026-02-01T08:00:00Z", "2026-02-01T08:05:00Z", "success", 1, 400000, None), ("playtomic_tenants", "2026-02-24T06:00:00Z", "2026-02-24T06:10:00Z", "success", 1, 7700000, None), ("playtomic_availability", "2026-02-25T06:00:00Z", "2026-02-25T07:30:00Z", "failed", 0, 0, "ReadTimeout: connection timed out"), # Stale running row (started 1970) ("eurostat", "1970-01-01T00:00:00Z", None, "running", 0, 0, None), ], ) conn.commit() conn.close() yield tmpdir @pytest.fixture def serving_meta_dir(): """Temp directory with a _serving_meta.json file.""" with tempfile.TemporaryDirectory() as tmpdir: meta = { "exported_at_utc": "2026-02-25T08:30:00+00:00", "tables": { "city_market_profile": {"row_count": 612}, "planner_defaults": {"row_count": 612}, "pseo_city_costs_de": {"row_count": 487}, }, } (Path(tmpdir) / "_serving_meta.json").write_text(json.dumps(meta)) # Fake duckdb file so the path exists (Path(tmpdir) / "analytics.duckdb").touch() yield tmpdir # ── Schema + query mocks ────────────────────────────────────────────────────── _MOCK_SCHEMA_ROWS = [ {"table_name": "city_market_profile", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1}, {"table_name": "city_market_profile", "column_name": "country_code", "data_type": "VARCHAR", "ordinal_position": 2}, {"table_name": "city_market_profile", "column_name": "marktreife_score", "data_type": "DOUBLE", "ordinal_position": 3}, {"table_name": "planner_defaults", "column_name": "city_slug", "data_type": "VARCHAR", "ordinal_position": 1}, ] _MOCK_TABLE_EXISTS = [{"1": 1}] _MOCK_SAMPLE_ROWS = [ {"city_slug": "berlin", "country_code": "DE", "marktreife_score": 82.5}, {"city_slug": "munich", "country_code": "DE", "marktreife_score": 77.0}, ] def _make_fetch_analytics_mock(schema=True): """Return an async mock for fetch_analytics that returns schema or table data.""" async def _mock(sql, params=None): if "information_schema.tables" in sql: return _MOCK_TABLE_EXISTS if "information_schema.columns" in sql and params: return [r for r in _MOCK_SCHEMA_ROWS if r["table_name"] == params[0]] if "information_schema.columns" in sql: return _MOCK_SCHEMA_ROWS if "city_market_profile" in sql: return _MOCK_SAMPLE_ROWS return [] return _mock # ════════════════════════════════════════════════════════════════════════════ # Dashboard # ════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_pipeline_dashboard_loads(admin_client, state_db_dir, serving_meta_dir): """Dashboard returns 200 with stat cards.""" with ( patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), ): resp = await admin_client.get("/admin/pipeline/") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "Data Pipeline" in data assert "Total Runs" in data assert "Success Rate" in data assert "Serving Tables" in data @pytest.mark.asyncio async def test_pipeline_dashboard_requires_admin(client): """Unauthenticated access redirects to login.""" resp = await client.get("/admin/pipeline/") assert resp.status_code in (302, 401) @pytest.mark.asyncio async def test_pipeline_dashboard_stale_warning(admin_client, state_db_dir, serving_meta_dir): """Stale run banner appears when a running row is old.""" with ( patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), ): resp = await admin_client.get("/admin/pipeline/") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "stale run" in data.lower() # ════════════════════════════════════════════════════════════════════════════ # Overview tab # ════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_pipeline_overview(admin_client, state_db_dir, serving_meta_dir): """Overview tab returns extraction status grid and serving table counts.""" with ( patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), ): resp = await admin_client.get("/admin/pipeline/overview") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "city_market_profile" in data assert "612" in data # row count from serving meta @pytest.mark.asyncio async def test_pipeline_overview_no_state_db(admin_client, serving_meta_dir): """Overview handles gracefully when .state.sqlite doesn't exist.""" with tempfile.TemporaryDirectory() as empty_dir: with ( patch.object(pipeline_mod, "_LANDING_DIR", empty_dir), patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), ): resp = await admin_client.get("/admin/pipeline/overview") assert resp.status_code == 200 # ════════════════════════════════════════════════════════════════════════════ # Extractions tab # ════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_pipeline_extractions_list(admin_client, state_db_dir): """Extractions tab returns run history table.""" with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): resp = await admin_client.get("/admin/pipeline/extractions") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "overpass" in data assert "playtomic_tenants" in data assert "success" in data @pytest.mark.asyncio async def test_pipeline_extractions_filter_extractor(admin_client, state_db_dir): """Extractor filter returns only 1 matching run (not 4).""" with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): resp = await admin_client.get("/admin/pipeline/extractions?extractor=overpass") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "overpass" in data # Filtered result should show "Showing 1 of 1" assert "Showing 1 of 1" in data @pytest.mark.asyncio async def test_pipeline_extractions_filter_status(admin_client, state_db_dir): """Status filter returns only runs with matching status.""" with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): resp = await admin_client.get("/admin/pipeline/extractions?status=failed") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "failed" in data assert "ReadTimeout" in data # error message shown @pytest.mark.asyncio async def test_pipeline_mark_stale(admin_client, state_db_dir): """POST to mark-stale updates a running row to failed.""" # Find the run_id of the stale running row (eurostat, started 1970) db_path = Path(state_db_dir) / ".state.sqlite" conn = sqlite3.connect(str(db_path)) row = conn.execute( "SELECT run_id FROM extraction_runs WHERE status = 'running' ORDER BY run_id LIMIT 1" ).fetchone() conn.close() assert row is not None run_id = row[0] async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): resp = await admin_client.post( f"/admin/pipeline/extractions/{run_id}/mark-stale", form={"csrf_token": "test"}, ) # Should redirect (flash + redirect pattern) assert resp.status_code in (302, 200) # Verify DB was updated conn = sqlite3.connect(str(db_path)) updated = conn.execute( "SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,) ).fetchone() conn.close() assert updated[0] == "failed" @pytest.mark.asyncio async def test_pipeline_mark_stale_already_finished(admin_client, state_db_dir): """Cannot mark an already-finished (success) row as stale.""" db_path = Path(state_db_dir) / ".state.sqlite" conn = sqlite3.connect(str(db_path)) row = conn.execute( "SELECT run_id FROM extraction_runs WHERE status = 'success' ORDER BY run_id LIMIT 1" ).fetchone() conn.close() run_id = row[0] async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): resp = await admin_client.post( f"/admin/pipeline/extractions/{run_id}/mark-stale", form={"csrf_token": "test"}, ) assert resp.status_code in (302, 200) # Verify status unchanged conn = sqlite3.connect(str(db_path)) status = conn.execute( "SELECT status FROM extraction_runs WHERE run_id = ?", (run_id,) ).fetchone()[0] conn.close() assert status == "success" @pytest.mark.asyncio async def test_pipeline_trigger_extract(admin_client, state_db_dir): """POST to trigger enqueues a run_extraction task and redirects.""" async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" # enqueue is imported inside the route handler, so patch at the source module with ( patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir), patch("padelnomics.worker.enqueue", new_callable=AsyncMock) as mock_enqueue, ): resp = await admin_client.post( "/admin/pipeline/extract/trigger", form={"csrf_token": "test"}, ) assert resp.status_code in (302, 200) mock_enqueue.assert_called_once_with("run_extraction") # ════════════════════════════════════════════════════════════════════════════ # Catalog tab # ════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_pipeline_catalog(admin_client, serving_meta_dir): """Catalog tab lists serving tables with row counts.""" with ( patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")), patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()), ): resp = await admin_client.get("/admin/pipeline/catalog") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "city_market_profile" in data assert "612" in data # row count from serving meta @pytest.mark.asyncio async def test_pipeline_table_detail(admin_client): """Table detail returns columns and sample rows.""" with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): resp = await admin_client.get("/admin/pipeline/catalog/city_market_profile") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "city_slug" in data assert "berlin" in data # from sample rows @pytest.mark.asyncio async def test_pipeline_table_detail_invalid_name(admin_client): """Table name with uppercase characters (invalid) returns 400.""" with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): resp = await admin_client.get("/admin/pipeline/catalog/InvalidTableName") assert resp.status_code in (400, 404) @pytest.mark.asyncio async def test_pipeline_table_detail_unknown_table(admin_client): """Non-existent table returns 404.""" async def _empty_fetch(sql, params=None): return [] with patch("padelnomics.analytics.fetch_analytics", side_effect=_empty_fetch): resp = await admin_client.get("/admin/pipeline/catalog/nonexistent_table") assert resp.status_code == 404 # ════════════════════════════════════════════════════════════════════════════ # Query editor # ════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_pipeline_query_editor_loads(admin_client): """Query editor tab returns textarea and schema sidebar.""" with patch("padelnomics.analytics.fetch_analytics", side_effect=_make_fetch_analytics_mock()): resp = await admin_client.get("/admin/pipeline/query") assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "query-editor" in data assert "schema-panel" in data assert "city_market_profile" in data @pytest.mark.asyncio async def test_pipeline_query_execute_valid(admin_client): """Valid SELECT query returns results table.""" async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" mock_result = ( ["city_slug", "country_code"], [("berlin", "DE"), ("munich", "DE")], None, 12.5, ) with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result): resp = await admin_client.post( "/admin/pipeline/query/execute", form={"csrf_token": "test", "sql": "SELECT city_slug, country_code FROM serving.city_market_profile"}, ) assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "berlin" in data assert "city_slug" in data @pytest.mark.asyncio async def test_pipeline_query_execute_blocked_keyword(admin_client): """Queries with blocked keywords return an error (no DB call made).""" async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: resp = await admin_client.post( "/admin/pipeline/query/execute", form={"csrf_token": "test", "sql": "DROP TABLE serving.city_market_profile"}, ) assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "blocked" in data.lower() or "error" in data.lower() mock_q.assert_not_called() @pytest.mark.asyncio async def test_pipeline_query_execute_empty(admin_client): """Empty SQL returns validation error.""" async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: resp = await admin_client.post( "/admin/pipeline/query/execute", form={"csrf_token": "test", "sql": ""}, ) assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "empty" in data.lower() or "error" in data.lower() mock_q.assert_not_called() @pytest.mark.asyncio async def test_pipeline_query_execute_too_long(admin_client): """SQL over 10,000 chars returns a length error.""" async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock) as mock_q: resp = await admin_client.post( "/admin/pipeline/query/execute", form={"csrf_token": "test", "sql": "SELECT " + "x" * 10_001}, ) assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "long" in data.lower() or "error" in data.lower() mock_q.assert_not_called() @pytest.mark.asyncio async def test_pipeline_query_execute_db_error(admin_client): """DB error from execute_user_query is displayed as error message.""" async with admin_client.session_transaction() as sess: sess["csrf_token"] = "test" mock_result = ([], [], "Table 'foo' not found", 5.0) with patch("padelnomics.analytics.execute_user_query", new_callable=AsyncMock, return_value=mock_result): resp = await admin_client.post( "/admin/pipeline/query/execute", form={"csrf_token": "test", "sql": "SELECT * FROM serving.foo"}, ) assert resp.status_code == 200 data = await resp.get_data(as_text=True) assert "not found" in data # ════════════════════════════════════════════════════════════════════════════ # analytics.execute_user_query() # ════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_execute_user_query_no_connection(): """Returns error tuple when _conn is None.""" import padelnomics.analytics as analytics_mod with patch.object(analytics_mod, "_conn", None): cols, rows, error, elapsed = await analytics_mod.execute_user_query("SELECT 1") assert cols == [] assert rows == [] assert error is not None assert "not available" in error.lower() @pytest.mark.asyncio async def test_execute_user_query_timeout(): """Returns timeout error when query takes too long.""" import asyncio import padelnomics.analytics as analytics_mod def _slow(): import time time.sleep(10) return [], [], None, 0.0 mock_conn = MagicMock() async def _slow_thread(_fn): await asyncio.sleep(10) with ( patch.object(analytics_mod, "_conn", mock_conn), patch("padelnomics.analytics.asyncio.to_thread", side_effect=_slow_thread), ): cols, rows, error, elapsed = await analytics_mod.execute_user_query( "SELECT 1", timeout_seconds=1 ) assert error is not None assert "timed out" in error.lower() # ════════════════════════════════════════════════════════════════════════════ # Unit tests: data access helpers # ════════════════════════════════════════════════════════════════════════════ def test_fetch_extraction_summary_missing_db(): """Returns zero-filled dict when state DB doesn't exist.""" with tempfile.TemporaryDirectory() as empty_dir: with patch.object(pipeline_mod, "_LANDING_DIR", empty_dir): result = pipeline_mod._fetch_extraction_summary_sync() assert result["total"] == 0 assert result["stale"] == 0 def test_fetch_extraction_summary_counts(state_db_dir): """Returns correct total/success/failed/running/stale counts.""" with patch.object(pipeline_mod, "_LANDING_DIR", state_db_dir): result = pipeline_mod._fetch_extraction_summary_sync() assert result["total"] == 4 assert result["success"] == 2 assert result["failed"] == 1 assert result["running"] == 1 assert result["stale"] == 1 # eurostat started in 1970 def test_load_serving_meta(serving_meta_dir): """Parses _serving_meta.json correctly.""" with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(serving_meta_dir) / "analytics.duckdb")): meta = pipeline_mod._load_serving_meta() assert meta is not None assert "city_market_profile" in meta["tables"] assert meta["tables"]["city_market_profile"]["row_count"] == 612 def test_load_serving_meta_missing(): """Returns None when _serving_meta.json doesn't exist.""" with tempfile.TemporaryDirectory() as empty_dir: with patch.object(pipeline_mod, "_SERVING_DUCKDB_PATH", str(Path(empty_dir) / "analytics.duckdb")): meta = pipeline_mod._load_serving_meta() assert meta is None def test_format_bytes(): assert pipeline_mod._format_bytes(0) == "0 B" assert pipeline_mod._format_bytes(512) == "512 B" assert pipeline_mod._format_bytes(1536) == "1.5 KB" assert pipeline_mod._format_bytes(1_572_864) == "1.5 MB" def test_duration_str(): assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:00:45Z") == "45s" assert pipeline_mod._duration_str("2026-02-01T08:00:00Z", "2026-02-01T08:02:30Z") == "2m 30s" assert pipeline_mod._duration_str(None, "2026-02-01T08:00:00Z") == ""