- stg_regional_income: expanded NUTS-1+2 (LENGTH IN 3,4), nuts_code rename, nuts_level - stg_nuts2_boundaries: new — ST_Read GISCO GeoJSON, bbox columns for spatial pre-filter - stg_income_usa: new — Census ACS state-level income staging model - dim_locations: spatial join replaces admin1_to_nuts1 VALUES CTE; us_income CTE with PPS normalisation (income/80610×30000); income cascade: NUTS-2→NUTS-1→US state→country - init_landing_seeds: compress=False for ST_Read files; gisco GeoJSON + census income seeds - CHANGELOG + PROJECT.md updated Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
130 lines
5.4 KiB
Python
130 lines
5.4 KiB
Python
"""Create minimal landing zone seed files so SQLMesh models can run before real data arrives.
|
|
|
|
Each seed contains one null/empty record that is filtered out by the staging model's
|
|
WHERE clause. Seeds live in the 1970/01 epoch so they're never confused with real data.
|
|
|
|
Usage:
|
|
uv run python scripts/init_landing_seeds.py [--landing-dir data/landing]
|
|
|
|
Idempotent: skips existing files.
|
|
"""
|
|
|
|
import argparse
|
|
import gzip
|
|
import json
|
|
from pathlib import Path
|
|
|
|
|
|
def create_seed(dest: Path, content: bytes, *, compress: bool = True) -> None:
|
|
"""Write content to a seed file atomically. Skips if the file already exists.
|
|
|
|
compress=True (default) writes gzipped content, suitable for all landing zone
|
|
files. compress=False writes raw bytes — required for files consumed by DuckDB
|
|
ST_Read (e.g. GeoJSON), which cannot read .gz files.
|
|
"""
|
|
if dest.exists():
|
|
return
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = dest.with_suffix(dest.suffix + ".tmp")
|
|
if compress:
|
|
with gzip.open(tmp, "wb") as f:
|
|
f.write(content)
|
|
else:
|
|
tmp.write_bytes(content)
|
|
tmp.rename(dest)
|
|
print(f" created: {dest}")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--landing-dir", default="data/landing", type=Path)
|
|
args = parser.parse_args()
|
|
base: Path = args.landing_dir
|
|
|
|
seeds = {
|
|
# --- Playtomic tenants ---
|
|
# JSONL: one null tenant (filtered by WHERE tenant_id IS NOT NULL)
|
|
"playtomic/1970/01/tenants.jsonl.gz":
|
|
b'{"tenant_id":null}\n',
|
|
# Blob: empty tenants array
|
|
"playtomic/1970/01/tenants.json.gz":
|
|
json.dumps({"tenants": [], "count": 0}).encode(),
|
|
|
|
# --- Playtomic availability (morning) ---
|
|
# JSONL: one null venue (filtered by WHERE tenant_id IS NOT NULL)
|
|
"playtomic/1970/01/availability_1970-01-01.jsonl.gz":
|
|
b'{"tenant_id":null,"date":"1970-01-01","captured_at_utc":"1970-01-01T00:00:00Z","slots":null}\n',
|
|
# Blob: empty venues array
|
|
"playtomic/1970/01/availability_1970-01-01.json.gz":
|
|
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
|
|
"venue_count": 0, "venues": []}).encode(),
|
|
|
|
# --- Playtomic recheck ---
|
|
# JSONL: one null venue (filtered by WHERE tenant_id IS NOT NULL)
|
|
"playtomic/1970/01/availability_1970-01-01_recheck_00.jsonl.gz":
|
|
b'{"tenant_id":null,"date":"1970-01-01","captured_at_utc":"1970-01-01T00:00:00Z","recheck_hour":0,"slots":null}\n',
|
|
# Blob: empty venues array (old format, kept for transition)
|
|
"playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz":
|
|
json.dumps({"date": "1970-01-01", "captured_at_utc": "1970-01-01T00:00:00Z",
|
|
"recheck_hour": 0, "venues": []}).encode(),
|
|
|
|
# --- GeoNames ---
|
|
# JSONL: one null city (filtered by WHERE geoname_id IS NOT NULL)
|
|
"geonames/1970/01/cities_global.jsonl.gz":
|
|
b'{"geoname_id":null}\n',
|
|
# Blob: empty rows array
|
|
"geonames/1970/01/cities_global.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
|
|
# --- Overpass tennis ---
|
|
# JSONL: one null element (filtered by WHERE type IS NOT NULL)
|
|
"overpass_tennis/1970/01/courts.jsonl.gz":
|
|
b'{"type":null,"id":null}\n',
|
|
# Blob: empty elements array
|
|
"overpass_tennis/1970/01/courts.json.gz":
|
|
json.dumps({"version": 0.6, "elements": []}).encode(),
|
|
|
|
# --- Overpass padel (unchanged format) ---
|
|
"overpass/1970/01/courts.json.gz":
|
|
json.dumps({"version": 0.6, "elements": []}).encode(),
|
|
|
|
# --- Eurostat ---
|
|
"eurostat/1970/01/urb_cpop1.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
"eurostat/1970/01/ilc_di03.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
"eurostat/1970/01/nama_10r_2hhinc.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
"census_usa/1970/01/acs5_state_income.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
"eurostat_city_labels/1970/01/cities_codelist.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
|
|
# --- National statistics ---
|
|
"ons_uk/1970/01/lad_population.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
"census_usa/1970/01/acs5_places.json.gz":
|
|
json.dumps({"rows": [], "count": 0}).encode(),
|
|
}
|
|
|
|
# Uncompressed seeds — required for files consumed by ST_Read (cannot read .gz)
|
|
uncompressed_seeds = {
|
|
# Empty NUTS-2 boundary placeholder so stg_nuts2_boundaries can run before
|
|
# the real file is downloaded via scripts/download_gisco_nuts.py.
|
|
# ST_Read on an empty FeatureCollection returns 0 rows (graceful degradation:
|
|
# all locations fall back to country-level income until the real file lands).
|
|
"gisco/1970/01/nuts2_boundaries.geojson":
|
|
b'{"type":"FeatureCollection","features":[]}',
|
|
}
|
|
|
|
print(f"Initialising landing seeds in: {base}")
|
|
for rel_path, content in seeds.items():
|
|
create_seed(base / rel_path, content)
|
|
for rel_path, content in uncompressed_seeds.items():
|
|
create_seed(base / rel_path, content, compress=False)
|
|
print("Done.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|