diff --git a/extract/padelnomics_extract/pyproject.toml b/extract/padelnomics_extract/pyproject.toml index ec3ca82..b5e7910 100644 --- a/extract/padelnomics_extract/pyproject.toml +++ b/extract/padelnomics_extract/pyproject.toml @@ -21,6 +21,7 @@ extract-census-usa = "padelnomics_extract.census_usa:main" extract-census-usa-income = "padelnomics_extract.census_usa_income:main" extract-ons-uk = "padelnomics_extract.ons_uk:main" extract-geonames = "padelnomics_extract.geonames:main" +extract-gisco = "padelnomics_extract.gisco:main" [build-system] requires = ["hatchling"] diff --git a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py index fae5891..8ce30f0 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/_shared.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/_shared.py @@ -11,9 +11,12 @@ from datetime import UTC, datetime from pathlib import Path import niquests +from dotenv import load_dotenv from .utils import end_run, open_state_db, start_run +load_dotenv() + LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing")) HTTP_TIMEOUT_SECONDS = 30 diff --git a/extract/padelnomics_extract/src/padelnomics_extract/all.py b/extract/padelnomics_extract/src/padelnomics_extract/all.py index 62987ce..6796fed 100644 --- a/extract/padelnomics_extract/src/padelnomics_extract/all.py +++ b/extract/padelnomics_extract/src/padelnomics_extract/all.py @@ -7,7 +7,7 @@ A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies run immediately in parallel; each completion may unlock new tasks. Current dependency graph: - - All 8 non-availability extractors have no dependencies (run in parallel) + - All 9 non-availability extractors have no dependencies (run in parallel) - playtomic_availability depends on playtomic_tenants (starts as soon as tenants finishes, even if other extractors are still running) """ @@ -26,6 +26,8 @@ from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME from .eurostat_city_labels import extract as extract_eurostat_city_labels from .geonames import EXTRACTOR_NAME as GEONAMES_NAME from .geonames import extract as extract_geonames +from .gisco import EXTRACTOR_NAME as GISCO_NAME +from .gisco import extract as extract_gisco from .ons_uk import EXTRACTOR_NAME as ONS_UK_NAME from .ons_uk import extract as extract_ons_uk from .overpass import EXTRACTOR_NAME as OVERPASS_NAME @@ -50,6 +52,7 @@ EXTRACTORS: dict[str, tuple] = { CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []), ONS_UK_NAME: (extract_ons_uk, []), GEONAMES_NAME: (extract_geonames, []), + GISCO_NAME: (extract_gisco, []), TENANTS_NAME: (extract_tenants, []), AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]), } diff --git a/extract/padelnomics_extract/src/padelnomics_extract/gisco.py b/extract/padelnomics_extract/src/padelnomics_extract/gisco.py new file mode 100644 index 0000000..8fd43fb --- /dev/null +++ b/extract/padelnomics_extract/src/padelnomics_extract/gisco.py @@ -0,0 +1,95 @@ +"""GISCO NUTS-2 boundary GeoJSON extractor. + +Downloads NUTS-2 boundary polygons from Eurostat GISCO. The file is stored +uncompressed because DuckDB's ST_Read cannot read gzipped files. + +NUTS classification revises approximately every 7 years (current: 2021). +The partition path is fixed to the revision year, not the run date, making +the source version explicit. Cursor tracking still uses year_month to avoid +re-downloading on every monthly run. + +Landing: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5 MB, uncompressed) +""" + +import sqlite3 +from pathlib import Path + +import niquests + +from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging +from .utils import get_last_cursor + +logger = setup_logging("padelnomics.extract.gisco") + +EXTRACTOR_NAME = "gisco" + +# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only. +# 20M resolution gives simplified polygons that are fast for point-in-polygon +# matching without sacrificing accuracy at the NUTS-2 boundary level. +GISCO_URL = ( + "https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/" + "NUTS_RG_20M_2021_4326_LEVL_2.geojson" +) + +# Fixed partition: NUTS boundaries are a static reference file, not time-series data. +# The 2024/01 partition reflects when this NUTS 2021 dataset was first ingested. +DEST_REL = Path("gisco/2024/01/nuts2_boundaries.geojson") + +_GISCO_TIMEOUT_SECONDS = HTTP_TIMEOUT_SECONDS * 4 # ~5 MB; generous for slow upstreams + + +def extract( + landing_dir: Path, + year_month: str, + conn: sqlite3.Connection, + session: niquests.Session, +) -> dict: + """Download NUTS-2 GeoJSON. Skips if already run this month or file exists.""" + last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) + if last_cursor == year_month: + logger.info("already ran for %s — skipping", year_month) + return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} + + dest = landing_dir / DEST_REL + if dest.exists(): + logger.info("file already exists (skipping download): %s", dest) + return { + "files_written": 0, + "files_skipped": 1, + "bytes_written": 0, + "cursor_value": year_month, + } + + dest.parent.mkdir(parents=True, exist_ok=True) + logger.info("GET %s", GISCO_URL) + resp = session.get(GISCO_URL, timeout=_GISCO_TIMEOUT_SECONDS) + resp.raise_for_status() + + content = resp.content + assert len(content) > 100_000, ( + f"GeoJSON too small ({len(content)} bytes) — download may have failed" + ) + assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON" + + # Write uncompressed — ST_Read requires a plain file, not .gz + tmp = dest.with_suffix(".geojson.tmp") + tmp.write_bytes(content) + tmp.rename(dest) + + size_mb = len(content) / 1_000_000 + logger.info("written %s (%.1f MB)", dest, size_mb) + + return { + "files_written": 1, + "files_skipped": 0, + "bytes_written": len(content), + "cursor_value": year_month, + } + + +def main() -> None: + run_extractor(EXTRACTOR_NAME, extract) + + +if __name__ == "__main__": + main() diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml index a415a41..e4a18b9 100644 --- a/infra/supervisor/workflows.toml +++ b/infra/supervisor/workflows.toml @@ -39,3 +39,23 @@ module = "padelnomics_extract.playtomic_availability" entry = "main_recheck" schedule = "0,30 6-23 * * *" depends_on = ["playtomic_availability"] + +[census_usa] +module = "padelnomics_extract.census_usa" +schedule = "monthly" + +[census_usa_income] +module = "padelnomics_extract.census_usa_income" +schedule = "monthly" + +[eurostat_city_labels] +module = "padelnomics_extract.eurostat_city_labels" +schedule = "monthly" + +[ons_uk] +module = "padelnomics_extract.ons_uk" +schedule = "monthly" + +[gisco] +module = "padelnomics_extract.gisco" +schedule = "monthly" diff --git a/scripts/download_gisco_nuts.py b/scripts/download_gisco_nuts.py deleted file mode 100644 index 5a7737b..0000000 --- a/scripts/download_gisco_nuts.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Download NUTS-2 boundary GeoJSON from Eurostat GISCO. - -One-time (or on NUTS revision) download of NUTS-2 boundary polygons used for -spatial income resolution in dim_locations. Stored uncompressed because DuckDB's -ST_Read function cannot read gzipped files. - -NUTS classification changes approximately every 7 years. Current revision: 2021. - -Output: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5MB, uncompressed) - -Usage: - uv run python scripts/download_gisco_nuts.py [--landing-dir data/landing] - -Idempotent: skips download if the file already exists. -""" - -import argparse -import sys -from pathlib import Path - -import niquests - -# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only. -# 20M resolution gives simplified polygons that are fast for point-in-polygon -# matching without sacrificing accuracy at the NUTS-2 boundary level. -GISCO_URL = ( - "https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/" - "NUTS_RG_20M_2021_4326_LEVL_2.geojson" -) - -# Fixed partition: NUTS boundaries are a static reference file, not time-series data. -# Use the NUTS revision year as the partition to make the source version explicit. -DEST_REL_PATH = "gisco/2024/01/nuts2_boundaries.geojson" - -HTTP_TIMEOUT_SECONDS = 120 - - -def download_nuts_boundaries(landing_dir: Path) -> None: - dest = landing_dir / DEST_REL_PATH - if dest.exists(): - print(f"Already exists (skipping): {dest}") - return - - dest.parent.mkdir(parents=True, exist_ok=True) - print(f"Downloading NUTS-2 boundaries from GISCO...") - print(f" URL: {GISCO_URL}") - - with niquests.Session() as session: - resp = session.get(GISCO_URL, timeout=HTTP_TIMEOUT_SECONDS) - resp.raise_for_status() - - content = resp.content - assert len(content) > 100_000, ( - f"GeoJSON too small ({len(content)} bytes) — download may have failed" - ) - assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON" - - # Write uncompressed — ST_Read requires a plain file - tmp = dest.with_suffix(".geojson.tmp") - tmp.write_bytes(content) - tmp.rename(dest) - - size_mb = len(content) / 1_000_000 - print(f" Written: {dest} ({size_mb:.1f} MB)") - print("Done. Run SQLMesh plan to rebuild stg_nuts2_boundaries.") - - -def main() -> None: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("--landing-dir", default="data/landing", type=Path) - args = parser.parse_args() - - if not args.landing_dir.is_dir(): - print(f"Error: landing dir does not exist: {args.landing_dir}", file=sys.stderr) - sys.exit(1) - - download_nuts_boundaries(args.landing_dir) - - -if __name__ == "__main__": - main()