add migration docs & tests, fix empty env var crash

- Expand migrate.py docstring with algorithm, protocol, and design decisions
- Add 20-test suite for migration framework (test_migrations.py)
- Fix: empty env vars (SECRET_KEY=) now fall back to defaults via _env() helper

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-16 11:37:00 +01:00
parent 674e051084
commit 97e3310998
4 changed files with 434 additions and 10 deletions

View File

@@ -0,0 +1,376 @@
"""
Tests for the sequential migration runner.
Synchronous tests — migrate.py uses stdlib sqlite3, not aiosqlite.
Uses tmp_path for isolated DB files and monkeypatch for DATABASE_PATH.
"""
import importlib
import re
import sqlite3
from pathlib import Path
from unittest.mock import patch
import pytest
from padelnomics.migrations.migrate import _discover_versions, _is_fresh_db, migrate
SCHEMA_PATH = (
Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "schema.sql"
)
VERSIONS_DIR = (
Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "versions"
)
# ── Helpers ───────────────────────────────────────────────────
def _old_schema_sql():
"""Return schema.sql with paddle columns swapped back to lemonsqueezy."""
schema = SCHEMA_PATH.read_text()
schema = schema.replace("paddle_customer_id", "lemonsqueezy_customer_id")
schema = schema.replace("paddle_subscription_id", "lemonsqueezy_subscription_id")
return schema
def _table_names(conn):
"""Return sorted list of user-visible table names."""
rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
" AND name NOT LIKE 'sqlite_%' ORDER BY name"
).fetchall()
return [r[0] for r in rows]
def _column_names(conn, table):
return [r[1] for r in conn.execute(f"PRAGMA table_info({table})").fetchall()]
# ── Fixtures ──────────────────────────────────────────────────
@pytest.fixture
def schema_sql():
return SCHEMA_PATH.read_text()
@pytest.fixture
def fresh_db_path(tmp_path):
"""Path to a non-existent DB file."""
return str(tmp_path / "fresh.db")
@pytest.fixture
def existing_db(tmp_path):
"""DB with old lemonsqueezy column names and no _migrations table."""
db_path = str(tmp_path / "existing.db")
schema = _old_schema_sql()
# Remove the _migrations table DDL so this DB has no tracking
schema = re.sub(
r"CREATE TABLE IF NOT EXISTS _migrations\s*\([^)]+\);",
"",
schema,
)
conn = sqlite3.connect(db_path)
conn.executescript(schema)
conn.commit()
conn.close()
return db_path
@pytest.fixture
def production_db(tmp_path, schema_sql):
"""DB with current paddle columns but no _migrations records."""
db_path = str(tmp_path / "production.db")
# Remove the _migrations DDL so it simulates manual migration
schema = re.sub(
r"CREATE TABLE IF NOT EXISTS _migrations\s*\([^)]+\);",
"",
schema_sql,
)
conn = sqlite3.connect(db_path)
conn.executescript(schema)
conn.commit()
conn.close()
return db_path
@pytest.fixture
def up_to_date_db(tmp_path, schema_sql):
"""DB with final schema and all migrations recorded."""
db_path = str(tmp_path / "uptodate.db")
conn = sqlite3.connect(db_path)
conn.executescript(schema_sql)
# Record all discovered versions as applied
for f in sorted(VERSIONS_DIR.iterdir()):
if re.match(r"^\d{4}_.+\.py$", f.name):
conn.execute(
"INSERT INTO _migrations (name) VALUES (?)", (f.stem,)
)
conn.commit()
conn.close()
return db_path
@pytest.fixture
def mock_versions_dir(tmp_path):
"""Empty temp directory for version discovery tests."""
d = tmp_path / "versions"
d.mkdir()
return d
# ── TestFreshDatabase ─────────────────────────────────────────
class TestFreshDatabase:
def test_creates_all_tables(self, fresh_db_path, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", fresh_db_path)
migrate()
conn = sqlite3.connect(fresh_db_path)
tables = _table_names(conn)
conn.close()
assert "_migrations" in tables
assert "users" in tables
assert "subscriptions" in tables
assert "scenarios" in tables
def test_records_all_versions_as_applied(self, fresh_db_path, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", fresh_db_path)
migrate()
conn = sqlite3.connect(fresh_db_path)
applied = {
r[0] for r in conn.execute("SELECT name FROM _migrations").fetchall()
}
conn.close()
versions = _discover_versions()
assert applied == set(versions)
def test_does_not_call_import_module(self, fresh_db_path, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", fresh_db_path)
with patch("padelnomics.migrations.migrate.importlib.import_module") as mock_imp:
migrate()
mock_imp.assert_not_called()
def test_uses_paddle_column_names(self, fresh_db_path, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", fresh_db_path)
migrate()
conn = sqlite3.connect(fresh_db_path)
cols = _column_names(conn, "subscriptions")
conn.close()
assert "paddle_customer_id" in cols
assert "paddle_subscription_id" in cols
assert "lemonsqueezy_customer_id" not in cols
# ── TestExistingDatabase ──────────────────────────────────────
class TestExistingDatabase:
def test_applies_pending_migration(self, existing_db, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", existing_db)
migrate()
conn = sqlite3.connect(existing_db)
cols = _column_names(conn, "subscriptions")
conn.close()
assert "paddle_customer_id" in cols
assert "paddle_subscription_id" in cols
assert "lemonsqueezy_customer_id" not in cols
def test_records_migration_with_timestamp(self, existing_db, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", existing_db)
migrate()
conn = sqlite3.connect(existing_db)
row = conn.execute(
"SELECT name, applied_at FROM _migrations WHERE name LIKE '0001%'"
).fetchone()
conn.close()
assert row is not None
assert row[0] == "0001_rename_ls_to_paddle"
assert row[1] is not None # timestamp populated
# ── TestUpToDateDatabase ──────────────────────────────────────
class TestUpToDateDatabase:
def test_noop_when_all_applied(self, up_to_date_db, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", up_to_date_db)
with patch("padelnomics.migrations.migrate.importlib.import_module") as mock_imp:
migrate()
mock_imp.assert_not_called()
def test_no_duplicate_entries_on_rerun(self, up_to_date_db, monkeypatch):
monkeypatch.setenv("DATABASE_PATH", up_to_date_db)
migrate()
migrate()
conn = sqlite3.connect(up_to_date_db)
count = conn.execute("SELECT COUNT(*) FROM _migrations").fetchone()[0]
conn.close()
assert count == len(_discover_versions())
# ── TestIdempotentMigration ───────────────────────────────────
class TestIdempotentMigration:
def test_production_db_paddle_cols_already_exist(
self, production_db, monkeypatch
):
"""Production scenario: paddle columns exist, no _migrations table.
0001 runs without error and gets recorded."""
monkeypatch.setenv("DATABASE_PATH", production_db)
migrate()
conn = sqlite3.connect(production_db)
cols = _column_names(conn, "subscriptions")
applied = {
r[0] for r in conn.execute("SELECT name FROM _migrations").fetchall()
}
conn.close()
assert "paddle_customer_id" in cols
assert "0001_rename_ls_to_paddle" in applied
# ── TestDiscoverVersions ─────────────────────────────────────
class TestDiscoverVersions:
def test_finds_and_sorts_version_files(self):
versions = _discover_versions()
assert len(versions) >= 1
assert versions[0] == "0001_rename_ls_to_paddle"
def test_ignores_non_matching_files(self, mock_versions_dir, monkeypatch):
(mock_versions_dir / "__init__.py").write_text("")
(mock_versions_dir / "readme.txt").write_text("")
(mock_versions_dir / "0001_real.py").write_text("")
monkeypatch.setattr(
"padelnomics.migrations.migrate.VERSIONS_DIR", mock_versions_dir
)
versions = _discover_versions()
assert versions == ["0001_real"]
def test_returns_empty_for_missing_directory(self, tmp_path, monkeypatch):
monkeypatch.setattr(
"padelnomics.migrations.migrate.VERSIONS_DIR",
tmp_path / "nonexistent",
)
assert _discover_versions() == []
# ── TestIsFreshDb ─────────────────────────────────────────────
class TestIsFreshDb:
def test_empty_db_is_fresh(self, tmp_path):
conn = sqlite3.connect(str(tmp_path / "empty.db"))
assert _is_fresh_db(conn) is True
conn.close()
def test_db_with_schema_is_not_fresh(self, tmp_path, schema_sql):
conn = sqlite3.connect(str(tmp_path / "full.db"))
conn.executescript(schema_sql)
assert _is_fresh_db(conn) is False
conn.close()
def test_db_with_single_table_is_not_fresh(self, tmp_path):
conn = sqlite3.connect(str(tmp_path / "one.db"))
conn.execute("CREATE TABLE foo (id INTEGER PRIMARY KEY)")
assert _is_fresh_db(conn) is False
conn.close()
# ── TestMigration0001 ─────────────────────────────────────────
class TestMigration0001:
@pytest.fixture
def mod_0001(self):
return importlib.import_module(
"padelnomics.migrations.versions.0001_rename_ls_to_paddle"
)
def test_renames_columns(self, tmp_path, mod_0001):
conn = sqlite3.connect(str(tmp_path / "rename.db"))
conn.executescript(_old_schema_sql())
mod_0001.up(conn)
cols = _column_names(conn, "subscriptions")
conn.close()
assert "paddle_customer_id" in cols
assert "paddle_subscription_id" in cols
assert "lemonsqueezy_customer_id" not in cols
def test_idempotent_when_already_renamed(self, tmp_path, schema_sql, mod_0001):
conn = sqlite3.connect(str(tmp_path / "idem.db"))
conn.executescript(schema_sql)
# Should not raise even though columns are already paddle_*
mod_0001.up(conn)
cols = _column_names(conn, "subscriptions")
conn.close()
assert "paddle_customer_id" in cols
def test_recreates_index(self, tmp_path, mod_0001):
conn = sqlite3.connect(str(tmp_path / "idx.db"))
conn.executescript(_old_schema_sql())
mod_0001.up(conn)
indexes = conn.execute(
"SELECT name FROM sqlite_master WHERE type='index'"
" AND name='idx_subscriptions_provider'"
).fetchall()
conn.close()
assert len(indexes) == 1
# ── TestMigrationOrdering ─────────────────────────────────────
class TestMigrationOrdering:
def test_multiple_pending_run_in_order(self, tmp_path, monkeypatch):
"""Mock two version files and verify they run in sorted order."""
db_path = str(tmp_path / "order.db")
# Create a DB with one arbitrary table so it's not "fresh"
conn = sqlite3.connect(db_path)
conn.execute("CREATE TABLE dummy (id INTEGER PRIMARY KEY)")
conn.close()
monkeypatch.setenv("DATABASE_PATH", db_path)
# Create fake version files in a temp versions dir
vdir = tmp_path / "vdir"
vdir.mkdir()
(vdir / "0001_first.py").write_text("")
(vdir / "0002_second.py").write_text("")
monkeypatch.setattr(
"padelnomics.migrations.migrate.VERSIONS_DIR", vdir
)
call_order = []
def fake_import(name):
class FakeMod:
@staticmethod
def up(conn):
call_order.append(name)
return FakeMod()
with patch(
"padelnomics.migrations.migrate.importlib.import_module",
side_effect=fake_import,
):
migrate()
assert call_order == [
"padelnomics.migrations.versions.0001_first",
"padelnomics.migrations.versions.0002_second",
]
def test_migrations_table_created_on_existing_db(
self, existing_db, monkeypatch
):
"""An existing DB without _migrations gets the table after migrate()."""
monkeypatch.setenv("DATABASE_PATH", existing_db)
migrate()
conn = sqlite3.connect(existing_db)
tables = _table_names(conn)
conn.close()
assert "_migrations" in tables