add sequential migration system with version tracking

Replace the simple schema.sql runner with a proper sequential
migration system that tracks applied versions in a _migrations table.
Absorb scripts/migrate_to_paddle.py as versions/0001.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-16 10:57:28 +01:00
parent 25d06a80d5
commit c10cd4d714
5 changed files with 99 additions and 42 deletions

View File

@@ -1,19 +0,0 @@
"""One-time migration: rename lemonsqueezy columns to paddle."""
import sqlite3
import sys
db_path = sys.argv[1] if len(sys.argv) > 1 else "data/app.db"
conn = sqlite3.connect(db_path)
conn.execute(
"ALTER TABLE subscriptions RENAME COLUMN lemonsqueezy_customer_id TO paddle_customer_id"
)
conn.execute(
"ALTER TABLE subscriptions RENAME COLUMN lemonsqueezy_subscription_id TO paddle_subscription_id"
)
conn.execute("DROP INDEX IF EXISTS idx_subscriptions_provider")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_subscriptions_provider ON subscriptions(paddle_subscription_id)"
)
conn.commit()
conn.close()
print(f"Migration complete: {db_path}")

View File

@@ -1,51 +1,103 @@
""" """
Simple migration runner. Runs schema.sql against the database. Sequential migration runner.
- Runs schema.sql (idempotent CREATE IF NOT EXISTS for fresh DBs)
- Scans versions/ for NNNN_*.py files and runs unapplied ones in order
- Fresh DBs: marks all versions as applied without running them
(schema.sql already contains the final schema)
""" """
import importlib
import os import os
import re
import sqlite3 import sqlite3
import sys import sys
from pathlib import Path from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
VERSIONS_DIR = Path(__file__).parent / "versions"
VERSION_RE = re.compile(r"^(\d{4})_.+\.py$")
def _discover_versions():
"""Return sorted list of (name, module_path) for version files."""
if not VERSIONS_DIR.is_dir():
return []
versions = []
for f in sorted(VERSIONS_DIR.iterdir()):
if VERSION_RE.match(f.name):
versions.append(f.stem) # e.g. "0001_rename_ls_to_paddle"
return versions
def _is_fresh_db(conn):
"""A DB is fresh if it has no application tables at all."""
row = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table'"
" AND name NOT LIKE 'sqlite_%'"
).fetchone()
return row is None
def migrate(): def migrate():
"""Run migrations."""
# Get database path from env or default
db_path = os.getenv("DATABASE_PATH", "data/app.db") db_path = os.getenv("DATABASE_PATH", "data/app.db")
# Ensure directory exists
Path(db_path).parent.mkdir(parents=True, exist_ok=True) Path(db_path).parent.mkdir(parents=True, exist_ok=True)
# Read schema
schema_path = Path(__file__).parent / "schema.sql"
schema = schema_path.read_text()
# Connect and execute
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
# Enable WAL mode
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA foreign_keys=ON")
# Run schema is_fresh = _is_fresh_db(conn)
# schema.sql is always idempotent (CREATE IF NOT EXISTS)
schema = (Path(__file__).parent / "schema.sql").read_text()
conn.executescript(schema) conn.executescript(schema)
conn.commit()
versions = _discover_versions()
print(f"✓ Migrations complete: {db_path}") applied = {
row[0]
# Show tables for row in conn.execute("SELECT name FROM _migrations").fetchall()
}
pending = [v for v in versions if v not in applied]
if is_fresh:
# Fresh DB — schema.sql already created final schema.
# Record all versions as applied without executing them.
for name in pending:
conn.execute("INSERT INTO _migrations (name) VALUES (?)", (name,))
conn.commit()
print(f"✓ Fresh database initialised: {db_path}")
if pending:
print(f" Recorded {len(pending)} migration(s) as already applied")
elif pending:
for name in pending:
print(f" Applying {name}...")
mod = importlib.import_module(
f"padelnomics.migrations.versions.{name}"
)
mod.up(conn)
conn.execute(
"INSERT INTO _migrations (name) VALUES (?)", (name,)
)
conn.commit()
print(f"✓ Applied {len(pending)} migration(s): {db_path}")
else:
print(f"✓ All migrations already applied: {db_path}")
# Show tables (excluding internal sqlite/fts tables)
cursor = conn.execute( cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" "SELECT name FROM sqlite_master WHERE type='table'"
" AND name NOT LIKE 'sqlite_%'"
" ORDER BY name"
) )
tables = [row[0] for row in cursor.fetchall()] tables = [row[0] for row in cursor.fetchall()]
print(f" Tables: {', '.join(tables)}") print(f" Tables: {', '.join(tables)}")
conn.close() conn.close()

View File

@@ -1,6 +1,13 @@
-- Padelnomics Database Schema -- Padelnomics Database Schema
-- Run with: python -m padelnomics.migrations.migrate -- Run with: python -m padelnomics.migrations.migrate
-- Migration tracking
CREATE TABLE IF NOT EXISTS _migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Users -- Users
CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,

View File

@@ -0,0 +1,17 @@
"""Rename lemonsqueezy columns to paddle."""
def up(conn):
conn.execute(
"ALTER TABLE subscriptions"
" RENAME COLUMN lemonsqueezy_customer_id TO paddle_customer_id"
)
conn.execute(
"ALTER TABLE subscriptions"
" RENAME COLUMN lemonsqueezy_subscription_id TO paddle_subscription_id"
)
conn.execute("DROP INDEX IF EXISTS idx_subscriptions_provider")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_subscriptions_provider"
" ON subscriptions(paddle_subscription_id)"
)