diff --git a/extract/psdonline/src/psdonline/execute.py b/extract/psdonline/src/psdonline/execute.py index 8a3ee5a..65d7867 100644 --- a/extract/psdonline/src/psdonline/execute.py +++ b/extract/psdonline/src/psdonline/execute.py @@ -4,6 +4,7 @@ import os import pathlib import sys from datetime import datetime +from io import BytesIO import niquests @@ -50,8 +51,8 @@ def extract_psd_file(url: str, year: int, month: int, http_session: niquests.Ses response = http_session.get(url, timeout=HTTP_TIMEOUT_SECONDS) logger.info(f"Storing file to {local_file}") extract_to_path.mkdir(parents=True, exist_ok=True) - normalized_content = normalize_zipped_csv(response.content) - local_file.write_bytes(normalized_content) + normalized_content = normalize_zipped_csv(BytesIO(response.content)) + local_file.write_bytes(normalized_content.read()) assert local_file.exists(), f"File was not written: {local_file}" logger.info("Download complete") diff --git a/transform/sqlmesh_materia/macros/__init__.py b/transform/sqlmesh_materia/macros/__init__.py index e69de29..48e55a2 100644 --- a/transform/sqlmesh_materia/macros/__init__.py +++ b/transform/sqlmesh_materia/macros/__init__.py @@ -0,0 +1,10 @@ +import os + +from sqlmesh import macro + + +@macro() +def psd_glob(evaluator) -> str: + """Return a quoted glob path for all PSD CSV gzip files under LANDING_DIR.""" + landing_dir = evaluator.var("LANDING_DIR") or os.environ.get("LANDING_DIR", "data/landing") + return f"'{landing_dir}/psd/**/*.csv.gzip'" diff --git a/transform/sqlmesh_materia/models/raw/psd_data.sql b/transform/sqlmesh_materia/models/raw/psd_data.sql index 992e162..db69cfc 100644 --- a/transform/sqlmesh_materia/models/raw/psd_data.sql +++ b/transform/sqlmesh_materia/models/raw/psd_data.sql @@ -21,4 +21,4 @@ MODEL ( ) ); select * -FROM read_csv('{{ var("LANDING_DIR") }}/psd/**/*.csv.gzip', delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true) +FROM read_csv(@psd_glob(), delim=',', encoding='utf-8', compression='gzip', max_line_size=10000000, header=true, union_by_name=true, filename=true, names = ['commodity_code', 'commodity_description', 'country_code', 'country_name', 'market_year', 'calendar_year', 'month', 'attribute_id', 'attribute_description', 'unit_id', 'unit_description', 'value'], all_varchar=true)