merge: fix sqlmesh worker command + article preview maps

This commit is contained in:
Deeman
2026-03-05 22:14:37 +01:00
13 changed files with 173 additions and 161 deletions

View File

@@ -33,10 +33,10 @@ do
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
uv run --package padelnomics_extract extract uv run --package padelnomics_extract extract
# Transform — plan detects new/changed models; run only executes existing plans. # Transform — run evaluates missing daily intervals for incremental models.
LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \ LANDING_DIR="${LANDING_DIR:-/data/padelnomics/landing}" \
DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \ DUCKDB_PATH="${DUCKDB_PATH:-/data/padelnomics/lakehouse.duckdb}" \
uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply uv run sqlmesh -p transform/sqlmesh_padelnomics run prod
# Export serving tables to analytics.duckdb (atomic swap). # Export serving tables to analytics.duckdb (atomic swap).
# The web app detects the inode change on next query — no restart needed. # The web app detects the inode change on next query — no restart needed.

View File

@@ -247,10 +247,10 @@ def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> tu
def run_transform() -> None: def run_transform() -> None:
"""Run SQLMesh — it evaluates model staleness internally.""" """Run SQLMesh — evaluates missing daily intervals."""
logger.info("Running SQLMesh transform") logger.info("Running SQLMesh transform")
ok, err = run_shell( ok, err = run_shell(
"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply", "uv run sqlmesh -p transform/sqlmesh_padelnomics run prod",
) )
if not ok: if not ok:
send_alert(f"[transform] {err}") send_alert(f"[transform] {err}")
@@ -358,6 +358,8 @@ def git_pull_and_sync() -> None:
run_shell(f"git checkout --detach {latest}") run_shell(f"git checkout --detach {latest}")
run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env") run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env")
run_shell("uv sync --all-packages") run_shell("uv sync --all-packages")
# Apply any model changes (FULL→INCREMENTAL, new models, etc.) before re-exec
run_shell("uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply")
# Re-exec so the new code is loaded. os.execv replaces this process in-place; # Re-exec so the new code is loaded. os.execv replaces this process in-place;
# systemd sees it as the same PID and does not restart the unit. # systemd sees it as the same PID and does not restart the unit.
logger.info("Deploy complete — re-execing to load new code") logger.info("Deploy complete — re-execing to load new code")

View File

@@ -14,7 +14,10 @@
MODEL ( MODEL (
name foundation.fct_availability_slot, name foundation.fct_availability_slot,
kind FULL, kind INCREMENTAL_BY_TIME_RANGE (
time_column snapshot_date
),
start '2026-03-01',
cron '@daily', cron '@daily',
grain (snapshot_date, tenant_id, resource_id, slot_start_time) grain (snapshot_date, tenant_id, resource_id, slot_start_time)
); );
@@ -37,7 +40,8 @@ WITH deduped AS (
captured_at_utc DESC captured_at_utc DESC
) AS rn ) AS rn
FROM staging.stg_playtomic_availability FROM staging.stg_playtomic_availability
WHERE price_amount IS NOT NULL WHERE snapshot_date BETWEEN @start_ds AND @end_ds
AND price_amount IS NOT NULL
AND price_amount > 0 AND price_amount > 0
) )
SELECT SELECT

View File

@@ -12,7 +12,10 @@
MODEL ( MODEL (
name foundation.fct_daily_availability, name foundation.fct_daily_availability,
kind FULL, kind INCREMENTAL_BY_TIME_RANGE (
time_column snapshot_date
),
start '2026-03-01',
cron '@daily', cron '@daily',
grain (snapshot_date, tenant_id) grain (snapshot_date, tenant_id)
); );
@@ -37,6 +40,7 @@ WITH slot_agg AS (
MAX(a.price_currency) AS price_currency, MAX(a.price_currency) AS price_currency,
MAX(a.captured_at_utc) AS captured_at_utc MAX(a.captured_at_utc) AS captured_at_utc
FROM foundation.fct_availability_slot a FROM foundation.fct_availability_slot a
WHERE a.snapshot_date BETWEEN @start_ds AND @end_ds
GROUP BY a.snapshot_date, a.tenant_id GROUP BY a.snapshot_date, a.tenant_id
) )
SELECT SELECT

View File

@@ -27,7 +27,7 @@ WITH venue_stats AS (
MAX(da.active_court_count) AS court_count, MAX(da.active_court_count) AS court_count,
COUNT(DISTINCT da.snapshot_date) AS days_observed COUNT(DISTINCT da.snapshot_date) AS days_observed
FROM foundation.fct_daily_availability da FROM foundation.fct_daily_availability da
WHERE TRY_CAST(da.snapshot_date AS DATE) >= CURRENT_DATE - INTERVAL '30 days' WHERE da.snapshot_date >= CURRENT_DATE - INTERVAL '30 days'
AND da.occupancy_rate IS NOT NULL AND da.occupancy_rate IS NOT NULL
AND da.occupancy_rate BETWEEN 0 AND 1.5 AND da.occupancy_rate BETWEEN 0 AND 1.5
GROUP BY da.tenant_id, da.country_code, da.city, da.city_slug, da.price_currency GROUP BY da.tenant_id, da.country_code, da.city, da.city_slug, da.price_currency

View File

@@ -13,44 +13,28 @@
MODEL ( MODEL (
name staging.stg_playtomic_availability, name staging.stg_playtomic_availability,
kind FULL, kind INCREMENTAL_BY_TIME_RANGE (
time_column snapshot_date
),
start '2026-03-01',
cron '@daily', cron '@daily',
grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc) grain (snapshot_date, tenant_id, resource_id, slot_start_time, snapshot_type, captured_at_utc)
); );
WITH WITH
morning_jsonl AS ( all_jsonl AS (
SELECT SELECT
date AS snapshot_date, CAST(date AS DATE) AS snapshot_date,
captured_at_utc, captured_at_utc,
'morning' AS snapshot_type, CASE
NULL::INTEGER AS recheck_hour, WHEN filename LIKE '%_recheck_%' THEN 'recheck'
tenant_id, ELSE 'morning'
slots AS slots_json END AS snapshot_type,
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*.jsonl.gz',
format = 'newline_delimited',
columns = {
date: 'VARCHAR',
captured_at_utc: 'VARCHAR',
tenant_id: 'VARCHAR',
slots: 'JSON'
},
filename = true
)
WHERE filename NOT LIKE '%_recheck_%'
AND tenant_id IS NOT NULL
),
recheck_jsonl AS (
SELECT
date AS snapshot_date,
captured_at_utc,
'recheck' AS snapshot_type,
TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour, TRY_CAST(recheck_hour AS INTEGER) AS recheck_hour,
tenant_id, tenant_id,
slots AS slots_json slots AS slots_json
FROM read_json( FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.jsonl.gz', @LANDING_DIR || '/playtomic/*/*/availability_' || @start_ds || '*.jsonl.gz',
format = 'newline_delimited', format = 'newline_delimited',
columns = { columns = {
date: 'VARCHAR', date: 'VARCHAR',
@@ -63,11 +47,6 @@ recheck_jsonl AS (
) )
WHERE tenant_id IS NOT NULL WHERE tenant_id IS NOT NULL
), ),
all_venues AS (
SELECT * FROM morning_jsonl
UNION ALL
SELECT * FROM recheck_jsonl
),
raw_resources AS ( raw_resources AS (
SELECT SELECT
av.snapshot_date, av.snapshot_date,
@@ -76,7 +55,7 @@ raw_resources AS (
av.recheck_hour, av.recheck_hour,
av.tenant_id, av.tenant_id,
resource_json resource_json
FROM all_venues av, FROM all_jsonl av,
LATERAL UNNEST( LATERAL UNNEST(
from_json(av.slots_json, '["JSON"]') from_json(av.slots_json, '["JSON"]')
) AS t(resource_json) ) AS t(resource_json)

View File

@@ -2736,13 +2736,13 @@ async def article_edit(article_id: int):
body = raw[m.end():].lstrip("\n") if m else raw body = raw[m.end():].lstrip("\n") if m else raw
body_html = mistune.html(body) if body else "" body_html = mistune.html(body) if body else ""
css_url = url_for("static", filename="css/output.css")
preview_doc = ( preview_doc = (
f"<!doctype html><html><head>" await render_template(
f"<link rel='stylesheet' href='{css_url}'>" "admin/partials/article_preview_doc.html", body_html=body_html
f"<style>html,body{{margin:0;padding:0}}body{{padding:2rem 2.5rem}}</style>" )
f"</head><body><div class='article-body'>{body_html}</div></body></html>" if body_html
) if body_html else "" else ""
)
data = {**dict(article), "body": body} data = {**dict(article), "body": body}
return await render_template( return await render_template(
@@ -2764,13 +2764,13 @@ async def article_preview():
m = _FRONTMATTER_RE.match(body) m = _FRONTMATTER_RE.match(body)
body = body[m.end():].lstrip("\n") if m else body body = body[m.end():].lstrip("\n") if m else body
body_html = mistune.html(body) if body else "" body_html = mistune.html(body) if body else ""
css_url = url_for("static", filename="css/output.css")
preview_doc = ( preview_doc = (
f"<!doctype html><html><head>" await render_template(
f"<link rel='stylesheet' href='{css_url}'>" "admin/partials/article_preview_doc.html", body_html=body_html
f"<style>html,body{{margin:0;padding:0}}body{{padding:2rem 2.5rem}}</style>" )
f"</head><body><div class='article-body'>{body_html}</div></body></html>" if body_html
) if body_html else "" else ""
)
return await render_template("admin/partials/article_preview.html", preview_doc=preview_doc) return await render_template("admin/partials/article_preview.html", preview_doc=preview_doc)

View File

@@ -384,7 +384,7 @@
<iframe <iframe
srcdoc="{{ preview_doc | e }}" srcdoc="{{ preview_doc | e }}"
style="flex:1;width:100%;border:none;display:block;" style="flex:1;width:100%;border:none;display:block;"
sandbox="allow-same-origin" sandbox="allow-same-origin allow-scripts"
title="Article preview" title="Article preview"
></iframe> ></iframe>
{% else %} {% else %}

View File

@@ -4,7 +4,7 @@
<iframe <iframe
srcdoc="{{ preview_doc | e }}" srcdoc="{{ preview_doc | e }}"
style="flex:1;width:100%;border:none;display:block;" style="flex:1;width:100%;border:none;display:block;"
sandbox="allow-same-origin" sandbox="allow-same-origin allow-scripts"
title="Article preview" title="Article preview"
></iframe> ></iframe>
{% else %} {% else %}

View File

@@ -0,0 +1,15 @@
{# Standalone HTML document used as iframe srcdoc for the article editor preview.
Includes Leaflet so map shortcodes render correctly. #}
<!doctype html>
<html>
<head>
<link rel="stylesheet" href="{{ url_for('static', filename='css/output.css') }}">
<link rel="stylesheet" href="{{ url_for('static', filename='vendor/leaflet/leaflet.min.css') }}">
<style>html,body{margin:0;padding:0}body{padding:2rem 2.5rem}</style>
</head>
<body>
<div class="article-body">{{ body_html | safe }}</div>
<script>window.LEAFLET_JS_URL = '{{ url_for("static", filename="vendor/leaflet/leaflet.min.js") }}';</script>
<script src="{{ url_for('static', filename='js/article-maps.js') }}"></script>
</body>
</html>

View File

@@ -60,106 +60,6 @@
{% endblock %} {% endblock %}
{% block scripts %} {% block scripts %}
<script> <script>window.LEAFLET_JS_URL = '{{ url_for("static", filename="vendor/leaflet/leaflet.min.js") }}';</script>
(function() { <script src="{{ url_for('static', filename='js/article-maps.js') }}"></script>
var countryMapEl = document.getElementById('country-map');
var cityMapEl = document.getElementById('city-map');
if (!countryMapEl && !cityMapEl) return;
var TILES = 'https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png';
var TILES_ATTR = '&copy; <a href="https://www.openstreetmap.org/copyright">OSM</a> &copy; <a href="https://carto.com/">CARTO</a>';
function scoreColor(score) {
if (score >= 60) return '#16A34A';
if (score >= 30) return '#D97706';
return '#DC2626';
}
function makeIcon(size, color) {
var s = Math.round(size);
return L.divIcon({
className: '',
html: '<div class="pn-marker" style="width:' + s + 'px;height:' + s + 'px;background:' + color + ';opacity:0.82;"></div>',
iconSize: [s, s],
iconAnchor: [s / 2, s / 2],
});
}
function initCountryMap(el) {
var slug = el.dataset.countrySlug;
var map = L.map(el, {scrollWheelZoom: false});
L.tileLayer(TILES, { attribution: TILES_ATTR, maxZoom: 18 }).addTo(map);
var lang = document.documentElement.lang || 'en';
fetch('/api/markets/' + slug + '/cities.json')
.then(function(r) { return r.json(); })
.then(function(data) {
if (!data.length) return;
var maxV = Math.max.apply(null, data.map(function(d) { return d.padel_venue_count || 1; }));
var bounds = [];
data.forEach(function(c) {
if (!c.lat || !c.lon) return;
var size = 10 + 36 * Math.sqrt((c.padel_venue_count || 1) / maxV);
var color = scoreColor(c.market_score);
var pop = c.population >= 1000000
? (c.population / 1000000).toFixed(1) + 'M'
: (c.population >= 1000 ? Math.round(c.population / 1000) + 'K' : (c.population || ''));
var tip = '<strong>' + c.city_name + '</strong><br>'
+ (c.padel_venue_count || 0) + ' venues'
+ (pop ? ' · ' + pop : '') + '<br>'
+ '<span style="color:' + color + ';font-weight:600;">Score ' + Math.round(c.market_score) + '/100</span>';
L.marker([c.lat, c.lon], { icon: makeIcon(size, color) })
.bindTooltip(tip, { className: 'map-tooltip', direction: 'top', offset: [0, -Math.round(size / 2)] })
.on('click', function() { window.location = '/' + lang + '/markets/' + slug + '/' + c.city_slug; })
.addTo(map);
bounds.push([c.lat, c.lon]);
});
if (bounds.length) map.fitBounds(bounds, { padding: [24, 24] });
});
}
var VENUE_ICON = L.divIcon({
className: '',
html: '<div class="pn-venue"></div>',
iconSize: [10, 10],
iconAnchor: [5, 5],
});
function initCityMap(el) {
var countrySlug = el.dataset.countrySlug;
var citySlug = el.dataset.citySlug;
var lat = parseFloat(el.dataset.lat);
var lon = parseFloat(el.dataset.lon);
var map = L.map(el, {scrollWheelZoom: false}).setView([lat, lon], 13);
L.tileLayer(TILES, { attribution: TILES_ATTR, maxZoom: 18 }).addTo(map);
fetch('/api/markets/' + countrySlug + '/' + citySlug + '/venues.json')
.then(function(r) { return r.json(); })
.then(function(data) {
data.forEach(function(v) {
if (!v.lat || !v.lon) return;
var indoor = v.indoor_court_count || 0;
var outdoor = v.outdoor_court_count || 0;
var total = v.court_count || (indoor + outdoor);
var courtLine = total
? total + ' court' + (total > 1 ? 's' : '')
+ (indoor || outdoor
? ' (' + [indoor ? indoor + ' indoor' : '', outdoor ? outdoor + ' outdoor' : ''].filter(Boolean).join(', ') + ')'
: '')
: '';
var tip = '<strong>' + v.name + '</strong>' + (courtLine ? '<br>' + courtLine : '');
L.marker([v.lat, v.lon], { icon: VENUE_ICON })
.bindTooltip(tip, { className: 'map-tooltip', direction: 'top', offset: [0, -7] })
.addTo(map);
});
});
}
var script = document.createElement('script');
script.src = '{{ url_for("static", filename="vendor/leaflet/leaflet.min.js") }}';
script.onload = function() {
if (countryMapEl) initCountryMap(countryMapEl);
if (cityMapEl) initCityMap(cityMapEl);
};
document.head.appendChild(script);
})();
</script>
{% endblock %} {% endblock %}

View File

@@ -0,0 +1,108 @@
/**
* Leaflet map initialisation for article pages (country + city maps).
*
* Looks for #country-map and #city-map elements. If neither exists, does nothing.
* Expects data-* attributes on the map elements and a global LEAFLET_JS_URL
* variable pointing to the Leaflet JS bundle.
*/
(function() {
var countryMapEl = document.getElementById('country-map');
var cityMapEl = document.getElementById('city-map');
if (!countryMapEl && !cityMapEl) return;
var TILES = 'https://{s}.basemaps.cartocdn.com/light_all/{z}/{x}/{y}{r}.png';
var TILES_ATTR = '&copy; <a href="https://www.openstreetmap.org/copyright">OSM</a> &copy; <a href="https://carto.com/">CARTO</a>';
function scoreColor(score) {
if (score >= 60) return '#16A34A';
if (score >= 30) return '#D97706';
return '#DC2626';
}
function makeIcon(size, color) {
var s = Math.round(size);
return L.divIcon({
className: '',
html: '<div class="pn-marker" style="width:' + s + 'px;height:' + s + 'px;background:' + color + ';opacity:0.82;"></div>',
iconSize: [s, s],
iconAnchor: [s / 2, s / 2],
});
}
function initCountryMap(el) {
var slug = el.dataset.countrySlug;
var map = L.map(el, {scrollWheelZoom: false});
L.tileLayer(TILES, { attribution: TILES_ATTR, maxZoom: 18 }).addTo(map);
var lang = document.documentElement.lang || 'en';
fetch('/api/markets/' + slug + '/cities.json')
.then(function(r) { return r.json(); })
.then(function(data) {
if (!data.length) return;
var maxV = Math.max.apply(null, data.map(function(d) { return d.padel_venue_count || 1; }));
var bounds = [];
data.forEach(function(c) {
if (!c.lat || !c.lon) return;
var size = 10 + 36 * Math.sqrt((c.padel_venue_count || 1) / maxV);
var color = scoreColor(c.market_score);
var pop = c.population >= 1000000
? (c.population / 1000000).toFixed(1) + 'M'
: (c.population >= 1000 ? Math.round(c.population / 1000) + 'K' : (c.population || ''));
var tip = '<strong>' + c.city_name + '</strong><br>'
+ (c.padel_venue_count || 0) + ' venues'
+ (pop ? ' · ' + pop : '') + '<br>'
+ '<span style="color:' + color + ';font-weight:600;">Score ' + Math.round(c.market_score) + '/100</span>';
L.marker([c.lat, c.lon], { icon: makeIcon(size, color) })
.bindTooltip(tip, { className: 'map-tooltip', direction: 'top', offset: [0, -Math.round(size / 2)] })
.on('click', function() { window.location = '/' + lang + '/markets/' + slug + '/' + c.city_slug; })
.addTo(map);
bounds.push([c.lat, c.lon]);
});
if (bounds.length) map.fitBounds(bounds, { padding: [24, 24] });
});
}
var VENUE_ICON = L.divIcon({
className: '',
html: '<div class="pn-venue"></div>',
iconSize: [10, 10],
iconAnchor: [5, 5],
});
function initCityMap(el) {
var countrySlug = el.dataset.countrySlug;
var citySlug = el.dataset.citySlug;
var lat = parseFloat(el.dataset.lat);
var lon = parseFloat(el.dataset.lon);
var map = L.map(el, {scrollWheelZoom: false}).setView([lat, lon], 13);
L.tileLayer(TILES, { attribution: TILES_ATTR, maxZoom: 18 }).addTo(map);
fetch('/api/markets/' + countrySlug + '/' + citySlug + '/venues.json')
.then(function(r) { return r.json(); })
.then(function(data) {
data.forEach(function(v) {
if (!v.lat || !v.lon) return;
var indoor = v.indoor_court_count || 0;
var outdoor = v.outdoor_court_count || 0;
var total = v.court_count || (indoor + outdoor);
var courtLine = total
? total + ' court' + (total > 1 ? 's' : '')
+ (indoor || outdoor
? ' (' + [indoor ? indoor + ' indoor' : '', outdoor ? outdoor + ' outdoor' : ''].filter(Boolean).join(', ') + ')'
: '')
: '';
var tip = '<strong>' + v.name + '</strong>' + (courtLine ? '<br>' + courtLine : '');
L.marker([v.lat, v.lon], { icon: VENUE_ICON })
.bindTooltip(tip, { className: 'map-tooltip', direction: 'top', offset: [0, -7] })
.addTo(map);
});
});
}
/* Dynamically load Leaflet JS then init maps */
var script = document.createElement('script');
script.src = window.LEAFLET_JS_URL || '/static/vendor/leaflet/leaflet.min.js';
script.onload = function() {
if (countryMapEl) initCountryMap(countryMapEl);
if (cityMapEl) initCityMap(cityMapEl);
};
document.head.appendChild(script);
})();

View File

@@ -737,9 +737,9 @@ async def handle_run_extraction(payload: dict) -> None:
@task("run_transform") @task("run_transform")
async def handle_run_transform(payload: dict) -> None: async def handle_run_transform(payload: dict) -> None:
"""Run SQLMesh transform (prod plan --auto-apply) in the background. """Run SQLMesh transform (prod run) in the background.
Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply`. Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics run prod`.
2-hour absolute timeout — same as extraction. 2-hour absolute timeout — same as extraction.
""" """
import subprocess import subprocess
@@ -748,7 +748,7 @@ async def handle_run_transform(payload: dict) -> None:
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread( result = await asyncio.to_thread(
subprocess.run, subprocess.run,
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"], ["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "run", "prod"],
capture_output=True, capture_output=True,
text=True, text=True,
timeout=7200, timeout=7200,
@@ -803,7 +803,7 @@ async def handle_run_pipeline(payload: dict) -> None:
), ),
( (
"transform", "transform",
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"], ["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "run", "prod"],
7200, 7200,
), ),
( (