Refactor PSD extraction: simplify to latest-only + add R2 support
## Key Changes
1. **Simplified extraction logic**
- Changed from downloading 220+ historical archives to checking only latest available month
- Tries current month and falls back up to 3 months (handles USDA publication lag)
- Architecture advisor insight: ETags naturally deduplicate, historical year/month structure was unnecessary
2. **Flat storage structure**
- Old: `data/{year}/{month}/{etag}.zip`
- New: `data/{etag}.zip` (local) or `psd/{etag}.zip` (R2)
- Migrated 226 existing files to flat structure
3. **Dual storage modes**
- **Local mode**: Downloads to local directory (development)
- **R2 mode**: Uploads to Cloudflare R2 (production)
- Mode determined by presence of R2 environment variables
- Added boto3 dependency for S3-compatible R2 API
4. **Updated raw SQLMesh model**
- Changed pattern from `**/*.zip` to `*.zip` to match flat structure
## Benefits
- Simpler: Single file check instead of 220+ URL attempts
- Efficient: ETag-based deduplication works naturally
- Flexible: Supports both local dev and production R2 storage
- Maintainable: Removed unnecessary complexity
## Testing
- ✅ Local extraction works and respects ETags
- ✅ Falls back correctly when current month unavailable
- ✅ Linting passes
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ authors = [
|
||||
requires-python = ">=3.13"
|
||||
|
||||
dependencies = [
|
||||
"boto3>=1.40.55",
|
||||
"niquests>=3.14.1",
|
||||
"pendulum>=3.1.0",
|
||||
]
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import boto3
|
||||
import niquests
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@@ -17,14 +20,45 @@ logger = logging.getLogger("PSDOnline Extractor")
|
||||
OUTPUT_DIR = pathlib.Path(__file__).parent / "data"
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
logger.info(f"Output dir: {OUTPUT_DIR}")
|
||||
#TODO: adapt to environment values, so this writes to s3 in prod
|
||||
|
||||
# R2 configuration from environment
|
||||
R2_ENDPOINT = os.getenv('R2_ENDPOINT')
|
||||
R2_BUCKET = os.getenv('R2_BUCKET')
|
||||
R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY')
|
||||
R2_SECRET_KEY = os.getenv('R2_SECRET_KEY')
|
||||
|
||||
PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month:02d}/psd_alldata_csv.zip"
|
||||
FIRST_YEAR = 2006
|
||||
FIRST_MONTH = 8
|
||||
|
||||
def extract_psd_file(url:str, extract_to_path: pathlib.Path, http_session: niquests.Session):
|
||||
def check_r2_file_exists(etag: str, s3_client) -> bool:
|
||||
"""Check if file exists in R2."""
|
||||
r2_key = f"psd/{etag}.zip"
|
||||
try:
|
||||
s3_client.head_object(Bucket=R2_BUCKET, Key=r2_key)
|
||||
logger.info(f"File {r2_key} already exists in R2, skipping")
|
||||
return True
|
||||
except ClientError as e:
|
||||
if e.response['Error']['Code'] == '404':
|
||||
return False
|
||||
raise
|
||||
|
||||
|
||||
def upload_to_r2(content: bytes, etag: str, s3_client):
|
||||
"""Upload file content to R2."""
|
||||
r2_key = f"psd/{etag}.zip"
|
||||
logger.info(f"Uploading to R2: {r2_key}")
|
||||
s3_client.put_object(Bucket=R2_BUCKET, Key=r2_key, Body=content)
|
||||
logger.info("Upload complete")
|
||||
|
||||
|
||||
def extract_psd_file(url: str, extract_to_path: pathlib.Path, http_session: niquests.Session, s3_client=None):
|
||||
"""
|
||||
Extract PSD file either to local storage or R2.
|
||||
If s3_client is provided, uploads to R2 only (no local storage).
|
||||
If s3_client is None, downloads to local storage.
|
||||
"""
|
||||
logger.info(f"Requesting file {url} ...")
|
||||
extracted_etags = [file.stem for file in OUTPUT_DIR.rglob("*.zip")]
|
||||
|
||||
response = http_session.head(url)
|
||||
if response.status_code == 404:
|
||||
@@ -33,34 +67,73 @@ def extract_psd_file(url:str, extract_to_path: pathlib.Path, http_session: nique
|
||||
elif response.status_code != 200:
|
||||
logger.error(f"Status code not ok, STATUS={response.status_code}")
|
||||
return
|
||||
|
||||
etag = response.headers.get("etag").replace('"',"").replace(":","_")
|
||||
if etag in extracted_etags:
|
||||
logger.info("File already extracted, skipping download.")
|
||||
return
|
||||
else:
|
||||
|
||||
# R2 mode: check R2 and upload if needed
|
||||
if s3_client:
|
||||
if check_r2_file_exists(etag, s3_client):
|
||||
return
|
||||
response = http_session.get(url)
|
||||
extract_to_path = extract_to_path / f"{etag}.zip"
|
||||
logger.info(f"Storing file to {extract_to_path}")
|
||||
extract_to_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
extract_to_path.write_bytes(response.content)
|
||||
logger.info("Download done.")
|
||||
upload_to_r2(response.content, etag, s3_client)
|
||||
return
|
||||
|
||||
# Local mode: check local and download if needed
|
||||
local_file = extract_to_path / f"{etag}.zip"
|
||||
if local_file.exists():
|
||||
logger.info(f"File {etag}.zip already exists locally, skipping")
|
||||
return
|
||||
|
||||
response = http_session.get(url)
|
||||
logger.info(f"Storing file to {local_file}")
|
||||
extract_to_path.mkdir(parents=True, exist_ok=True)
|
||||
local_file.write_bytes(response.content)
|
||||
logger.info("Download complete")
|
||||
|
||||
|
||||
def extract_psd_dataset():
|
||||
today = datetime.now()
|
||||
years = list(range(FIRST_YEAR, today.year+1))
|
||||
for year in years:
|
||||
months = list(range(1,13))
|
||||
if year == FIRST_YEAR:
|
||||
months = list(range(FIRST_MONTH, 13))
|
||||
if year == years[-1]:
|
||||
months = list(range(1, today.month+1))
|
||||
logger.info(f"Year {year}, extracting months: {months}")
|
||||
for month in months:
|
||||
|
||||
# Check if R2 credentials are configured
|
||||
use_r2 = all([R2_ENDPOINT, R2_BUCKET, R2_ACCESS_KEY, R2_SECRET_KEY])
|
||||
|
||||
if use_r2:
|
||||
logger.info("R2 credentials found, uploading to R2")
|
||||
s3_client = boto3.client(
|
||||
's3',
|
||||
endpoint_url=R2_ENDPOINT,
|
||||
aws_access_key_id=R2_ACCESS_KEY,
|
||||
aws_secret_access_key=R2_SECRET_KEY
|
||||
)
|
||||
else:
|
||||
logger.info("R2 credentials not found, downloading to local storage")
|
||||
s3_client = None
|
||||
|
||||
# Try current month and previous 3 months (USDA data is published with lag)
|
||||
with niquests.Session() as session:
|
||||
for months_back in range(4):
|
||||
year = today.year
|
||||
month = today.month - months_back
|
||||
# Handle year rollover
|
||||
while month < 1:
|
||||
month += 12
|
||||
year -= 1
|
||||
|
||||
url = PSD_HISTORICAL_URL.format(year=year, month=month)
|
||||
target_dir = OUTPUT_DIR / f"{year}"/f"{month:02d}"
|
||||
with niquests.Session() as session:
|
||||
extract_psd_file(url=url, http_session=session, extract_to_path=target_dir)
|
||||
logger.info(f"Trying {year}-{month:02d}...")
|
||||
|
||||
# Check if URL exists
|
||||
response = session.head(url)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Found latest data at {year}-{month:02d}")
|
||||
extract_psd_file(url=url, http_session=session, extract_to_path=OUTPUT_DIR, s3_client=s3_client)
|
||||
return
|
||||
elif response.status_code == 404:
|
||||
logger.info(f"Month {year}-{month:02d} not found, trying earlier...")
|
||||
else:
|
||||
logger.warning(f"Unexpected status code {response.status_code} for {year}-{month:02d}")
|
||||
|
||||
logger.error("Could not find any available data in the last 4 months")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user