Fix extract and SQLMesh pipeline to build DuckDB lakehouse
extract: wrap response.content in BytesIO before passing to
normalize_zipped_csv, and call .read() on the returned BytesIO before
write_bytes (two bugs: wrong type in, wrong type out)
sqlmesh: {{ var() }} inside SQL string literals is not substituted by
SQLMesh's Jinja (SQL parser treats them as opaque strings). Replace with
a @psd_glob() macro that evaluates LANDING_DIR at render time and returns
a quoted glob path string.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,7 @@ import os
|
|||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
import niquests
|
import niquests
|
||||||
|
|
||||||
@@ -50,8 +51,8 @@ def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Ses
|
|||||||
response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
|
response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS)
|
||||||
logger.info(f"Storing file to {local_file}")
|
logger.info(f"Storing file to {local_file}")
|
||||||
extract_to_path.mkdir(parents=True, exist_ok=True)
|
extract_to_path.mkdir(parents=True, exist_ok=True)
|
||||||
normalized_content = normalize_zipped_csv(response.content)
|
normalized_content = normalize_zipped_csv(BytesIO(response.content))
|
||||||
local_file.write_bytes(normalized_content)
|
local_file.write_bytes(normalized_content.read())
|
||||||
assert local_file.exists(), f"File was not written: {local_file}"
|
assert local_file.exists(), f"File was not written: {local_file}"
|
||||||
logger.info("Download complete")
|
logger.info("Download complete")
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,10 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from sqlmesh import macro
|
||||||
|
|
||||||
|
|
||||||
|
@macro()
|
||||||
|
def psd_glob(evaluator) -> str:
|
||||||
|
"""Return a quoted glob path for all PSD CSV gzip files under LANDING_DIR."""
|
||||||
|
landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing")
|
||||||
|
return f"'{landing_dir}/psd/**/*.csv.gzip'"
|
||||||
|
|||||||
@@ -21,4 +21,4 @@ MODEL (
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
select *
|
select *
|
||||||
FROM read_csv('{{ var("LANDING_DIR") }}/psd/**/*.csv.gzip', delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)
|
FROM read_csv(@psd_glob(), delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)
|
||||||
|
|||||||
Reference in New Issue
Block a user