fix(staging): enforce grain dedup in resources + opening_hours + skip old blob in tenants
Both stg_playtomic_resources and stg_playtomic_opening_hours lacked QUALIFY ROW_NUMBER() dedup despite declaring a grain. When both tenants.json.gz (old) and tenants.jsonl.gz (new) exist for the same month, the UNION ALL produced exactly 2× rows. Fixes: - stg_playtomic_resources: QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_id) - stg_playtomic_opening_hours: QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week) - playtomic_tenants.py: skip if old blob OR new JSONL already exists for the month, preventing same-month dual-format writes that trigger the duplicate Row counts after fix: ~43.8K resources, ~93.4K opening_hours (was 87.6K, 186.8K). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -77,6 +77,10 @@ def extract(
|
|||||||
year, month = year_month.split("/")
|
year, month = year_month.split("/")
|
||||||
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
||||||
dest = dest_dir / "tenants.jsonl.gz"
|
dest = dest_dir / "tenants.jsonl.gz"
|
||||||
|
old_blob = dest_dir / "tenants.json.gz"
|
||||||
|
if dest.exists() or old_blob.exists():
|
||||||
|
logger.info("Already have tenants for %s/%s — skipping", year, month)
|
||||||
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
proxy_urls = load_proxy_urls()
|
proxy_urls = load_proxy_urls()
|
||||||
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
|
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
|
||||||
|
|||||||
@@ -104,3 +104,6 @@ SELECT
|
|||||||
FROM unpivoted
|
FROM unpivoted
|
||||||
WHERE opening_time IS NOT NULL
|
WHERE opening_time IS NOT NULL
|
||||||
AND closing_time IS NOT NULL
|
AND closing_time IS NOT NULL
|
||||||
|
-- Enforce grain: if both old blob and new JSONL exist for the same month,
|
||||||
|
-- the UNION ALL produces duplicate (tenant_id, day_of_week) pairs — deduplicate.
|
||||||
|
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week ORDER BY tenant_id) = 1
|
||||||
|
|||||||
@@ -68,3 +68,6 @@ SELECT
|
|||||||
FROM unnested
|
FROM unnested
|
||||||
WHERE (resource_json ->> 'resource_id') IS NOT NULL
|
WHERE (resource_json ->> 'resource_id') IS NOT NULL
|
||||||
AND (resource_json ->> 'sport_id') = 'PADEL'
|
AND (resource_json ->> 'sport_id') = 'PADEL'
|
||||||
|
-- Enforce grain: if both old blob and new JSONL exist for the same month,
|
||||||
|
-- the UNION ALL produces duplicate (tenant_id, resource_id) pairs — deduplicate.
|
||||||
|
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_json ->> 'resource_id' ORDER BY tenant_id) = 1
|
||||||
|
|||||||
Reference in New Issue
Block a user