diff --git a/extract/psdonline/src/psdonline/execute.py b/extract/psdonline/src/psdonline/execute.py new file mode 100644 index 0000000..b5e97f8 --- /dev/null +++ b/extract/psdonline/src/psdonline/execute.py @@ -0,0 +1,56 @@ +import niquests +import io +import zipfile +import pathlib +import logging +import sys +from datetime import datetime +import asyncio + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.StreamHandler(sys.stdout) + ] +) + +output_dir = pathlib.Path(__file__).parent / "data" +output_dir.mkdir(parents=True, exist_ok=True) +logging.info(f"Output dir: {output_dir}") + +PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month}/psd_alldata_csv.zip" +PSD_LATEST_URL = "https://apps.fas.usda.gov/psdonline/downloads/psd_alldata_csv.zip" +FIRST_YEAR = 2006 +FIRST_MONTH = 8 + +async def extract_psd_file(url:str, http_session: niquests.AsyncSession): + logging.info(f"Start downloading {url} ...") + latest_data = await http_session.get(url) + logging.info("Download done.") + if latest_data.status_code != 200: + logging.info(f"Status code not ok, STATUS={latest_data.status_code}") + return + latest_buf=io.BytesIO() + latest_buf.write(latest_data.content) + latest_buf.seek(0) + + logging.info("Extracting Zipfile ...") + zipfile.ZipFile(latest_buf).extract('psd_alldata.csv', output_dir) + logging.info("Extracting Zipfile done.") + +async def extract_historical_psd_dataset(): + today = datetime.now() + years = list(range(FIRST_YEAR, today.year)) + months = list(range(1,13)) + historical_data_extraction_urls = [PSD_HISTORICAL_URL.format(year=year, month=month) for year in years for month in months] + logging.info(f"Downloading {len(historical_data_extraction_urls)} urls") + async with niquests.AsyncSession() as session: + async with asyncio.TaskGroup() as tg: + for url in historical_data_extraction_urls: + tg.create_task(extract_psd_file(url, session)) + + +if __name__ == "__main__": + asyncio.run(extract_historical_psd_dataset()) diff --git a/transform/sqlmesh-duckdb/models/example/full_model.sql b/transform/sqlmesh-duckdb/models/example/full_model.sql new file mode 100644 index 0000000..362b049 --- /dev/null +++ b/transform/sqlmesh-duckdb/models/example/full_model.sql @@ -0,0 +1,15 @@ +MODEL ( + name sqlmesh_example.full_model, + kind FULL, + cron '@daily', + grain item_id, + audits (assert_positive_order_ids), +); + +SELECT + item_id, + COUNT(DISTINCT id) AS num_orders, +FROM + sqlmesh_example.incremental_model +GROUP BY item_id + \ No newline at end of file diff --git a/transform/sqlmesh-duckdb/models/example/incremental_model.sql b/transform/sqlmesh-duckdb/models/example/incremental_model.sql new file mode 100644 index 0000000..d2db527 --- /dev/null +++ b/transform/sqlmesh-duckdb/models/example/incremental_model.sql @@ -0,0 +1,19 @@ +MODEL ( + name sqlmesh_example.incremental_model, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date) +); + +SELECT + id, + item_id, + event_date, +FROM + sqlmesh_example.seed_model +WHERE + event_date BETWEEN @start_date AND @end_date + \ No newline at end of file diff --git a/transform/sqlmesh-duckdb/models/example/seed_model.sql b/transform/sqlmesh-duckdb/models/example/seed_model.sql new file mode 100644 index 0000000..192d2df --- /dev/null +++ b/transform/sqlmesh-duckdb/models/example/seed_model.sql @@ -0,0 +1,13 @@ +MODEL ( + name sqlmesh_example.seed_model, + kind SEED ( + path '../seeds/seed_data.csv' + ), + columns ( + id INTEGER, + item_id INTEGER, + event_date DATE + ), + grain (id, event_date) +); + \ No newline at end of file