diff --git a/.claude/plans/refactor-psd-extraction.md b/.claude/plans/refactor-psd-extraction.md new file mode 100644 index 0000000..1799cc8 --- /dev/null +++ b/.claude/plans/refactor-psd-extraction.md @@ -0,0 +1,158 @@ +# PSD Extraction Refactoring Plan + +**Status:** ✅ Completed +**Branch:** `refactor/psd-extraction-r2` +**Date:** 2025-10-20 + +## Problem Statement + +The original PSD extraction implementation downloaded 220+ historical monthly archives from August 2006 to present, storing them in a nested `{year}/{month}/{etag}.zip` directory structure. This approach was overengineered because: + +1. **ETags already provide deduplication** - Each unique data snapshot has a unique ETag +2. **Historical year/month structure was redundant** - The publication date (year/month) is metadata, not data identity +3. **No R2 support** - Files could only be stored locally, not in production R2 bucket +4. **Unnecessary complexity** - Downloading 220+ URLs hoping to find unique ETags when we only need the latest + +## Architecture Analysis + +### Key Insight + +**What does each file represent?** + +USDA publishes monthly snapshots, but most months they re-publish the same data. The ETag tells you when the *actual data* has changed, not when USDA published it. The year/month structure is publication metadata, not data identity. + +### The Data-Oriented Question + +**What do we actually need?** + +We need to capture every unique data snapshot. The ETag already identifies unique snapshots. The current approach downloads 220+ URLs to find unique ETags. The direct approach: check 1 URL (current month), download if new ETag, done. + +## Proposed Solution + +### 1. Simplify to Current-Month-Only Extraction + +**Old approach:** +```python +for year in range(2006, today.year+1): + for month in range(1, 13): + download(year, month) # 220+ downloads +``` + +**New approach:** +```python +# Try current month, fallback 3 months (handles publication lag) +for months_back in range(4): + if download_if_exists(today.year, today.month - months_back): + break +``` + +**Why this works:** +- ETags naturally deduplicate +- Historical snapshots already captured from previous runs +- Only need to check for latest data +- Same result, 220x less work + +### 2. Flatten Storage Structure + +**Old:** `data/{year}/{month}/{etag}.zip` +**New:** `data/{etag}.zip` (local) or `landing/psd/{etag}.zip` (R2) + +**Benefits:** +- ETag is the natural identifier +- Simpler to manage +- No nested directory traversal +- Works identically for local and R2 + +### 3. Dual Storage Modes + +**Local Mode (Development):** +- No R2 credentials → downloads to local directory +- ETag-based deduplication via file existence check +- Use case: Local development and testing + +**R2 Mode (Production):** +- R2 credentials present → uploads to R2 only (no local storage) +- ETag-based deduplication via S3 HEAD request +- Use case: Production pipelines on ephemeral workers + +**Mode Detection:** +```python +use_r2 = all([R2_ENDPOINT, R2_BUCKET, R2_ACCESS_KEY, R2_SECRET_KEY]) +``` + +### 4. R2 Integration + +**Configuration:** +- Bucket: `beanflows-data-prod` +- Path: `landing/psd/{etag}.zip` +- Credentials: Via Pulumi ESC (`beanflows/prod`) +- Library: boto3 with S3-compatible API + +**Pulumi ESC Environment Variables:** +- `R2_ENDPOINT`: Account URL (without bucket path) +- `R2_BUCKET`: `beanflows-data-prod` +- `R2_ADMIN_ACCESS_KEY_ID`: Access key (fallback from R2_ACCESS_KEY) +- `R2_ADMIN_SECRET_ACCESS_KEY`: Secret key (fallback from R2_SECRET_KEY) + +## Implementation Summary + +### Phase 1: Simplify Extraction ✅ +- Changed loop from 220+ historical downloads to current month check +- Added fallback logic (tries 4 months back for publication lag) +- Flattened storage to `{etag}.zip` +- Updated raw SQLMesh model pattern to `*.zip` + +### Phase 2: Add R2 Support ✅ +- Added boto3 dependency +- Implemented R2 upload with ETag deduplication +- Added support for ESC variable names +- Updated Pulumi ESC environment with R2_BUCKET and fixed R2_ENDPOINT + +### Phase 3: Historical Migration ✅ +- Created temporary script to upload 227 existing files to R2 +- All files now in `landing/psd/*.zip` +- Verified deduplication works on both local and R2 + +### Phase 4: Documentation ✅ +- Updated CLAUDE.md with Pulumi ESC usage guide +- Fixed supervisor bootstrap documentation (automatic in CI/CD) +- Added examples for running commands with ESC secrets + +## Benefits Achieved + +1. **Simplicity:** Single file check instead of 220+ URL attempts +2. **Efficiency:** ETag-based deduplication works naturally +3. **Flexibility:** Supports both local dev and production R2 storage +4. **Maintainability:** Removed unnecessary complexity +5. **Cost Optimization:** Ephemeral workers don't need local storage +6. **Data Consistency:** All historical data now in R2 landing bucket + +## Testing Results + +✅ Local extraction works and respects ETags +✅ R2 upload works (tested with Sept 2025 data) +✅ R2 deduplication works (skips existing files) +✅ Fallback logic works (tries current month, falls back to Sept) +✅ Historical migration completed (227 files uploaded) +✅ All linting passes + +## Metrics + +- **Code reduction:** ~40 lines removed, ~80 lines added (net +40 for R2 support) +- **Download efficiency:** 220+ requests → 1-4 requests +- **Storage structure:** Nested 3-level → Flat 1-level +- **Files migrated:** 227 historical files to R2 +- **Time to migrate:** ~2 minutes for 227 files (~2.3 GB) + +## Next Steps + +1. Update SQLMesh raw model to support reading from R2 (future work) +2. Merge branch to master +3. Deploy to production +4. Monitor daily extraction runs + +## References + +- Architecture pattern: Data-oriented design (identify data by content, not metadata) +- Inspiration: ETag-based caching patterns +- Storage: Cloudflare R2 (S3-compatible object storage) diff --git a/CLAUDE.md b/CLAUDE.md index 9f97d58..40aac0e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -29,6 +29,42 @@ pre-commit install uv add ``` +## Secrets Management with Pulumi ESC + +All secrets are managed via Pulumi ESC (Environment, Secrets, and Configuration). The production environment is `beanflows/prod`. + +**Load secrets into your shell:** +```bash +# Login to Pulumi ESC (one-time) +esc login + +# Load secrets as environment variables +eval $(esc env open beanflows/prod --format shell) + +# Now all secrets are available as env vars +echo $R2_ENDPOINT # Example: access R2 endpoint +``` + +**Run commands with ESC secrets:** +```bash +# Run a command with secrets loaded +esc run beanflows/prod -- uv run extract_psd + +# Run multiple commands +esc run beanflows/prod -- bash -c " + uv run extract_psd + cd transform/sqlmesh_materia && uv run sqlmesh plan prod +" +``` + +**Available secrets in `beanflows/prod`:** +- R2 storage: `R2_ENDPOINT`, `R2_BUCKET`, `R2_ACCESS_KEY`, `R2_SECRET_KEY` +- Hetzner Cloud: `HETZNER_TOKEN`, SSH keys +- GitLab: `GITLAB_READ_TOKEN` +- Iceberg catalog credentials + +**Note:** Never hardcode secrets! Always use Pulumi ESC or environment variables. + ## Project Structure This is a uv workspace with three main components: @@ -36,13 +72,24 @@ This is a uv workspace with three main components: ### 1. Extract Layer (`extract/`) Contains extraction packages for pulling data from external sources. -- **`extract/psdonline/`**: Extracts USDA PSD commodity data from archives dating back to 2006 +- **`extract/psdonline/`**: Extracts USDA PSD commodity data - Entry point: `extract_psd` CLI command (defined in `extract/psdonline/src/psdonline/execute.py`) - - Downloads monthly zip archives to `extract/psdonline/src/psdonline/data/` + - Checks latest available monthly snapshot (tries current month and 3 months back) - Uses ETags to avoid re-downloading unchanged files + - Storage modes: + - **Local mode** (no R2 credentials): Downloads to `extract/psdonline/src/psdonline/data/{etag}.zip` + - **R2 mode** (R2 credentials present): Uploads to `s3://bucket/psd/{etag}.zip` + - Flat structure: files named by ETag for natural deduplication **Run extraction:** ```bash +extract_psd # Local mode (default) + +# R2 mode (requires env vars: R2_ENDPOINT, R2_BUCKET, R2_ACCESS_KEY, R2_SECRET_KEY) +export R2_ENDPOINT=... +export R2_BUCKET=... +export R2_ACCESS_KEY=... +export R2_SECRET_KEY=... extract_psd ``` @@ -197,9 +244,11 @@ pytest --cov=./ --cov-report=xml - Runs on every master push - Creates/updates Hetzner CPX11 supervisor instance (~€4.49/mo) - Uses Pulumi ESC (`beanflows/prod`) for all secrets -- **`deploy:supervisor`**: Checks supervisor status - - Verifies supervisor is bootstrapped - - Supervisor auto-updates via `git pull` every 15 minutes (no CI/CD deployment needed) +- **`deploy:supervisor`**: Bootstraps and monitors supervisor + - Checks if supervisor is already bootstrapped (`test -d /opt/materia/.git`) + - If not bootstrapped: Runs `infra/bootstrap_supervisor.sh` automatically + - If already bootstrapped: Verifies service status + - After bootstrap: Supervisor auto-updates via `git pull` every 15 minutes **Note:** No build artifacts! Supervisor pulls code directly from git and runs via `uv`. @@ -225,12 +274,15 @@ pytest --cov=./ --cov-report=xml - Uses systemd service for automatic restart on failure - Pulls secrets from Pulumi ESC -**Bootstrap (one-time):** -```bash -# Get supervisor IP from Pulumi -cd infra && pulumi stack output supervisor_ip -s prod +**Bootstrap:** +Bootstrapping happens automatically in CI/CD (`deploy:supervisor` stage). The pipeline: +1. Checks if supervisor is already bootstrapped +2. If not: Runs `infra/bootstrap_supervisor.sh` with secrets injected +3. If yes: Verifies systemd service status -# Run bootstrap script +Manual bootstrap (if needed): +```bash +cd infra && pulumi stack output supervisor_ip -s prod export PULUMI_ACCESS_TOKEN= ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh ``` diff --git a/extract/psdonline/pyproject.toml b/extract/psdonline/pyproject.toml index aa85bba..69a7b3e 100644 --- a/extract/psdonline/pyproject.toml +++ b/extract/psdonline/pyproject.toml @@ -9,6 +9,7 @@ authors = [ requires-python = ">=3.13" dependencies = [ + "boto3>=1.40.55", "niquests>=3.14.1", "pendulum>=3.1.0", ] diff --git a/extract/psdonline/src/psdonline/execute.py b/extract/psdonline/src/psdonline/execute.py index 0829fd4..5d7c978 100644 --- a/extract/psdonline/src/psdonline/execute.py +++ b/extract/psdonline/src/psdonline/execute.py @@ -1,9 +1,12 @@ import logging +import os import pathlib import sys from datetime import datetime +import boto3 import niquests +from botocore.exceptions import ClientError logging.basicConfig( level=logging.INFO, @@ -17,14 +20,45 @@ logger = logging.getLogger("PSDOnline Extractor") OUTPUT_DIR = pathlib.Path(__file__).parent / "data" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) logger.info(f"Output dir: {OUTPUT_DIR}") -#TODO: adapt to environment values, so this writes to s3 in prod + +# R2 configuration from environment +R2_ENDPOINT = os.getenv('R2_ENDPOINT') +R2_BUCKET = os.getenv('R2_BUCKET') +R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY') or os.getenv('R2_ADMIN_ACCESS_KEY_ID') +R2_SECRET_KEY = os.getenv('R2_SECRET_KEY') or os.getenv('R2_ADMIN_SECRET_ACCESS_KEY') + PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month:02d}/psd_alldata_csv.zip" FIRST_YEAR = 2006 FIRST_MONTH = 8 -def extract_psd_file(url:str, extract_to_path: pathlib.Path, http_session: niquests.Session): +def check_r2_file_exists(etag: str, s3_client) -> bool: + """Check if file exists in R2.""" + r2_key = f"landing/psd/{etag}.zip" + try: + s3_client.head_object(Bucket=R2_BUCKET, Key=r2_key) + logger.info(f"File {r2_key} already exists in R2, skipping") + return True + except ClientError as e: + if e.response['Error']['Code'] == '404': + return False + raise + + +def upload_to_r2(content: bytes, etag: str, s3_client): + """Upload file content to R2.""" + r2_key = f"landing/psd/{etag}.zip" + logger.info(f"Uploading to R2: {r2_key}") + s3_client.put_object(Bucket=R2_BUCKET, Key=r2_key, Body=content) + logger.info("Upload complete") + + +def extract_psd_file(url: str, extract_to_path: pathlib.Path, http_session: niquests.Session, s3_client=None): + """ + Extract PSD file either to local storage or R2. + If s3_client is provided, uploads to R2 only (no local storage). + If s3_client is None, downloads to local storage. + """ logger.info(f"Requesting file {url} ...") - extracted_etags = [file.stem for file in OUTPUT_DIR.rglob("*.zip")] response = http_session.head(url) if response.status_code == 404: @@ -33,34 +67,73 @@ def extract_psd_file(url:str, extract_to_path: pathlib.Path, http_session: nique elif response.status_code != 200: logger.error(f"Status code not ok, STATUS={response.status_code}") return + etag = response.headers.get("etag").replace('"',"").replace(":","_") - if etag in extracted_etags: - logger.info("File already extracted, skipping download.") - return - else: + + # R2 mode: check R2 and upload if needed + if s3_client: + if check_r2_file_exists(etag, s3_client): + return response = http_session.get(url) - extract_to_path = extract_to_path / f"{etag}.zip" - logger.info(f"Storing file to {extract_to_path}") - extract_to_path.parent.mkdir(parents=True, exist_ok=True) - extract_to_path.write_bytes(response.content) - logger.info("Download done.") + upload_to_r2(response.content, etag, s3_client) + return + + # Local mode: check local and download if needed + local_file = extract_to_path / f"{etag}.zip" + if local_file.exists(): + logger.info(f"File {etag}.zip already exists locally, skipping") + return + + response = http_session.get(url) + logger.info(f"Storing file to {local_file}") + extract_to_path.mkdir(parents=True, exist_ok=True) + local_file.write_bytes(response.content) + logger.info("Download complete") def extract_psd_dataset(): today = datetime.now() - years = list(range(FIRST_YEAR, today.year+1)) - for year in years: - months = list(range(1,13)) - if year == FIRST_YEAR: - months = list(range(FIRST_MONTH, 13)) - if year == years[-1]: - months = list(range(1, today.month+1)) - logger.info(f"Year {year}, extracting months: {months}") - for month in months: + + # Check if R2 credentials are configured + use_r2 = all([R2_ENDPOINT, R2_BUCKET, R2_ACCESS_KEY, R2_SECRET_KEY]) + + if use_r2: + logger.info("R2 credentials found, uploading to R2") + s3_client = boto3.client( + 's3', + endpoint_url=R2_ENDPOINT, + aws_access_key_id=R2_ACCESS_KEY, + aws_secret_access_key=R2_SECRET_KEY + ) + else: + logger.info("R2 credentials not found, downloading to local storage") + s3_client = None + + # Try current month and previous 3 months (USDA data is published with lag) + with niquests.Session() as session: + for months_back in range(4): + year = today.year + month = today.month - months_back + # Handle year rollover + while month < 1: + month += 12 + year -= 1 + url = PSD_HISTORICAL_URL.format(year=year, month=month) - target_dir = OUTPUT_DIR / f"{year}"/f"{month:02d}" - with niquests.Session() as session: - extract_psd_file(url=url, http_session=session, extract_to_path=target_dir) + logger.info(f"Trying {year}-{month:02d}...") + + # Check if URL exists + response = session.head(url) + if response.status_code == 200: + logger.info(f"Found latest data at {year}-{month:02d}") + extract_psd_file(url=url, http_session=session, extract_to_path=OUTPUT_DIR, s3_client=s3_client) + return + elif response.status_code == 404: + logger.info(f"Month {year}-{month:02d} not found, trying earlier...") + else: + logger.warning(f"Unexpected status code {response.status_code} for {year}-{month:02d}") + + logger.error("Could not find any available data in the last 4 months") if __name__ == "__main__": diff --git a/transform/sqlmesh_materia/models/raw/psd_data.sql b/transform/sqlmesh_materia/models/raw/psd_data.sql index 62efe88..8f356cd 100644 --- a/transform/sqlmesh_materia/models/raw/psd_data.sql +++ b/transform/sqlmesh_materia/models/raw/psd_data.sql @@ -21,4 +21,4 @@ MODEL ( ) ); SELECT * - FROM read_csv('zip://extract/psdonline/src/psdonline/data/**/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true) + FROM read_csv('zip://extract/psdonline/src/psdonline/data/*.zip/*.csv', header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true) diff --git a/uv.lock b/uv.lock index 255c6b6..0ca8631 100644 --- a/uv.lock +++ b/uv.lock @@ -147,6 +147,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/44/d2ef5e87509158ad2187f4dd0852df80695bb1ee0cfe0a684727b01a69e0/bcrypt-5.0.0-cp39-abi3-win_arm64.whl", hash = "sha256:f2347d3534e76bf50bca5500989d6c1d05ed64b440408057a37673282c654927", size = 144953, upload-time = "2025-09-25T19:50:37.32Z" }, ] +[[package]] +name = "boto3" +version = "1.40.55" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, + { name = "jmespath" }, + { name = "s3transfer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/50/d8/a279c054e0c9731172f05b3d118f3ffc9d74806657f84fc0c93c42d1bb5d/boto3-1.40.55.tar.gz", hash = "sha256:27e35b4fa9edd414ce06c1a748bf57cacd8203271847d93fc1053e4a4ec6e1a9", size = 111590, upload-time = "2025-10-17T19:34:56.753Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/42/8c/559c6145d857ed953536a83f3a94915bbd5d3d2d406db1abf8bf40be7645/boto3-1.40.55-py3-none-any.whl", hash = "sha256:2e30f5a0d49e107b8a5c0c487891afd300bfa410e1d918bf187ae45ac3839332", size = 139322, upload-time = "2025-10-17T19:34:55.028Z" }, +] + +[[package]] +name = "botocore" +version = "1.40.55" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "jmespath" }, + { name = "python-dateutil" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a4/92/dce4842b2e215d213d34b064fcdd13c6a782c43344e77336bcde586e9229/botocore-1.40.55.tar.gz", hash = "sha256:79b6472e2de92b3519d44fc1eec8c5feced7f99a0d10fdea6dc93133426057c1", size = 14446917, upload-time = "2025-10-17T19:34:47.44Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/21/30/f13bbc36e83b78777ff1abf50a084efcc3336b808e76560d8c5a0c9219e0/botocore-1.40.55-py3-none-any.whl", hash = "sha256:cdc38f7a4ddb30a2cd1cdd4fabde2a5a16e41b5a642292e1c30de5c4e46f5d44", size = 14116107, upload-time = "2025-10-17T19:34:44.398Z" }, +] + [[package]] name = "cattrs" version = "25.1.1" @@ -765,6 +793,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" }, ] +[[package]] +name = "jmespath" +version = "1.0.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/00/2a/e867e8531cf3e36b41201936b7fa7ba7b5702dbef42922193f05c8976cd6/jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe", size = 25843, upload-time = "2022-06-17T18:00:12.224Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, +] + [[package]] name = "json-stream" version = "2.3.3" @@ -1225,12 +1262,14 @@ name = "psdonline" version = "0.1.0" source = { editable = "extract/psdonline" } dependencies = [ + { name = "boto3" }, { name = "niquests" }, { name = "pendulum" }, ] [package.metadata] requires-dist = [ + { name = "boto3", specifier = ">=1.40.55" }, { name = "niquests", specifier = ">=3.14.1" }, { name = "pendulum", specifier = ">=3.1.0" }, ] @@ -1740,6 +1779,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/24/3c/21cf283d67af33a8e6ed242396863af195a8a6134ec581524fd22b9811b6/ruff-0.12.10-py3-none-win_arm64.whl", hash = "sha256:cc138cc06ed9d4bfa9d667a65af7172b47840e1a98b02ce7011c391e54635ffc", size = 12074225, upload-time = "2025-08-21T18:23:20.137Z" }, ] +[[package]] +name = "s3transfer" +version = "0.14.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/62/74/8d69dcb7a9efe8baa2046891735e5dfe433ad558ae23d9e3c14c633d1d58/s3transfer-0.14.0.tar.gz", hash = "sha256:eff12264e7c8b4985074ccce27a3b38a485bb7f7422cc8046fee9be4983e4125", size = 151547, upload-time = "2025-09-09T19:23:31.089Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/48/f0/ae7ca09223a81a1d890b2557186ea015f6e0502e9b8cb8e1813f1d8cfa4e/s3transfer-0.14.0-py3-none-any.whl", hash = "sha256:ea3b790c7077558ed1f02a3072fb3cb992bbbd253392f4b6e9e8976941c7d456", size = 85712, upload-time = "2025-09-09T19:23:30.041Z" }, +] + [[package]] name = "semver" version = "3.0.4"