""" Regression tests for analytics.py. Bugs covered: - Concurrent DuckDB queries via asyncio.gather returned empty/wrong results because _conn.execute() is not thread-safe. Fixed by using _conn.cursor() per asyncio.to_thread call (each cursor is independently usable from any single thread). - DuckDB normalizes unquoted column identifiers to lowercase; analytics queries and callers must use lowercase names. """ import asyncio import duckdb import pytest # ── Fixtures ──────────────────────────────────────────────────────────────── @pytest.fixture def analytics_duckdb(tmp_path): """Temporary DuckDB with serving.commodity_metrics: 11 global rows + 5 country rows.""" db_path = str(tmp_path / "test.duckdb") conn = duckdb.connect(db_path) conn.execute("CREATE SCHEMA serving") conn.execute( """ CREATE TABLE serving.commodity_metrics ( commodity_code INTEGER, commodity_name TEXT, country_code TEXT, country_name TEXT, market_year INTEGER, ingest_date DATE, production DOUBLE, imports DOUBLE, exports DOUBLE, total_distribution DOUBLE, ending_stocks DOUBLE, net_supply DOUBLE, trade_balance DOUBLE, supply_demand_balance DOUBLE, stock_to_use_ratio_pct DOUBLE, production_yoy_pct DOUBLE ) """ ) # 11 global rows (2015–2025) for year in range(2015, 2026): conn.execute( """INSERT INTO serving.commodity_metrics VALUES (711100, 'Coffee', NULL, 'Global', ?, '2025-01-01', 100.0, 10.0, 20.0, 90.0, 30.0, 90.0, 10.0, 10.0, 33.3, 1.0)""", [year], ) # 5 country rows for latest year for code, name in [ ("BR", "Brazil"), ("VN", "Vietnam"), ("CO", "Colombia"), ("ID", "Indonesia"), ("ET", "Ethiopia"), ]: conn.execute( """INSERT INTO serving.commodity_metrics VALUES (711100, 'Coffee', ?, ?, 2025, '2025-01-01', 50.0, 5.0, 10.0, 45.0, 15.0, 45.0, 5.0, 5.0, 33.3, 2.0)""", [code, name], ) conn.commit() conn.close() yield duckdb.connect(db_path, read_only=True) @pytest.fixture(autouse=False) def patched_analytics(analytics_duckdb, monkeypatch): """Patch analytics._conn with the temp DuckDB connection.""" from beanflows import analytics monkeypatch.setattr(analytics, "_conn", analytics_duckdb) yield analytics_duckdb # ── Concurrency regression ─────────────────────────────────────────────────── @pytest.mark.asyncio async def test_concurrent_queries_all_return_data(patched_analytics): """ Regression: asyncio.gather fires analytics queries concurrently via asyncio.to_thread. Using _conn.execute() from multiple threads simultaneously corrupted internal cursor state — callers silently received 0 rows. Fix: _query() obtains its own _conn.cursor() so each thread has an independent execution context. """ from beanflows import analytics ts, top, stu, bal, yoy = await asyncio.gather( analytics.get_global_time_series( 711100, ["production", "exports", "imports", "ending_stocks", "total_distribution"] ), analytics.get_top_countries(711100, "production", limit=10), analytics.get_stock_to_use_trend(711100), analytics.get_supply_demand_balance(711100), analytics.get_production_yoy_by_country(711100, limit=15), ) assert len(ts) == 11, f"time_series: expected 11, got {len(ts)}" assert len(top) == 5, f"top_producers: expected 5, got {len(top)}" assert len(stu) == 11, f"stu_trend: expected 11, got {len(stu)}" assert len(bal) == 11, f"balance: expected 11, got {len(bal)}" assert len(yoy) == 5, f"yoy: expected 5, got {len(yoy)}" @pytest.mark.asyncio async def test_repeated_concurrent_runs_are_stable(patched_analytics): """Concurrent queries should return consistent row counts across multiple runs.""" from beanflows import analytics for _ in range(3): ts, top = await asyncio.gather( analytics.get_global_time_series(711100, ["production"]), analytics.get_top_countries(711100, "production", limit=10), ) assert len(ts) == 11 assert len(top) == 5 # ── Column name regression ─────────────────────────────────────────────────── @pytest.mark.asyncio async def test_result_column_names_are_lowercase(patched_analytics): """ Regression: DuckDB normalizes unquoted identifiers to lowercase in physical tables. Templates and analytics callers must use lowercase column names. """ from beanflows import analytics ts = await analytics.get_global_time_series( 711100, ["production", "exports", "total_distribution"] ) assert ts, "Expected rows" row = ts[0] for col in ("market_year", "production", "exports", "total_distribution"): assert col in row, f"Column '{col}' missing — DuckDB should return lowercase" # Ensure no legacy mixed-case keys leaked through for bad in ("Production", "Exports", "Total_Distribution", "Market_Year"): assert bad not in row, f"Mixed-case key '{bad}' found — column casing regression" @pytest.mark.asyncio async def test_stu_trend_column_name_lowercase(patched_analytics): """stock_to_use_ratio_pct must be lowercase (was Stock_to_Use_Ratio_pct in SQL).""" from beanflows import analytics rows = await analytics.get_stock_to_use_trend(711100) assert rows assert "stock_to_use_ratio_pct" in rows[0] assert "Stock_to_Use_Ratio_pct" not in rows[0] # ── Global filter regression ───────────────────────────────────────────────── @pytest.mark.asyncio async def test_global_time_series_excludes_country_rows(patched_analytics): """get_global_time_series must filter country_name = 'Global' only.""" from beanflows import analytics rows = await analytics.get_global_time_series(711100, ["production"]) assert all(r["market_year"] in range(2015, 2026) for r in rows) assert len(rows) == 11 # 11 global rows, 0 country rows