git mv all tracked files from the nested padelnomics/ workspace directory to the git repo root. Merged .gitignore files. No code changes — pure path rename. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
270 lines
9.0 KiB
Python
270 lines
9.0 KiB
Python
"""
|
|
Tests for the sequential migration runner.
|
|
|
|
Synchronous tests — migrate.py uses stdlib sqlite3, not aiosqlite.
|
|
Uses tmp_path for isolated DB files and monkeypatch for DATABASE_PATH.
|
|
"""
|
|
|
|
import importlib
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from padelnomics.migrations.migrate import _discover_versions, migrate
|
|
|
|
VERSIONS_DIR = (
|
|
Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "versions"
|
|
)
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────
|
|
|
|
|
|
def _table_names(conn):
|
|
"""Return sorted list of user-visible table names."""
|
|
rows = conn.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
" AND name NOT LIKE 'sqlite_%' ORDER BY name"
|
|
).fetchall()
|
|
return [r[0] for r in rows]
|
|
|
|
|
|
def _column_names(conn, table):
|
|
return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
|
|
|
|
|
|
# ── Fixtures ──────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
def fresh_db_path(tmp_path):
|
|
"""Path to a non-existent DB file."""
|
|
return str(tmp_path / "fresh.db")
|
|
|
|
|
|
@pytest.fixture
|
|
def existing_db(tmp_path):
|
|
"""DB with 0000 baseline applied (simulates an existing production DB)."""
|
|
db_path = str(tmp_path / "existing.db")
|
|
conn = sqlite3.connect(db_path)
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
|
|
# Create _migrations table and apply only 0000
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS _migrations (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
)
|
|
""")
|
|
mod_0000 = importlib.import_module(
|
|
"padelnomics.migrations.versions.0000_initial_schema"
|
|
)
|
|
mod_0000.up(conn)
|
|
conn.execute(
|
|
"INSERT INTO _migrations (name) VALUES (?)",
|
|
("0000_initial_schema",),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return db_path
|
|
|
|
|
|
@pytest.fixture
|
|
def up_to_date_db(tmp_path):
|
|
"""DB with all migrations applied via migrate()."""
|
|
db_path = str(tmp_path / "uptodate.db")
|
|
migrate(db_path)
|
|
return db_path
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_versions_dir(tmp_path):
|
|
"""Empty temp directory for version discovery tests."""
|
|
d = tmp_path / "versions"
|
|
d.mkdir()
|
|
return d
|
|
|
|
|
|
# ── TestFreshDatabase ─────────────────────────────────────────
|
|
|
|
|
|
class TestFreshDatabase:
|
|
def test_creates_all_tables(self, fresh_db_path):
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
tables = _table_names(conn)
|
|
conn.close()
|
|
assert "_migrations" in tables
|
|
assert "users" in tables
|
|
assert "subscriptions" in tables
|
|
assert "scenarios" in tables
|
|
|
|
def test_records_all_versions_as_applied(self, fresh_db_path):
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
applied = {
|
|
r[0] for r in conn.execute("SELECT name FROM _migrations").fetchall()
|
|
}
|
|
conn.close()
|
|
versions = _discover_versions()
|
|
assert applied == set(versions)
|
|
|
|
def test_uses_provider_column_names(self, fresh_db_path):
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
cols = _column_names(conn, "subscriptions")
|
|
conn.close()
|
|
assert "provider_subscription_id" in cols
|
|
assert "paddle_customer_id" not in cols
|
|
assert "lemonsqueezy_customer_id" not in cols
|
|
|
|
def test_creates_rbac_tables(self, fresh_db_path):
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
tables = _table_names(conn)
|
|
conn.close()
|
|
assert "user_roles" in tables
|
|
assert "billing_customers" in tables
|
|
|
|
|
|
# ── TestExistingDatabase ──────────────────────────────────────
|
|
|
|
|
|
class TestExistingDatabase:
|
|
def test_applies_pending_migrations(self, existing_db):
|
|
migrate(existing_db)
|
|
conn = sqlite3.connect(existing_db)
|
|
applied = {
|
|
r[0] for r in conn.execute("SELECT name FROM _migrations").fetchall()
|
|
}
|
|
conn.close()
|
|
versions = _discover_versions()
|
|
assert applied == set(versions)
|
|
|
|
def test_records_migration_with_timestamp(self, existing_db):
|
|
migrate(existing_db)
|
|
conn = sqlite3.connect(existing_db)
|
|
row = conn.execute(
|
|
"SELECT name, applied_at FROM _migrations WHERE name LIKE '0001%'"
|
|
).fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert row[0] == "0001_rename_ls_to_paddle"
|
|
assert row[1] is not None # timestamp populated
|
|
|
|
|
|
# ── TestUpToDateDatabase ──────────────────────────────────────
|
|
|
|
|
|
class TestUpToDateDatabase:
|
|
def test_noop_when_all_applied(self, up_to_date_db):
|
|
with patch("padelnomics.migrations.migrate.importlib.import_module") as mock_imp:
|
|
migrate(up_to_date_db)
|
|
mock_imp.assert_not_called()
|
|
|
|
def test_no_duplicate_entries_on_rerun(self, up_to_date_db):
|
|
migrate(up_to_date_db)
|
|
migrate(up_to_date_db)
|
|
conn = sqlite3.connect(up_to_date_db)
|
|
count = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0]
|
|
conn.close()
|
|
assert count == len(_discover_versions())
|
|
|
|
|
|
# ── TestIdempotentMigration ───────────────────────────────────
|
|
|
|
|
|
class TestIdempotentMigration:
|
|
def test_migrate_twice_is_idempotent(self, fresh_db_path):
|
|
"""Running migrate() twice produces the same result."""
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
tables_first = _table_names(conn)
|
|
count_first = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0]
|
|
conn.close()
|
|
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
tables_second = _table_names(conn)
|
|
count_second = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0]
|
|
conn.close()
|
|
|
|
assert tables_first == tables_second
|
|
assert count_first == count_second
|
|
|
|
|
|
# ── TestDiscoverVersions ─────────────────────────────────────
|
|
|
|
|
|
class TestDiscoverVersions:
|
|
def test_finds_and_sorts_version_files(self):
|
|
versions = _discover_versions()
|
|
assert len(versions) >= 2
|
|
assert versions[0] == "0000_initial_schema"
|
|
assert versions[1] == "0001_rename_ls_to_paddle"
|
|
|
|
def test_ignores_non_matching_files(self, mock_versions_dir, monkeypatch):
|
|
(mock_versions_dir / "__init__.py").write_text("")
|
|
(mock_versions_dir / "readme.txt").write_text("")
|
|
(mock_versions_dir / "0001_real.py").write_text("")
|
|
monkeypatch.setattr(
|
|
"padelnomics.migrations.migrate.VERSIONS_DIR", mock_versions_dir
|
|
)
|
|
versions = _discover_versions()
|
|
assert versions == ["0001_real"]
|
|
|
|
def test_returns_empty_for_missing_directory(self, tmp_path, monkeypatch):
|
|
monkeypatch.setattr(
|
|
"padelnomics.migrations.migrate.VERSIONS_DIR",
|
|
tmp_path / "nonexistent",
|
|
)
|
|
assert _discover_versions() == []
|
|
|
|
|
|
# ── TestMigrationOrdering ─────────────────────────────────────
|
|
|
|
|
|
class TestMigrationOrdering:
|
|
def test_multiple_pending_run_in_order(self, tmp_path, monkeypatch):
|
|
"""Mock two version files and verify they run in sorted order."""
|
|
db_path = str(tmp_path / "order.db")
|
|
monkeypatch.setenv("DATABASE_PATH", db_path)
|
|
|
|
# Create fake version files in a temp versions dir
|
|
vdir = tmp_path / "vdir"
|
|
vdir.mkdir()
|
|
(vdir / "0001_first.py").write_text("")
|
|
(vdir / "0002_second.py").write_text("")
|
|
monkeypatch.setattr(
|
|
"padelnomics.migrations.migrate.VERSIONS_DIR", vdir
|
|
)
|
|
|
|
call_order = []
|
|
|
|
def fake_import(name):
|
|
class FakeMod:
|
|
@staticmethod
|
|
def up(conn):
|
|
call_order.append(name)
|
|
return FakeMod()
|
|
|
|
with patch(
|
|
"padelnomics.migrations.migrate.importlib.import_module",
|
|
side_effect=fake_import,
|
|
):
|
|
migrate()
|
|
|
|
assert call_order == [
|
|
"padelnomics.migrations.versions.0001_first",
|
|
"padelnomics.migrations.versions.0002_second",
|
|
]
|
|
|
|
def test_migrations_table_created_automatically(self, fresh_db_path):
|
|
"""A fresh DB gets the _migrations table from migrate()."""
|
|
migrate(fresh_db_path)
|
|
conn = sqlite3.connect(fresh_db_path)
|
|
tables = _table_names(conn)
|
|
conn.close()
|
|
assert "_migrations" in tables
|