Compare commits
8 Commits
v7
...
v202602282
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aee3733b49 | ||
|
|
51d9aab4a0 | ||
|
|
85b6aa0d0a | ||
|
|
e62aad148b | ||
|
|
6fb1e990e3 | ||
|
|
6edf8ba65e | ||
|
|
ed0a578050 | ||
|
|
c1cdeec6be |
@@ -43,9 +43,10 @@ ALERT_WEBHOOK_URL=ENC[AES256_GCM,data:4sXQk8zklruC525J279TUUatdDJQ43qweuoPhtpI82
|
||||
NTFY_TOKEN=ENC[AES256_GCM,data:YlOxhsRJ8P1y4kk6ugWm41iyRCsM6oAWjvbU9lGcD0A=,iv:JZXOvi3wTOPV9A46c7fMiqbszNCvXkOgh9i/H1hob24=,tag:8xnPimgy7sesOAnxhaXmpg==,type:str]
|
||||
SUPERVISOR_GIT_PULL=ENC[AES256_GCM,data:mg==,iv:KgqMVYj12FjOzWxtA1T0r0pqCDJ6MtHzMjE+4W/W+s4=,tag:czFaOqhHG8nqrQ8AZ8QiGw==,type:str]
|
||||
#ENC[AES256_GCM,data:hzAZvCWc4RTk290=,iv:RsSI4OpAOQGcFVpfXDZ6t705yWmlO0JEWwWF5uQu9As=,tag:UPqFtA2tXiSa0vzJAv8qXg==,type:comment]
|
||||
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:O+eoFK/Z4hUgVxDqK58qVICa8wjo2o6/Es7VHZ3wyfjuDed38ekybjiUy5W7BF6X38HP73VSeRv/5cgbdbPMWjvwtw==,iv:MILS/EvbY+D2i28B1i5PgAAxlkRuK1fAKmUuuAtuCXk=,tag:o4eQkkga/RjQGlqYnXwufQ==,type:str]
|
||||
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:VQ8SU8xOoVMAQfJVit6HQfOjLlq3u41iHMfTYUZ908ouCcsKkB/mRBbhlODiu7tJXdjuGD47iTgXlso54bPIbjcLwLcg2GNOiSI=,iv:g/RM1XoCw78OmtGUh2Dyfd1N8tNQRlcfRrtj6uJYvds=,tag:lbPdM4JJxTysr1qG1A4+Fw==,type:str]
|
||||
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:x/F0toXDc8stsUNxaepCmxq1+WuacqqPtdc+R5mxTwcAzsKxCdwt8KpBZWMvz7ku4tHDGsKD949QAX2ANXP9oCMTgW0=,iv:6G9gE9/v7GaYj8aqVTmMrpw6AcQK9yMSCAohNdAD1Ws=,tag:2Jimr1ldVSfkh8LPEwdN3w==,type:str]
|
||||
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:6BfXBYmyHpgZU/kJWpZLf8eH5VowVK1n0r6GzFTNAx/OmyaaS1RZVPC1JPkPBnTwEmo0WHYRW8uiUdkABmH9F5ZqqlsAesyfW7zvU9r7yD+D7w==,iv:3CBn2qCoTueQy8xVcQqZS4E3F0qoFYnNbzTZTpJ1veo=,tag:wC3Ecl4uNTwPiT23ATvRZg==,type:str]
|
||||
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:/N77CFf6tJWCk7HrnBOm2Q1ynx7XoblzfbzJySeCjrxqiu4r+CB90aDkaPahlQKI00DUZih3pcy7WhnjdAwI30G5kJZ3P8H8/R0tP7OBK1wPVbsJq8prQJPFOAWewsS4KWNtSURZPYSCxslcBb7DHLX6ZAjv6A5KFOjRK2N8usR9sIabrCWh,iv:G3Ropu/JGytZK/zKsNGFjjSu3Wt6fvHaAqI9RpUHvlI=,tag:fv6xuS94OR+4xfiyKrYELA==,type:str]
|
||||
PROXY_CONCURRENCY=ENC[AES256_GCM,data:vdEZ,iv:+eTNQO+s/SsVDBLg1/+fneMzEEsFkuEFxo/FcVV+mWc=,tag:i/EPwi/jOoWl3xW8H0XMdw==,type:str]
|
||||
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:L2s=,iv:fV3mCKmK5fxUmIWRePELBDAPTb8JZqasVIhnAl55kYw=,tag:XL+PO6sblz/7WqHC3dtk1w==,type:str]
|
||||
#ENC[AES256_GCM,data:RC+t2vqLwLjapdAUql8rQls=,iv:Kkiz3ND0g0MRAgcPJysIYMzSQS96Rq+3YP5yO7yWfIY=,tag:Y6TbZd81ihIwn+U515qd1g==,type:comment]
|
||||
GSC_SERVICE_ACCOUNT_PATH=ENC[AES256_GCM,data:Vki6yHk+gd4n,iv:rxzKvwrGnAkLcpS41EZ097E87NrIpNZGFfl4iXFvr40=,tag:EZkBJpCq5rSpKYVC4H3JHQ==,type:str]
|
||||
@@ -61,7 +62,7 @@ sops_age__list_1__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb2
|
||||
sops_age__list_1__map_recipient=age1wjepykv3glvsrtegu25tevg7vyn3ngpl607u3yjc9ucay04s045s796msw
|
||||
sops_age__list_2__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBFeHhaOURNZnRVMEwxNThu\nUjF4Q0kwUXhTUE1QSzZJbmpubnh3RnpQTmdvCjRmWWxpNkxFUmVGb3NRbnlydW5O\nWEg3ZXJQTU4vcndzS2pUQXY3Q0ttYjAKLS0tIE9IRFJ1c2ZxbGVHa2xTL0swbGN1\nTzgwMThPUDRFTWhuZHJjZUYxOTZrU00KY62qrNBCUQYxwcLMXFEnLkwncxq3BPJB\nKm4NzeHBU87XmPWVrgrKuf+PH1mxJlBsl7Hev8xBTy7l6feiZjLIvQ==\n-----END AGE ENCRYPTED FILE-----\n
|
||||
sops_age__list_2__map_recipient=age1c783ym2q5x9tv7py5d28uc4k44aguudjn03g97l9nzs00dd9tsrqum8h4d
|
||||
sops_lastmodified=2026-02-28T15:51:24Z
|
||||
sops_mac=ENC[AES256_GCM,data:FV34u95nhrUKbzVvcZ44V0pPVlhxJafs0TpkHlcpcuc+1TQWAysW5QbyKEVAGiLobKd3wjT7ThYGLEko7ZWjgXvpS0Bx0Qn+8KN+wFFZ84/cgCu6BEI3K5Ua3ssLyEYDcqIWy13K7DM6ymfp3bBNg96p3dgcf0wbJHHr+ef5048=,iv:JDhmirwvMhudCBFeQqlXvmqKaMIi++16UJvLOoMFios=,tag:p+LumDAe+CNozGxx5kZObw==,type:str]
|
||||
sops_lastmodified=2026-02-28T17:03:44Z
|
||||
sops_mac=ENC[AES256_GCM,data:IQ9jpRxVUssaMK+qFcM3nPdzXHkiqp6E+DhEey1TfqUu5GCBNsWeVy9m9A6p9RWhu2NtJV7aKdUeqneuMtD1q5Tnm6L96zuyot2ESnx2N2ssD9ilrDauQxoBJcrJVnGV61CgaCz9458w8BuVUZydn3MoHeRaU7bOBBzQlTI6vZk=,iv:qHqdt3av/KZRQHr/OS/9KdAJUgKlKEDgan7qI3Zzkck=,tag:fOvdO9iRTTF1Siobu2mLqg==,type:str]
|
||||
sops_unencrypted_suffix=_unencrypted
|
||||
sops_version=3.12.1
|
||||
|
||||
@@ -17,9 +17,9 @@ jobs:
|
||||
- run: uv run pytest web/tests/ -x -q -p no:faulthandler
|
||||
- run: uv run ruff check web/src/ web/tests/
|
||||
|
||||
# Creates v<N> tag after tests pass. The on-server supervisor polls for new
|
||||
# tags every 60s and deploys automatically. No SSH keys or deploy credentials
|
||||
# needed in CI — only the built-in github.token.
|
||||
# Creates a v{YYYYMMDDHHMM} tag after tests pass on master.
|
||||
# The on-server supervisor polls for new tags every 60s and deploys
|
||||
# automatically. No SSH keys or deploy credentials needed in CI.
|
||||
tag:
|
||||
needs: [test]
|
||||
runs-on: ubuntu-latest
|
||||
@@ -32,5 +32,6 @@ jobs:
|
||||
run: |
|
||||
git config user.name "CI"
|
||||
git config user.email "ci@noreply"
|
||||
git tag "v${{ github.run_number }}"
|
||||
git push origin "v${{ github.run_number }}"
|
||||
TAG="v$(date -u +%Y%m%d%H%M)"
|
||||
git tag "$TAG"
|
||||
git push origin "$TAG"
|
||||
|
||||
@@ -52,8 +52,7 @@ MAX_VENUES_PER_RUN = 20_000
|
||||
MAX_RETRIES_PER_VENUE = 2
|
||||
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "30"))
|
||||
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
|
||||
# Override worker count — useful when tier 0 is a single rotating endpoint (DC/residential)
|
||||
# that supports many concurrent connections. Defaults to len(tiers[0]) when unset.
|
||||
# Worker count: defaults to MAX_PROXY_CONCURRENCY (200). Override via PROXY_CONCURRENCY env var.
|
||||
_PROXY_CONCURRENCY = os.environ.get("PROXY_CONCURRENCY", "").strip()
|
||||
MAX_PROXY_CONCURRENCY = 200
|
||||
|
||||
@@ -300,8 +299,7 @@ def extract(
|
||||
|
||||
# Set up tiered proxy cycler with circuit breaker
|
||||
tiers = load_proxy_tiers()
|
||||
default_workers = len(tiers[0]) if tiers else 1
|
||||
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else default_workers
|
||||
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else (MAX_PROXY_CONCURRENCY if tiers else 1)
|
||||
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
|
||||
|
||||
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
@@ -491,8 +489,7 @@ def extract_recheck(
|
||||
|
||||
# Set up tiered proxy cycler with circuit breaker
|
||||
tiers = load_proxy_tiers()
|
||||
default_workers = len(tiers[0]) if tiers else 1
|
||||
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else default_workers
|
||||
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else (MAX_PROXY_CONCURRENCY if tiers else 1)
|
||||
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
|
||||
|
||||
if worker_count > 1 and len(venues_to_recheck) > 10:
|
||||
|
||||
@@ -279,12 +279,18 @@ def web_code_changed() -> bool:
|
||||
|
||||
|
||||
def current_deployed_tag() -> str | None:
|
||||
"""Return the tag currently checked out, or None if not on a tag."""
|
||||
"""Return the highest-version tag pointing at HEAD, or None.
|
||||
|
||||
Uses the same sort order as latest_remote_tag() so that when multiple
|
||||
tags point to the same commit (e.g. a date-based tag and a CI integer
|
||||
tag), we always compare apples-to-apples.
|
||||
"""
|
||||
result = subprocess.run(
|
||||
["git", "describe", "--tags", "--exact-match", "HEAD"],
|
||||
["git", "tag", "--list", "--sort=-version:refname", "--points-at", "HEAD", "v*"],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
return result.stdout.strip() or None
|
||||
tags = result.stdout.strip().splitlines()
|
||||
return tags[0] if tags else None
|
||||
|
||||
|
||||
def latest_remote_tag() -> str | None:
|
||||
@@ -321,6 +327,10 @@ def git_pull_and_sync() -> None:
|
||||
run_shell(f"git checkout --detach {latest}")
|
||||
run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env")
|
||||
run_shell("uv sync --all-packages")
|
||||
# Re-exec so the new code is loaded. os.execv replaces this process in-place;
|
||||
# systemd sees it as the same PID and does not restart the unit.
|
||||
logger.info("Deploy complete — re-execing to load new code")
|
||||
os.execv(sys.executable, sys.argv)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -2,22 +2,14 @@
|
||||
-- One row per available 60-minute booking slot per court per venue per day.
|
||||
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
|
||||
--
|
||||
-- Reads BOTH morning snapshots and recheck files:
|
||||
-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
||||
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
|
||||
-- Recheck (new): availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
|
||||
-- Recheck (old): availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
|
||||
-- Reads morning snapshots and recheck files (JSONL format):
|
||||
-- Morning: availability_{date}.jsonl.gz → snapshot_type = 'morning'
|
||||
-- Recheck: availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
|
||||
--
|
||||
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
|
||||
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
|
||||
--
|
||||
-- Supports two morning landing formats (UNION ALL during migration):
|
||||
-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc
|
||||
-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required)
|
||||
--
|
||||
-- Requires: at least one availability file in the landing zone.
|
||||
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
|
||||
-- with empty venues[] ensures this model runs before real data arrives.
|
||||
-- Source: data/landing/playtomic/{year}/{month}/availability_*.jsonl.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_playtomic_availability,
|
||||
@@ -27,7 +19,6 @@ MODEL (
|
||||
);
|
||||
|
||||
WITH
|
||||
-- New format: one venue per JSONL line — no outer UNNEST needed
|
||||
morning_jsonl AS (
|
||||
SELECT
|
||||
date AS snapshot_date,
|
||||
@@ -50,35 +41,6 @@ morning_jsonl AS (
|
||||
WHERE filename NOT LIKE '%_recheck_%'
|
||||
AND tenant_id IS NOT NULL
|
||||
),
|
||||
-- Old format: {"date":..., "venues": [...]} blob — kept for transition
|
||||
morning_blob AS (
|
||||
SELECT
|
||||
af.date AS snapshot_date,
|
||||
af.captured_at_utc,
|
||||
'morning' AS snapshot_type,
|
||||
NULL::INTEGER AS recheck_hour,
|
||||
venue_json ->> 'tenant_id' AS tenant_id,
|
||||
venue_json -> 'slots' AS slots_json
|
||||
FROM (
|
||||
SELECT date, captured_at_utc, venues
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
|
||||
format = 'auto',
|
||||
columns = {
|
||||
date: 'VARCHAR',
|
||||
captured_at_utc: 'VARCHAR',
|
||||
venues: 'JSON[]'
|
||||
},
|
||||
filename = true,
|
||||
maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count
|
||||
)
|
||||
WHERE filename NOT LIKE '%_recheck_%'
|
||||
AND venues IS NOT NULL
|
||||
AND json_array_length(venues) > 0
|
||||
) af,
|
||||
LATERAL UNNEST(af.venues) AS t(venue_json)
|
||||
),
|
||||
-- Recheck snapshots (new JSONL format — one venue per line)
|
||||
recheck_jsonl AS (
|
||||
SELECT
|
||||
date AS snapshot_date,
|
||||
@@ -101,43 +63,10 @@ recheck_jsonl AS (
|
||||
)
|
||||
WHERE tenant_id IS NOT NULL
|
||||
),
|
||||
-- Recheck snapshots (old blob format, kept for transition)
|
||||
recheck_blob AS (
|
||||
SELECT
|
||||
rf.date AS snapshot_date,
|
||||
rf.captured_at_utc,
|
||||
'recheck' AS snapshot_type,
|
||||
TRY_CAST(
|
||||
regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER
|
||||
) AS recheck_hour,
|
||||
venue_json ->> 'tenant_id' AS tenant_id,
|
||||
venue_json -> 'slots' AS slots_json
|
||||
FROM (
|
||||
SELECT date, captured_at_utc, venues, filename
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
|
||||
format = 'auto',
|
||||
columns = {
|
||||
date: 'VARCHAR',
|
||||
captured_at_utc: 'VARCHAR',
|
||||
venues: 'JSON[]'
|
||||
},
|
||||
filename = true,
|
||||
maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit
|
||||
)
|
||||
WHERE venues IS NOT NULL
|
||||
AND json_array_length(venues) > 0
|
||||
) rf,
|
||||
LATERAL UNNEST(rf.venues) AS t(venue_json)
|
||||
),
|
||||
all_venues AS (
|
||||
SELECT * FROM morning_jsonl
|
||||
UNION ALL
|
||||
SELECT * FROM morning_blob
|
||||
UNION ALL
|
||||
SELECT * FROM recheck_jsonl
|
||||
UNION ALL
|
||||
SELECT * FROM recheck_blob
|
||||
),
|
||||
raw_resources AS (
|
||||
SELECT
|
||||
|
||||
@@ -5,11 +5,7 @@
|
||||
-- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal
|
||||
-- key (no dynamic access) and UNION ALL to unpivot.
|
||||
--
|
||||
-- Supports two landing formats (UNION ALL during migration):
|
||||
-- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column
|
||||
-- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required)
|
||||
--
|
||||
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
||||
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_playtomic_opening_hours,
|
||||
@@ -19,40 +15,18 @@ MODEL (
|
||||
);
|
||||
|
||||
WITH
|
||||
-- New format: one tenant per JSONL line
|
||||
jsonl_venues AS (
|
||||
venues AS (
|
||||
SELECT
|
||||
tenant_id,
|
||||
opening_hours AS oh
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
|
||||
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
|
||||
format = 'newline_delimited',
|
||||
columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'}
|
||||
)
|
||||
WHERE tenant_id IS NOT NULL
|
||||
AND opening_hours IS NOT NULL
|
||||
),
|
||||
-- Old format: blob
|
||||
blob_venues AS (
|
||||
SELECT
|
||||
tenant ->> 'tenant_id' AS tenant_id,
|
||||
tenant -> 'opening_hours' AS oh
|
||||
FROM (
|
||||
SELECT UNNEST(tenants) AS tenant
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
|
||||
format = 'auto',
|
||||
maximum_object_size = 134217728
|
||||
)
|
||||
)
|
||||
WHERE (tenant ->> 'tenant_id') IS NOT NULL
|
||||
AND (tenant -> 'opening_hours') IS NOT NULL
|
||||
),
|
||||
venues AS (
|
||||
SELECT * FROM jsonl_venues
|
||||
UNION ALL
|
||||
SELECT * FROM blob_venues
|
||||
),
|
||||
-- Unpivot by UNION ALL — 7 literal key accesses
|
||||
unpivoted AS (
|
||||
SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number,
|
||||
@@ -104,6 +78,4 @@ SELECT
|
||||
FROM unpivoted
|
||||
WHERE opening_time IS NOT NULL
|
||||
AND closing_time IS NOT NULL
|
||||
-- Enforce grain: if both old blob and new JSONL exist for the same month,
|
||||
-- the UNION ALL produces duplicate (tenant_id, day_of_week) pairs — deduplicate.
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week ORDER BY tenant_id) = 1
|
||||
|
||||
@@ -2,11 +2,7 @@
|
||||
-- Reads resources array from the landing zone to extract court type, size,
|
||||
-- surface, and booking config.
|
||||
--
|
||||
-- Supports two landing formats (UNION ALL during migration):
|
||||
-- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column
|
||||
-- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources)
|
||||
--
|
||||
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
||||
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_playtomic_resources,
|
||||
@@ -16,41 +12,18 @@ MODEL (
|
||||
);
|
||||
|
||||
WITH
|
||||
-- New format: one tenant per JSONL line — single UNNEST for resources
|
||||
jsonl_unnested AS (
|
||||
unnested AS (
|
||||
SELECT
|
||||
tenant_id,
|
||||
UPPER(address ->> 'country_code') AS country_code,
|
||||
UNNEST(from_json(resources, '["JSON"]')) AS resource_json
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
|
||||
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
|
||||
format = 'newline_delimited',
|
||||
columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'}
|
||||
)
|
||||
WHERE tenant_id IS NOT NULL
|
||||
AND resources IS NOT NULL
|
||||
),
|
||||
-- Old format: blob — double UNNEST (tenants → resources)
|
||||
blob_unnested AS (
|
||||
SELECT
|
||||
tenant ->> 'tenant_id' AS tenant_id,
|
||||
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
|
||||
UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json
|
||||
FROM (
|
||||
SELECT UNNEST(tenants) AS tenant
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
|
||||
format = 'auto',
|
||||
maximum_object_size = 134217728
|
||||
)
|
||||
)
|
||||
WHERE (tenant ->> 'tenant_id') IS NOT NULL
|
||||
AND (tenant -> 'resources') IS NOT NULL
|
||||
),
|
||||
unnested AS (
|
||||
SELECT * FROM jsonl_unnested
|
||||
UNION ALL
|
||||
SELECT * FROM blob_unnested
|
||||
)
|
||||
SELECT
|
||||
tenant_id,
|
||||
@@ -68,6 +41,4 @@ SELECT
|
||||
FROM unnested
|
||||
WHERE (resource_json ->> 'resource_id') IS NOT NULL
|
||||
AND (resource_json ->> 'sport_id') = 'PADEL'
|
||||
-- Enforce grain: if both old blob and new JSONL exist for the same month,
|
||||
-- the UNION ALL produces duplicate (tenant_id, resource_id) pairs — deduplicate.
|
||||
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_json ->> 'resource_id' ORDER BY tenant_id) = 1
|
||||
|
||||
@@ -3,11 +3,7 @@
|
||||
-- including address, opening hours, court resources, VAT rate, and facilities.
|
||||
-- Deduplicates on tenant_id (keeps most recent extraction).
|
||||
--
|
||||
-- Supports two landing formats (UNION ALL during migration):
|
||||
-- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed)
|
||||
-- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required)
|
||||
--
|
||||
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
|
||||
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_playtomic_venues,
|
||||
@@ -17,8 +13,7 @@ MODEL (
|
||||
);
|
||||
|
||||
WITH
|
||||
-- New format: one tenant per JSONL line — no UNNEST, access columns directly
|
||||
jsonl_parsed AS (
|
||||
parsed AS (
|
||||
SELECT
|
||||
tenant_id,
|
||||
tenant_name,
|
||||
@@ -45,7 +40,7 @@ jsonl_parsed AS (
|
||||
filename AS source_file,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
|
||||
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
|
||||
format = 'newline_delimited',
|
||||
filename = true,
|
||||
columns = {
|
||||
@@ -59,49 +54,6 @@ jsonl_parsed AS (
|
||||
)
|
||||
WHERE tenant_id IS NOT NULL
|
||||
),
|
||||
-- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out
|
||||
blob_parsed AS (
|
||||
SELECT
|
||||
tenant ->> 'tenant_id' AS tenant_id,
|
||||
tenant ->> 'tenant_name' AS tenant_name,
|
||||
tenant ->> 'slug' AS slug,
|
||||
tenant ->> 'tenant_type' AS tenant_type,
|
||||
tenant ->> 'tenant_status' AS tenant_status,
|
||||
tenant ->> 'playtomic_status' AS playtomic_status,
|
||||
tenant ->> 'booking_type' AS booking_type,
|
||||
tenant -> 'address' ->> 'street' AS street,
|
||||
tenant -> 'address' ->> 'city' AS city,
|
||||
tenant -> 'address' ->> 'postal_code' AS postal_code,
|
||||
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
|
||||
tenant -> 'address' ->> 'timezone' AS timezone,
|
||||
tenant -> 'address' ->> 'administrative_area' AS administrative_area,
|
||||
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat,
|
||||
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon,
|
||||
TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate,
|
||||
tenant ->> 'default_currency' AS default_currency,
|
||||
TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes,
|
||||
tenant -> 'opening_hours' AS opening_hours_json,
|
||||
tenant -> 'resources' AS resources_json,
|
||||
tenant ->> 'created_at' AS created_at,
|
||||
tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw,
|
||||
filename AS source_file,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM (
|
||||
SELECT UNNEST(tenants) AS tenant, filename
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
|
||||
format = 'auto',
|
||||
filename = true,
|
||||
maximum_object_size = 134217728
|
||||
)
|
||||
)
|
||||
WHERE (tenant ->> 'tenant_id') IS NOT NULL
|
||||
),
|
||||
parsed AS (
|
||||
SELECT * FROM jsonl_parsed
|
||||
UNION ALL
|
||||
SELECT * FROM blob_parsed
|
||||
),
|
||||
deduped AS (
|
||||
SELECT *,
|
||||
ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn
|
||||
|
||||
@@ -3,11 +3,7 @@
|
||||
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
|
||||
-- One row per geoname_id (GeoNames stable numeric identifier).
|
||||
--
|
||||
-- Supports two landing formats (UNION ALL during migration):
|
||||
-- New: cities_global.jsonl.gz — one city per line, columns directly accessible
|
||||
-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required)
|
||||
--
|
||||
-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz
|
||||
-- Source: data/landing/geonames/{year}/{month}/cities_global.jsonl.gz
|
||||
|
||||
MODEL (
|
||||
name staging.stg_population_geonames,
|
||||
@@ -16,17 +12,14 @@ MODEL (
|
||||
grain geoname_id
|
||||
);
|
||||
|
||||
WITH
|
||||
-- New format: one city per JSONL line
|
||||
jsonl_rows AS (
|
||||
SELECT
|
||||
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
|
||||
city_name,
|
||||
country_code,
|
||||
TRIM(city_name) AS city_name,
|
||||
UPPER(country_code) AS country_code,
|
||||
TRY_CAST(lat AS DOUBLE) AS lat,
|
||||
TRY_CAST(lon AS DOUBLE) AS lon,
|
||||
admin1_code,
|
||||
admin2_code,
|
||||
NULLIF(TRIM(admin1_code), '') AS admin1_code,
|
||||
NULLIF(TRIM(admin2_code), '') AS admin2_code,
|
||||
TRY_CAST(population AS BIGINT) AS population,
|
||||
TRY_CAST(ref_year AS INTEGER) AS ref_year,
|
||||
CURRENT_DATE AS extracted_date
|
||||
@@ -40,50 +33,8 @@ jsonl_rows AS (
|
||||
}
|
||||
)
|
||||
WHERE geoname_id IS NOT NULL
|
||||
),
|
||||
-- Old format: {"rows": [...]} blob — kept for transition
|
||||
blob_rows AS (
|
||||
SELECT
|
||||
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
|
||||
row ->> 'city_name' AS city_name,
|
||||
row ->> 'country_code' AS country_code,
|
||||
TRY_CAST(row ->> 'lat' AS DOUBLE) AS lat,
|
||||
TRY_CAST(row ->> 'lon' AS DOUBLE) AS lon,
|
||||
row ->> 'admin1_code' AS admin1_code,
|
||||
row ->> 'admin2_code' AS admin2_code,
|
||||
TRY_CAST(row ->> 'population' AS BIGINT) AS population,
|
||||
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
|
||||
CURRENT_DATE AS extracted_date
|
||||
FROM (
|
||||
SELECT UNNEST(rows) AS row
|
||||
FROM read_json(
|
||||
@LANDING_DIR || '/geonames/*/*/cities_global.json.gz',
|
||||
auto_detect = true,
|
||||
maximum_object_size = 40000000
|
||||
)
|
||||
)
|
||||
WHERE (row ->> 'geoname_id') IS NOT NULL
|
||||
),
|
||||
all_rows AS (
|
||||
SELECT * FROM jsonl_rows
|
||||
UNION ALL
|
||||
SELECT * FROM blob_rows
|
||||
)
|
||||
SELECT
|
||||
geoname_id,
|
||||
TRIM(city_name) AS city_name,
|
||||
UPPER(country_code) AS country_code,
|
||||
lat,
|
||||
lon,
|
||||
NULLIF(TRIM(admin1_code), '') AS admin1_code,
|
||||
NULLIF(TRIM(admin2_code), '') AS admin2_code,
|
||||
population,
|
||||
ref_year,
|
||||
extracted_date
|
||||
FROM all_rows
|
||||
WHERE population IS NOT NULL
|
||||
AND population IS NOT NULL
|
||||
AND population > 0
|
||||
AND geoname_id IS NOT NULL
|
||||
AND city_name IS NOT NULL
|
||||
AND lat IS NOT NULL
|
||||
AND lon IS NOT NULL
|
||||
|
||||
@@ -1,22 +1,19 @@
|
||||
"""Create minimal seed files for SQLMesh staging models that require landing data."""
|
||||
"""Create minimal seed files for SQLMesh staging models that require landing data.
|
||||
|
||||
Seeds are empty JSONL gzip files — they satisfy DuckDB's file-not-found check
|
||||
while contributing zero rows to the staging models.
|
||||
"""
|
||||
import gzip
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
seed = {
|
||||
"date": "1970-01-01",
|
||||
"captured_at_utc": "1970-01-01T00:00:00Z",
|
||||
"venue_count": 0,
|
||||
"venues_errored": 0,
|
||||
"venues": [],
|
||||
}
|
||||
morning = Path("data/landing/playtomic/1970/01/availability_1970-01-01.json.gz")
|
||||
recheck = Path("data/landing/playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz")
|
||||
# stg_playtomic_availability requires at least one morning and one recheck file
|
||||
morning = Path("data/landing/playtomic/1970/01/availability_1970-01-01.jsonl.gz")
|
||||
recheck = Path("data/landing/playtomic/1970/01/availability_1970-01-01_recheck_00.jsonl.gz")
|
||||
morning.parent.mkdir(parents=True, exist_ok=True)
|
||||
for p in [morning, recheck]:
|
||||
if not p.exists():
|
||||
with gzip.open(p, "wt") as f:
|
||||
json.dump(seed, f)
|
||||
with gzip.open(p, "wb") as f:
|
||||
pass # empty JSONL — 0 rows, no error
|
||||
print("created", p)
|
||||
else:
|
||||
print("exists ", p)
|
||||
|
||||
Reference in New Issue
Block a user