fix(pipeline): make availability chain incremental + fix supervisor
Convert the availability chain (stg_playtomic_availability → fct_availability_slot → fct_daily_availability) from FULL to INCREMENTAL_BY_TIME_RANGE so sqlmesh run processes only new daily intervals instead of re-reading all files. Supervisor changes: - run_transform(): plan prod --auto-apply → run prod (evaluates missing cron intervals, picks up new data) - git_pull_and_sync(): add plan prod --auto-apply before re-exec so model code changes are applied on deploy - supervisor.sh: same plan → run change Staging model uses a date-scoped glob (@start_ds) to read only the current interval's files. snapshot_date cast to DATE (was VARCHAR) as required by time_column. Clean up redundant TRY_CAST(snapshot_date AS DATE) in venue_pricing_benchmarks since it's already DATE from foundation. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -33,10 +33,10 @@ do
|
|||||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
||||||
uv run --package padelnomics_extract extract
|
uv run --package padelnomics_extract extract
|
||||||
|
|
||||||
# Transform — plan detects new/changed models; run only executes existing plans.
|
# Transform — run evaluates missing daily intervals for incremental models.
|
||||||
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
|
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
|
||||||
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
|
||||||
uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply
|
uv run sqlmesh -p transform/sqlmesh_padelnomics run prod
|
||||||
|
|
||||||
# Export serving tables to analytics.duckdb (atomic swap).
|
# Export serving tables to analytics.duckdb (atomic swap).
|
||||||
# The web app detects the inode change on next query — no restart needed.
|
# The web app detects the inode change on next query — no restart needed.
|
||||||
|
|||||||
@@ -247,10 +247,10 @@ def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> tu
|
|||||||
|
|
||||||
|
|
||||||
def run_transform() -> None:
|
def run_transform() -> None:
|
||||||
"""Run SQLMesh — it evaluates model staleness internally."""
|
"""Run SQLMesh — evaluates missing daily intervals."""
|
||||||
logger.info("Running SQLMesh transform")
|
logger.info("Running SQLMesh transform")
|
||||||
ok, err = run_shell(
|
ok, err = run_shell(
|
||||||
"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply",
|
"uv run sqlmesh -p transform/sqlmesh_padelnomics run prod",
|
||||||
)
|
)
|
||||||
if not ok:
|
if not ok:
|
||||||
send_alert(f"[transform] {err}")
|
send_alert(f"[transform] {err}")
|
||||||
@@ -358,6 +358,8 @@ def git_pull_and_sync() -> None:
|
|||||||
run_shell(f"git checkout --detach {latest}")
|
run_shell(f"git checkout --detach {latest}")
|
||||||
run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env")
|
run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env")
|
||||||
run_shell("uv sync --all-packages")
|
run_shell("uv sync --all-packages")
|
||||||
|
# Apply any model changes (FULL→INCREMENTAL, new models, etc.) before re-exec
|
||||||
|
run_shell("uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply")
|
||||||
# Re-exec so the new code is loaded. os.execv replaces this process in-place;
|
# Re-exec so the new code is loaded. os.execv replaces this process in-place;
|
||||||
# systemd sees it as the same PID and does not restart the unit.
|
# systemd sees it as the same PID and does not restart the unit.
|
||||||
logger.info("Deploy complete — re-execing to load new code")
|
logger.info("Deploy complete — re-execing to load new code")
|
||||||
|
|||||||
@@ -14,7 +14,10 @@
|
|||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name foundation.fct_availability_slot,
|
name foundation.fct_availability_slot,
|
||||||
kind FULL,
|
kind INCREMENTAL_BY_TIME_RANGE (
|
||||||
|
time_column snapshot_date
|
||||||
|
),
|
||||||
|
start '2026-03-01',
|
||||||
cron '@daily',
|
cron '@daily',
|
||||||
grain (snapshot_date, tenant_id, resource_id, slot_start_time)
|
grain (snapshot_date, tenant_id, resource_id, slot_start_time)
|
||||||
);
|
);
|
||||||
@@ -37,7 +40,8 @@ WITH deduped AS (
|
|||||||
captured_at_utc DESC
|
captured_at_utc DESC
|
||||||
) AS rn
|
) AS rn
|
||||||
FROM staging.stg_playtomic_availability
|
FROM staging.stg_playtomic_availability
|
||||||
WHERE price_amount IS NOT NULL
|
WHERE snapshot_date BETWEEN @start_ds AND @end_ds
|
||||||
|
AND price_amount IS NOT NULL
|
||||||
AND price_amount > 0
|
AND price_amount > 0
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
|
|||||||
@@ -12,7 +12,10 @@
|
|||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name foundation.fct_daily_availability,
|
name foundation.fct_daily_availability,
|
||||||
kind FULL,
|
kind INCREMENTAL_BY_TIME_RANGE (
|
||||||
|
time_column snapshot_date
|
||||||
|
),
|
||||||
|
start '2026-03-01',
|
||||||
cron '@daily',
|
cron '@daily',
|
||||||
grain (snapshot_date, tenant_id)
|
grain (snapshot_date, tenant_id)
|
||||||
);
|
);
|
||||||
@@ -37,6 +40,7 @@ WITH slot_agg AS (
|
|||||||
MAX(a.price_currency) AS price_currency,
|
MAX(a.price_currency) AS price_currency,
|
||||||
MAX(a.captured_at_utc) AS captured_at_utc
|
MAX(a.captured_at_utc) AS captured_at_utc
|
||||||
FROM foundation.fct_availability_slot a
|
FROM foundation.fct_availability_slot a
|
||||||
|
WHERE a.snapshot_date BETWEEN @start_ds AND @end_ds
|
||||||
GROUP BY a.snapshot_date, a.tenant_id
|
GROUP BY a.snapshot_date, a.tenant_id
|
||||||
)
|
)
|
||||||
SELECT
|
SELECT
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ WITH venue_stats AS (
|
|||||||
MAX(da.active_court_count) AS court_count,
|
MAX(da.active_court_count) AS court_count,
|
||||||
COUNT(DISTINCT da.snapshot_date) AS days_observed
|
COUNT(DISTINCT da.snapshot_date) AS days_observed
|
||||||
FROM foundation.fct_daily_availability da
|
FROM foundation.fct_daily_availability da
|
||||||
WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days'
|
WHERE da.snapshot_date >= CURRENT_DATE - INTERVAL '30 days'
|
||||||
AND da.occupancy_rate IS NOT NULL
|
AND da.occupancy_rate IS NOT NULL
|
||||||
AND da.occupancy_rate BETWEEN 0 AND 1.5
|
AND da.occupancy_rate BETWEEN 0 AND 1.5
|
||||||
GROUP BY da.tenant_id, da.country_code, da.city, da.city_slug, da.price_currency
|
GROUP BY da.tenant_id, da.country_code, da.city, da.city_slug, da.price_currency
|
||||||
|
|||||||
@@ -13,44 +13,28 @@
|
|||||||
|
|
||||||
MODEL (
|
MODEL (
|
||||||
name staging.stg_playtomic_availability,
|
name staging.stg_playtomic_availability,
|
||||||
kind FULL,
|
kind INCREMENTAL_BY_TIME_RANGE (
|
||||||
|
time_column snapshot_date
|
||||||
|
),
|
||||||
|
start '2026-03-01',
|
||||||
cron '@daily',
|
cron '@daily',
|
||||||
grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
|
grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
|
||||||
);
|
);
|
||||||
|
|
||||||
WITH
|
WITH
|
||||||
morning_jsonl AS (
|
all_jsonl AS (
|
||||||
SELECT
|
SELECT
|
||||||
date AS snapshot_date,
|
CAST(date AS DATE) AS snapshot_date,
|
||||||
captured_at_utc,
|
captured_at_utc,
|
||||||
'morning' AS snapshot_type,
|
CASE
|
||||||
NULL::INTEGER AS recheck_hour,
|
WHEN filename LIKE '%_recheck_%' THEN 'recheck'
|
||||||
tenant_id,
|
ELSE 'morning'
|
||||||
slots AS slots_json
|
END AS snapshot_type,
|
||||||
FROM read_json(
|
|
||||||
@LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz',
|
|
||||||
format = 'newline_delimited',
|
|
||||||
columns = {
|
|
||||||
date: 'VARCHAR',
|
|
||||||
captured_at_utc: 'VARCHAR',
|
|
||||||
tenant_id: 'VARCHAR',
|
|
||||||
slots: 'JSON'
|
|
||||||
},
|
|
||||||
filename = true
|
|
||||||
)
|
|
||||||
WHERE filename NOT LIKE '%_recheck_%'
|
|
||||||
AND tenant_id IS NOT NULL
|
|
||||||
),
|
|
||||||
recheck_jsonl AS (
|
|
||||||
SELECT
|
|
||||||
date AS snapshot_date,
|
|
||||||
captured_at_utc,
|
|
||||||
'recheck' AS snapshot_type,
|
|
||||||
TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour,
|
TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
slots AS slots_json
|
slots AS slots_json
|
||||||
FROM read_json(
|
FROM read_json(
|
||||||
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.jsonl.gz',
|
@LANDING_DIR || '/playtomic/*/*/availability_' || @start_ds || '*.jsonl.gz',
|
||||||
format = 'newline_delimited',
|
format = 'newline_delimited',
|
||||||
columns = {
|
columns = {
|
||||||
date: 'VARCHAR',
|
date: 'VARCHAR',
|
||||||
@@ -63,11 +47,6 @@ recheck_jsonl AS (
|
|||||||
)
|
)
|
||||||
WHERE tenant_id IS NOT NULL
|
WHERE tenant_id IS NOT NULL
|
||||||
),
|
),
|
||||||
all_venues AS (
|
|
||||||
SELECT * FROM morning_jsonl
|
|
||||||
UNION ALL
|
|
||||||
SELECT * FROM recheck_jsonl
|
|
||||||
),
|
|
||||||
raw_resources AS (
|
raw_resources AS (
|
||||||
SELECT
|
SELECT
|
||||||
av.snapshot_date,
|
av.snapshot_date,
|
||||||
@@ -76,7 +55,7 @@ raw_resources AS (
|
|||||||
av.recheck_hour,
|
av.recheck_hour,
|
||||||
av.tenant_id,
|
av.tenant_id,
|
||||||
resource_json
|
resource_json
|
||||||
FROM all_venues av,
|
FROM all_jsonl av,
|
||||||
LATERAL UNNEST(
|
LATERAL UNNEST(
|
||||||
from_json(av.slots_json, '["JSON"]')
|
from_json(av.slots_json, '["JSON"]')
|
||||||
) AS t(resource_json)
|
) AS t(resource_json)
|
||||||
|
|||||||
Reference in New Issue
Block a user