From 2f47d1e589e92a6f51d3501794a1efa4e54d99ee Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 5 Mar 2026 21:34:02 +0100 Subject: [PATCH] fix(pipeline): make availability chain incremental + fix supervisor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Convert the availability chain (stg_playtomic_availability → fct_availability_slot → fct_daily_availability) from FULL to INCREMENTAL_BY_TIME_RANGE so sqlmesh run processes only new daily intervals instead of re-reading all files. Supervisor changes: - run_transform(): plan prod --auto-apply → run prod (evaluates missing cron intervals, picks up new data) - git_pull_and_sync(): add plan prod --auto-apply before re-exec so model code changes are applied on deploy - supervisor.sh: same plan → run change Staging model uses a date-scoped glob (@start_ds) to read only the current interval's files. snapshot_date cast to DATE (was VARCHAR) as required by time_column. Clean up redundant TRY_CAST(snapshot_date AS DATE) in venue_pricing_benchmarks since it's already DATE from foundation. Co-Authored-By: Claude Sonnet 4.6 --- infra/supervisor/supervisor.sh | 4 +- src/padelnomics/supervisor.py | 6 ++- .../foundation/fct_availability_slot.sql | 8 +++- .../foundation/fct_daily_availability.sql | 6 ++- .../serving/venue_pricing_benchmarks.sql | 2 +- .../staging/stg_playtomic_availability.sql | 45 +++++-------------- 6 files changed, 30 insertions(+), 41 deletions(-) diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh index a855b12..f21f425 100644 --- a/infra/supervisor/supervisor.sh +++ b/infra/supervisor/supervisor.sh @@ -33,10 +33,10 @@ do DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ uv run --package padelnomics_extract extract - # Transform — plan detects new/changed models; run only executes existing plans. + # Transform — run evaluates missing daily intervals for incremental models. LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ - uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply + uv run sqlmesh -p transform/sqlmesh_padelnomics run prod # Export serving tables to analytics.duckdb (atomic swap). # The web app detects the inode change on next query — no restart needed. diff --git a/src/padelnomics/supervisor.py b/src/padelnomics/supervisor.py index 02ccfbe..c2a8529 100644 --- a/src/padelnomics/supervisor.py +++ b/src/padelnomics/supervisor.py @@ -247,10 +247,10 @@ def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> tu def run_transform() -> None: - """Run SQLMesh — it evaluates model staleness internally.""" + """Run SQLMesh — evaluates missing daily intervals.""" logger.info("Running SQLMesh transform") ok, err = run_shell( - "uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply", + "uv run sqlmesh -p transform/sqlmesh_padelnomics run prod", ) if not ok: send_alert(f"[transform] {err}") @@ -358,6 +358,8 @@ def git_pull_and_sync() -> None: run_shell(f"git checkout --detach {latest}") run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env") run_shell("uv sync --all-packages") + # Apply any model changes (FULL→INCREMENTAL, new models, etc.) before re-exec + run_shell("uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply") # Re-exec so the new code is loaded. os.execv replaces this process in-place; # systemd sees it as the same PID and does not restart the unit. logger.info("Deploy complete — re-execing to load new code") diff --git a/transform/sqlmesh_padelnomics/models/foundation/fct_availability_slot.sql b/transform/sqlmesh_padelnomics/models/foundation/fct_availability_slot.sql index 8094e9a..8107f9c 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/fct_availability_slot.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/fct_availability_slot.sql @@ -14,7 +14,10 @@ MODEL ( name foundation.fct_availability_slot, - kind FULL, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column snapshot_date + ), + start '2026-03-01', cron '@daily', grain (snapshot_date, tenant_id, resource_id, slot_start_time) ); @@ -37,7 +40,8 @@ WITH deduped AS ( captured_at_utc DESC ) AS rn FROM staging.stg_playtomic_availability - WHERE price_amount IS NOT NULL + WHERE snapshot_date BETWEEN @start_ds AND @end_ds + AND price_amount IS NOT NULL AND price_amount > 0 ) SELECT diff --git a/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql b/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql index 74b8b8a..bdbabb3 100644 --- a/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql +++ b/transform/sqlmesh_padelnomics/models/foundation/fct_daily_availability.sql @@ -12,7 +12,10 @@ MODEL ( name foundation.fct_daily_availability, - kind FULL, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column snapshot_date + ), + start '2026-03-01', cron '@daily', grain (snapshot_date, tenant_id) ); @@ -37,6 +40,7 @@ WITH slot_agg AS ( MAX(a.price_currency) AS price_currency, MAX(a.captured_at_utc) AS captured_at_utc FROM foundation.fct_availability_slot a + WHERE a.snapshot_date BETWEEN @start_ds AND @end_ds GROUP BY a.snapshot_date, a.tenant_id ) SELECT diff --git a/transform/sqlmesh_padelnomics/models/serving/venue_pricing_benchmarks.sql b/transform/sqlmesh_padelnomics/models/serving/venue_pricing_benchmarks.sql index a305ad4..592ebc8 100644 --- a/transform/sqlmesh_padelnomics/models/serving/venue_pricing_benchmarks.sql +++ b/transform/sqlmesh_padelnomics/models/serving/venue_pricing_benchmarks.sql @@ -27,7 +27,7 @@ WITH venue_stats AS ( MAX(da.active_court_count) AS court_count, COUNT(DISTINCT da.snapshot_date) AS days_observed FROM foundation.fct_daily_availability da - WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days' + WHERE da.snapshot_date >= CURRENT_DATE - INTERVAL '30 days' AND da.occupancy_rate IS NOT NULL AND da.occupancy_rate BETWEEN 0 AND 1.5 GROUP BY da.tenant_id, da.country_code, da.city, da.city_slug, da.price_currency diff --git a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql index 40ea9e9..d6fa37d 100644 --- a/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql +++ b/transform/sqlmesh_padelnomics/models/staging/stg_playtomic_availability.sql @@ -13,44 +13,28 @@ MODEL ( name staging.stg_playtomic_availability, - kind FULL, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column snapshot_date + ), + start '2026-03-01', cron '@daily', grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) ); WITH -morning_jsonl AS ( +all_jsonl AS ( SELECT - date AS snapshot_date, + CAST(date AS DATE) AS snapshot_date, captured_at_utc, - 'morning' AS snapshot_type, - NULL::INTEGER AS recheck_hour, - tenant_id, - slots AS slots_json - FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz', - format = 'newline_delimited', - columns = { - date: 'VARCHAR', - captured_at_utc: 'VARCHAR', - tenant_id: 'VARCHAR', - slots: 'JSON' - }, - filename = true - ) - WHERE filename NOT LIKE '%_recheck_%' - AND tenant_id IS NOT NULL -), -recheck_jsonl AS ( - SELECT - date AS snapshot_date, - captured_at_utc, - 'recheck' AS snapshot_type, + CASE + WHEN filename LIKE '%_recheck_%' THEN 'recheck' + ELSE 'morning' + END AS snapshot_type, TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour, tenant_id, slots AS slots_json FROM read_json( - @LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.jsonl.gz', + @LANDING_DIR || '/playtomic/*/*/availability_' || @start_ds || '*.jsonl.gz', format = 'newline_delimited', columns = { date: 'VARCHAR', @@ -63,11 +47,6 @@ recheck_jsonl AS ( ) WHERE tenant_id IS NOT NULL ), -all_venues AS ( - SELECT * FROM morning_jsonl - UNION ALL - SELECT * FROM recheck_jsonl -), raw_resources AS ( SELECT av.snapshot_date, @@ -76,7 +55,7 @@ raw_resources AS ( av.recheck_hour, av.tenant_id, resource_json - FROM all_venues av, + FROM all_jsonl av, LATERAL UNNEST( from_json(av.slots_json, '["JSON"]') ) AS t(resource_json)