diff --git a/extract/.dlt/config.toml b/extract/.dlt/config.toml deleted file mode 100644 index e18ed51..0000000 --- a/extract/.dlt/config.toml +++ /dev/null @@ -1,7 +0,0 @@ -[destination.filesystem] -bucket_url = "file:///home/deeman/data/wasde" # three / for an absolute path -kwargs = '{"auto_mkdir": true}' - -[runtime] -log_level="DEBUG" -log_format="JSON" diff --git a/extract/apikey b/extract/apikey deleted file mode 100644 index 697912c..0000000 --- a/extract/apikey +++ /dev/null @@ -1 +0,0 @@ -n1cDatmPp2b18qqKOgDKVndjdrUk4oUgB9HALoVA diff --git a/extract/dlt_script.py b/extract/dlt_script.py deleted file mode 100644 index 8d8cbf1..0000000 --- a/extract/dlt_script.py +++ /dev/null @@ -1,196 +0,0 @@ -import dlt -from dlt.sources.rest_api import rest_api_source -import datetime -import os -import requests -from typing import List, Dict, Any -import dotenv - -def get_api_key(): - """Get API key from environment variable or prompt user if not set""" - dotenv.load_dotenv(dotenv.find_dotenv()) - api_key = os.environ.get("FAS_API_KEY") - if not api_key: - api_key = input("Please enter your FAS API key: ") - os.environ["FAS_API_KEY"] = api_key - return api_key - -def get_current_and_past_years(num_years=5): - """Get current year and past years for data range""" - current_year = datetime.datetime.now().year - return list(range(current_year - num_years, current_year + 1)) - -def fetch_commodity_codes(base_url: str, api_key: str, endpoint: str) -> List: - """Fetch commodity codes from the specified endpoint""" - headers = {"X-Api-Key": api_key} - response = requests.get(f"{base_url}{endpoint}", headers=headers) - response.raise_for_status() - data = response.json() - - # Handle different response structures - if isinstance(data, list): - return data - elif isinstance(data, dict) and 'items' in data: - return data['items'] - return [] - -def extract_esr_commodity_codes(commodities: List[Dict[str, Any]]) -> List[int]: - """Extract commodity codes from ESR commodities response""" - codes = [] - for commodity in commodities: - if 'commodityCode' in commodity: - try: - codes.append(int(commodity['commodityCode'])) - except (ValueError, TypeError): - pass - return codes - -def extract_psd_commodity_codes(commodities: List[Dict[str, Any]]) -> List[str]: - """Extract commodity codes from PSD commodities response""" - codes = [] - for commodity in commodities: - if 'commodityCode' in commodity: - codes.append(str(commodity['commodityCode'])) - return codes - -def run_pipeline(): - """Run the USDA FAS Open Data pipeline using REST API source""" - - # Get necessary parameters - api_key = get_api_key() - years = get_current_and_past_years() - base_url = "https://api.fas.usda.gov" - - # Fetch all commodity codes - print("Fetching ESR commodity codes...") - esr_commodities = fetch_commodity_codes(base_url, api_key, "/api/esr/commodities") - esr_commodity_codes = extract_esr_commodity_codes(esr_commodities) - print(f"Found {len(esr_commodity_codes)} ESR commodity codes") - - print("Fetching PSD commodity codes...") - psd_commodities = fetch_commodity_codes(base_url, api_key, "/api/psd/commodities") - psd_commodity_codes = extract_psd_commodity_codes(psd_commodities) - print(f"Found {len(psd_commodity_codes)} PSD commodity codes") - - # Configure the REST API source - source = rest_api_source({ - "client": { - "base_url": base_url, - "auth": { - "type": "api_key", - "api_key": api_key, - "name": "X-Api-Key", - "location": "header" - } - }, - "resource_defaults": { - "write_disposition": "replace", - }, - "resources": [ - # Reference data resources - ESR, GATS, PSD - {"name": "esr_regions", "endpoint": {"path": "/api/esr/regions"}}, - {"name": "esr_countries", "endpoint": {"path": "/api/esr/countries"}}, - {"name": "esr_commodities", "endpoint": {"path": "/api/esr/commodities"}}, - {"name": "esr_units_of_measure", "endpoint": {"path": "/api/esr/unitsOfMeasure"}}, - {"name": "esr_data_release_dates", "endpoint": {"path": "/api/esr/datareleasedates"}}, - - {"name": "gats_regions", "endpoint": {"path": "/api/gats/regions"}}, - {"name": "gats_countries", "endpoint": {"path": "/api/gats/countries"}}, - {"name": "gats_commodities", "endpoint": {"path": "/api/gats/commodities"}}, - {"name": "gats_hs6_commodities", "endpoint": {"path": "/api/gats/HS6Commodities"}}, - {"name": "gats_units_of_measure", "endpoint": {"path": "/api/gats/unitsOfMeasure"}}, - {"name": "gats_customs_districts", "endpoint": {"path": "/api/gats/customsDistricts"}}, - {"name": "gats_census_export_data_release_dates", "endpoint": {"path": "/api/gats/census/data/exports/dataReleaseDates"}}, - {"name": "gats_census_import_data_release_dates", "endpoint": {"path": "/api/gats/census/data/imports/dataReleaseDates"}}, - {"name": "gats_untrade_export_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/exports/dataReleaseDates"}}, - {"name": "gats_untrade_import_data_release_dates", "endpoint": {"path": "/api/gats/UNTrade/data/imports/dataReleaseDates"}}, - - {"name": "psd_regions", "endpoint": {"path": "/api/psd/regions"}}, - {"name": "psd_countries", "endpoint": {"path": "/api/psd/countries"}}, - {"name": "psd_commodities", "endpoint": {"path": "/api/psd/commodities"}}, - {"name": "psd_units_of_measure", "endpoint": {"path": "/api/psd/unitsOfMeasure"}}, - {"name": "psd_commodity_attributes", "endpoint": {"path": "/api/psd/commodityAttributes"}}, - - # Dynamic ESR exports resources for each commodity and year - *[ - { - "name": f"esr_exports_commodity_{comm_code}_year_{year}", - "endpoint": { - "path": f"/api/esr/exports/commodityCode/{comm_code}/allCountries/marketYear/{year}", - "response_actions": [ - {"status_code": 404, "action": "ignore"} - ] - }, - "primary_key": ["commodityCode", "countryCode", "weekEnding"], - "write_disposition": "append" - } - for comm_code in esr_commodity_codes - for year in years - ], - - # Sample GATS census imports for major trading partners (for all recent years) - # You can expand to more countries as needed - *[ - { - "name": f"gats_census_imports_{country_code}_{year}_{month:02d}", - "endpoint": { - "path": f"/api/gats/censusImports/partnerCode/{country_code}/year/{year}/month/{month:02d}", - "response_actions": [ - {"status_code": 404, "action": "ignore"} - ] - }, - "write_disposition": "append" - } - for country_code in ['CH', 'CA', 'MX', 'BR', 'EU'] # Major trading partners - for year in years - for month in range(1, 13) - if not (year == datetime.datetime.now().year and month > datetime.datetime.now().month) - ], - - # Dynamic PSD commodity data for all commodities and years - *[ - { - "name": f"psd_commodity_{comm_code}_year_{year}", - "endpoint": { - "path": f"/api/psd/commodity/{comm_code}/country/all/year/{year}", - "response_actions": [ - {"status_code": 404, "action": "ignore"} - ] - }, - "write_disposition": "append" - } - for comm_code in psd_commodity_codes - for year in years - ], - - # World-level data for all commodities - *[ - { - "name": f"psd_world_commodity_{comm_code}_year_{year}", - "endpoint": { - "path": f"/api/psd/commodity/{comm_code}/world/year/{year}", - "response_actions": [ - {"status_code": 404, "action": "ignore"} - ] - }, - "write_disposition": "append" - } - for comm_code in psd_commodity_codes - for year in years - ], - ] - }) - - # Initialize and run the pipeline - pipeline = dlt.pipeline( - pipeline_name="usda_fas_open_data", - destination="filesystem", # Change to your preferred destination - dataset_name="usda_fas_data", - ) - - print("Starting pipeline run. This may take some time due to the large number of resources...") - load_info = pipeline.run(source) - print(f"USDA FAS Open Data pipeline complete: {load_info}") - -if __name__ == "__main__": - run_pipeline()