Commit Graph

21 Commits

Author SHA1 Message Date
Deeman
411aea3954 merge: SOPS migration + Python supervisor + docs (3 repos) 2026-02-26 12:15:35 +01:00
Deeman
5d7d53a260 feat(supervisor): port Python supervisor from padelnomics + workflows.toml
Port padelnomics' schedule-aware Python supervisor to materia:
- src/materia/supervisor.py — croniter scheduling, topological wave
  execution (parallel independent workflows), tag-based git pull + deploy,
  status CLI subcommand
- infra/supervisor/workflows.toml — workflow registry (psd daily, cot
  weekly, prices daily, ice daily, weather daily)
- infra/supervisor/materia-supervisor.service — updated ExecStart to Python
  supervisor, added SUPERVISOR_GIT_PULL=1

Adaptations from padelnomics:
- Uses extract_core.state.open_state_db (not padelnomics_extract.utils)
- uv run sqlmesh -p transform/sqlmesh_materia run
- uv run materia pipeline run export_serving
- web/deploy.sh path (materia's deploy.sh is under web/)
- Removed proxy_mode (not used in materia)

Also: add croniter dependency to src/materia, delete old supervisor.sh.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 11:59:55 +01:00
Deeman
b884bc2b4a feat(cot): add combined (futures+options) COT extractor and transform models
- extract/cftc_cot: refactor extract_cot_year() to accept url_template and
  landing_subdir params; add _extract_cot() shared loop; add extract_cot_combined()
  entry point using com_disagg_txt_{year}.zip → landing/cot_combined/
- pyproject.toml: add extract_cot_combined script entry point
- macros/__init__.py: add @cot_combined_glob() for cot_combined/**/*.csv.gzip
- fct_cot_positioning.sql: union cot_glob and cot_combined_glob in src CTE;
  add report_type column (FutOnly_or_Combined) to cast_and_clean + deduplicated;
  include FutOnly_or_Combined in hkey to avoid key collisions; add report_type to grain
- obt_cot_positioning.sql: add report_type = 'FutOnly' filter to preserve
  existing serving behavior
- obt_cot_positioning_combined.sql: new serving model filtered to report_type =
  'Combined'; identical analytics (COT index, net %, windows) on combined data
- pipelines.py: register extract_cot_combined; add to extract_all meta-pipeline

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 11:24:56 +01:00
Deeman
6d716a83ae feat(secrets): rewrite secrets.py for SOPS, update cli.py
secrets.py: replace Pulumi ESC (esc CLI) with SOPS decrypt. Reads
.env.prod.sops via `sops --decrypt`, parses dotenv output. Same public
API: get_secret(), list_secrets(), test_connection().

cli.py: update secrets subcommand help text and test command messaging.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 10:44:25 +01:00
Deeman
9de3a3ba01 feat(extract): replace OpenWeatherMap with Open-Meteo weather extractor
Replaced the OWM extractor (8 locations, API key required, 14,600-call
backfill over 30+ days) with Open-Meteo (12 locations, no API key,
ERA5 reanalysis, full backfill in 12 API calls ~30 seconds).

- Rename extract/openweathermap → extract/openmeteo (git mv)
- Rewrite api.py: fetch_archive (ERA5, date-range) + fetch_recent (forecast,
  past_days=10 to cover ERA5 lag); 9 daily variables incl. et0 and VPD
- Rewrite execute.py: _split_and_write() unzips parallel arrays into per-day
  flat JSON; no cursor / rate limiting / call cap needed
- Update pipelines.py: --package openmeteo, timeout 120s (was 1200s)
- Update fct_weather_daily.sql: flat Open-Meteo field names (temperature_2m_*
  etc.), remove pressure_afternoon_hpa, add et0_mm + vpd_max_kpa + is_high_vpd
- Remove OPENWEATHERMAP_API_KEY from CLAUDE.md env vars table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-26 00:59:54 +01:00
Deeman
08e74665bb feat(extract): add OpenWeatherMap daily weather extractor
Adds extract/openweathermap package with daily weather extraction for 8
coffee-growing regions (Brazil, Vietnam, Colombia, Ethiopia, Honduras,
Guatemala, Indonesia). Feeds crop stress signal for commodity sentiment score.

Extractor:
- OWM One Call API 3.0 / Day Summary — one JSON.gz per (location, date)
- extract_weather: daily, fetches yesterday + today (16 calls max)
- extract_weather_backfill: fills 2020-01-01 to yesterday, capped at 500
  calls/run with resume cursor '{location_id}:{date}' for crash safety
- Full idempotency via file existence check; state tracking via extract_core

SQLMesh:
- seeds.weather_locations (8 regions with lat/lon/variety)
- foundation.fct_weather_daily: INCREMENTAL_BY_TIME_RANGE, grain
  (location_id, observation_date), dedup via hash key, crop stress flags:
  is_frost (<2°C), is_heat_stress (>35°C), is_drought (<1mm), in_growing_season

Landing path: LANDING_DIR/weather/{location_id}/{year}/{date}.json.gz

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-25 22:40:27 +01:00
Deeman
9ee7a3d9d3 fix: export_serving — Arrow-based copy, rename to analytics.duckdb
Two bugs fixed:

1. Cross-connection COPY: DuckDB doesn't support referencing another
   connection's tables as src.serving.table. Replace with Arrow as
   intermediate: src reads to Arrow, dst.register() + CREATE TABLE.

2. Catalog/schema name collision: naming the export file serving.duckdb
   made DuckDB assign catalog name "serving" — same as the schema we
   create inside it. Every serving.table query became ambiguous. Rename
   to analytics.duckdb (catalog "analytics", schema "serving" = no clash).

   SERVING_DUCKDB_PATH values updated: serving.duckdb → analytics.duckdb
   in supervisor, service, bootstrap, dev_run.sh, .env.example, docker-compose.

3. Temp file: use _export.duckdb (not serving.duckdb.tmp) to avoid
   the same catalog collision during the write phase.

Verified: 6 tables exported, serving.* queries work read-only.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-22 12:54:39 +01:00
Deeman
b899bcbad4 feat: DuckDB two-file architecture — resolve SQLMesh/web-app lock contention
Split the single lakehouse.duckdb into two files to eliminate the exclusive
write-lock conflict between SQLMesh (pipeline) and the Quart web app (reader):

  lakehouse.duckdb  — SQLMesh exclusive (all pipeline layers)
  serving.duckdb    — web app reads (serving tables only, atomically swapped)

Changes:

web/src/beanflows/analytics.py
- Replace persistent global _conn with per-thread connections (threading.local)
- Add _get_conn(): opens read_only=True on first call per thread, reopens
  automatically on inode change (~1μs os.stat) to pick up atomic file swaps
- Switch env var from DUCKDB_PATH → SERVING_DUCKDB_PATH
- Add module docstring documenting architecture + DuckLake migration path

web/src/beanflows/app.py
- Startup check: use SERVING_DUCKDB_PATH
- Health check: use _db_path instead of _conn

src/materia/export_serving.py (new)
- Reads all serving.* tables from lakehouse.duckdb (read_only)
- Writes to serving_new.duckdb, then os.rename → serving.duckdb (atomic)
- ~50 lines; runs after each SQLMesh transform

src/materia/pipelines.py
- Add export_serving pipeline entry (uv run python -c ...)

infra/supervisor/supervisor.sh
- Add SERVING_DUCKDB_PATH env var comment
- Add export step: uv run materia pipeline run export_serving

infra/supervisor/materia-supervisor.service
- Add Environment=SERVING_DUCKDB_PATH=/data/materia/serving.duckdb

infra/bootstrap_supervisor.sh
- Add SERVING_DUCKDB_PATH to .env template

web/.env.example + web/docker-compose.yml
- Document both env vars; switch web service to SERVING_DUCKDB_PATH

web/src/beanflows/dashboard/templates/settings.html
- Minor settings page fix from prior session

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-22 11:06:55 +01:00
Deeman
562e2d1847 Add extract_all meta-pipeline: runs all four data source extractors in sequence
Sequences: extract (PSD) → extract_cot (CFTC) → extract_prices (KC=F) → extract_ice_all (ICE)
Stops and reports on first failure. META_PIPELINES dict makes it easy to
add more meta-pipelines as sources expand.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 22:00:10 +01:00
Deeman
ff896685d2 Add extract_ice_all command to run all three ICE extractors in sequence
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 21:59:08 +01:00
Deeman
ff7301d6a8 ICE extraction overhaul: API discovery + aging report + historical backfill
- Replace brittle ICE_STOCKS_URL env var with API-based URL discovery via
  the private ICE Report Center JSON API (no auth required)
- Add rolling CSV → XLS fallback in extract_ice_stocks() using
  find_latest_report() from ice_api.py
- Add ice_api.py: fetch_report_listings(), find_latest_report() with
  pagination up to MAX_API_PAGES
- Add xls_parse.py: detect_file_format() (magic bytes), xls_to_rows()
  using xlrd for OLE2/BIFF XLS files
- Add extract_ice_aging(): monthly certified stock aging report by
  age bucket × port → ice_aging/ landing dir
- Add extract_ice_historical(): 30-year EOM by-port stocks from static
  ICE URL → ice_stocks_by_port/ landing dir
- Add xlrd>=2.0.1 (parse XLS), xlwt>=1.3.0 (dev, test fixtures)
- Add SQLMesh raw + foundation models for both new datasets
- Add ice_aging_glob(), ice_stocks_by_port_glob() macros
- Add extract_ice_aging + extract_ice_historical pipeline entries
- Add 12 unit tests (format detection, XLS roundtrip, API mock, CSV output)

Seed files (data/landing/ice_aging/seed/ and ice_stocks_by_port/seed/)
must be created locally — data/ is gitignored.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 21:13:18 +01:00
Deeman
67c048485b Add Phase 1A-C + ICE warehouse stocks: prices, methodology, pipeline automation
Phase 1A — KC=F Coffee Futures Prices:
- New extract/coffee_prices/ package (yfinance): downloads KC=F daily OHLCV,
  stores as gzip CSV with SHA256-based idempotency
- SQLMesh models: raw/coffee_prices → foundation/fct_coffee_prices →
  serving/coffee_prices (with 20d/50d SMA, 52-week high/low, daily return %)
- Dashboard: 4 metric cards + dual-line chart (close, 20d MA, 50d MA)
- API: GET /commodities/<ticker>/prices

Phase 1B — Data Methodology Page:
- New /methodology route with full-page template (base.html)
- 6 anchored sections: USDA PSD, CFTC COT, KC=F price, ICE warehouse stocks,
  data quality model, update schedule table
- "Methodology" link added to marketing footer

Phase 1C — Automated Pipeline:
- supervisor.sh updated: runs extract_cot, extract_prices, extract_ice in
  sequence before transform
- Webhook failure alerting via ALERT_WEBHOOK_URL env var (ntfy/Slack/Telegram)

ICE Warehouse Stocks:
- New extract/ice_stocks/ package (niquests): normalizes ICE Report Center CSV
  to canonical schema, hash-based idempotency, soft-fail on 404 with guidance
- SQLMesh models: raw/ice_warehouse_stocks → foundation/fct_ice_warehouse_stocks
  → serving/ice_warehouse_stocks (30d avg, WoW change, 52w drawdown)
- Dashboard: 4 metric cards + line chart (certified bags + 30d avg)
- API: GET /commodities/<code>/stocks

Foundation:
- dim_commodity: added ticker (KC=F) and ice_stock_report_code (COFFEE-C) columns
- macros/__init__.py: added prices_glob() and ice_stocks_glob()
- pipelines.py: added extract_prices and extract_ice entries

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 11:41:43 +01:00
Deeman
0a83b2cb74 Add CFTC COT data integration with foundation data model layer
- New extraction package (cftc_cot): downloads yearly Disaggregated Futures ZIPs
  from CFTC, etag-based dedup, dynamic inner filename discovery, gzip normalization
- SQLMesh 3-layer architecture: raw (technical) → foundation (business model) → serving (mart)
- dim_commodity seed: conformed dimension mapping USDA ↔ CFTC codes — the commodity ontology
- fct_cot_positioning: typed, deduplicated weekly positioning facts for all commodities
- obt_cot_positioning: Coffee C mart with COT Index (26w/52w), WoW delta, OI ratios
- Analytics functions + REST API endpoints: /commodities/<code>/positioning[/latest]
- Dashboard widget: Managed Money net, COT Index card, dual-axis Chart.js chart
- 23 passing tests (10 unit + 2 SQLMesh model + existing regression suite)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 23:28:10 +01:00
Deeman
c1d00dcdc4 Refactor to local-first architecture on Hetzner NVMe
Remove distributed R2/Iceberg/SSH pipeline architecture in favor of
local subprocess execution with NVMe storage. Landing data backed up
to R2 via rclone timer.

- Strip Iceberg catalog, httpfs, boto3, paramiko, prefect, pyarrow
- Pipelines run via subprocess.run() with bounded timeouts
- Extract writes to {LANDING_DIR}/psd/{year}/{month}/{etag}.csv.gzip
- SQLMesh reads LANDING_DIR variable, writes to DUCKDB_PATH
- Delete unused provider stubs (ovh, scaleway, oracle)
- Add rclone systemd timer for R2 backup every 6h
- Update supervisor to run pipelines with env vars

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 19:50:19 +01:00
Deeman
6c93021f2d remove stupid rules 2025-10-12 21:44:56 +02:00
Deeman
ce1cad4c41 fix 2025-10-12 21:36:32 +02:00
Deeman
5ce112f44d Add comprehensive E2E tests for materia CLI
- Add pytest and pytest-cov for testing
- Add niquests for modern HTTP/2 support (keep requests for hcloud compatibility)
- Create 13 E2E tests covering CLI, workers, pipelines, and secrets (71% coverage)
- Fix Pulumi ESC environment path (beanflows/prod) and secret key names
- Update GitLab CI to run CLI tests with coverage reporting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-12 21:32:51 +02:00
Deeman
55bb84f0fa implement cli/infra update cicd 2025-10-12 21:00:41 +02:00
Deeman
c3c281fcd8 update structure 2025-07-08 22:41:59 +02:00
dc64f83fe6 add hook 2025-03-01 18:25:52 +01:00
96a6abf1e0 Initial commit 2025-03-01 18:11:57 +01:00