Files
beanflows/web/src/beanflows/analytics.py
Deeman 8d1dbace0f
Some checks failed
CI / test-cli (push) Successful in 11s
CI / test-sqlmesh (push) Successful in 12s
CI / test-web (push) Failing after 12s
CI / tag (push) Has been skipped
fix(analytics): add _conn module-level override for test patching
Tests monkeypatch analytics._conn to inject a temp DuckDB connection.
The attribute didn't exist; fetch_analytics now uses it when set,
bypassing the _db_path / threading.local path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 02:00:11 +01:00

753 lines
25 KiB
Python

"""
DuckDB analytics data access layer.
Bridge between the async Quart app and sync DuckDB reads.
All queries run via asyncio.to_thread() against a per-thread read-only connection.
## Connection architecture
The web app reads from `serving.duckdb` (SERVING_DUCKDB_PATH), a small file containing
only the serving-layer tables. The full pipeline DB (DUCKDB_PATH / lakehouse.duckdb) is
SQLMesh's exclusive domain — the web app never opens it.
After each SQLMesh run, an export script copies all serving.* tables from lakehouse.duckdb
to serving_new.duckdb, then atomically renames it to serving.duckdb. The inode-based
reopen in _get_conn() picks up the new file automatically on the next query.
Per-thread connections (threading.local) are used because:
- DuckDB read_only=True allows multiple concurrent connections to the same file (fine).
- DuckDB connections are NOT thread-safe for concurrent use by multiple threads — each
thread needs its own connection object.
- asyncio.to_thread() dispatches to a thread pool; concurrent dashboard queries run in
parallel threads, so each must have its own connection.
## Future scaling path
At tens of GBs of serving data, atomic file swap becomes impractical. Migration options:
- DuckLake: DuckDB extension (duckdb/ducklake, released May 2025) providing concurrent
multi-process read/write via an external SQL catalog (PostgreSQL/SQLite). Data stored
as Parquet files + catalog metadata. Experimental as of 2025. When mature, replace
_get_conn() / fetch_analytics() with DuckLake connections.
- Raw Parquet: Export serving tables as per-table Parquet files. Web app queries via
read_parquet(). Zero lock contention. ~10-20 line change in this file.
Both migrations are isolated to open_analytics_db() and _get_conn() — all 18+ query
functions above stay unchanged.
"""
import asyncio
import os
import threading
from pathlib import Path
import duckdb
# Coffee (Green) commodity code in USDA PSD
COFFEE_COMMODITY_CODE = 711100
# Coffee C futures commodity code in CFTC COT reports (3-digit CFTC commodity code)
COFFEE_CFTC_CODE = "083"
# Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs)
ALLOWED_METRICS = frozenset({
"production",
"imports",
"exports",
"total_distribution",
"ending_stocks",
"beginning_stocks",
"total_supply",
"domestic_consumption",
"net_supply",
"trade_balance",
"supply_demand_balance",
"stock_to_use_ratio_pct",
"production_yoy_pct",
})
_local = threading.local()
_db_path: str = ""
_conn: duckdb.DuckDBPyConnection | None = None # test override: set to bypass _db_path / _local
def open_analytics_db() -> None:
"""Store the serving DB path. Connections are created per-thread on first use."""
global _db_path
db_path = os.getenv("SERVING_DUCKDB_PATH", "")
assert db_path, "SERVING_DUCKDB_PATH environment variable must be set"
if not Path(db_path).exists():
print(f"[analytics] Serving DuckDB not found at {db_path!r} — analytics disabled")
return
_db_path = db_path
def close_analytics_db() -> None:
"""Close this thread's DuckDB connection if open."""
conn = getattr(_local, "conn", None)
if conn:
conn.close()
_local.conn = None
global _db_path
_db_path = ""
def _get_conn() -> duckdb.DuckDBPyConnection:
"""Return a read-only DuckDB connection for the current thread.
Opens a new connection on first call per thread, or when the file's inode
changes (i.e. the export script atomically swapped serving.duckdb).
Cost: one os.stat() per call (~1 µs).
"""
current_ino = os.stat(_db_path).st_ino
conn = getattr(_local, "conn", None)
conn_ino = getattr(_local, "ino", 0)
if conn is None or conn_ino != current_ino:
if conn:
conn.close()
_local.conn = duckdb.connect(_db_path, read_only=True)
_local.ino = current_ino
return _local.conn
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict]:
"""Run a read-only DuckDB query off the event loop. Returns list of dicts.
Returns empty list if analytics DB is not configured (SERVING_DUCKDB_PATH unset
or file missing at startup) — dashboard routes degrade gracefully.
If the module-level _conn is set (test override), it is used directly in place
of the per-thread _get_conn() path.
"""
if _conn is None and not _db_path:
return []
def _query():
conn = _conn if _conn is not None else _get_conn()
cursor = conn.cursor()
result = cursor.execute(sql, params or [])
columns = [desc[0] for desc in result.description]
return [dict(zip(columns, row)) for row in result.fetchall()]
return await asyncio.to_thread(_query)
def _validate_metrics(metrics: list[str]) -> list[str]:
"""Filter metrics to allowed set. Returns validated list."""
valid = [m for m in metrics if m in ALLOWED_METRICS]
assert valid, f"No valid metrics in {metrics}. Allowed: {sorted(ALLOWED_METRICS)}"
return valid
# =============================================================================
# Query Functions
# =============================================================================
async def get_available_commodities() -> list[dict]:
"""Distinct commodity list from serving layer."""
return await fetch_analytics(
"""
SELECT DISTINCT commodity_code, commodity_name
FROM serving.commodity_metrics
WHERE country_code IS NOT NULL
ORDER BY commodity_name
"""
)
async def get_global_time_series(
commodity_code: int,
metrics: list[str],
start_year: int | None = None,
end_year: int | None = None,
) -> list[dict]:
"""Global supply/demand time series by market_year for a commodity."""
metrics = _validate_metrics(metrics)
cols = ", ".join(metrics)
where_parts = ["country_name = 'Global'", "commodity_code = ?"]
params: list = [commodity_code]
if start_year is not None:
where_parts.append("market_year >= ?")
params.append(start_year)
if end_year is not None:
where_parts.append("market_year <= ?")
params.append(end_year)
where_clause = " AND ".join(where_parts)
return await fetch_analytics(
f"""
SELECT market_year, {cols}
FROM serving.commodity_metrics
WHERE {where_clause}
ORDER BY market_year
""",
params,
)
async def get_top_countries(
commodity_code: int,
metric: str,
limit: int = 10,
) -> list[dict]:
"""Country ranking for latest market year by a single metric."""
metric = _validate_metrics([metric])[0]
return await fetch_analytics(
f"""
WITH latest AS (
SELECT MAX(market_year) AS max_year
FROM serving.commodity_metrics
WHERE commodity_code = ? AND country_code IS NOT NULL
)
SELECT country_name, country_code, market_year, {metric}
FROM serving.commodity_metrics, latest
WHERE commodity_code = ?
AND country_code IS NOT NULL
AND market_year = latest.max_year
ORDER BY {metric} DESC
LIMIT ?
""",
[commodity_code, commodity_code, limit],
)
async def get_stock_to_use_trend(commodity_code: int) -> list[dict]:
"""Global stock-to-use ratio over time."""
return await fetch_analytics(
"""
SELECT market_year, stock_to_use_ratio_pct
FROM serving.commodity_metrics
WHERE commodity_code = ?
AND country_name = 'Global'
ORDER BY market_year
""",
[commodity_code],
)
async def get_supply_demand_balance(commodity_code: int) -> list[dict]:
"""Global supply-demand balance trend."""
return await fetch_analytics(
"""
SELECT market_year, production, total_distribution, supply_demand_balance
FROM serving.commodity_metrics
WHERE commodity_code = ?
AND country_name = 'Global'
ORDER BY market_year
""",
[commodity_code],
)
async def get_production_yoy_by_country(
commodity_code: int,
limit: int = 15,
) -> list[dict]:
"""Latest YoY production changes by country."""
return await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(market_year) AS max_year
FROM serving.commodity_metrics
WHERE commodity_code = ? AND country_code IS NOT NULL
)
SELECT country_name, country_code, market_year,
production, production_yoy_pct
FROM serving.commodity_metrics, latest
WHERE commodity_code = ?
AND country_code IS NOT NULL
AND market_year = latest.max_year
AND production > 0
ORDER BY ABS(production_yoy_pct) DESC
LIMIT ?
""",
[commodity_code, commodity_code, limit],
)
# =============================================================================
# COT Positioning Queries
# =============================================================================
# Columns safe for user-facing COT queries
ALLOWED_COT_METRICS = frozenset({
"open_interest",
"managed_money_long",
"managed_money_short",
"managed_money_net",
"managed_money_spread",
"managed_money_net_pct_of_oi",
"managed_money_long_short_ratio",
"managed_money_net_wow",
"prod_merc_long",
"prod_merc_short",
"prod_merc_net",
"swap_long",
"swap_short",
"swap_net",
"other_reportable_net",
"nonreportable_net",
"change_open_interest",
"change_managed_money_net",
"cot_index_26w",
"cot_index_52w",
"concentration_top4_long_pct",
"concentration_top8_long_pct",
"traders_total",
"traders_managed_money_long",
"traders_managed_money_short",
})
def _validate_cot_metrics(metrics: list[str]) -> list[str]:
valid = [m for m in metrics if m in ALLOWED_COT_METRICS]
assert valid, f"No valid COT metrics in {metrics}. Allowed: {sorted(ALLOWED_COT_METRICS)}"
return valid
def _cot_table(combined: bool) -> str:
return "serving.cot_positioning_combined" if combined else "serving.cot_positioning"
async def get_cot_positioning_time_series(
cftc_commodity_code: str,
metrics: list[str],
start_date: str | None = None,
end_date: str | None = None,
limit: int = 520,
combined: bool = False,
) -> list[dict]:
"""Weekly COT positioning time series. limit defaults to ~10 years of weekly data."""
assert 1 <= limit <= 2000, "limit must be between 1 and 2000"
metrics = _validate_cot_metrics(metrics)
cols = ", ".join(metrics)
table = _cot_table(combined)
where_parts = ["cftc_commodity_code = ?"]
params: list = [cftc_commodity_code]
if start_date is not None:
where_parts.append("report_date >= ?")
params.append(start_date)
if end_date is not None:
where_parts.append("report_date <= ?")
params.append(end_date)
where_clause = " AND ".join(where_parts)
return await fetch_analytics(
f"""
SELECT report_date, {cols}
FROM {table}
WHERE {where_clause}
ORDER BY report_date ASC
LIMIT ?
""",
[*params, limit],
)
async def get_cot_positioning_latest(cftc_commodity_code: str, combined: bool = False) -> dict | None:
"""Latest week's full COT positioning snapshot."""
table = _cot_table(combined)
rows = await fetch_analytics(
f"""
SELECT *
FROM {table}
WHERE cftc_commodity_code = ?
ORDER BY report_date DESC
LIMIT 1
""",
[cftc_commodity_code],
)
return rows[0] if rows else None
async def get_cot_index_trend(
cftc_commodity_code: str,
weeks: int = 104,
combined: bool = False,
) -> list[dict]:
"""COT Index time series (26w and 52w) for the trailing N weeks."""
assert 1 <= weeks <= 1040, "weeks must be between 1 and 1040"
table = _cot_table(combined)
return await fetch_analytics(
f"""
SELECT report_date, cot_index_26w, cot_index_52w,
managed_money_net, managed_money_net_pct_of_oi
FROM {table}
WHERE cftc_commodity_code = ?
ORDER BY report_date DESC
LIMIT ?
""",
[cftc_commodity_code, weeks],
)
async def get_cot_options_delta(cftc_commodity_code: str) -> dict | None:
"""Latest managed_money_net difference between combined and futures-only reports.
Shows whether the options book is reinforcing (same direction) or hedging
(opposite direction) the futures position. Returns None if either table
has no data.
"""
rows = await fetch_analytics(
"""
SELECT f.report_date,
f.managed_money_net AS fut_net,
c.managed_money_net AS combined_net,
c.managed_money_net - f.managed_money_net AS options_delta
FROM serving.cot_positioning f
JOIN serving.cot_positioning_combined c USING (report_date)
WHERE f.cftc_commodity_code = ?
ORDER BY f.report_date DESC
LIMIT 1
""",
[cftc_commodity_code],
)
return rows[0] if rows else None
# =============================================================================
# Coffee Prices Queries
# =============================================================================
# KC=F Yahoo Finance ticker
COFFEE_TICKER = "KC=F"
async def get_price_time_series(ticker: str, limit: int = 504) -> list[dict]:
"""Daily OHLCV + moving averages from serving.coffee_prices. Default ~2 years."""
assert 1 <= limit <= 5000, "limit must be between 1 and 5000"
return await fetch_analytics(
"""
SELECT trade_date, open, high, low, close, volume,
daily_return_pct, sma_20d, sma_50d, high_52w, low_52w
FROM serving.coffee_prices
WHERE ticker = ?
ORDER BY trade_date DESC
LIMIT ?
""",
[ticker, limit],
)
async def get_price_latest(ticker: str) -> dict | None:
"""Latest trading day's close price, daily return, and 52-week range."""
rows = await fetch_analytics(
"""
SELECT trade_date, close, daily_return_pct, high_52w, low_52w, sma_20d, sma_50d
FROM serving.coffee_prices
WHERE ticker = ?
ORDER BY trade_date DESC
LIMIT 1
""",
[ticker],
)
return rows[0] if rows else None
# =============================================================================
# ICE Warehouse Stocks Queries
# =============================================================================
async def get_ice_stocks_trend(days: int = 365) -> list[dict]:
"""Daily ICE certified stocks over the trailing N days."""
assert 1 <= days <= 3650, "days must be between 1 and 3650"
return await fetch_analytics(
"""
SELECT report_date, total_certified_bags, pending_grading_bags,
wow_change_bags, avg_30d_bags, high_52w_bags, drawdown_from_52w_high_pct
FROM serving.ice_warehouse_stocks
ORDER BY report_date DESC
LIMIT ?
""",
[days],
)
async def get_ice_stocks_latest() -> dict | None:
"""Latest ICE certified warehouse stock report."""
rows = await fetch_analytics(
"""
SELECT report_date, total_certified_bags, pending_grading_bags,
wow_change_bags, avg_30d_bags, drawdown_from_52w_high_pct
FROM serving.ice_warehouse_stocks
ORDER BY report_date DESC
LIMIT 1
"""
)
return rows[0] if rows else None
async def get_ice_aging_latest() -> list[dict]:
"""All age buckets for the most recent ICE aging report, sorted youngest first."""
return await fetch_analytics(
"""
SELECT report_date, age_bucket, age_bucket_start_days, age_bucket_end_days,
antwerp_bags, hamburg_bremen_bags, houston_bags, miami_bags,
new_orleans_bags, new_york_bags, total_bags
FROM serving.ice_aging_stocks
WHERE report_date = (SELECT MAX(report_date) FROM serving.ice_aging_stocks)
ORDER BY age_bucket_start_days
"""
)
async def get_ice_aging_trend(reports: int = 6) -> list[dict]:
"""Age bucket distribution for the last N aging reports, sorted oldest report first."""
assert 1 <= reports <= 12, "reports must be between 1 and 12"
return await fetch_analytics(
"""
SELECT report_date, age_bucket, age_bucket_start_days, age_bucket_end_days,
antwerp_bags, hamburg_bremen_bags, houston_bags, miami_bags,
new_orleans_bags, new_york_bags, total_bags
FROM serving.ice_aging_stocks
WHERE report_date IN (
SELECT DISTINCT report_date
FROM serving.ice_aging_stocks
ORDER BY report_date DESC
LIMIT ?
)
ORDER BY report_date, age_bucket_start_days
""",
[reports],
)
async def get_ice_stocks_by_port_trend(months: int = 120) -> list[dict]:
"""Monthly by-port warehouse stocks over the trailing N months."""
assert 1 <= months <= 360, "months must be between 1 and 360"
return await fetch_analytics(
"""
SELECT report_date, new_york_bags, new_orleans_bags, houston_bags, miami_bags,
antwerp_bags, hamburg_bremen_bags, barcelona_bags, virginia_bags,
total_bags, mom_change_bags, mom_change_pct, avg_12m_bags
FROM serving.ice_warehouse_stocks_by_port
ORDER BY report_date DESC
LIMIT ?
""",
[months],
)
async def get_ice_stocks_by_port_latest() -> dict | None:
"""Latest month's by-port warehouse stock breakdown."""
rows = await fetch_analytics(
"""
SELECT report_date, new_york_bags, new_orleans_bags, houston_bags, miami_bags,
antwerp_bags, hamburg_bremen_bags, barcelona_bags, virginia_bags,
total_bags, mom_change_bags, mom_change_pct, avg_12m_bags
FROM serving.ice_warehouse_stocks_by_port
ORDER BY report_date DESC
LIMIT 1
"""
)
return rows[0] if rows else None
async def get_country_comparison(
commodity_code: int,
country_codes: list[str],
metric: str,
) -> list[dict]:
"""Multi-country time series for a single metric."""
assert len(country_codes) <= 10, "Maximum 10 countries for comparison"
metric = _validate_metrics([metric])[0]
placeholders = ", ".join(["?"] * len(country_codes))
return await fetch_analytics(
f"""
SELECT country_name, country_code, market_year, {metric}
FROM serving.commodity_metrics
WHERE commodity_code = ?
AND country_code IN ({placeholders})
ORDER BY country_name, market_year
""",
[commodity_code, *country_codes],
)
# =============================================================================
# Weather Queries
# =============================================================================
# Columns safe for user-facing weather series queries (prevents SQL injection)
ALLOWED_WEATHER_METRICS = frozenset({
"temp_min_c",
"temp_max_c",
"temp_mean_c",
"precipitation_mm",
"humidity_max_pct",
"et0_mm",
"vpd_max_kpa",
"precip_sum_7d_mm",
"precip_sum_30d_mm",
"temp_mean_30d_c",
"temp_anomaly_c",
"water_balance_mm",
"water_balance_7d_mm",
"drought_streak_days",
"heat_streak_days",
"vpd_streak_days",
"crop_stress_index",
})
def _validate_weather_metrics(metrics: list[str]) -> list[str]:
valid = [m for m in metrics if m in ALLOWED_WEATHER_METRICS]
assert valid, f"No valid weather metrics in {metrics}. Allowed: {sorted(ALLOWED_WEATHER_METRICS)}"
return valid
async def get_weather_locations() -> list[dict]:
"""12 locations with latest-day stress index and flags, sorted by severity.
Used by the map and location card grid (overview mode).
"""
return await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(observation_date) AS max_date
FROM serving.weather_daily
)
SELECT
w.location_id,
w.location_name,
w.country,
w.lat,
w.lon,
w.variety,
w.observation_date,
w.crop_stress_index,
w.drought_streak_days,
w.heat_streak_days,
w.vpd_streak_days,
w.precip_sum_7d_mm,
w.precip_sum_30d_mm,
w.temp_anomaly_c,
w.is_frost,
w.is_heat_stress,
w.is_drought,
w.is_high_vpd
FROM serving.weather_daily AS w, latest
WHERE w.observation_date = latest.max_date
ORDER BY w.crop_stress_index DESC
"""
)
async def get_weather_location_series(
location_id: str,
metrics: list[str] | None = None,
days: int = 365,
) -> list[dict]:
"""Time series for one location. limit defaults to 1 year of daily data."""
assert 1 <= days <= 3650, "days must be between 1 and 3650"
if metrics is None:
metrics = [
"temp_mean_c", "precipitation_mm", "crop_stress_index",
"precip_sum_7d_mm", "water_balance_7d_mm", "temp_anomaly_c",
]
metrics = _validate_weather_metrics(metrics)
cols = ", ".join(metrics)
return await fetch_analytics(
f"""
SELECT observation_date, location_id, location_name, country,
is_frost, is_heat_stress, is_drought, is_high_vpd,
{cols}
FROM serving.weather_daily
WHERE location_id = ?
ORDER BY observation_date DESC
LIMIT ?
""",
[location_id, days],
)
async def get_weather_stress_latest() -> dict | None:
"""Global stress snapshot for the latest day.
Returns avg/max stress index, count of locations under stress (index > 20),
worst location name, and observation date. Aggregates 12 rows in DuckDB.
"""
rows = await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(observation_date) AS max_date
FROM serving.weather_daily
)
SELECT
ANY_VALUE(latest.max_date) AS observation_date,
ROUND(AVG(w.crop_stress_index), 1) AS avg_crop_stress_index,
MAX(w.crop_stress_index) AS max_crop_stress_index,
COUNT(*) FILTER (WHERE w.crop_stress_index > 20) AS locations_under_stress,
ARG_MAX(w.location_name, w.crop_stress_index) AS worst_location_name,
ARG_MAX(w.country, w.crop_stress_index) AS worst_location_country,
COUNT(*) FILTER (WHERE w.is_frost) AS frost_count,
COUNT(*) FILTER (WHERE w.is_drought) AS drought_count
FROM serving.weather_daily AS w, latest
WHERE w.observation_date = latest.max_date
"""
)
return rows[0] if rows else None
async def get_weather_stress_trend(days: int = 365) -> list[dict]:
"""Daily global stress time series — avg and max across all 12 locations.
Used by the global stress chart and Pulse sparkline.
"""
assert 1 <= days <= 3650, "days must be between 1 and 3650"
return await fetch_analytics(
"""
SELECT
observation_date,
ROUND(AVG(crop_stress_index), 1) AS avg_crop_stress_index,
MAX(crop_stress_index) AS max_crop_stress_index,
COUNT(*) FILTER (WHERE crop_stress_index > 20) AS locations_under_stress
FROM serving.weather_daily
GROUP BY observation_date
ORDER BY observation_date DESC
LIMIT ?
""",
[days],
)
async def get_weather_active_alerts() -> list[dict]:
"""Locations with active stress flags on the latest observation date.
Sorted by crop_stress_index descending.
"""
return await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(observation_date) AS max_date
FROM serving.weather_daily
)
SELECT
w.location_id,
w.location_name,
w.country,
w.variety,
w.observation_date,
w.crop_stress_index,
w.drought_streak_days,
w.heat_streak_days,
w.vpd_streak_days,
w.precip_sum_7d_mm,
w.is_frost,
w.is_heat_stress,
w.is_drought,
w.is_high_vpd
FROM serving.weather_daily AS w, latest
WHERE w.observation_date = latest.max_date
AND (w.is_frost OR w.is_heat_stress OR w.is_drought OR w.is_high_vpd)
ORDER BY w.crop_stress_index DESC
"""
)