Compare commits
3 Commits
v202603011
...
v202603011
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75305935bd | ||
|
|
a15c32d398 | ||
|
|
97c5846d51 |
@@ -21,6 +21,7 @@ extract-census-usa = "padelnomics_extract.census_usa:main"
|
|||||||
extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
|
extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
|
||||||
extract-ons-uk = "padelnomics_extract.ons_uk:main"
|
extract-ons-uk = "padelnomics_extract.ons_uk:main"
|
||||||
extract-geonames = "padelnomics_extract.geonames:main"
|
extract-geonames = "padelnomics_extract.geonames:main"
|
||||||
|
extract-gisco = "padelnomics_extract.gisco:main"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["hatchling"]
|
requires = ["hatchling"]
|
||||||
|
|||||||
@@ -11,9 +11,12 @@ from datetime import UTC, datetime
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import niquests
|
import niquests
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
from .utils import end_run, open_state_db, start_run
|
from .utils import end_run, open_state_db, start_run
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
|
LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
|
||||||
|
|
||||||
HTTP_TIMEOUT_SECONDS = 30
|
HTTP_TIMEOUT_SECONDS = 30
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies
|
|||||||
run immediately in parallel; each completion may unlock new tasks.
|
run immediately in parallel; each completion may unlock new tasks.
|
||||||
|
|
||||||
Current dependency graph:
|
Current dependency graph:
|
||||||
- All 8 non-availability extractors have no dependencies (run in parallel)
|
- All 9 non-availability extractors have no dependencies (run in parallel)
|
||||||
- playtomic_availability depends on playtomic_tenants (starts as soon as
|
- playtomic_availability depends on playtomic_tenants (starts as soon as
|
||||||
tenants finishes, even if other extractors are still running)
|
tenants finishes, even if other extractors are still running)
|
||||||
"""
|
"""
|
||||||
@@ -26,6 +26,8 @@ from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME
|
|||||||
from .eurostat_city_labels import extract as extract_eurostat_city_labels
|
from .eurostat_city_labels import extract as extract_eurostat_city_labels
|
||||||
from .geonames import EXTRACTOR_NAME as GEONAMES_NAME
|
from .geonames import EXTRACTOR_NAME as GEONAMES_NAME
|
||||||
from .geonames import extract as extract_geonames
|
from .geonames import extract as extract_geonames
|
||||||
|
from .gisco import EXTRACTOR_NAME as GISCO_NAME
|
||||||
|
from .gisco import extract as extract_gisco
|
||||||
from .ons_uk import EXTRACTOR_NAME as ONS_UK_NAME
|
from .ons_uk import EXTRACTOR_NAME as ONS_UK_NAME
|
||||||
from .ons_uk import extract as extract_ons_uk
|
from .ons_uk import extract as extract_ons_uk
|
||||||
from .overpass import EXTRACTOR_NAME as OVERPASS_NAME
|
from .overpass import EXTRACTOR_NAME as OVERPASS_NAME
|
||||||
@@ -50,6 +52,7 @@ EXTRACTORS: dict[str, tuple] = {
|
|||||||
CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []),
|
CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []),
|
||||||
ONS_UK_NAME: (extract_ons_uk, []),
|
ONS_UK_NAME: (extract_ons_uk, []),
|
||||||
GEONAMES_NAME: (extract_geonames, []),
|
GEONAMES_NAME: (extract_geonames, []),
|
||||||
|
GISCO_NAME: (extract_gisco, []),
|
||||||
TENANTS_NAME: (extract_tenants, []),
|
TENANTS_NAME: (extract_tenants, []),
|
||||||
AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]),
|
AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]),
|
||||||
}
|
}
|
||||||
|
|||||||
95
extract/padelnomics_extract/src/padelnomics_extract/gisco.py
Normal file
95
extract/padelnomics_extract/src/padelnomics_extract/gisco.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
"""GISCO NUTS-2 boundary GeoJSON extractor.
|
||||||
|
|
||||||
|
Downloads NUTS-2 boundary polygons from Eurostat GISCO. The file is stored
|
||||||
|
uncompressed because DuckDB's ST_Read cannot read gzipped files.
|
||||||
|
|
||||||
|
NUTS classification revises approximately every 7 years (current: 2021).
|
||||||
|
The partition path is fixed to the revision year, not the run date, making
|
||||||
|
the source version explicit. Cursor tracking still uses year_month to avoid
|
||||||
|
re-downloading on every monthly run.
|
||||||
|
|
||||||
|
Landing: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5 MB, uncompressed)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import niquests
|
||||||
|
|
||||||
|
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
|
||||||
|
from .utils import get_last_cursor
|
||||||
|
|
||||||
|
logger = setup_logging("padelnomics.extract.gisco")
|
||||||
|
|
||||||
|
EXTRACTOR_NAME = "gisco"
|
||||||
|
|
||||||
|
# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only.
|
||||||
|
# 20M resolution gives simplified polygons that are fast for point-in-polygon
|
||||||
|
# matching without sacrificing accuracy at the NUTS-2 boundary level.
|
||||||
|
GISCO_URL = (
|
||||||
|
"https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/"
|
||||||
|
"NUTS_RG_20M_2021_4326_LEVL_2.geojson"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fixed partition: NUTS boundaries are a static reference file, not time-series data.
|
||||||
|
# The 2024/01 partition reflects when this NUTS 2021 dataset was first ingested.
|
||||||
|
DEST_REL = Path("gisco/2024/01/nuts2_boundaries.geojson")
|
||||||
|
|
||||||
|
_GISCO_TIMEOUT_SECONDS = HTTP_TIMEOUT_SECONDS * 4 # ~5 MB; generous for slow upstreams
|
||||||
|
|
||||||
|
|
||||||
|
def extract(
|
||||||
|
landing_dir: Path,
|
||||||
|
year_month: str,
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
session: niquests.Session,
|
||||||
|
) -> dict:
|
||||||
|
"""Download NUTS-2 GeoJSON. Skips if already run this month or file exists."""
|
||||||
|
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
|
||||||
|
if last_cursor == year_month:
|
||||||
|
logger.info("already ran for %s — skipping", year_month)
|
||||||
|
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
|
||||||
|
|
||||||
|
dest = landing_dir / DEST_REL
|
||||||
|
if dest.exists():
|
||||||
|
logger.info("file already exists (skipping download): %s", dest)
|
||||||
|
return {
|
||||||
|
"files_written": 0,
|
||||||
|
"files_skipped": 1,
|
||||||
|
"bytes_written": 0,
|
||||||
|
"cursor_value": year_month,
|
||||||
|
}
|
||||||
|
|
||||||
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
logger.info("GET %s", GISCO_URL)
|
||||||
|
resp = session.get(GISCO_URL, timeout=_GISCO_TIMEOUT_SECONDS)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
content = resp.content
|
||||||
|
assert len(content) > 100_000, (
|
||||||
|
f"GeoJSON too small ({len(content)} bytes) — download may have failed"
|
||||||
|
)
|
||||||
|
assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON"
|
||||||
|
|
||||||
|
# Write uncompressed — ST_Read requires a plain file, not .gz
|
||||||
|
tmp = dest.with_suffix(".geojson.tmp")
|
||||||
|
tmp.write_bytes(content)
|
||||||
|
tmp.rename(dest)
|
||||||
|
|
||||||
|
size_mb = len(content) / 1_000_000
|
||||||
|
logger.info("written %s (%.1f MB)", dest, size_mb)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"files_written": 1,
|
||||||
|
"files_skipped": 0,
|
||||||
|
"bytes_written": len(content),
|
||||||
|
"cursor_value": year_month,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
run_extractor(EXTRACTOR_NAME, extract)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -39,3 +39,23 @@ module = "padelnomics_extract.playtomic_availability"
|
|||||||
entry = "main_recheck"
|
entry = "main_recheck"
|
||||||
schedule = "0,30 6-23 * * *"
|
schedule = "0,30 6-23 * * *"
|
||||||
depends_on = ["playtomic_availability"]
|
depends_on = ["playtomic_availability"]
|
||||||
|
|
||||||
|
[census_usa]
|
||||||
|
module = "padelnomics_extract.census_usa"
|
||||||
|
schedule = "monthly"
|
||||||
|
|
||||||
|
[census_usa_income]
|
||||||
|
module = "padelnomics_extract.census_usa_income"
|
||||||
|
schedule = "monthly"
|
||||||
|
|
||||||
|
[eurostat_city_labels]
|
||||||
|
module = "padelnomics_extract.eurostat_city_labels"
|
||||||
|
schedule = "monthly"
|
||||||
|
|
||||||
|
[ons_uk]
|
||||||
|
module = "padelnomics_extract.ons_uk"
|
||||||
|
schedule = "monthly"
|
||||||
|
|
||||||
|
[gisco]
|
||||||
|
module = "padelnomics_extract.gisco"
|
||||||
|
schedule = "monthly"
|
||||||
|
|||||||
@@ -1,81 +0,0 @@
|
|||||||
"""Download NUTS-2 boundary GeoJSON from Eurostat GISCO.
|
|
||||||
|
|
||||||
One-time (or on NUTS revision) download of NUTS-2 boundary polygons used for
|
|
||||||
spatial income resolution in dim_locations. Stored uncompressed because DuckDB's
|
|
||||||
ST_Read function cannot read gzipped files.
|
|
||||||
|
|
||||||
NUTS classification changes approximately every 7 years. Current revision: 2021.
|
|
||||||
|
|
||||||
Output: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5MB, uncompressed)
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
uv run python scripts/download_gisco_nuts.py [--landing-dir data/landing]
|
|
||||||
|
|
||||||
Idempotent: skips download if the file already exists.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import argparse
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import niquests
|
|
||||||
|
|
||||||
# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only.
|
|
||||||
# 20M resolution gives simplified polygons that are fast for point-in-polygon
|
|
||||||
# matching without sacrificing accuracy at the NUTS-2 boundary level.
|
|
||||||
GISCO_URL = (
|
|
||||||
"https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/"
|
|
||||||
"NUTS_RG_20M_2021_4326_LEVL_2.geojson"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fixed partition: NUTS boundaries are a static reference file, not time-series data.
|
|
||||||
# Use the NUTS revision year as the partition to make the source version explicit.
|
|
||||||
DEST_REL_PATH = "gisco/2024/01/nuts2_boundaries.geojson"
|
|
||||||
|
|
||||||
HTTP_TIMEOUT_SECONDS = 120
|
|
||||||
|
|
||||||
|
|
||||||
def download_nuts_boundaries(landing_dir: Path) -> None:
|
|
||||||
dest = landing_dir / DEST_REL_PATH
|
|
||||||
if dest.exists():
|
|
||||||
print(f"Already exists (skipping): {dest}")
|
|
||||||
return
|
|
||||||
|
|
||||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
||||||
print(f"Downloading NUTS-2 boundaries from GISCO...")
|
|
||||||
print(f" URL: {GISCO_URL}")
|
|
||||||
|
|
||||||
with niquests.Session() as session:
|
|
||||||
resp = session.get(GISCO_URL, timeout=HTTP_TIMEOUT_SECONDS)
|
|
||||||
resp.raise_for_status()
|
|
||||||
|
|
||||||
content = resp.content
|
|
||||||
assert len(content) > 100_000, (
|
|
||||||
f"GeoJSON too small ({len(content)} bytes) — download may have failed"
|
|
||||||
)
|
|
||||||
assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON"
|
|
||||||
|
|
||||||
# Write uncompressed — ST_Read requires a plain file
|
|
||||||
tmp = dest.with_suffix(".geojson.tmp")
|
|
||||||
tmp.write_bytes(content)
|
|
||||||
tmp.rename(dest)
|
|
||||||
|
|
||||||
size_mb = len(content) / 1_000_000
|
|
||||||
print(f" Written: {dest} ({size_mb:.1f} MB)")
|
|
||||||
print("Done. Run SQLMesh plan to rebuild stg_nuts2_boundaries.")
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
parser = argparse.ArgumentParser(description=__doc__)
|
|
||||||
parser.add_argument("--landing-dir", default="data/landing", type=Path)
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if not args.landing_dir.is_dir():
|
|
||||||
print(f"Error: landing dir does not exist: {args.landing_dir}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
download_nuts_boundaries(args.landing_dir)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
Reference in New Issue
Block a user