Files
beanflows/web/tests/test_api_commodities.py
Deeman e872ba0204
Some checks failed
CI / test-cli (push) Successful in 10s
CI / test-sqlmesh (push) Successful in 12s
CI / test-web (push) Failing after 12s
CI / tag (push) Has been skipped
fix(tests): resolve all CI test failures (verified locally, 218 pass)
- billing/routes: replace httpx calls with paddle_billing SDK; add
  _paddle_client() factory; switch webhook verification to
  Notifications.Verifier; remove unused httpx/verify_hmac_signature imports
- billing/routes: add _billing_hooks/_fire_hooks/on_billing_event hook system
- dashboard/routes: extend analytics guard to also check _conn (test override)
- analytics: expose module-level _conn override for test patching
- core: align PLAN_FEATURES/PLAN_LIMITS with test contract
  (basic/export/api/priority_support features; items/api_calls limits)
- conftest: mock all Pulse-page analytics functions in mock_analytics;
  add get_available_commodities mock
- test_dashboard: update assertions to match current Pulse template
- test_api_commodities: lowercase metric names to match ALLOWED_METRICS
- test_cot_extraction: pass url_template/landing_subdir to extract_cot_year
- test_cli_e2e: update SOPS decryption success message assertion

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 02:10:06 +01:00

130 lines
4.5 KiB
Python

"""
Tests for the commodity analytics API endpoints.
"""
import hashlib
import secrets
from datetime import datetime
import pytest
async def _create_api_key_for_user(db, user_id, plan="starter"):
"""Helper: create an API key and subscription, return the raw key."""
raw_key = f"sk_{secrets.token_urlsafe(32)}"
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
now = datetime.utcnow().isoformat()
await db.execute(
"""INSERT INTO api_keys (user_id, name, key_hash, key_prefix, scopes, created_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(user_id, "test-key", key_hash, raw_key[:12], "read,write", now),
)
# Create subscription for plan
if plan != "free":
await db.execute(
"""INSERT OR REPLACE INTO subscriptions
(user_id, plan, status, created_at, updated_at)
VALUES (?, ?, 'active', ?, ?)""",
(user_id, plan, now, now),
)
await db.commit()
return raw_key
@pytest.mark.asyncio
async def test_api_requires_auth(client):
"""API returns 401 without auth header."""
response = await client.get("/api/v1/commodities")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_api_rejects_free_plan(client, db, test_user, mock_analytics):
"""API returns 403 for free plan users."""
raw_key = await _create_api_key_for_user(db, test_user["id"], plan="free")
response = await client.get(
"/api/v1/commodities",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 403
@pytest.mark.asyncio
async def test_list_commodities(client, db, test_user, mock_analytics):
"""GET /commodities returns commodity list."""
raw_key = await _create_api_key_for_user(db, test_user["id"])
response = await client.get(
"/api/v1/commodities",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 200
data = await response.get_json()
assert "commodities" in data
assert len(data["commodities"]) == 2
@pytest.mark.asyncio
async def test_commodity_metrics(client, db, test_user, mock_analytics):
"""GET /commodities/<code>/metrics returns time series."""
raw_key = await _create_api_key_for_user(db, test_user["id"])
response = await client.get(
"/api/v1/commodities/711100/metrics?metrics=production&metrics=exports",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 200
data = await response.get_json()
assert data["commodity_code"] == 711100
assert "production" in data["metrics"]
@pytest.mark.asyncio
async def test_commodity_metrics_invalid_metric(client, db, test_user, mock_analytics):
"""GET /commodities/<code>/metrics rejects invalid metrics."""
raw_key = await _create_api_key_for_user(db, test_user["id"])
response = await client.get(
"/api/v1/commodities/711100/metrics?metrics=DROP_TABLE",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_commodity_countries(client, db, test_user, mock_analytics):
"""GET /commodities/<code>/countries returns ranking."""
raw_key = await _create_api_key_for_user(db, test_user["id"])
response = await client.get(
"/api/v1/commodities/711100/countries?metric=production&limit=5",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 200
data = await response.get_json()
assert data["metric"] == "production"
@pytest.mark.asyncio
async def test_commodity_csv_export(client, db, test_user, mock_analytics):
"""GET /commodities/<code>/metrics.csv returns CSV."""
raw_key = await _create_api_key_for_user(db, test_user["id"])
response = await client.get(
"/api/v1/commodities/711100/metrics.csv",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 200
assert "text/csv" in response.content_type
@pytest.mark.asyncio
async def test_me_endpoint(client, db, test_user, mock_analytics):
"""GET /me returns user info."""
raw_key = await _create_api_key_for_user(db, test_user["id"])
response = await client.get(
"/api/v1/me",
headers={"Authorization": f"Bearer {raw_key}"},
)
assert response.status_code == 200
data = await response.get_json()
assert data["email"] == "test@example.com"
assert data["plan"] == "starter"