Commit Graph

31 Commits

Author SHA1 Message Date
Deeman
9de3a3ba01 feat(extract): replace OpenWeatherMap with Open-Meteo weather extractor
Replaced the OWM extractor (8 locations, API key required, 14,600-call
backfill over 30+ days) with Open-Meteo (12 locations, no API key,
ERA5 reanalysis, full backfill in 12 API calls ~30 seconds).

- Rename extract/openweathermap → extract/openmeteo (git mv)
- Rewrite api.py: fetch_archive (ERA5, date-range) + fetch_recent (forecast,
  past_days=10 to cover ERA5 lag); 9 daily variables incl. et0 and VPD
- Rewrite execute.py: _split_and_write() unzips parallel arrays into per-day
  flat JSON; no cursor / rate limiting / call cap needed
- Update pipelines.py: --package openmeteo, timeout 120s (was 1200s)
- Update fct_weather_daily.sql: flat Open-Meteo field names (temperature_2m_*
  etc.), remove pressure_afternoon_hpa, add et0_mm + vpd_max_kpa + is_high_vpd
- Remove OPENWEATHERMAP_API_KEY from CLAUDE.md env vars table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 00:59:54 +01:00
Deeman
4817f7de2f feat(extract): add 4 weather locations (ES, PE, UG, CI)
Expands coverage from 8 to 12 coffee-growing regions:
- brazil_espirito_santo (Robusta/Conilon — largest BR Robusta state)
- peru_jaen (Arabica — fastest-growing origin, top-10 global producer)
- uganda_elgon (Robusta — 4th largest African producer)
- ivory_coast_daloa (Robusta — historically significant West African origin)

Now 8 Arabica + 4 Robusta regions = 12 calls/day (well within OWM free tier).
Backfill cost: ~21,900 additional calls over ~44 days at 500/run.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 00:12:29 +01:00
Deeman
08e74665bb feat(extract): add OpenWeatherMap daily weather extractor
Adds extract/openweathermap package with daily weather extraction for 8
coffee-growing regions (Brazil, Vietnam, Colombia, Ethiopia, Honduras,
Guatemala, Indonesia). Feeds crop stress signal for commodity sentiment score.

Extractor:
- OWM One Call API 3.0 / Day Summary — one JSON.gz per (location, date)
- extract_weather: daily, fetches yesterday + today (16 calls max)
- extract_weather_backfill: fills 2020-01-01 to yesterday, capped at 500
  calls/run with resume cursor '{location_id}:{date}' for crash safety
- Full idempotency via file existence check; state tracking via extract_core

SQLMesh:
- seeds.weather_locations (8 regions with lat/lon/variety)
- foundation.fct_weather_daily: INCREMENTAL_BY_TIME_RANGE, grain
  (location_id, observation_date), dedup via hash key, crop stress flags:
  is_frost (<2°C), is_heat_stress (>35°C), is_drought (<1mm), in_growing_season

Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 22:40:27 +01:00
Deeman
80c1163a7f feat: extraction framework overhaul — extract_core shared package + SQLite state tracking
- Add extract/extract_core/ workspace package with three modules:
  - state.py: SQLite run tracking (open_state_db, start_run, end_run, get_last_cursor)
  - http.py: niquests session factory + etag normalization helpers
  - files.py: landing_path, content_hash, write_bytes_atomic (atomic gzip writes)
- State lives at {LANDING_DIR}/.state.sqlite — no extra env var needed
- SQLite chosen over DuckDB: state tracking is OLTP (row inserts/updates), not analytical
- Refactor all 4 extractors (psdonline, cftc_cot, coffee_prices, ice_stocks):
  - Replace inline boilerplate with extract_core helpers
  - Add start_run/end_run tracking to every extraction entry point
  - extract_cot_year returns int (bytes_written) instead of bool
- Update tests: assert result == 0 (not `is False`) for the return type change

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-22 14:37:50 +01:00
Deeman
c92e5a8e07 ice_stocks: add backfill extractor for historical daily stocks
The ICE API at /marketdata/api/reports/293/results stores all historical
daily XLS reports date-descending. Previously the extractor only fetched
the latest. New extract_ice_backfill entry point pages through the API
and downloads all matching 'Daily Warehouse Stocks' reports.

- ice_api.py: add find_all_reports() alongside find_latest_report()
- execute.py: add extract_ice_stocks_backfill(max_pages=3) — default
  covers ~6 months; max_pages=20 fetches ~3 years of history
- pyproject.toml: register extract_ice_backfill entry point

Ran backfill: 131 files, 2025-08-15 → 2026-02-20

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-22 01:35:57 +01:00
Deeman
493ce64fde fix ice_stocks XLS date parsing: handle 'Feb 20, 2026' format
ICE changed the daily stocks XLS header from 'As of: 1/30/2026' to
'As of: Feb 20, 2026  1:35:39PM'. Expand _build_canonical_csv_from_xls
to try multiple strptime formats (%m/%d/%Y, %b %d, %Y, etc.) on both
single-token and three-token date candidates.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 22:18:17 +01:00
Deeman
ff896685d2 Add extract_ice_all command to run all three ICE extractors in sequence
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 21:59:08 +01:00
Deeman
ff7301d6a8 ICE extraction overhaul: API discovery + aging report + historical backfill
- Replace brittle ICE_STOCKS_URL env var with API-based URL discovery via
  the private ICE Report Center JSON API (no auth required)
- Add rolling CSV → XLS fallback in extract_ice_stocks() using
  find_latest_report() from ice_api.py
- Add ice_api.py: fetch_report_listings(), find_latest_report() with
  pagination up to MAX_API_PAGES
- Add xls_parse.py: detect_file_format() (magic bytes), xls_to_rows()
  using xlrd for OLE2/BIFF XLS files
- Add extract_ice_aging(): monthly certified stock aging report by
  age bucket × port → ice_aging/ landing dir
- Add extract_ice_historical(): 30-year EOM by-port stocks from static
  ICE URL → ice_stocks_by_port/ landing dir
- Add xlrd>=2.0.1 (parse XLS), xlwt>=1.3.0 (dev, test fixtures)
- Add SQLMesh raw + foundation models for both new datasets
- Add ice_aging_glob(), ice_stocks_by_port_glob() macros
- Add extract_ice_aging + extract_ice_historical pipeline entries
- Add 12 unit tests (format detection, XLS roundtrip, API mock, CSV output)

Seed files (data/landing/ice_aging/seed/ and ice_stocks_by_port/seed/)
must be created locally — data/ is gitignored.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 21:13:18 +01:00
Deeman
67c048485b Add Phase 1A-C + ICE warehouse stocks: prices, methodology, pipeline automation
Phase 1A — KC=F Coffee Futures Prices:
- New extract/coffee_prices/ package (yfinance): downloads KC=F daily OHLCV,
  stores as gzip CSV with SHA256-based idempotency
- SQLMesh models: raw/coffee_prices → foundation/fct_coffee_prices →
  serving/coffee_prices (with 20d/50d SMA, 52-week high/low, daily return %)
- Dashboard: 4 metric cards + dual-line chart (close, 20d MA, 50d MA)
- API: GET /commodities/<ticker>/prices

Phase 1B — Data Methodology Page:
- New /methodology route with full-page template (base.html)
- 6 anchored sections: USDA PSD, CFTC COT, KC=F price, ICE warehouse stocks,
  data quality model, update schedule table
- "Methodology" link added to marketing footer

Phase 1C — Automated Pipeline:
- supervisor.sh updated: runs extract_cot, extract_prices, extract_ice in
  sequence before transform
- Webhook failure alerting via ALERT_WEBHOOK_URL env var (ntfy/Slack/Telegram)

ICE Warehouse Stocks:
- New extract/ice_stocks/ package (niquests): normalizes ICE Report Center CSV
  to canonical schema, hash-based idempotency, soft-fail on 404 with guidance
- SQLMesh models: raw/ice_warehouse_stocks → foundation/fct_ice_warehouse_stocks
  → serving/ice_warehouse_stocks (30d avg, WoW change, 52w drawdown)
- Dashboard: 4 metric cards + line chart (certified bags + 30d avg)
- API: GET /commodities/<code>/stocks

Foundation:
- dim_commodity: added ticker (KC=F) and ice_stock_report_code (COFFEE-C) columns
- macros/__init__.py: added prices_glob() and ice_stocks_glob()
- pipelines.py: added extract_prices and extract_ice entries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 11:41:43 +01:00
Deeman
0a83b2cb74 Add CFTC COT data integration with foundation data model layer
- New extraction package (cftc_cot): downloads yearly Disaggregated Futures ZIPs
  from CFTC, etag-based dedup, dynamic inner filename discovery, gzip normalization
- SQLMesh 3-layer architecture: raw (technical) → foundation (business model) → serving (mart)
- dim_commodity seed: conformed dimension mapping USDA ↔ CFTC codes — the commodity ontology
- fct_cot_positioning: typed, deduplicated weekly positioning facts for all commodities
- obt_cot_positioning: Coffee C mart with COT Index (26w/52w), WoW delta, OI ratios
- Analytics functions + REST API endpoints: /commodities/<code>/positioning[/latest]
- Dashboard widget: Managed Money net, COT Index card, dual-axis Chart.js chart
- 23 passing tests (10 unit + 2 SQLMesh model + existing regression suite)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 23:28:10 +01:00
Deeman
423fb8c619 Fix extract and SQLMesh pipeline to build DuckDB lakehouse
extract: wrap response.content in BytesIO before passing to
normalize_zipped_csv, and call .read() on the returned BytesIO before
write_bytes (two bugs: wrong type in, wrong type out)

sqlmesh: {{ var() }} inside SQL string literals is not substituted by
SQLMesh's Jinja (SQL parser treats them as opaque strings). Replace with
a @psd_glob() macro that evaluates LANDING_DIR at render time and returns
a quoted glob path string.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 17:02:59 +01:00
Deeman
c1d00dcdc4 Refactor to local-first architecture on Hetzner NVMe
Remove distributed R2/Iceberg/SSH pipeline architecture in favor of
local subprocess execution with NVMe storage. Landing data backed up
to R2 via rclone timer.

- Strip Iceberg catalog, httpfs, boto3, paramiko, prefect, pyarrow
- Pipelines run via subprocess.run() with bounded timeouts
- Extract writes to {LANDING_DIR}/psd/{year}/{month}/{etag}.csv.gzip
- SQLMesh reads LANDING_DIR variable, writes to DUCKDB_PATH
- Delete unused provider stubs (ovh, scaleway, oracle)
- Add rclone systemd timer for R2 backup every 6h
- Update supervisor to run pipelines with env vars

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 19:50:19 +01:00
Deeman
6d4377ccf9 cleanup and prefect service setup 2026-02-04 22:24:55 +01:00
Deeman
d30ec9b66b Add R2 upload support with landing bucket path
## Changes

1. **Support ESC environment variable names**
   - Fallback to R2_ADMIN_ACCESS_KEY_ID if R2_ACCESS_KEY not set
   - Fallback to R2_ADMIN_SECRET_ACCESS_KEY if R2_SECRET_KEY not set
   - Allows script to work with Pulumi ESC (beanflows/prod) variables

2. **Use landing bucket path**
   - Changed R2 path from `psd/{etag}.zip` to `landing/psd/{etag}.zip`
   - All extracted data goes to landing bucket for consistent organization

3. **Updated Pulumi ESC environment**
   - Added R2_BUCKET=beanflows-data-prod
   - Fixed R2_ENDPOINT to remove bucket path (now just account URL)

## Testing

-  R2 upload works: Uploaded to landing/psd/316039e2612edc1_0.zip
-  R2 deduplication works: Skips upload if file exists
-  Local mode still works without credentials

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-20 22:45:30 +02:00
Deeman
38897617e7 Refactor PSD extraction: simplify to latest-only + add R2 support
## Key Changes

1. **Simplified extraction logic**
   - Changed from downloading 220+ historical archives to checking only latest available month
   - Tries current month and falls back up to 3 months (handles USDA publication lag)
   - Architecture advisor insight: ETags naturally deduplicate, historical year/month structure was unnecessary

2. **Flat storage structure**
   - Old: `data/{year}/{month}/{etag}.zip`
   - New: `data/{etag}.zip` (local) or `psd/{etag}.zip` (R2)
   - Migrated 226 existing files to flat structure

3. **Dual storage modes**
   - **Local mode**: Downloads to local directory (development)
   - **R2 mode**: Uploads to Cloudflare R2 (production)
   - Mode determined by presence of R2 environment variables
   - Added boto3 dependency for S3-compatible R2 API

4. **Updated raw SQLMesh model**
   - Changed pattern from `**/*.zip` to `*.zip` to match flat structure

## Benefits

- Simpler: Single file check instead of 220+ URL attempts
- Efficient: ETag-based deduplication works naturally
- Flexible: Supports both local dev and production R2 storage
- Maintainable: Removed unnecessary complexity

## Testing

-  Local extraction works and respects ETags
-  Falls back correctly when current month unavailable
-  Linting passes

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-20 22:02:15 +02:00
Deeman
6c93021f2d remove stupid rules 2025-10-12 21:44:56 +02:00
Deeman
f5f2dbc7a5 refactor 2025-08-25 20:50:25 +02:00
Deeman
9baa0d185c testing sqlmesh 2025-07-27 00:18:03 +02:00
Deeman
0bbbd25b68 update projects to packages 2025-07-26 22:32:37 +02:00
Deeman
00fffb2089 more simplification 2025-07-26 22:19:33 +02:00
Deeman
1c3455a906 more simplification 2025-07-26 22:18:47 +02:00
Deeman
4fd1b96114 simplify using etags 2025-07-26 22:08:35 +02:00
Deeman
bd65ddcac8 adding incremental load abilities 2025-07-26 21:10:02 +02:00
Deeman
b8ad73202c finish historical extraction 2025-07-13 23:20:50 +02:00
Deeman
70bd8a52db async is requesting stuff too fast 2025-07-13 18:08:25 +02:00
Deeman
8143c6ed8e async is requesting stuff too fast 2025-07-13 18:08:19 +02:00
Deeman
c3c281fcd8 update structure 2025-07-08 22:41:59 +02:00
Deeman
0ef57f3e06 updates 2025-07-08 21:06:06 +02:00
Deeman
10d9424ff1 uncomment code 2025-05-08 18:02:45 +02:00
Deeman
265250864c add dlt script to extract data from fas.usda.gov 2025-04-30 22:35:31 +02:00
Deeman
d60bf0ea3f dlt add 2025-04-30 19:04:19 +02:00