diff --git a/extract/openmeteo/src/openmeteo/execute.py b/extract/openmeteo/src/openmeteo/execute.py index c722bc9..85ab742 100644 --- a/extract/openmeteo/src/openmeteo/execute.py +++ b/extract/openmeteo/src/openmeteo/execute.py @@ -75,6 +75,19 @@ def _write_day_file(location_id: str, date_str: str, record: dict) -> int: return bytes_written +def _count_existing_files(location_id: str, start: date, end: date) -> int: + """Count per-day files already on disk for a location between start and end (inclusive).""" + count = 0 + d = start + while d <= end: + year = d.strftime("%Y") + dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year) + if (dest_dir / f"{d.isoformat()}.json.gz").exists(): + count += 1 + d += timedelta(days=1) + return count + + def _split_and_write(location_id: str, response: dict) -> tuple[int, int, int]: """Split an Open-Meteo array response into per-day JSON.gz files. @@ -171,12 +184,22 @@ def extract_weather_backfill() -> None: bytes_written_total = 0 try: + start = BACKFILL_START + end = date.fromisoformat(yesterday) + expected_days = (end - start).days + 1 + with niquests.Session() as session: for loc in LOCATIONS: logger.info( f"Backfill {loc['id']} ({loc['country']}) " f"{start_date} → {yesterday}" ) + existing = _count_existing_files(loc["id"], start, end) + if existing == expected_days: + logger.info(f" {loc['id']}: 0 new, {existing} already existed (skipped API call)") + files_skipped += existing + continue + response = fetch_archive( session, loc["lat"], loc["lon"], start_date=start_date, diff --git a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql index c90adb9..18a31ef 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_coffee_prices.sql @@ -29,7 +29,7 @@ WITH src AS ( TRY_CAST(High AS DOUBLE) AS high, TRY_CAST(Low AS DOUBLE) AS low, TRY_CAST(Close AS DOUBLE) AS close, - TRY_CAST(Adj_Close AS DOUBLE) AS adj_close, + TRY_CAST("Adj Close" AS DOUBLE) AS adj_close, TRY_CAST(Volume AS BIGINT) AS volume, filename AS source_file, /* Filename encodes the content hash — use as ingest identifier */ HASH(Date, Close) AS hkey /* Dedup key: trade date + close price */ diff --git a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql index 2a2e9cd..3fcabb5 100644 --- a/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql +++ b/transform/sqlmesh_materia/models/foundation/fct_cot_positioning.sql @@ -24,7 +24,7 @@ WITH src AS ( ), cast_and_clean AS ( SELECT TRIM(market_and_exchange_names) AS market_and_exchange_name, /* Identifiers */ - report_date_as_yyyy_mm_dd::DATE AS report_date, + "Report_Date_as_YYYY-MM-DD"::DATE AS report_date, TRIM(cftc_commodity_code) AS cftc_commodity_code, TRIM(cftc_contract_market_code) AS cftc_contract_market_code, TRIM(contract_units) AS contract_units, @@ -32,8 +32,8 @@ WITH src AS ( TRY_CAST(prod_merc_positions_long_all AS INT) AS prod_merc_long, /* Producer / Merchant (commercial hedgers: exporters, processors) */ TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_short, TRY_CAST(swap_positions_long_all AS INT) AS swap_long, /* Swap dealers */ - TRY_CAST(swap_positions_short_all AS INT) AS swap_short, - TRY_CAST(swap_positions_spread_all AS INT) AS swap_spread, + TRY_CAST("Swap__Positions_Short_All" AS INT) AS swap_short, + TRY_CAST("Swap__Positions_Spread_All" AS INT) AS swap_spread, TRY_CAST(m_money_positions_long_all AS INT) AS managed_money_long, /* Managed money (hedge funds, CTAs — the primary speculative signal) */ TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_short, TRY_CAST(m_money_positions_spread_all AS INT) AS managed_money_spread, @@ -44,7 +44,7 @@ WITH src AS ( TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_short, TRY_CAST(prod_merc_positions_long_all AS INT) /* Net positions (long minus short per category) */ - TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_net, TRY_CAST(m_money_positions_long_all AS INT) - TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_net, - TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST(swap_positions_short_all AS INT) AS swap_net, + TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST("Swap__Positions_Short_All" AS INT) AS swap_net, TRY_CAST(other_rept_positions_long_all AS INT) - TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_net, TRY_CAST(nonrept_positions_long_all AS INT) - TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_net, TRY_CAST(change_in_open_interest_all AS INT) AS change_open_interest, /* Week-over-week changes */ @@ -64,7 +64,7 @@ WITH src AS ( MAKE_DATE(STR_SPLIT(filename, '/')[-2]::INT, 1, 1) AS ingest_date, /* Ingest date: derived from landing path year directory */ /* Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] */ HASH( cftc_commodity_code, - report_date_as_yyyy_mm_dd, + "Report_Date_as_YYYY-MM-DD", cftc_contract_market_code, open_interest_all, m_money_positions_long_all, @@ -77,7 +77,7 @@ WITH src AS ( WHERE NOT TRIM(cftc_commodity_code) IS NULL AND LENGTH(TRIM(cftc_commodity_code)) > 0 - AND NOT report_date_as_yyyy_mm_dd::DATE IS NULL + AND NOT "Report_Date_as_YYYY-MM-DD"::DATE IS NULL ), deduplicated AS ( SELECT ANY_VALUE(market_and_exchange_name) AS market_and_exchange_name, diff --git a/uv.lock b/uv.lock index 6baaff9..14d11bc 100644 --- a/uv.lock +++ b/uv.lock @@ -14,7 +14,7 @@ members = [ "extract-core", "ice-stocks", "materia", - "openweathermap", + "openmeteo", "psdonline", "sqlmesh-materia", ] @@ -1766,6 +1766,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1", size = 160065, upload-time = "2025-06-19T22:48:06.508Z" }, ] +[[package]] +name = "openmeteo" +version = "0.1.0" +source = { editable = "extract/openmeteo" } +dependencies = [ + { name = "extract-core" }, + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "niquests", specifier = ">=3.14.1" }, +] + [[package]] name = "opentelemetry-api" version = "1.39.1" @@ -1779,21 +1794,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, ] -[[package]] -name = "openweathermap" -version = "0.1.0" -source = { editable = "extract/openweathermap" } -dependencies = [ - { name = "extract-core" }, - { name = "niquests" }, -] - -[package.metadata] -requires-dist = [ - { name = "extract-core", editable = "extract/extract_core" }, - { name = "niquests", specifier = ">=3.14.1" }, -] - [[package]] name = "orjson" version = "3.11.7" diff --git a/web/scripts/dev_run.sh b/web/scripts/dev_run.sh index dbfefa1..040d37b 100755 --- a/web/scripts/dev_run.sh +++ b/web/scripts/dev_run.sh @@ -168,7 +168,7 @@ run_with_label "$COLOR_CSS" "css " make css-watch # Open a private/incognito browser window once the server is ready. # Polls /auth/dev-login until it responds (up to 10 seconds), then launches. ( - DEV_URL="http://localhost:5001/auth/dev-login?email=pro@beanflows.coffee" + DEV_URL="http://localhost:5001/auth/dev-login?email=admin@beanflows.coffee" for i in $(seq 1 20); do sleep 0.5 if curl -s -o /dev/null -w "%{http_code}" "$DEV_URL" 2>/dev/null | grep -qE "^[23]"; then