merge: daily tenant snapshots with date-based partition

This commit is contained in:
Deeman
2026-02-28 17:30:33 +01:00
3 changed files with 9 additions and 8 deletions

View File

@@ -80,8 +80,10 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]:
if not playtomic_dir.exists():
return []
# Prefer JSONL (new format), fall back to blob (old format)
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
# Prefer daily partition (YYYY/MM/DD), fall back to older monthly/weekly partitions
tenant_files = sorted(playtomic_dir.glob("*/*/*/tenants.jsonl.gz"), reverse=True)
if not tenant_files:
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
if not tenant_files:
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
if not tenant_files:

View File

@@ -81,12 +81,11 @@ def extract(
partitions and picks the most recent one.
"""
today = datetime.now(UTC)
year = today.strftime("%G") # ISO year (matches ISO week, differs from calendar year on week boundaries)
week = today.strftime("W%V") # ISO week: W01 … W53
dest_dir = landing_path(landing_dir, "playtomic", year, week)
year, month, day = today.strftime("%Y"), today.strftime("%m"), today.strftime("%d")
dest_dir = landing_path(landing_dir, "playtomic", year, month, day)
dest = dest_dir / "tenants.jsonl.gz"
if dest.exists():
logger.info("Already have tenants for %s/%s — skipping", year, week)
logger.info("Already have tenants for %s/%s/%s — skipping", year, month, day)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
tiers = load_proxy_tiers()
@@ -162,7 +161,7 @@ def extract(
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": f"{year}/{week}",
"cursor_value": f"{year}/{month}/{day}",
}

View File

@@ -23,7 +23,7 @@ schedule = "monthly"
[playtomic_tenants]
module = "padelnomics_extract.playtomic_tenants"
schedule = "weekly"
schedule = "daily"
[playtomic_availability]
module = "padelnomics_extract.playtomic_availability"