fix(extract,transform): fix COT/prices column name mismatches + OWM rate limit skip
- fct_cot_positioning: quote Swap__Positions_Short_All and Swap__Positions_Spread_All (CSV uses double underscore; DuckDB preserves header names exactly) - fct_cot_positioning: quote Report_Date_as_YYYY-MM-DD (dashes preserved in header) - fct_coffee_prices: quote "Adj Close" (space in CSV header) - openmeteo/execute.py: skip API call in backfill when all daily files already exist (_count_existing_files pre-check prevents 429 rate limit on re-runs) - dev_run.sh: open browser as admin@beanflows.coffee instead of pro@ Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -75,6 +75,19 @@ def _write_day_file(location_id: str, date_str: str, record: dict) -> int:
|
||||
return bytes_written
|
||||
|
||||
|
||||
def _count_existing_files(location_id: str, start: date, end: date) -> int:
|
||||
"""Count per-day files already on disk for a location between start and end (inclusive)."""
|
||||
count = 0
|
||||
d = start
|
||||
while d <= end:
|
||||
year = d.strftime("%Y")
|
||||
dest_dir = landing_path(LANDING_DIR, LANDING_SUBDIR, location_id, year)
|
||||
if (dest_dir / f"{d.isoformat()}.json.gz").exists():
|
||||
count += 1
|
||||
d += timedelta(days=1)
|
||||
return count
|
||||
|
||||
|
||||
def _split_and_write(location_id: str, response: dict) -> tuple[int, int, int]:
|
||||
"""Split an Open-Meteo array response into per-day JSON.gz files.
|
||||
|
||||
@@ -171,12 +184,22 @@ def extract_weather_backfill() -> None:
|
||||
bytes_written_total = 0
|
||||
|
||||
try:
|
||||
start = BACKFILL_START
|
||||
end = date.fromisoformat(yesterday)
|
||||
expected_days = (end - start).days + 1
|
||||
|
||||
with niquests.Session() as session:
|
||||
for loc in LOCATIONS:
|
||||
logger.info(
|
||||
f"Backfill {loc['id']} ({loc['country']}) "
|
||||
f"{start_date} → {yesterday}"
|
||||
)
|
||||
existing = _count_existing_files(loc["id"], start, end)
|
||||
if existing == expected_days:
|
||||
logger.info(f" {loc['id']}: 0 new, {existing} already existed (skipped API call)")
|
||||
files_skipped += existing
|
||||
continue
|
||||
|
||||
response = fetch_archive(
|
||||
session, loc["lat"], loc["lon"],
|
||||
start_date=start_date,
|
||||
|
||||
@@ -29,7 +29,7 @@ WITH src AS (
|
||||
TRY_CAST(High AS DOUBLE) AS high,
|
||||
TRY_CAST(Low AS DOUBLE) AS low,
|
||||
TRY_CAST(Close AS DOUBLE) AS close,
|
||||
TRY_CAST(Adj_Close AS DOUBLE) AS adj_close,
|
||||
TRY_CAST("Adj Close" AS DOUBLE) AS adj_close,
|
||||
TRY_CAST(Volume AS BIGINT) AS volume,
|
||||
filename AS source_file, /* Filename encodes the content hash — use as ingest identifier */
|
||||
HASH(Date, Close) AS hkey /* Dedup key: trade date + close price */
|
||||
|
||||
@@ -24,7 +24,7 @@ WITH src AS (
|
||||
), cast_and_clean AS (
|
||||
SELECT
|
||||
TRIM(market_and_exchange_names) AS market_and_exchange_name, /* Identifiers */
|
||||
report_date_as_yyyy_mm_dd::DATE AS report_date,
|
||||
"Report_Date_as_YYYY-MM-DD"::DATE AS report_date,
|
||||
TRIM(cftc_commodity_code) AS cftc_commodity_code,
|
||||
TRIM(cftc_contract_market_code) AS cftc_contract_market_code,
|
||||
TRIM(contract_units) AS contract_units,
|
||||
@@ -32,8 +32,8 @@ WITH src AS (
|
||||
TRY_CAST(prod_merc_positions_long_all AS INT) AS prod_merc_long, /* Producer / Merchant (commercial hedgers: exporters, processors) */
|
||||
TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_short,
|
||||
TRY_CAST(swap_positions_long_all AS INT) AS swap_long, /* Swap dealers */
|
||||
TRY_CAST(swap_positions_short_all AS INT) AS swap_short,
|
||||
TRY_CAST(swap_positions_spread_all AS INT) AS swap_spread,
|
||||
TRY_CAST("Swap__Positions_Short_All" AS INT) AS swap_short,
|
||||
TRY_CAST("Swap__Positions_Spread_All" AS INT) AS swap_spread,
|
||||
TRY_CAST(m_money_positions_long_all AS INT) AS managed_money_long, /* Managed money (hedge funds, CTAs — the primary speculative signal) */
|
||||
TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_short,
|
||||
TRY_CAST(m_money_positions_spread_all AS INT) AS managed_money_spread,
|
||||
@@ -44,7 +44,7 @@ WITH src AS (
|
||||
TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_short,
|
||||
TRY_CAST(prod_merc_positions_long_all AS INT) /* Net positions (long minus short per category) */ - TRY_CAST(prod_merc_positions_short_all AS INT) AS prod_merc_net,
|
||||
TRY_CAST(m_money_positions_long_all AS INT) - TRY_CAST(m_money_positions_short_all AS INT) AS managed_money_net,
|
||||
TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST(swap_positions_short_all AS INT) AS swap_net,
|
||||
TRY_CAST(swap_positions_long_all AS INT) - TRY_CAST("Swap__Positions_Short_All" AS INT) AS swap_net,
|
||||
TRY_CAST(other_rept_positions_long_all AS INT) - TRY_CAST(other_rept_positions_short_all AS INT) AS other_reportable_net,
|
||||
TRY_CAST(nonrept_positions_long_all AS INT) - TRY_CAST(nonrept_positions_short_all AS INT) AS nonreportable_net,
|
||||
TRY_CAST(change_in_open_interest_all AS INT) AS change_open_interest, /* Week-over-week changes */
|
||||
@@ -64,7 +64,7 @@ WITH src AS (
|
||||
MAKE_DATE(STR_SPLIT(filename, '/')[-2]::INT, 1, 1) AS ingest_date, /* Ingest date: derived from landing path year directory */ /* Path: .../cot/{year}/{etag}.csv.gzip → extract year from [-2] */
|
||||
HASH(
|
||||
cftc_commodity_code,
|
||||
report_date_as_yyyy_mm_dd,
|
||||
"Report_Date_as_YYYY-MM-DD",
|
||||
cftc_contract_market_code,
|
||||
open_interest_all,
|
||||
m_money_positions_long_all,
|
||||
@@ -77,7 +77,7 @@ WITH src AS (
|
||||
WHERE
|
||||
NOT TRIM(cftc_commodity_code) IS NULL
|
||||
AND LENGTH(TRIM(cftc_commodity_code)) > 0
|
||||
AND NOT report_date_as_yyyy_mm_dd::DATE IS NULL
|
||||
AND NOT "Report_Date_as_YYYY-MM-DD"::DATE IS NULL
|
||||
), deduplicated AS (
|
||||
SELECT
|
||||
ANY_VALUE(market_and_exchange_name) AS market_and_exchange_name,
|
||||
|
||||
32
uv.lock
generated
32
uv.lock
generated
@@ -14,7 +14,7 @@ members = [
|
||||
"extract-core",
|
||||
"ice-stocks",
|
||||
"materia",
|
||||
"openweathermap",
|
||||
"openmeteo",
|
||||
"psdonline",
|
||||
"sqlmesh-materia",
|
||||
]
|
||||
@@ -1766,6 +1766,21 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1", size = 160065, upload-time = "2025-06-19T22:48:06.508Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openmeteo"
|
||||
version = "0.1.0"
|
||||
source = { editable = "extract/openmeteo" }
|
||||
dependencies = [
|
||||
{ name = "extract-core" },
|
||||
{ name = "niquests" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "extract-core", editable = "extract/extract_core" },
|
||||
{ name = "niquests", specifier = ">=3.14.1" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-api"
|
||||
version = "1.39.1"
|
||||
@@ -1779,21 +1794,6 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "openweathermap"
|
||||
version = "0.1.0"
|
||||
source = { editable = "extract/openweathermap" }
|
||||
dependencies = [
|
||||
{ name = "extract-core" },
|
||||
{ name = "niquests" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "extract-core", editable = "extract/extract_core" },
|
||||
{ name = "niquests", specifier = ">=3.14.1" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "orjson"
|
||||
version = "3.11.7"
|
||||
|
||||
@@ -168,7 +168,7 @@ run_with_label "$COLOR_CSS" "css " make css-watch
|
||||
# Open a private/incognito browser window once the server is ready.
|
||||
# Polls /auth/dev-login until it responds (up to 10 seconds), then launches.
|
||||
(
|
||||
DEV_URL="http://localhost:5001/auth/dev-login?email=pro@beanflows.coffee"
|
||||
DEV_URL="http://localhost:5001/auth/dev-login?email=admin@beanflows.coffee"
|
||||
for i in $(seq 1 20); do
|
||||
sleep 0.5
|
||||
if curl -s -o /dev/null -w "%{http_code}" "$DEV_URL" 2>/dev/null | grep -qE "^[23]"; then
|
||||
|
||||
Reference in New Issue
Block a user