""" Tests for the sequential migration runner. Synchronous tests — migrate.py uses stdlib sqlite3, not aiosqlite. Uses tmp_path for isolated DB files and monkeypatch for DATABASE_PATH. """ import importlib import sqlite3 from pathlib import Path from unittest.mock import patch import pytest from padelnomics.migrations.migrate import _discover_versions, migrate VERSIONS_DIR = ( Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "versions" ) # ── Helpers ─────────────────────────────────────────────────── def _table_names(conn): """Return sorted list of user-visible table names.""" rows = conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" " AND name NOT LIKE 'sqlite_%' ORDER BY name" ).fetchall() return [r[0] for r in rows] def _column_names(conn, table): return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()] # ── Fixtures ────────────────────────────────────────────────── @pytest.fixture def fresh_db_path(tmp_path): """Path to a non-existent DB file.""" return str(tmp_path / "fresh.db") @pytest.fixture def existing_db(tmp_path): """DB with 0000 baseline applied (simulates an existing production DB).""" db_path = str(tmp_path / "existing.db") conn = sqlite3.connect(db_path) conn.execute("PRAGMA foreign_keys=ON") # Create _migrations table and apply only 0000 conn.execute(""" CREATE TABLE IF NOT EXISTS _migrations ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, applied_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) mod_0000 = importlib.import_module( "padelnomics.migrations.versions.0000_initial_schema" ) mod_0000.up(conn) conn.execute( "INSERT INTO _migrations (name) VALUES (?)", ("0000_initial_schema",), ) conn.commit() conn.close() return db_path @pytest.fixture def up_to_date_db(tmp_path): """DB with all migrations applied via migrate().""" db_path = str(tmp_path / "uptodate.db") migrate(db_path) return db_path @pytest.fixture def mock_versions_dir(tmp_path): """Empty temp directory for version discovery tests.""" d = tmp_path / "versions" d.mkdir() return d # ── TestFreshDatabase ───────────────────────────────────────── class TestFreshDatabase: def test_creates_all_tables(self, fresh_db_path): migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) tables = _table_names(conn) conn.close() assert "_migrations" in tables assert "users" in tables assert "subscriptions" in tables assert "scenarios" in tables def test_records_all_versions_as_applied(self, fresh_db_path): migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) applied = { r[0] for r in conn.execute("SELECT name FROM _migrations").fetchall() } conn.close() versions = _discover_versions() assert applied == set(versions) def test_uses_provider_column_names(self, fresh_db_path): migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) cols = _column_names(conn, "subscriptions") conn.close() assert "provider_subscription_id" in cols assert "paddle_customer_id" not in cols assert "lemonsqueezy_customer_id" not in cols def test_creates_rbac_tables(self, fresh_db_path): migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) tables = _table_names(conn) conn.close() assert "user_roles" in tables assert "billing_customers" in tables # ── TestExistingDatabase ────────────────────────────────────── class TestExistingDatabase: def test_applies_pending_migrations(self, existing_db): migrate(existing_db) conn = sqlite3.connect(existing_db) applied = { r[0] for r in conn.execute("SELECT name FROM _migrations").fetchall() } conn.close() versions = _discover_versions() assert applied == set(versions) def test_records_migration_with_timestamp(self, existing_db): migrate(existing_db) conn = sqlite3.connect(existing_db) row = conn.execute( "SELECT name, applied_at FROM _migrations WHERE name LIKE '0001%'" ).fetchone() conn.close() assert row is not None assert row[0] == "0001_rename_ls_to_paddle" assert row[1] is not None # timestamp populated # ── TestUpToDateDatabase ────────────────────────────────────── class TestUpToDateDatabase: def test_noop_when_all_applied(self, up_to_date_db): with patch("padelnomics.migrations.migrate.importlib.import_module") as mock_imp: migrate(up_to_date_db) mock_imp.assert_not_called() def test_no_duplicate_entries_on_rerun(self, up_to_date_db): migrate(up_to_date_db) migrate(up_to_date_db) conn = sqlite3.connect(up_to_date_db) count = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0] conn.close() assert count == len(_discover_versions()) # ── TestIdempotentMigration ─────────────────────────────────── class TestIdempotentMigration: def test_migrate_twice_is_idempotent(self, fresh_db_path): """Running migrate() twice produces the same result.""" migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) tables_first = _table_names(conn) count_first = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0] conn.close() migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) tables_second = _table_names(conn) count_second = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0] conn.close() assert tables_first == tables_second assert count_first == count_second # ── TestDiscoverVersions ───────────────────────────────────── class TestDiscoverVersions: def test_finds_and_sorts_version_files(self): versions = _discover_versions() assert len(versions) >= 2 assert versions[0] == "0000_initial_schema" assert versions[1] == "0001_rename_ls_to_paddle" def test_ignores_non_matching_files(self, mock_versions_dir, monkeypatch): (mock_versions_dir / "__init__.py").write_text("") (mock_versions_dir / "readme.txt").write_text("") (mock_versions_dir / "0001_real.py").write_text("") monkeypatch.setattr( "padelnomics.migrations.migrate.VERSIONS_DIR", mock_versions_dir ) versions = _discover_versions() assert versions == ["0001_real"] def test_returns_empty_for_missing_directory(self, tmp_path, monkeypatch): monkeypatch.setattr( "padelnomics.migrations.migrate.VERSIONS_DIR", tmp_path / "nonexistent", ) assert _discover_versions() == [] # ── TestMigrationOrdering ───────────────────────────────────── class TestMigrationOrdering: def test_multiple_pending_run_in_order(self, tmp_path, monkeypatch): """Mock two version files and verify they run in sorted order.""" db_path = str(tmp_path / "order.db") monkeypatch.setenv("DATABASE_PATH", db_path) # Create fake version files in a temp versions dir vdir = tmp_path / "vdir" vdir.mkdir() (vdir / "0001_first.py").write_text("") (vdir / "0002_second.py").write_text("") monkeypatch.setattr( "padelnomics.migrations.migrate.VERSIONS_DIR", vdir ) call_order = [] def fake_import(name): class FakeMod: @staticmethod def up(conn): call_order.append(name) return FakeMod() with patch( "padelnomics.migrations.migrate.importlib.import_module", side_effect=fake_import, ): migrate() assert call_order == [ "padelnomics.migrations.versions.0001_first", "padelnomics.migrations.versions.0002_second", ] def test_migrations_table_created_automatically(self, fresh_db_path): """A fresh DB gets the _migrations table from migrate().""" migrate(fresh_db_path) conn = sqlite3.connect(fresh_db_path) tables = _table_names(conn) conn.close() assert "_migrations" in tables