Remove distributed R2/Iceberg/SSH pipeline architecture in favor of
local subprocess execution with NVMe storage. Landing data backed up
to R2 via rclone timer.
- Strip Iceberg catalog, httpfs, boto3, paramiko, prefect, pyarrow
- Pipelines run via subprocess.run() with bounded timeouts
- Extract writes to {LANDING_DIR}/psd/{year}/{month}/{etag}.csv.gzip
- SQLMesh reads LANDING_DIR variable, writes to DUCKDB_PATH
- Delete unused provider stubs (ovh, scaleway, oracle)
- Add rclone systemd timer for R2 backup every 6h
- Update supervisor to run pipelines with env vars
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
25 lines
1.0 KiB
SQL
25 lines
1.0 KiB
SQL
MODEL (
|
|
name raw.psd_alldata,
|
|
kind FULL,
|
|
grain ( commodity_code, country_code, market_year, calendar_year, month, attribute_id,unit_id ),
|
|
start '2006-08-01',
|
|
cron '@daily',
|
|
columns (
|
|
commodity_code varchar,
|
|
commodity_description varchar,
|
|
country_code varchar,
|
|
country_name varchar,
|
|
market_year varchar,
|
|
calendar_year varchar,
|
|
month varchar,
|
|
attribute_id varchar,
|
|
attribute_description varchar,
|
|
unit_id varchar,
|
|
unit_description varchar,
|
|
value varchar,
|
|
filename varchar
|
|
)
|
|
);
|
|
select *
|
|
FROM read_csv('{{ var("LANDING_DIR") }}/psd/**/*.csv.gzip', delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)
|