feat(extract): convert playtomic_tenants to JSONL output
- playtomic_tenants.py: write each tenant as a JSONL line after dedup, compress via compress_jsonl_atomic → tenants.jsonl.gz - playtomic_availability.py: update _load_tenant_ids() to prefer tenants.jsonl.gz, fall back to tenants.json.gz (transition) - stg_playtomic_venues.sql: UNION ALL jsonl+blob CTEs for transition; JSONL reads top-level columns directly, no UNNEST(tenants) needed - stg_playtomic_resources.sql: same UNION ALL pattern, single UNNEST for resources in JSONL path vs double UNNEST in blob path - stg_playtomic_opening_hours.sql: same UNION ALL pattern, opening_hours as top-level JSON column in JSONL path Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -65,28 +65,44 @@ _thread_local = threading.local()
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_tenant_ids(landing_dir: Path) -> list[str]:
|
||||
"""Read tenant IDs from the most recent tenants.json.gz file."""
|
||||
"""Read tenant IDs from the most recent tenants file (JSONL or blob format)."""
|
||||
assert landing_dir.is_dir(), f"landing_dir must exist: {landing_dir}"
|
||||
playtomic_dir = landing_dir / "playtomic"
|
||||
if not playtomic_dir.exists():
|
||||
return []
|
||||
|
||||
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
|
||||
# Prefer JSONL (new format), fall back to blob (old format)
|
||||
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
|
||||
if not tenant_files:
|
||||
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
|
||||
if not tenant_files:
|
||||
return []
|
||||
|
||||
latest = tenant_files[0]
|
||||
logger.info("Loading tenant IDs from %s", latest)
|
||||
|
||||
with gzip.open(latest, "rb") as f:
|
||||
data = json.loads(f.read())
|
||||
|
||||
tenants = data.get("tenants", [])
|
||||
ids = []
|
||||
for t in tenants:
|
||||
tid = t.get("tenant_id") or t.get("id")
|
||||
if tid:
|
||||
ids.append(tid)
|
||||
|
||||
with gzip.open(latest, "rt") as f:
|
||||
if latest.name.endswith(".jsonl.gz"):
|
||||
# JSONL: one tenant object per line
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
record = json.loads(line)
|
||||
tid = record.get("tenant_id") or record.get("id")
|
||||
if tid:
|
||||
ids.append(tid)
|
||||
except json.JSONDecodeError:
|
||||
break # truncated last line
|
||||
else:
|
||||
# Blob: {"tenants": [...]}
|
||||
data = json.loads(f.read())
|
||||
for t in data.get("tenants", []):
|
||||
tid = t.get("tenant_id") or t.get("id")
|
||||
if tid:
|
||||
ids.append(tid)
|
||||
|
||||
logger.info("Loaded %d tenant IDs", len(ids))
|
||||
return ids
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""Playtomic tenants extractor — venue listings via unauthenticated API.
|
||||
|
||||
Paginates through the global tenant list (sorted by UUID) using the `page`
|
||||
parameter. Deduplicates on tenant_id and writes a single consolidated JSON
|
||||
to the landing zone.
|
||||
parameter. Deduplicates on tenant_id and writes a gzipped JSONL file to the
|
||||
landing zone (one tenant object per line).
|
||||
|
||||
API notes (discovered 2026-02):
|
||||
- bbox params (min_latitude etc.) are silently ignored by the API
|
||||
@@ -18,7 +18,7 @@ pages.
|
||||
|
||||
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
|
||||
|
||||
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.json.gz
|
||||
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
|
||||
"""
|
||||
|
||||
import json
|
||||
@@ -31,7 +31,7 @@ import niquests
|
||||
|
||||
from ._shared import HTTP_TIMEOUT_SECONDS, USER_AGENT, run_extractor, setup_logging
|
||||
from .proxy import load_proxy_urls, make_round_robin_cycler
|
||||
from .utils import landing_path, write_gzip_atomic
|
||||
from .utils import compress_jsonl_atomic, landing_path
|
||||
|
||||
logger = setup_logging("padelnomics.extract.playtomic_tenants")
|
||||
|
||||
@@ -76,7 +76,7 @@ def extract(
|
||||
"""Fetch all Playtomic venues via global pagination. Returns run metrics."""
|
||||
year, month = year_month.split("/")
|
||||
dest_dir = landing_path(landing_dir, "playtomic", year, month)
|
||||
dest = dest_dir / "tenants.json.gz"
|
||||
dest = dest_dir / "tenants.jsonl.gz"
|
||||
|
||||
proxy_urls = load_proxy_urls()
|
||||
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
|
||||
@@ -138,8 +138,12 @@ def extract(
|
||||
if not next_proxy:
|
||||
time.sleep(THROTTLE_SECONDS)
|
||||
|
||||
payload = json.dumps({"tenants": all_tenants, "count": len(all_tenants)}).encode()
|
||||
bytes_written = write_gzip_atomic(dest, payload)
|
||||
# Write each tenant as a JSONL line, then compress atomically
|
||||
working_path = dest.with_suffix(".working.jsonl")
|
||||
with open(working_path, "w") as f:
|
||||
for tenant in all_tenants:
|
||||
f.write(json.dumps(tenant, separators=(",", ":")) + "\n")
|
||||
bytes_written = compress_jsonl_atomic(working_path, dest)
|
||||
logger.info("%d unique venues -> %s", len(all_tenants), dest)
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user