""" Unit tests for billing SQL helpers, feature/limit access, and plan determination. """ import pytest from hypothesis import HealthCheck, given from hypothesis import settings as h_settings from hypothesis import strategies as st from beanflows.billing.routes import ( can_access_feature, get_billing_customer, get_subscription, get_subscription_by_provider_id, is_within_limits, record_transaction, update_subscription_status, upsert_billing_customer, upsert_subscription, ) from beanflows.core import config # ════════════════════════════════════════════════════════════ # get_subscription # ════════════════════════════════════════════════════════════ class TestGetSubscription: async def test_returns_none_for_user_without_subscription(self, db, test_user): result = await get_subscription(test_user["id"]) assert result is None async def test_returns_subscription_for_user(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="active") result = await get_subscription(test_user["id"]) assert result is not None assert result["plan"] == "pro" assert result["status"] == "active" assert result["user_id"] == test_user["id"] # ════════════════════════════════════════════════════════════ # upsert_subscription # ════════════════════════════════════════════════════════════ class TestUpsertSubscription: async def test_insert_new_subscription(self, db, test_user): sub_id = await upsert_subscription( user_id=test_user["id"], plan="pro", status="active", provider_subscription_id="sub_xyz", current_period_end="2025-06-01T00:00:00Z", ) assert sub_id > 0 row = await get_subscription(test_user["id"]) assert row["plan"] == "pro" assert row["status"] == "active" assert row["provider_subscription_id"] == "sub_xyz" assert row["current_period_end"] == "2025-06-01T00:00:00Z" async def test_update_existing_by_provider_subscription_id(self, db, test_user): """upsert finds existing by provider_subscription_id, not user_id.""" await upsert_subscription( user_id=test_user["id"], plan="starter", status="active", provider_subscription_id="sub_same", ) await upsert_subscription( user_id=test_user["id"], plan="pro", status="active", provider_subscription_id="sub_same", ) row = await get_subscription(test_user["id"]) assert row["plan"] == "pro" assert row["provider_subscription_id"] == "sub_same" async def test_different_provider_id_creates_new(self, db, test_user): """Different provider_subscription_id creates a new row (multi-sub support).""" await upsert_subscription( user_id=test_user["id"], plan="starter", status="active", provider_subscription_id="sub_first", ) await upsert_subscription( user_id=test_user["id"], plan="pro", status="active", provider_subscription_id="sub_second", ) from beanflows.core import fetch_all rows = await fetch_all( "SELECT * FROM subscriptions WHERE user_id = ? ORDER BY created_at", (test_user["id"],), ) assert len(rows) == 2 async def test_upsert_with_none_period_end(self, db, test_user): await upsert_subscription( user_id=test_user["id"], plan="pro", status="active", provider_subscription_id="sub_1", current_period_end=None, ) row = await get_subscription(test_user["id"]) assert row["current_period_end"] is None # ════════════════════════════════════════════════════════════ # upsert_billing_customer / get_billing_customer # ════════════════════════════════════════════════════════════ class TestUpsertBillingCustomer: async def test_creates_billing_customer(self, db, test_user): await upsert_billing_customer(test_user["id"], "cust_abc") row = await get_billing_customer(test_user["id"]) assert row is not None assert row["provider_customer_id"] == "cust_abc" async def test_updates_existing_customer(self, db, test_user): await upsert_billing_customer(test_user["id"], "cust_old") await upsert_billing_customer(test_user["id"], "cust_new") row = await get_billing_customer(test_user["id"]) assert row["provider_customer_id"] == "cust_new" async def test_get_returns_none_for_unknown_user(self, db): row = await get_billing_customer(99999) assert row is None # ════════════════════════════════════════════════════════════ # get_subscription_by_provider_id # ════════════════════════════════════════════════════════════ class TestGetSubscriptionByProviderId: async def test_returns_none_for_unknown_id(self, db): result = await get_subscription_by_provider_id("nonexistent") assert result is None async def test_finds_by_provider_subscription_id(self, db, test_user, create_subscription): await create_subscription(test_user["id"], provider_subscription_id="sub_findme") result = await get_subscription_by_provider_id("sub_findme") assert result is not None assert result["user_id"] == test_user["id"] # ════════════════════════════════════════════════════════════ # update_subscription_status # ════════════════════════════════════════════════════════════ class TestUpdateSubscriptionStatus: async def test_updates_status(self, db, test_user, create_subscription): await create_subscription(test_user["id"], status="active", provider_subscription_id="sub_upd") await update_subscription_status("sub_upd", status="cancelled") row = await get_subscription(test_user["id"]) assert row["status"] == "cancelled" assert row["updated_at"] is not None async def test_updates_extra_fields(self, db, test_user, create_subscription): await create_subscription(test_user["id"], provider_subscription_id="sub_extra") await update_subscription_status( "sub_extra", status="active", plan="starter", current_period_end="2026-01-01T00:00:00Z", ) row = await get_subscription(test_user["id"]) assert row["status"] == "active" assert row["plan"] == "starter" assert row["current_period_end"] == "2026-01-01T00:00:00Z" async def test_noop_for_unknown_provider_id(self, db, test_user, create_subscription): await create_subscription(test_user["id"], provider_subscription_id="sub_known", status="active") await update_subscription_status("sub_unknown", status="expired") row = await get_subscription(test_user["id"]) assert row["status"] == "active" # unchanged # ════════════════════════════════════════════════════════════ # can_access_feature # ════════════════════════════════════════════════════════════ class TestCanAccessFeature: async def test_no_subscription_gets_free_features(self, db, test_user): assert await can_access_feature(test_user["id"], "basic") is True assert await can_access_feature(test_user["id"], "export") is False assert await can_access_feature(test_user["id"], "api") is False async def test_active_pro_gets_all_features(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="active") assert await can_access_feature(test_user["id"], "basic") is True assert await can_access_feature(test_user["id"], "export") is True assert await can_access_feature(test_user["id"], "api") is True assert await can_access_feature(test_user["id"], "priority_support") is True async def test_active_starter_gets_starter_features(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="starter", status="active") assert await can_access_feature(test_user["id"], "basic") is True assert await can_access_feature(test_user["id"], "export") is True assert await can_access_feature(test_user["id"], "api") is False async def test_cancelled_still_has_features(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="cancelled") assert await can_access_feature(test_user["id"], "api") is True async def test_on_trial_has_features(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="on_trial") assert await can_access_feature(test_user["id"], "api") is True async def test_expired_falls_back_to_free(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="expired") assert await can_access_feature(test_user["id"], "api") is False assert await can_access_feature(test_user["id"], "basic") is True async def test_past_due_falls_back_to_free(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="past_due") assert await can_access_feature(test_user["id"], "export") is False async def test_paused_falls_back_to_free(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="paused") assert await can_access_feature(test_user["id"], "api") is False async def test_nonexistent_feature_returns_false(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="active") assert await can_access_feature(test_user["id"], "teleportation") is False # ════════════════════════════════════════════════════════════ # is_within_limits # ════════════════════════════════════════════════════════════ class TestIsWithinLimits: async def test_free_user_within_limits(self, db, test_user): assert await is_within_limits(test_user["id"], "items", 50) is True async def test_free_user_at_limit(self, db, test_user): assert await is_within_limits(test_user["id"], "items", 100) is False async def test_free_user_over_limit(self, db, test_user): assert await is_within_limits(test_user["id"], "items", 150) is False async def test_pro_unlimited(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="active") assert await is_within_limits(test_user["id"], "items", 999999) is True assert await is_within_limits(test_user["id"], "api_calls", 999999) is True async def test_starter_limits(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="starter", status="active") assert await is_within_limits(test_user["id"], "items", 999) is True assert await is_within_limits(test_user["id"], "items", 1000) is False async def test_expired_pro_gets_free_limits(self, db, test_user, create_subscription): await create_subscription(test_user["id"], plan="pro", status="expired") assert await is_within_limits(test_user["id"], "items", 100) is False async def test_unknown_resource_returns_false(self, db, test_user): assert await is_within_limits(test_user["id"], "unicorns", 0) is False # ════════════════════════════════════════════════════════════ # Parameterized: status × feature access matrix # ════════════════════════════════════════════════════════════ STATUSES = ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"] FEATURES = ["basic", "export", "api", "priority_support"] ACTIVE_STATUSES = {"active", "on_trial", "cancelled"} @pytest.mark.parametrize("status", STATUSES) @pytest.mark.parametrize("feature", FEATURES) async def test_feature_access_matrix(db, test_user, create_subscription, status, feature): if status != "free": await create_subscription(test_user["id"], plan="pro", status=status) result = await can_access_feature(test_user["id"], feature) if status in ACTIVE_STATUSES: expected = feature in config.PLAN_FEATURES["pro"] else: expected = feature in config.PLAN_FEATURES["free"] assert result == expected, f"status={status}, feature={feature}" # ════════════════════════════════════════════════════════════ # Parameterized: plan × feature matrix (active status) # ════════════════════════════════════════════════════════════ PLANS = ["free", "starter", "pro"] @pytest.mark.parametrize("plan", PLANS) @pytest.mark.parametrize("feature", FEATURES) async def test_plan_feature_matrix(db, test_user, create_subscription, plan, feature): if plan != "free": await create_subscription(test_user["id"], plan=plan, status="active") result = await can_access_feature(test_user["id"], feature) expected = feature in config.PLAN_FEATURES.get(plan, []) assert result == expected, f"plan={plan}, feature={feature}" # ════════════════════════════════════════════════════════════ # Parameterized: plan × resource limit boundaries # ════════════════════════════════════════════════════════════ @pytest.mark.parametrize("plan", PLANS) @pytest.mark.parametrize("resource,at_limit", [ ("items", 100), ("items", 1000), ("api_calls", 1000), ("api_calls", 10000), ]) async def test_plan_limit_matrix(db, test_user, create_subscription, plan, resource, at_limit): if plan != "free": await create_subscription(test_user["id"], plan=plan, status="active") plan_limit = config.PLAN_LIMITS.get(plan, {}).get(resource, 0) result = await is_within_limits(test_user["id"], resource, at_limit) if plan_limit == -1: assert result is True elif at_limit < plan_limit: assert result is True else: assert result is False # ════════════════════════════════════════════════════════════ # Hypothesis: limit boundaries # ════════════════════════════════════════════════════════════ class TestLimitsHypothesis: @given(count=st.integers(min_value=0, max_value=10000)) @h_settings(max_examples=100, deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture]) async def test_free_limit_boundary_items(self, db, test_user, count): result = await is_within_limits(test_user["id"], "items", count) assert result == (count < 100) @given(count=st.integers(min_value=0, max_value=100000)) @h_settings(max_examples=100, deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture]) async def test_pro_always_within_limits(self, db, test_user, create_subscription, count): # Use upsert to avoid duplicate inserts across Hypothesis examples await upsert_subscription( user_id=test_user["id"], plan="pro", status="active", provider_subscription_id="sub_hyp", ) result = await is_within_limits(test_user["id"], "items", count) assert result is True # ════════════════════════════════════════════════════════════ # record_transaction # ════════════════════════════════════════════════════════════ class TestRecordTransaction: async def test_inserts_transaction(self, db, test_user): txn_id = await record_transaction( user_id=test_user["id"], provider_transaction_id="txn_abc123", type="payment", amount_cents=2999, currency="EUR", status="completed", ) assert txn_id is not None and txn_id > 0 from beanflows.core import fetch_one row = await fetch_one( "SELECT * FROM transactions WHERE provider_transaction_id = ?", ("txn_abc123",), ) assert row is not None assert row["user_id"] == test_user["id"] assert row["amount_cents"] == 2999 assert row["currency"] == "EUR" assert row["status"] == "completed" async def test_idempotent_on_duplicate_provider_id(self, db, test_user): await record_transaction( user_id=test_user["id"], provider_transaction_id="txn_dup", amount_cents=1000, ) # Second insert with same provider_transaction_id should be ignored await record_transaction( user_id=test_user["id"], provider_transaction_id="txn_dup", amount_cents=9999, ) from beanflows.core import fetch_all rows = await fetch_all( "SELECT * FROM transactions WHERE provider_transaction_id = ?", ("txn_dup",), ) assert len(rows) == 1 assert rows[0]["amount_cents"] == 1000 # original value preserved