import dlt from dlt.sources.rest_api import rest_api_source import datetime import os import requests from typing import List, Dict, Any import dotenv def get_api_key(): """Get API key from environment variable or prompt user if not set""" dotenv.load_dotenv(dotenv.find_dotenv()) api_key = os.environ.get("FAS_API_KEY") if not api_key: api_key = input("Please enter your FAS API key: ") os.environ["FAS_API_KEY"] = api_key return api_key def get_current_and_past_years(num_years=5): """Get current year and past years for data range""" current_year = datetime.datetime.now().year return list(range(current_year - num_years, current_year + 1)) def fetch_commodity_codes(base_url: str, api_key: str, endpoint: str) -> List: """Fetch commodity codes from the specified endpoint""" headers = {"X-Api-Key": api_key} response = requests.get(f"{base_url}{endpoint}", headers=headers) response.raise_for_status() data = response.json() # Handle different response structures if isinstance(data, list): return data elif isinstance(data, dict) and 'items' in data: return data['items'] return [] def extract_esr_commodity_codes(commodities: List[Dict[str, Any]]) -> List[int]: """Extract commodity codes from ESR commodities response""" codes = [] for commodity in commodities: if 'commodityCode' in commodity: try: codes.append(int(commodity['commodityCode'])) except (ValueError, TypeError): pass return codes def extract_psd_commodity_codes(commodities: List[Dict[str, Any]]) -> List[str]: """Extract commodity codes from PSD commodities response""" codes = [] for commodity in commodities: if 'commodityCode' in commodity: codes.append(str(commodity['commodityCode'])) return codes def run_pipeline(): """Run the USDA FAS Open Data pipeline using REST API source""" # Get necessary parameters api_key = get_api_key() years = get_current_and_past_years() base_url = "https://api.fas.usda.gov" # Fetch all commodity codes print("Fetching ESR commodity codes...") esr_commodities = fetch_commodity_codes(base_url, api_key, "/api/esr/commodities") esr_commodity_codes = extract_esr_commodity_codes(esr_commodities) print(f"Found {len(esr_commodity_codes)} ESR commodity codes") print("Fetching PSD commodity codes...") psd_commodities = fetch_commodity_codes(base_url, api_key, "/api/psd/commodities") psd_commodity_codes = extract_psd_commodity_codes(psd_commodities) print(f"Found {len(psd_commodity_codes)} PSD commodity codes") # Configure the REST API source source = rest_api_source({ "client": { "base_url": base_url, "auth": { "type": "api_key", "api_key": api_key, "name": "X-Api-Key", "location": "header" } }, "resource_defaults": { "write_disposition": "replace", }, "resources": [ # Reference data resources - ESR, GATS, PSD {"name": "esr_regions", "endpoint": {"path": "/api/esr/regions"}}, {"name": "esr_countries", "endpoint": {"path": "/api/esr/countries"}}, {"name": "esr_commodities", "endpoint": {"path": "/api/esr/commodities"}}, {"name": "esr_units_of_measure", "endpoint": {"path": "/api/esr/unitsOfMeasure"}}, {"name": "esr_data_release_dates", "endpoint": {"path": "/api/esr/datareleasedates"}}, {"name": "gats_regions", "endpoint": {"path": "/api/gats/regions"}}, {"name": "gats_countries", "endpoint": {"path": "/api/gats/countries"}}, {"name": "gats_commodities", "endpoint": {"path": "/api/gats/commodities"}}, {"name": "gats_hs6_commodities", "endpoint": {"path": "/api/gats/HS6Commodities"}}, {"name": "gats_units_of_measure", "endpoint": {"path": "/api/gats/unitsOfMeasure"}}, {"name": "gats_customs_districts", "endpoint": {"path": "/api/gats/customsDistricts"}}, {"name": "gats_census_export_data_release_dates", "endpoint": {"path": "/api/gats/census/data/exports/dataReleaseDates"}}, {"name": "gats_census_import_data_release_dates", "endpoint": {"path": "/api/gats/census/data/imports/dataReleaseDates"}}, {"name": "gats_untrade_export_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/exports/dataReleaseDates"}}, {"name": "gats_untrade_import_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/imports/dataReleaseDates"}}, {"name": "psd_regions", "endpoint": {"path": "/api/psd/regions"}}, {"name": "psd_countries", "endpoint": {"path": "/api/psd/countries"}}, {"name": "psd_commodities", "endpoint": {"path": "/api/psd/commodities"}}, {"name": "psd_units_of_measure", "endpoint": {"path": "/api/psd/unitsOfMeasure"}}, {"name": "psd_commodity_attributes", "endpoint": {"path": "/api/psd/commodityAttributes"}}, # Dynamic ESR exports resources for each commodity and year #*[ # { # "name": f"esr_exports_commodity_{comm_code}_year_{year}", # "endpoint": { # "path": f"/api/esr/exports/commodityCode/{comm_code}/allCountries/marketYear/{year}", # "response_actions": [ # {"status_code": 404, "action": "ignore"} # ] # }, # "primary_key": ["commodityCode", "countryCode", "weekEnding"], # "write_disposition": "append" # } # for comm_code in esr_commodity_codes # for year in years #], # Sample GATS census imports for major trading partners (for all recent years) # You can expand to more countries as needed #*[ # { # "name": f"gats_census_imports_{country_code}_{year}_{month:02d}", # "endpoint": { # "path": f"/api/gats/censusImports/partnerCode/{country_code}/year/{year}/month/{month:02d}", # "response_actions": [ # {"status_code": 404, "action": "ignore"} # ] # }, # "write_disposition": "append" # } # for country_code in ['CH', 'CA', 'MX', 'BR', 'EU'] # Major trading partners # for year in years # for month in range(1, 13) # if not (year == datetime.datetime.now().year and month > datetime.datetime.now().month) #], # Dynamic PSD commodity data for all commodities and years #*[ # { # "name": f"psd_commodity_{comm_code}_year_{year}", # "endpoint": { # "path": f"/api/psd/commodity/{comm_code}/country/all/year/{year}", # "response_actions": [ # {"status_code": 404, "action": "ignore"} # ] # }, # "write_disposition": "append" # } # for comm_code in psd_commodity_codes # for year in years #], # World-level data for all commodities #*[ # { # "name": f"psd_world_commodity_{comm_code}_year_{year}", # "endpoint": { # "path": f"/api/psd/commodity/{comm_code}/world/year/{year}", # "response_actions": [ # {"status_code": 404, "action": "ignore"} # ] # }, # "write_disposition": "append" # } # for comm_code in psd_commodity_codes # for year in years #], ] }) # Initialize and run the pipeline pipeline = dlt.pipeline( pipeline_name="usda_fas_open_data", destination="filesystem", # Change to your preferred destination dataset_name="usda_fas_data", ) print("Starting pipeline run. This may take some time due to the large number of resources...") load_info = pipeline.run(source) print(f"USDA FAS Open Data pipeline complete: {load_info}") if __name__ == "__main__": run_pipeline()