This commit is contained in:
Deeman
2025-07-08 21:06:06 +02:00
parent 5368c1e521
commit 0ef57f3e06
3 changed files with 0 additions and 204 deletions

View File

@@ -1,7 +0,0 @@
[destination.filesystem]
bucket_url = "file:///home/deeman/data/wasde" # three / for an absolute path
kwargs = '{"auto_mkdir": true}'
[runtime]
log_level="DEBUG"
log_format="JSON"

View File

@@ -1 +0,0 @@
n1cDatmPp2b18qqKOgDKVndjdrUk4oUgB9HALoVA

View File

@@ -1,196 +0,0 @@
import dlt
from dlt.sources.rest_api import rest_api_source
import datetime
import os
import requests
from typing import List, Dict, Any
import dotenv
def get_api_key():
"""Get API key from environment variable or prompt user if not set"""
dotenv.load_dotenv(dotenv.find_dotenv())
api_key = os.environ.get("FAS_API_KEY")
if not api_key:
api_key = input("Please enter your FAS API key: ")
os.environ["FAS_API_KEY"] = api_key
return api_key
def get_current_and_past_years(num_years=5):
"""Get current year and past years for data range"""
current_year = datetime.datetime.now().year
return list(range(current_year - num_years, current_year + 1))
def fetch_commodity_codes(base_url: str, api_key: str, endpoint: str) -> List:
"""Fetch commodity codes from the specified endpoint"""
headers = {"X-Api-Key": api_key}
response = requests.get(f"{base_url}{endpoint}", headers=headers)
response.raise_for_status()
data = response.json()
# Handle different response structures
if isinstance(data, list):
return data
elif isinstance(data, dict) and 'items' in data:
return data['items']
return []
def extract_esr_commodity_codes(commodities: List[Dict[str, Any]]) -> List[int]:
"""Extract commodity codes from ESR commodities response"""
codes = []
for commodity in commodities:
if 'commodityCode' in commodity:
try:
codes.append(int(commodity['commodityCode']))
except (ValueError, TypeError):
pass
return codes
def extract_psd_commodity_codes(commodities: List[Dict[str, Any]]) -> List[str]:
"""Extract commodity codes from PSD commodities response"""
codes = []
for commodity in commodities:
if 'commodityCode' in commodity:
codes.append(str(commodity['commodityCode']))
return codes
def run_pipeline():
"""Run the USDA FAS Open Data pipeline using REST API source"""
# Get necessary parameters
api_key = get_api_key()
years = get_current_and_past_years()
base_url = "https://api.fas.usda.gov"
# Fetch all commodity codes
print("Fetching ESR commodity codes...")
esr_commodities = fetch_commodity_codes(base_url, api_key, "/api/esr/commodities")
esr_commodity_codes = extract_esr_commodity_codes(esr_commodities)
print(f"Found {len(esr_commodity_codes)} ESR commodity codes")
print("Fetching PSD commodity codes...")
psd_commodities = fetch_commodity_codes(base_url, api_key, "/api/psd/commodities")
psd_commodity_codes = extract_psd_commodity_codes(psd_commodities)
print(f"Found {len(psd_commodity_codes)} PSD commodity codes")
# Configure the REST API source
source = rest_api_source({
"client": {
"base_url": base_url,
"auth": {
"type": "api_key",
"api_key": api_key,
"name": "X-Api-Key",
"location": "header"
}
},
"resource_defaults": {
"write_disposition": "replace",
},
"resources": [
# Reference data resources - ESR, GATS, PSD
{"name": "esr_regions", "endpoint": {"path": "/api/esr/regions"}},
{"name": "esr_countries", "endpoint": {"path": "/api/esr/countries"}},
{"name": "esr_commodities", "endpoint": {"path": "/api/esr/commodities"}},
{"name": "esr_units_of_measure", "endpoint": {"path": "/api/esr/unitsOfMeasure"}},
{"name": "esr_data_release_dates", "endpoint": {"path": "/api/esr/datareleasedates"}},
{"name": "gats_regions", "endpoint": {"path": "/api/gats/regions"}},
{"name": "gats_countries", "endpoint": {"path": "/api/gats/countries"}},
{"name": "gats_commodities", "endpoint": {"path": "/api/gats/commodities"}},
{"name": "gats_hs6_commodities", "endpoint": {"path": "/api/gats/HS6Commodities"}},
{"name": "gats_units_of_measure", "endpoint": {"path": "/api/gats/unitsOfMeasure"}},
{"name": "gats_customs_districts", "endpoint": {"path": "/api/gats/customsDistricts"}},
{"name": "gats_census_export_data_release_dates", "endpoint": {"path": "/api/gats/census/data/exports/dataReleaseDates"}},
{"name": "gats_census_import_data_release_dates", "endpoint": {"path": "/api/gats/census/data/imports/dataReleaseDates"}},
{"name": "gats_untrade_export_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/exports/dataReleaseDates"}},
{"name": "gats_untrade_import_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/imports/dataReleaseDates"}},
{"name": "psd_regions", "endpoint": {"path": "/api/psd/regions"}},
{"name": "psd_countries", "endpoint": {"path": "/api/psd/countries"}},
{"name": "psd_commodities", "endpoint": {"path": "/api/psd/commodities"}},
{"name": "psd_units_of_measure", "endpoint": {"path": "/api/psd/unitsOfMeasure"}},
{"name": "psd_commodity_attributes", "endpoint": {"path": "/api/psd/commodityAttributes"}},
# Dynamic ESR exports resources for each commodity and year
*[
{
"name": f"esr_exports_commodity_{comm_code}_year_{year}",
"endpoint": {
"path": f"/api/esr/exports/commodityCode/{comm_code}/allCountries/marketYear/{year}",
"response_actions": [
{"status_code": 404, "action": "ignore"}
]
},
"primary_key": ["commodityCode", "countryCode", "weekEnding"],
"write_disposition": "append"
}
for comm_code in esr_commodity_codes
for year in years
],
# Sample GATS census imports for major trading partners (for all recent years)
# You can expand to more countries as needed
*[
{
"name": f"gats_census_imports_{country_code}_{year}_{month:02d}",
"endpoint": {
"path": f"/api/gats/censusImports/partnerCode/{country_code}/year/{year}/month/{month:02d}",
"response_actions": [
{"status_code": 404, "action": "ignore"}
]
},
"write_disposition": "append"
}
for country_code in ['CH', 'CA', 'MX', 'BR', 'EU'] # Major trading partners
for year in years
for month in range(1, 13)
if not (year == datetime.datetime.now().year and month > datetime.datetime.now().month)
],
# Dynamic PSD commodity data for all commodities and years
*[
{
"name": f"psd_commodity_{comm_code}_year_{year}",
"endpoint": {
"path": f"/api/psd/commodity/{comm_code}/country/all/year/{year}",
"response_actions": [
{"status_code": 404, "action": "ignore"}
]
},
"write_disposition": "append"
}
for comm_code in psd_commodity_codes
for year in years
],
# World-level data for all commodities
*[
{
"name": f"psd_world_commodity_{comm_code}_year_{year}",
"endpoint": {
"path": f"/api/psd/commodity/{comm_code}/world/year/{year}",
"response_actions": [
{"status_code": 404, "action": "ignore"}
]
},
"write_disposition": "append"
}
for comm_code in psd_commodity_codes
for year in years
],
]
})
# Initialize and run the pipeline
pipeline = dlt.pipeline(
pipeline_name="usda_fas_open_data",
destination="filesystem", # Change to your preferred destination
dataset_name="usda_fas_data",
)
print("Starting pipeline run. This may take some time due to the large number of resources...")
load_info = pipeline.run(source)
print(f"USDA FAS Open Data pipeline complete: {load_info}")
if __name__ == "__main__":
run_pipeline()