git mv all tracked files from the nested padelnomics/ workspace directory to the git repo root. Merged .gitignore files. No code changes — pure path rename. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
305 lines
16 KiB
Python
305 lines
16 KiB
Python
"""
|
|
Unit tests for billing SQL helpers, feature/limit access, and plan determination.
|
|
"""
|
|
import pytest
|
|
from hypothesis import HealthCheck, given
|
|
from hypothesis import settings as h_settings
|
|
from hypothesis import strategies as st
|
|
from padelnomics.billing.routes import (
|
|
can_access_feature,
|
|
get_subscription,
|
|
get_subscription_by_provider_id,
|
|
is_within_limits,
|
|
update_subscription_status,
|
|
upsert_billing_customer,
|
|
upsert_subscription,
|
|
)
|
|
from padelnomics.core import config
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# get_subscription
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestGetSubscription:
|
|
async def test_returns_none_for_user_without_subscription(self, db, test_user):
|
|
result = await get_subscription(test_user["id"])
|
|
assert result is None
|
|
|
|
async def test_returns_subscription_for_user(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="active")
|
|
result = await get_subscription(test_user["id"])
|
|
assert result is not None
|
|
assert result["plan"] == "pro"
|
|
assert result["status"] == "active"
|
|
assert result["user_id"] == test_user["id"]
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# upsert_subscription
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestUpsertSubscription:
|
|
async def test_insert_new_subscription(self, db, test_user):
|
|
await upsert_billing_customer(test_user["id"], "cust_abc")
|
|
sub_id = await upsert_subscription(
|
|
user_id=test_user["id"],
|
|
plan="pro",
|
|
status="active",
|
|
provider_subscription_id="sub_xyz",
|
|
current_period_end="2025-06-01T00:00:00Z",
|
|
)
|
|
assert sub_id > 0
|
|
row = await get_subscription(test_user["id"])
|
|
assert row["plan"] == "pro"
|
|
assert row["status"] == "active"
|
|
assert row["provider_subscription_id"] == "sub_xyz"
|
|
assert row["current_period_end"] == "2025-06-01T00:00:00Z"
|
|
|
|
async def test_update_existing_subscription(self, db, test_user, create_subscription):
|
|
original_id = await create_subscription(
|
|
test_user["id"], plan="starter", status="active",
|
|
provider_subscription_id="sub_old",
|
|
)
|
|
returned_id = await upsert_subscription(
|
|
user_id=test_user["id"],
|
|
plan="pro",
|
|
status="active",
|
|
provider_subscription_id="sub_old",
|
|
)
|
|
assert returned_id == original_id
|
|
row = await get_subscription(test_user["id"])
|
|
assert row["plan"] == "pro"
|
|
assert row["provider_subscription_id"] == "sub_old"
|
|
|
|
async def test_upsert_with_none_period_end(self, db, test_user):
|
|
await upsert_subscription(
|
|
user_id=test_user["id"],
|
|
plan="pro",
|
|
status="active",
|
|
provider_subscription_id="sub_1",
|
|
current_period_end=None,
|
|
)
|
|
row = await get_subscription(test_user["id"])
|
|
assert row["current_period_end"] is None
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# get_subscription_by_provider_id
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestGetSubscriptionByProviderId:
|
|
async def test_returns_none_for_unknown_id(self, db):
|
|
result = await get_subscription_by_provider_id("nonexistent")
|
|
assert result is None
|
|
|
|
async def test_finds_by_provider_subscription_id(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], provider_subscription_id="sub_findme")
|
|
result = await get_subscription_by_provider_id("sub_findme")
|
|
assert result is not None
|
|
assert result["user_id"] == test_user["id"]
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# update_subscription_status
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestUpdateSubscriptionStatus:
|
|
async def test_updates_status(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], status="active", provider_subscription_id="sub_upd")
|
|
await update_subscription_status("sub_upd", status="cancelled")
|
|
row = await get_subscription(test_user["id"])
|
|
assert row["status"] == "cancelled"
|
|
assert row["updated_at"] is not None
|
|
|
|
async def test_updates_extra_fields(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], provider_subscription_id="sub_extra")
|
|
await update_subscription_status(
|
|
"sub_extra",
|
|
status="active",
|
|
plan="starter",
|
|
current_period_end="2026-01-01T00:00:00Z",
|
|
)
|
|
row = await get_subscription(test_user["id"])
|
|
assert row["status"] == "active"
|
|
assert row["plan"] == "starter"
|
|
assert row["current_period_end"] == "2026-01-01T00:00:00Z"
|
|
|
|
async def test_noop_for_unknown_provider_id(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], provider_subscription_id="sub_known", status="active")
|
|
await update_subscription_status("sub_unknown", status="expired")
|
|
row = await get_subscription(test_user["id"])
|
|
assert row["status"] == "active" # unchanged
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# can_access_feature
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestCanAccessFeature:
|
|
async def test_no_subscription_gets_free_features(self, db, test_user):
|
|
assert await can_access_feature(test_user["id"], "basic") is True
|
|
assert await can_access_feature(test_user["id"], "export") is False
|
|
assert await can_access_feature(test_user["id"], "api") is False
|
|
|
|
async def test_active_pro_gets_all_features(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="active")
|
|
assert await can_access_feature(test_user["id"], "basic") is True
|
|
assert await can_access_feature(test_user["id"], "export") is True
|
|
assert await can_access_feature(test_user["id"], "api") is True
|
|
assert await can_access_feature(test_user["id"], "priority_support") is True
|
|
|
|
async def test_active_starter_gets_starter_features(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="starter", status="active")
|
|
assert await can_access_feature(test_user["id"], "basic") is True
|
|
assert await can_access_feature(test_user["id"], "export") is True
|
|
assert await can_access_feature(test_user["id"], "api") is False
|
|
|
|
async def test_cancelled_still_has_features(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="cancelled")
|
|
assert await can_access_feature(test_user["id"], "api") is True
|
|
|
|
async def test_on_trial_has_features(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="on_trial")
|
|
assert await can_access_feature(test_user["id"], "api") is True
|
|
|
|
async def test_expired_falls_back_to_free(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="expired")
|
|
assert await can_access_feature(test_user["id"], "api") is False
|
|
assert await can_access_feature(test_user["id"], "basic") is True
|
|
|
|
async def test_past_due_falls_back_to_free(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="past_due")
|
|
assert await can_access_feature(test_user["id"], "export") is False
|
|
|
|
async def test_paused_falls_back_to_free(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="paused")
|
|
assert await can_access_feature(test_user["id"], "api") is False
|
|
|
|
async def test_nonexistent_feature_returns_false(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="active")
|
|
assert await can_access_feature(test_user["id"], "teleportation") is False
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# is_within_limits
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestIsWithinLimits:
|
|
async def test_free_user_within_limits(self, db, test_user):
|
|
assert await is_within_limits(test_user["id"], "items", 50) is True
|
|
|
|
async def test_free_user_at_limit(self, db, test_user):
|
|
assert await is_within_limits(test_user["id"], "items", 100) is False
|
|
|
|
async def test_free_user_over_limit(self, db, test_user):
|
|
assert await is_within_limits(test_user["id"], "items", 150) is False
|
|
|
|
async def test_pro_unlimited(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="active")
|
|
assert await is_within_limits(test_user["id"], "items", 999999) is True
|
|
assert await is_within_limits(test_user["id"], "api_calls", 999999) is True
|
|
|
|
async def test_starter_limits(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="starter", status="active")
|
|
assert await is_within_limits(test_user["id"], "items", 999) is True
|
|
assert await is_within_limits(test_user["id"], "items", 1000) is False
|
|
|
|
async def test_expired_pro_gets_free_limits(self, db, test_user, create_subscription):
|
|
await create_subscription(test_user["id"], plan="pro", status="expired")
|
|
assert await is_within_limits(test_user["id"], "items", 100) is False
|
|
|
|
async def test_unknown_resource_returns_false(self, db, test_user):
|
|
assert await is_within_limits(test_user["id"], "unicorns", 0) is False
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Parameterized: status x feature access matrix
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
STATUSES = ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"]
|
|
FEATURES = ["basic", "export", "api", "priority_support"]
|
|
ACTIVE_STATUSES = {"active", "on_trial", "cancelled"}
|
|
|
|
|
|
@pytest.mark.parametrize("status", STATUSES)
|
|
@pytest.mark.parametrize("feature", FEATURES)
|
|
async def test_feature_access_matrix(db, test_user, create_subscription, status, feature):
|
|
if status != "free":
|
|
await create_subscription(test_user["id"], plan="pro", status=status)
|
|
|
|
result = await can_access_feature(test_user["id"], feature)
|
|
|
|
if status in ACTIVE_STATUSES:
|
|
expected = feature in config.PLAN_FEATURES["pro"]
|
|
else:
|
|
expected = feature in config.PLAN_FEATURES["free"]
|
|
|
|
assert result == expected, f"status={status}, feature={feature}"
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Parameterized: plan x feature matrix (active status)
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
PLANS = ["free", "starter", "pro"]
|
|
|
|
|
|
@pytest.mark.parametrize("plan", PLANS)
|
|
@pytest.mark.parametrize("feature", FEATURES)
|
|
async def test_plan_feature_matrix(db, test_user, create_subscription, plan, feature):
|
|
if plan != "free":
|
|
await create_subscription(test_user["id"], plan=plan, status="active")
|
|
|
|
result = await can_access_feature(test_user["id"], feature)
|
|
expected = feature in config.PLAN_FEATURES.get(plan, [])
|
|
assert result == expected, f"plan={plan}, feature={feature}"
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Parameterized: plan x resource limit boundaries
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
@pytest.mark.parametrize("plan", PLANS)
|
|
@pytest.mark.parametrize("resource,at_limit", [
|
|
("items", 100),
|
|
("items", 1000),
|
|
("api_calls", 1000),
|
|
("api_calls", 10000),
|
|
])
|
|
async def test_plan_limit_matrix(db, test_user, create_subscription, plan, resource, at_limit):
|
|
if plan != "free":
|
|
await create_subscription(test_user["id"], plan=plan, status="active")
|
|
|
|
plan_limit = config.PLAN_LIMITS.get(plan, {}).get(resource, 0)
|
|
result = await is_within_limits(test_user["id"], resource, at_limit)
|
|
|
|
if plan_limit == -1:
|
|
assert result is True
|
|
elif at_limit < plan_limit:
|
|
assert result is True
|
|
else:
|
|
assert result is False
|
|
|
|
|
|
# ════════════════════════════════════════════════════════════
|
|
# Hypothesis: limit boundaries
|
|
# ════════════════════════════════════════════════════════════
|
|
|
|
class TestLimitsHypothesis:
|
|
@given(count=st.integers(min_value=0, max_value=10000))
|
|
@h_settings(max_examples=100, deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture])
|
|
async def test_free_limit_boundary_items(self, db, test_user, count):
|
|
result = await is_within_limits(test_user["id"], "items", count)
|
|
assert result == (count < 100)
|
|
|
|
@given(count=st.integers(min_value=0, max_value=100000))
|
|
@h_settings(max_examples=100, deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture])
|
|
async def test_pro_always_within_limits(self, db, test_user, create_subscription, count):
|
|
# Use upsert to avoid duplicate inserts across Hypothesis examples
|
|
await upsert_subscription(
|
|
user_id=test_user["id"], plan="pro", status="active",
|
|
provider_subscription_id="sub_hyp",
|
|
)
|
|
result = await is_within_limits(test_user["id"], "items", count)
|
|
assert result is True
|