async is requesting stuff too fast
This commit is contained in:
56
extract/psdonline/src/psdonline/execute.py
Normal file
56
extract/psdonline/src/psdonline/execute.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
import niquests
|
||||||
|
import io
|
||||||
|
import zipfile
|
||||||
|
import pathlib
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||||
|
datefmt='%Y-%m-%d %H:%M:%S',
|
||||||
|
handlers=[
|
||||||
|
logging.StreamHandler(sys.stdout)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
output_dir = pathlib.Path(__file__).parent / "data"
|
||||||
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
logging.info(f"Output dir: {output_dir}")
|
||||||
|
|
||||||
|
PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month}/psd_alldata_csv.zip"
|
||||||
|
PSD_LATEST_URL = "https://apps.fas.usda.gov/psdonline/downloads/psd_alldata_csv.zip"
|
||||||
|
FIRST_YEAR = 2006
|
||||||
|
FIRST_MONTH = 8
|
||||||
|
|
||||||
|
async def extract_psd_file(url:str, http_session: niquests.AsyncSession):
|
||||||
|
logging.info(f"Start downloading {url} ...")
|
||||||
|
latest_data = await http_session.get(url)
|
||||||
|
logging.info("Download done.")
|
||||||
|
if latest_data.status_code != 200:
|
||||||
|
logging.info(f"Status code not ok, STATUS={latest_data.status_code}")
|
||||||
|
return
|
||||||
|
latest_buf=io.BytesIO()
|
||||||
|
latest_buf.write(latest_data.content)
|
||||||
|
latest_buf.seek(0)
|
||||||
|
|
||||||
|
logging.info("Extracting Zipfile ...")
|
||||||
|
zipfile.ZipFile(latest_buf).extract('psd_alldata.csv', output_dir)
|
||||||
|
logging.info("Extracting Zipfile done.")
|
||||||
|
|
||||||
|
async def extract_historical_psd_dataset():
|
||||||
|
today = datetime.now()
|
||||||
|
years = list(range(FIRST_YEAR, today.year))
|
||||||
|
months = list(range(1,13))
|
||||||
|
historical_data_extraction_urls = [PSD_HISTORICAL_URL.format(year=year, month=month) for year in years for month in months]
|
||||||
|
logging.info(f"Downloading {len(historical_data_extraction_urls)} urls")
|
||||||
|
async with niquests.AsyncSession() as session:
|
||||||
|
async with asyncio.TaskGroup() as tg:
|
||||||
|
for url in historical_data_extraction_urls:
|
||||||
|
tg.create_task(extract_psd_file(url, session))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(extract_historical_psd_dataset())
|
||||||
15
transform/sqlmesh-duckdb/models/example/full_model.sql
Normal file
15
transform/sqlmesh-duckdb/models/example/full_model.sql
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
MODEL (
|
||||||
|
name sqlmesh_example.full_model,
|
||||||
|
kind FULL,
|
||||||
|
cron '@daily',
|
||||||
|
grain item_id,
|
||||||
|
audits (assert_positive_order_ids),
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
item_id,
|
||||||
|
COUNT(DISTINCT id) AS num_orders,
|
||||||
|
FROM
|
||||||
|
sqlmesh_example.incremental_model
|
||||||
|
GROUP BY item_id
|
||||||
|
|
||||||
@@ -0,0 +1,19 @@
|
|||||||
|
MODEL (
|
||||||
|
name sqlmesh_example.incremental_model,
|
||||||
|
kind INCREMENTAL_BY_TIME_RANGE (
|
||||||
|
time_column event_date
|
||||||
|
),
|
||||||
|
start '2020-01-01',
|
||||||
|
cron '@daily',
|
||||||
|
grain (id, event_date)
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
id,
|
||||||
|
item_id,
|
||||||
|
event_date,
|
||||||
|
FROM
|
||||||
|
sqlmesh_example.seed_model
|
||||||
|
WHERE
|
||||||
|
event_date BETWEEN @start_date AND @end_date
|
||||||
|
|
||||||
13
transform/sqlmesh-duckdb/models/example/seed_model.sql
Normal file
13
transform/sqlmesh-duckdb/models/example/seed_model.sql
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
MODEL (
|
||||||
|
name sqlmesh_example.seed_model,
|
||||||
|
kind SEED (
|
||||||
|
path '../seeds/seed_data.csv'
|
||||||
|
),
|
||||||
|
columns (
|
||||||
|
id INTEGER,
|
||||||
|
item_id INTEGER,
|
||||||
|
event_date DATE
|
||||||
|
),
|
||||||
|
grain (id, event_date)
|
||||||
|
);
|
||||||
|
|
||||||
Reference in New Issue
Block a user