Compare commits

...

4 Commits

Author SHA1 Message Date
Deeman
75305935bd merge: GISCO extractor + wire all extractors + load_dotenv fix
All checks were successful
CI / test (push) Successful in 50s
CI / tag (push) Successful in 3s
2026-03-01 17:08:42 +01:00
Deeman
99cb0ac005 chore: remove .gitlab-ci.yml (GitLab now backup-only mirror)
All checks were successful
CI / test (push) Successful in 50s
CI / tag (push) Successful in 2s
CI runs on Gitea only. GitLab is a passive push mirror — no runners,
no tagging, no deploy involvement.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 17:06:09 +01:00
Deeman
a15c32d398 fix(extract): load .env automatically via load_dotenv()
PROXY_URLS_* and other secrets were defined in .env but never loaded,
causing availability to run in slow serial mode (1 req/s) instead of
parallel mode with proxies.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 16:41:59 +01:00
Deeman
97c5846d51 feat(extract): GISCO extractor + wire all unscheduled extractors
- New gisco.py: proper extractor module replacing scripts/download_gisco_nuts.py.
  Writes uncompressed .geojson (ST_Read can't handle .gz). Fixed partition path
  gisco/2024/01/nuts2_boundaries.geojson; cursor tracking skips re-download monthly.
- all.py: import + register gisco in EXTRACTORS (9 independent, 1 dep)
- pyproject.toml: add extract-gisco entry point
- workflows.toml: add census_usa, census_usa_income, eurostat_city_labels,
  ons_uk, gisco — all monthly, no dependencies
- Delete scripts/download_gisco_nuts.py (superseded)

Unblocks: stg_nuts2_boundaries, stg_regional_income, stg_income_usa,
and 4 downstream models (dim_locations, pseo_city_costs_de,
location_opportunity_profile, pseo_country_overview).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 15:49:39 +01:00
7 changed files with 123 additions and 113 deletions

View File

@@ -1,31 +0,0 @@
stages:
- test
- tag
test:
stage: test
image: python:3.12-slim
before_script:
- pip install uv
script:
- uv sync
- uv run pytest web/tests/ -x -q -p no:faulthandler
- uv run ruff check web/src/ web/tests/
rules:
- if: $CI_COMMIT_BRANCH == "master"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
tag:
stage: tag
image:
name: alpine/git
entrypoint: [""]
script:
- git tag "v${CI_PIPELINE_IID}"
- git push "https://gitlab-ci-token:${CI_JOB_TOKEN}@${CI_SERVER_HOST}/${CI_PROJECT_PATH}.git" "v${CI_PIPELINE_IID}"
rules:
- if: $CI_COMMIT_BRANCH == "master"
# Deployment is handled by the on-server supervisor (src/padelnomics/supervisor.py).
# It polls git every 60s, fetches tags, and deploys only when a new passing tag exists.
# No CI secrets needed — zero SSH keys, zero deploy credentials.

View File

@@ -21,6 +21,7 @@ extract-census-usa = "padelnomics_extract.census_usa:main"
extract-census-usa-income = "padelnomics_extract.census_usa_income:main"
extract-ons-uk = "padelnomics_extract.ons_uk:main"
extract-geonames = "padelnomics_extract.geonames:main"
extract-gisco = "padelnomics_extract.gisco:main"
[build-system]
requires = ["hatchling"]

View File

@@ -11,9 +11,12 @@ from datetime import UTC, datetime
from pathlib import Path
import niquests
from dotenv import load_dotenv
from .utils import end_run, open_state_db, start_run
load_dotenv()
LANDING_DIR = Path(os.environ.get("LANDING_DIR", "data/landing"))
HTTP_TIMEOUT_SECONDS = 30

View File

@@ -7,7 +7,7 @@ A graphlib.TopologicalSorter schedules them: tasks with no unmet dependencies
run immediately in parallel; each completion may unlock new tasks.
Current dependency graph:
- All 8 non-availability extractors have no dependencies (run in parallel)
- All 9 non-availability extractors have no dependencies (run in parallel)
- playtomic_availability depends on playtomic_tenants (starts as soon as
tenants finishes, even if other extractors are still running)
"""
@@ -26,6 +26,8 @@ from .eurostat_city_labels import EXTRACTOR_NAME as EUROSTAT_CITY_LABELS_NAME
from .eurostat_city_labels import extract as extract_eurostat_city_labels
from .geonames import EXTRACTOR_NAME as GEONAMES_NAME
from .geonames import extract as extract_geonames
from .gisco import EXTRACTOR_NAME as GISCO_NAME
from .gisco import extract as extract_gisco
from .ons_uk import EXTRACTOR_NAME as ONS_UK_NAME
from .ons_uk import extract as extract_ons_uk
from .overpass import EXTRACTOR_NAME as OVERPASS_NAME
@@ -50,6 +52,7 @@ EXTRACTORS: dict[str, tuple] = {
CENSUS_USA_INCOME_NAME: (extract_census_usa_income, []),
ONS_UK_NAME: (extract_ons_uk, []),
GEONAMES_NAME: (extract_geonames, []),
GISCO_NAME: (extract_gisco, []),
TENANTS_NAME: (extract_tenants, []),
AVAILABILITY_NAME: (extract_availability, [TENANTS_NAME]),
}

View File

@@ -0,0 +1,95 @@
"""GISCO NUTS-2 boundary GeoJSON extractor.
Downloads NUTS-2 boundary polygons from Eurostat GISCO. The file is stored
uncompressed because DuckDB's ST_Read cannot read gzipped files.
NUTS classification revises approximately every 7 years (current: 2021).
The partition path is fixed to the revision year, not the run date, making
the source version explicit. Cursor tracking still uses year_month to avoid
re-downloading on every monthly run.
Landing: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5 MB, uncompressed)
"""
import sqlite3
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor
logger = setup_logging("padelnomics.extract.gisco")
EXTRACTOR_NAME = "gisco"
# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only.
# 20M resolution gives simplified polygons that are fast for point-in-polygon
# matching without sacrificing accuracy at the NUTS-2 boundary level.
GISCO_URL = (
"https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/"
"NUTS_RG_20M_2021_4326_LEVL_2.geojson"
)
# Fixed partition: NUTS boundaries are a static reference file, not time-series data.
# The 2024/01 partition reflects when this NUTS 2021 dataset was first ingested.
DEST_REL = Path("gisco/2024/01/nuts2_boundaries.geojson")
_GISCO_TIMEOUT_SECONDS = HTTP_TIMEOUT_SECONDS * 4 # ~5 MB; generous for slow upstreams
def extract(
landing_dir: Path,
year_month: str,
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Download NUTS-2 GeoJSON. Skips if already run this month or file exists."""
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME)
if last_cursor == year_month:
logger.info("already ran for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
dest = landing_dir / DEST_REL
if dest.exists():
logger.info("file already exists (skipping download): %s", dest)
return {
"files_written": 0,
"files_skipped": 1,
"bytes_written": 0,
"cursor_value": year_month,
}
dest.parent.mkdir(parents=True, exist_ok=True)
logger.info("GET %s", GISCO_URL)
resp = session.get(GISCO_URL, timeout=_GISCO_TIMEOUT_SECONDS)
resp.raise_for_status()
content = resp.content
assert len(content) > 100_000, (
f"GeoJSON too small ({len(content)} bytes) — download may have failed"
)
assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON"
# Write uncompressed — ST_Read requires a plain file, not .gz
tmp = dest.with_suffix(".geojson.tmp")
tmp.write_bytes(content)
tmp.rename(dest)
size_mb = len(content) / 1_000_000
logger.info("written %s (%.1f MB)", dest, size_mb)
return {
"files_written": 1,
"files_skipped": 0,
"bytes_written": len(content),
"cursor_value": year_month,
}
def main() -> None:
run_extractor(EXTRACTOR_NAME, extract)
if __name__ == "__main__":
main()

View File

@@ -39,3 +39,23 @@ module = "padelnomics_extract.playtomic_availability"
entry = "main_recheck"
schedule = "0,30 6-23 * * *"
depends_on = ["playtomic_availability"]
[census_usa]
module = "padelnomics_extract.census_usa"
schedule = "monthly"
[census_usa_income]
module = "padelnomics_extract.census_usa_income"
schedule = "monthly"
[eurostat_city_labels]
module = "padelnomics_extract.eurostat_city_labels"
schedule = "monthly"
[ons_uk]
module = "padelnomics_extract.ons_uk"
schedule = "monthly"
[gisco]
module = "padelnomics_extract.gisco"
schedule = "monthly"

View File

@@ -1,81 +0,0 @@
"""Download NUTS-2 boundary GeoJSON from Eurostat GISCO.
One-time (or on NUTS revision) download of NUTS-2 boundary polygons used for
spatial income resolution in dim_locations. Stored uncompressed because DuckDB's
ST_Read function cannot read gzipped files.
NUTS classification changes approximately every 7 years. Current revision: 2021.
Output: {LANDING_DIR}/gisco/2024/01/nuts2_boundaries.geojson (~5MB, uncompressed)
Usage:
uv run python scripts/download_gisco_nuts.py [--landing-dir data/landing]
Idempotent: skips download if the file already exists.
"""
import argparse
import sys
from pathlib import Path
import niquests
# NUTS 2021 revision, 20M scale (1:20,000,000), WGS84 (EPSG:4326), LEVL_2 only.
# 20M resolution gives simplified polygons that are fast for point-in-polygon
# matching without sacrificing accuracy at the NUTS-2 boundary level.
GISCO_URL = (
"https://gisco-services.ec.europa.eu/distribution/v2/nuts/geojson/"
"NUTS_RG_20M_2021_4326_LEVL_2.geojson"
)
# Fixed partition: NUTS boundaries are a static reference file, not time-series data.
# Use the NUTS revision year as the partition to make the source version explicit.
DEST_REL_PATH = "gisco/2024/01/nuts2_boundaries.geojson"
HTTP_TIMEOUT_SECONDS = 120
def download_nuts_boundaries(landing_dir: Path) -> None:
dest = landing_dir / DEST_REL_PATH
if dest.exists():
print(f"Already exists (skipping): {dest}")
return
dest.parent.mkdir(parents=True, exist_ok=True)
print(f"Downloading NUTS-2 boundaries from GISCO...")
print(f" URL: {GISCO_URL}")
with niquests.Session() as session:
resp = session.get(GISCO_URL, timeout=HTTP_TIMEOUT_SECONDS)
resp.raise_for_status()
content = resp.content
assert len(content) > 100_000, (
f"GeoJSON too small ({len(content)} bytes) — download may have failed"
)
assert b'"FeatureCollection"' in content, "Response does not look like GeoJSON"
# Write uncompressed — ST_Read requires a plain file
tmp = dest.with_suffix(".geojson.tmp")
tmp.write_bytes(content)
tmp.rename(dest)
size_mb = len(content) / 1_000_000
print(f" Written: {dest} ({size_mb:.1f} MB)")
print("Done. Run SQLMesh plan to rebuild stg_nuts2_boundaries.")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--landing-dir", default="data/landing", type=Path)
args = parser.parse_args()
if not args.landing_dir.is_dir():
print(f"Error: landing dir does not exist: {args.landing_dir}", file=sys.stderr)
sys.exit(1)
download_nuts_boundaries(args.landing_dir)
if __name__ == "__main__":
main()