diff --git a/extract/.dlt/config.toml b/extract/.dlt/config.toml index 3372bd5..e18ed51 100644 --- a/extract/.dlt/config.toml +++ b/extract/.dlt/config.toml @@ -1,5 +1,6 @@ [destination.filesystem] -bucket_url = "file:///~/data/wasde" # three / for an absolute path +bucket_url = "file:///home/deeman/data/wasde" # three / for an absolute path +kwargs = '{"auto_mkdir": true}' [runtime] log_level="DEBUG" diff --git a/extract/dlt_script.py b/extract/dlt_script.py index e69de29..3cb5d4c 100644 --- a/extract/dlt_script.py +++ b/extract/dlt_script.py @@ -0,0 +1,196 @@ +import dlt +from dlt.sources.rest_api import rest_api_source +import datetime +import os +import requests +from typing import List, Dict, Any +import dotenv + +def get_api_key(): + """Get API key from environment variable or prompt user if not set""" + dotenv.load_dotenv(dotenv.find_dotenv()) + api_key = os.environ.get("FAS_API_KEY") + if not api_key: + api_key = input("Please enter your FAS API key: ") + os.environ["FAS_API_KEY"] = api_key + return api_key + +def get_current_and_past_years(num_years=5): + """Get current year and past years for data range""" + current_year = datetime.datetime.now().year + return list(range(current_year - num_years, current_year + 1)) + +def fetch_commodity_codes(base_url: str, api_key: str, endpoint: str) -> List: + """Fetch commodity codes from the specified endpoint""" + headers = {"X-Api-Key": api_key} + response = requests.get(f"{base_url}{endpoint}", headers=headers) + response.raise_for_status() + data = response.json() + + # Handle different response structures + if isinstance(data, list): + return data + elif isinstance(data, dict) and 'items' in data: + return data['items'] + return [] + +def extract_esr_commodity_codes(commodities: List[Dict[str, Any]]) -> List[int]: + """Extract commodity codes from ESR commodities response""" + codes = [] + for commodity in commodities: + if 'commodityCode' in commodity: + try: + codes.append(int(commodity['commodityCode'])) + except (ValueError, TypeError): + pass + return codes + +def extract_psd_commodity_codes(commodities: List[Dict[str, Any]]) -> List[str]: + """Extract commodity codes from PSD commodities response""" + codes = [] + for commodity in commodities: + if 'commodityCode' in commodity: + codes.append(str(commodity['commodityCode'])) + return codes + +def run_pipeline(): + """Run the USDA FAS Open Data pipeline using REST API source""" + + # Get necessary parameters + api_key = get_api_key() + years = get_current_and_past_years() + base_url = "https://api.fas.usda.gov" + + # Fetch all commodity codes + print("Fetching ESR commodity codes...") + esr_commodities = fetch_commodity_codes(base_url, api_key, "/api/esr/commodities") + esr_commodity_codes = extract_esr_commodity_codes(esr_commodities) + print(f"Found {len(esr_commodity_codes)} ESR commodity codes") + + print("Fetching PSD commodity codes...") + psd_commodities = fetch_commodity_codes(base_url, api_key, "/api/psd/commodities") + psd_commodity_codes = extract_psd_commodity_codes(psd_commodities) + print(f"Found {len(psd_commodity_codes)} PSD commodity codes") + + # Configure the REST API source + source = rest_api_source({ + "client": { + "base_url": base_url, + "auth": { + "type": "api_key", + "api_key": api_key, + "name": "X-Api-Key", + "location": "header" + } + }, + "resource_defaults": { + "write_disposition": "replace", + }, + "resources": [ + # Reference data resources - ESR, GATS, PSD + {"name": "esr_regions", "endpoint": {"path": "/api/esr/regions"}}, + {"name": "esr_countries", "endpoint": {"path": "/api/esr/countries"}}, + {"name": "esr_commodities", "endpoint": {"path": "/api/esr/commodities"}}, + {"name": "esr_units_of_measure", "endpoint": {"path": "/api/esr/unitsOfMeasure"}}, + {"name": "esr_data_release_dates", "endpoint": {"path": "/api/esr/datareleasedates"}}, + + {"name": "gats_regions", "endpoint": {"path": "/api/gats/regions"}}, + {"name": "gats_countries", "endpoint": {"path": "/api/gats/countries"}}, + {"name": "gats_commodities", "endpoint": {"path": "/api/gats/commodities"}}, + {"name": "gats_hs6_commodities", "endpoint": {"path": "/api/gats/HS6Commodities"}}, + {"name": "gats_units_of_measure", "endpoint": {"path": "/api/gats/unitsOfMeasure"}}, + {"name": "gats_customs_districts", "endpoint": {"path": "/api/gats/customsDistricts"}}, + {"name": "gats_census_export_data_release_dates", "endpoint": {"path": "/api/gats/census/data/exports/dataReleaseDates"}}, + {"name": "gats_census_import_data_release_dates", "endpoint": {"path": "/api/gats/census/data/imports/dataReleaseDates"}}, + {"name": "gats_untrade_export_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/exports/dataReleaseDates"}}, + {"name": "gats_untrade_import_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/imports/dataReleaseDates"}}, + + {"name": "psd_regions", "endpoint": {"path": "/api/psd/regions"}}, + {"name": "psd_countries", "endpoint": {"path": "/api/psd/countries"}}, + {"name": "psd_commodities", "endpoint": {"path": "/api/psd/commodities"}}, + {"name": "psd_units_of_measure", "endpoint": {"path": "/api/psd/unitsOfMeasure"}}, + {"name": "psd_commodity_attributes", "endpoint": {"path": "/api/psd/commodityAttributes"}}, + + # Dynamic ESR exports resources for each commodity and year + #*[ + # { + # "name": f"esr_exports_commodity_{comm_code}_year_{year}", + # "endpoint": { + # "path": f"/api/esr/exports/commodityCode/{comm_code}/allCountries/marketYear/{year}", + # "response_actions": [ + # {"status_code": 404, "action": "ignore"} + # ] + # }, + # "primary_key": ["commodityCode", "countryCode", "weekEnding"], + # "write_disposition": "append" + # } + # for comm_code in esr_commodity_codes + # for year in years + #], + + # Sample GATS census imports for major trading partners (for all recent years) + # You can expand to more countries as needed + #*[ + # { + # "name": f"gats_census_imports_{country_code}_{year}_{month:02d}", + # "endpoint": { + # "path": f"/api/gats/censusImports/partnerCode/{country_code}/year/{year}/month/{month:02d}", + # "response_actions": [ + # {"status_code": 404, "action": "ignore"} + # ] + # }, + # "write_disposition": "append" + # } + # for country_code in ['CH', 'CA', 'MX', 'BR', 'EU'] # Major trading partners + # for year in years + # for month in range(1, 13) + # if not (year == datetime.datetime.now().year and month > datetime.datetime.now().month) + #], + + # Dynamic PSD commodity data for all commodities and years + #*[ + # { + # "name": f"psd_commodity_{comm_code}_year_{year}", + # "endpoint": { + # "path": f"/api/psd/commodity/{comm_code}/country/all/year/{year}", + # "response_actions": [ + # {"status_code": 404, "action": "ignore"} + # ] + # }, + # "write_disposition": "append" + # } + # for comm_code in psd_commodity_codes + # for year in years + #], + + # World-level data for all commodities + #*[ + # { + # "name": f"psd_world_commodity_{comm_code}_year_{year}", + # "endpoint": { + # "path": f"/api/psd/commodity/{comm_code}/world/year/{year}", + # "response_actions": [ + # {"status_code": 404, "action": "ignore"} + # ] + # }, + # "write_disposition": "append" + # } + # for comm_code in psd_commodity_codes + # for year in years + #], + ] + }) + + # Initialize and run the pipeline + pipeline = dlt.pipeline( + pipeline_name="usda_fas_open_data", + destination="filesystem", # Change to your preferred destination + dataset_name="usda_fas_data", + ) + + print("Starting pipeline run. This may take some time due to the large number of resources...") + load_info = pipeline.run(source) + print(f"USDA FAS Open Data pipeline complete: {load_info}") + +if __name__ == "__main__": + run_pipeline() diff --git a/pyproject.toml b/pyproject.toml index e437d94..2aa41d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "httpx>=0.28.1", "ipykernel>=6.29.5", "pyowm>=3.3.0", + "python-dotenv>=1.1.0", "yfinance>=0.2.54", ] diff --git a/uv.lock b/uv.lock index 0cfd792..ba68227 100644 --- a/uv.lock +++ b/uv.lock @@ -493,6 +493,7 @@ dependencies = [ { name = "httpx" }, { name = "ipykernel" }, { name = "pyowm" }, + { name = "python-dotenv" }, { name = "yfinance" }, ] @@ -509,6 +510,7 @@ requires-dist = [ { name = "httpx", specifier = ">=0.28.1" }, { name = "ipykernel", specifier = ">=6.29.5" }, { name = "pyowm", specifier = ">=3.3.0" }, + { name = "python-dotenv", specifier = ">=1.1.0" }, { name = "yfinance", specifier = ">=0.2.54" }, ] @@ -855,6 +857,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892, upload_time = "2024-03-01T18:36:18.57Z" }, ] +[[package]] +name = "python-dotenv" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/88/2c/7bb1416c5620485aa793f2de31d3df393d3686aa8a8506d11e10e13c5baf/python_dotenv-1.1.0.tar.gz", hash = "sha256:41f90bc6f5f177fb41f53e87666db362025010eb28f60a01c9143bfa33a2b2d5", size = 39920, upload_time = "2025-03-25T10:14:56.835Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1e/18/98a99ad95133c6a6e2005fe89faedf294a748bd5dc803008059409ac9b1e/python_dotenv-1.1.0-py3-none-any.whl", hash = "sha256:d7c01d9e2293916c18baf562d95698754b0dbbb5e74d457c45d4f6561fb9d55d", size = 20256, upload_time = "2025-03-25T10:14:55.034Z" }, +] + [[package]] name = "pytz" version = "2025.1"