finish historical extraction

This commit is contained in:
Deeman
2025-07-13 23:20:50 +02:00
parent 70bd8a52db
commit b8ad73202c
19 changed files with 62 additions and 189 deletions

View File

@@ -1,11 +1,8 @@
import niquests
import io
import zipfile
import pathlib
import logging
import sys
from datetime import datetime
import asyncio
logging.basicConfig(
level=logging.INFO,
@@ -15,42 +12,45 @@ logging.basicConfig(
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger("PSD Extraction")
output_dir = pathlib.Path(__file__).parent / "data"
output_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Output dir: {output_dir}")
PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month}/psd_alldata_csv.zip"
logger.info(f"Output dir: {output_dir}")
#TODO: adapt to environment values, so this writes to s3 in prod
PSD_HISTORICAL_URL = "https://apps.fas.usda.gov/psdonline/downloads/archives/{year}/{month:02d}/psd_alldata_csv.zip"
PSD_LATEST_URL = "https://apps.fas.usda.gov/psdonline/downloads/psd_alldata_csv.zip"
FIRST_YEAR = 2006
FIRST_MONTH = 8
async def extract_psd_file(url:str, http_session: niquests.AsyncSession):
logging.info(f"Start downloading {url} ...")
latest_data = await http_session.get(url)
logging.info("Download done.")
if latest_data.status_code != 200:
logging.info(f"Status code not ok, STATUS={latest_data.status_code}")
def extract_psd_file(url:str, extract_to_path: pathlib.Path, http_session: niquests.Session):
logger.info(f"Start downloading {url} ...")
response = http_session.get(url)
if response.status_code != 200:
logger.error(f"Status code not ok, STATUS={response.status_code}")
return
latest_buf=io.BytesIO()
latest_buf.write(latest_data.content)
latest_buf.seek(0)
logger.info(f"Storing file to {extract_to_path}")
extract_to_path.parent.mkdir(parents=True, exist_ok=True)
extract_to_path.write_bytes(response.content)
logger.info("Download done.")
logging.info("Extracting Zipfile ...")
zipfile.ZipFile(latest_buf).extract('psd_alldata.csv', output_dir)
logging.info("Extracting Zipfile done.")
async def extract_historical_psd_dataset():
def extract_historical_psd_dataset():
today = datetime.now()
years = list(range(FIRST_YEAR, today.year))
months = list(range(1,13))
historical_data_extraction_urls = [PSD_HISTORICAL_URL.format(year=year, month=month) for year in years for month in months]
logging.info(f"Downloading {len(historical_data_extraction_urls)} urls")
async with niquests.AsyncSession() as session:
async with asyncio.TaskGroup() as tg:
for url in historical_data_extraction_urls:
tg.create_task(extract_psd_file(url, session))
logger.info(f"Downloading {len(years) * len(months)} urls")
for year in years:
for month in months:
url = PSD_HISTORICAL_URL.format(year=year, month=month)
target_path = output_dir / f"{year}"/f"{month:02d}" / "psd_alldata_csv.zip"
with niquests.Session() as session:
logger.info(f"Downloading psd_alldata_csv.zip for {year}/{month:02d}")
try:
extract_psd_file(url=url, http_session=session, extract_to_path=target_path)
except Exception as e:
logger.error("Error trying to download file. Likely the file does not exist", e)
if __name__ == "__main__":
asyncio.run(extract_historical_psd_dataset())
extract_historical_psd_dataset()