add hybrid calculator refactor and comprehensive billing test suite

Move planner financial model from client-side JS to server-side Python
(calculator.py + /planner/calculate endpoint). Add full test coverage:
227 calculator tests and 371 billing tests covering SQL helpers,
webhooks, routes, and subscription gating with Hypothesis fuzzing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-13 12:05:03 +01:00
parent cf11add1e5
commit 9703651562
14 changed files with 2905 additions and 186 deletions

28
CHANGELOG.md Normal file
View File

@@ -0,0 +1,28 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Added
- Server-side financial calculator (`planner/calculator.py`) — ported JS `calc()`, `pmt()`, `calcIRR()` to Python so the full financial model is no longer exposed in client-side JavaScript
- `POST /planner/calculate` endpoint for server-side computation
- Pre-computed initial data (`window.__PADELNOMICS_INITIAL_D__`) injected on page load for instant first render
- Debounced API fetch pattern in `planner.js` with `AbortController` for in-flight request cancellation
- Computing indicator CSS (`.planner-app--computing`) with subtle "computing..." text
- Comprehensive test suite for calculator (`tests/test_calculator.py` — 227 tests) covering all 4 venue/ownership combos, edge cases, and Hypothesis property-based fuzzing
- Comprehensive billing test suite (371 tests total):
- `tests/conftest.py` — shared fixtures (DB, app, clients, subscriptions, webhook helpers)
- `tests/test_billing_helpers.py` — unit tests for SQL helpers, feature/limit access, plan determination (60+ tests + parameterized + Hypothesis)
- `tests/test_billing_webhooks.py` — integration tests for LemonSqueezy webhooks (signature verification, all lifecycle events, Hypothesis fuzzing)
- `tests/test_billing_routes.py` — route tests (pricing, checkout, manage, cancel, resume, subscription_required decorator)
- Added `hypothesis>=6.100.0` and `respx>=0.22.0` to dev dependencies for property-based testing and httpx mocking
- **Factored into Copier template** — all billing tests now generate as `.jinja` templates with provider-specific conditionals for Stripe, Paddle, and LemonSqueezy at `/var/home/Deeman/Projects/materia_saas_boilerplate/{{project_slug}}/tests/`
### Changed
- `planner.js` no longer contains `calc()`, `pmt()`, or `calcIRR()` functions — computation moved server-side
- `render()` split into `render()` (tab switching + schedule calc) and `renderWith(d)` (DOM updates from data)
- Tab switching now renders from `_lastD` cache (instant, no API call)
- Slider input triggers 200ms debounced server call instead of synchronous client-side calc

View File

@@ -28,6 +28,7 @@ Thumbs.db
# Testing
.pytest_cache/
.hypothesis/
.coverage
htmlcov/

View File

@@ -23,8 +23,10 @@ packages = ["src/padelnomics"]
[tool.uv]
dev-dependencies = [
"hypothesis>=6.151.6",
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"respx>=0.22.0",
"ruff>=0.3.0",
]

View File

@@ -0,0 +1,419 @@
"""
Padel court financial model — server-side calculation engine.
Ported from planner.js calc()/pmt()/calcIRR() so the full financial
model is no longer exposed in client-side JavaScript.
"""
import math
# JS-compatible rounding: half-up (0.5 rounds to 1), not Python's
# banker's rounding (round-half-even).
_round = lambda n: math.floor(n + 0.5)
# -- Default state (mirrors the JS `S` object) --
DEFAULTS = {
"venue": "indoor",
"own": "rent",
"dblCourts": 4,
"sglCourts": 2,
"sqmPerDblHall": 330,
"sqmPerSglHall": 220,
"sqmPerDblOutdoor": 300,
"sqmPerSglOutdoor": 200,
"ratePeak": 50,
"rateOffPeak": 35,
"rateSingle": 30,
"peakPct": 40,
"hoursPerDay": 16,
"daysPerMonthIndoor": 29,
"daysPerMonthOutdoor": 25,
"bookingFee": 10,
"utilTarget": 40,
"membershipRevPerCourt": 500,
"fbRevPerCourt": 300,
"coachingRevPerCourt": 200,
"retailRevPerCourt": 80,
"racketRentalRate": 15,
"racketPrice": 5,
"racketQty": 2,
"ballRate": 10,
"ballPrice": 3,
"ballCost": 1.5,
"courtCostDbl": 25000,
"courtCostSgl": 15000,
"shipping": 3000,
"hallCostSqm": 500,
"foundationSqm": 150,
"landPriceSqm": 60,
"hvac": 100000,
"electrical": 60000,
"sanitary": 80000,
"parking": 50000,
"fitout": 40000,
"planning": 100000,
"fireProtection": 80000,
"floorPrep": 12000,
"hvacUpgrade": 20000,
"lightingUpgrade": 10000,
"outdoorFoundation": 35,
"outdoorSiteWork": 8000,
"outdoorLighting": 4000,
"outdoorFencing": 6000,
"equipment": 2000,
"workingCapital": 15000,
"contingencyPct": 10,
"rentSqm": 4,
"outdoorRent": 400,
"insurance": 300,
"electricity": 600,
"heating": 400,
"maintenance": 300,
"cleaning": 300,
"marketing": 350,
"staff": 0,
"propertyTax": 250,
"water": 125,
"loanPct": 85,
"interestRate": 5,
"loanTerm": 10,
"constructionMonths": 0,
"holdYears": 5,
"exitMultiple": 6,
"annualRevGrowth": 2,
"ramp": [0.25, 0.35, 0.45, 0.55, 0.65, 0.75, 0.82, 0.88, 0.93, 0.96, 0.98, 1],
"season": [0, 0, 0, 0.7, 0.9, 1, 1, 1, 0.8, 0, 0, 0],
}
def validate_state(s: dict) -> dict:
"""Apply defaults and coerce types. Returns a clean copy."""
out = {**DEFAULTS}
for k, default in DEFAULTS.items():
if k not in s:
continue
v = s[k]
if isinstance(default, list):
if isinstance(v, list):
out[k] = [float(x) for x in v]
elif isinstance(default, float):
try:
out[k] = float(v)
except (TypeError, ValueError):
pass
elif isinstance(default, int):
try:
out[k] = int(float(v))
except (TypeError, ValueError):
pass
elif isinstance(default, str):
out[k] = str(v)
return out
def pmt(rate: float, nper: int, pv: float) -> float:
"""Loan payment calculation (matches JS pmt exactly)."""
if rate == 0:
return pv / nper
return pv * rate * math.pow(1 + rate, nper) / (math.pow(1 + rate, nper) - 1)
def calc_irr(cfs: list[float], guess: float = 0.1) -> float:
"""Newton-Raphson IRR solver (matches JS calcIRR exactly)."""
r = guess
for _ in range(300):
npv = 0.0
d = 0.0
for t, cf in enumerate(cfs):
npv += cf / math.pow(1 + r, t)
d -= t * cf / math.pow(1 + r, t + 1)
if abs(d) < 1e-12:
break
nr = r - npv / d
if abs(nr - r) < 1e-9:
return nr
r = nr
if r < -0.99:
r = -0.99
if r > 10:
r = 10
return r
def calc(s: dict) -> dict:
"""
Main financial model. Takes validated state dict, returns full
derived-data dict (the `d` object from the JS version).
"""
d: dict = {}
is_in = s["venue"] == "indoor"
is_buy = s["own"] == "buy"
d["totalCourts"] = s["dblCourts"] + s["sglCourts"]
total_courts = d["totalCourts"]
d["hallSqm"] = (
(s["dblCourts"] * s["sqmPerDblHall"] + s["sglCourts"] * s["sqmPerSglHall"] + 200 + total_courts * 20)
if total_courts
else 0
)
d["outdoorLandSqm"] = (
(s["dblCourts"] * s["sqmPerDblOutdoor"] + s["sglCourts"] * s["sqmPerSglOutdoor"] + 100)
if total_courts
else 0
)
d["sqm"] = d["hallSqm"] if is_in else d["outdoorLandSqm"]
# -- CAPEX --
capex_items: list[dict] = []
def ci(name: str, amount: float, info: str = ""):
capex_items.append({"name": name, "amount": _round(amount), "info": info})
ci(
"Padel Courts",
s["dblCourts"] * s["courtCostDbl"] + s["sglCourts"] * s["courtCostSgl"],
f"{s['dblCourts']}\u00d7dbl + {s['sglCourts']}\u00d7sgl",
)
ci("Shipping", math.ceil(total_courts / 2) * s["shipping"] if total_courts else 0)
if is_in:
if is_buy:
ci("Hall Construction", d["hallSqm"] * s["hallCostSqm"],
f"{d['hallSqm']}m\u00b2 \u00d7 \u20ac{s['hallCostSqm']}/m\u00b2")
ci("Foundation", d["hallSqm"] * s["foundationSqm"],
f"{d['hallSqm']}m\u00b2 \u00d7 \u20ac{s['foundationSqm']}/m\u00b2")
land_sqm = _round(d["hallSqm"] * 1.25)
ci("Land Purchase", land_sqm * s["landPriceSqm"],
f"{land_sqm}m\u00b2 \u00d7 \u20ac{s['landPriceSqm']}/m\u00b2")
ci("Transaction Costs", _round(land_sqm * s["landPriceSqm"] * 0.1), "~10% of land")
ci("HVAC System", s["hvac"])
ci("Electrical + Lighting", s["electrical"])
ci("Sanitary / Changing", s["sanitary"])
ci("Parking + Exterior", s["parking"])
ci("Planning + Permits", s["planning"])
ci("Fire Protection", s["fireProtection"])
else:
ci("Floor Preparation", s["floorPrep"])
ci("HVAC Upgrade", s["hvacUpgrade"])
ci("Lighting Upgrade", s["lightingUpgrade"])
ci("Fit-Out & Reception", s["fitout"])
else:
ci("Concrete Foundation", (s["dblCourts"] * 250 + s["sglCourts"] * 150) * s["outdoorFoundation"])
ci("Site Work", s["outdoorSiteWork"])
ci("Lighting", total_courts * s["outdoorLighting"])
ci("Fencing", s["outdoorFencing"])
if is_buy:
ci("Land Purchase", d["outdoorLandSqm"] * s["landPriceSqm"],
f"{d['outdoorLandSqm']}m\u00b2 \u00d7 \u20ac{s['landPriceSqm']}/m\u00b2")
ci("Transaction Costs", _round(d["outdoorLandSqm"] * s["landPriceSqm"] * 0.1))
ci("Equipment", s["equipment"] + total_courts * 300)
ci("Working Capital", s["workingCapital"])
ci("Miscellaneous", 8000 if is_buy else 6000)
sub = sum(i["amount"] for i in capex_items)
cont = _round(sub * s["contingencyPct"] / 100)
if s["contingencyPct"] > 0:
ci(f"Contingency ({s['contingencyPct']}%)", cont)
d["capexItems"] = capex_items
d["capex"] = sub + cont
d["capexPerCourt"] = d["capex"] / total_courts if total_courts > 0 else 0
d["capexPerSqm"] = d["capex"] / d["sqm"] if d["sqm"] > 0 else 0
# -- OPEX --
opex_items: list[dict] = []
def oi(name: str, amount: float, info: str = ""):
opex_items.append({"name": name, "amount": _round(amount), "info": info})
if not is_buy:
if is_in:
oi("Rent", d["hallSqm"] * s["rentSqm"],
f"{d['hallSqm']}m\u00b2 \u00d7 \u20ac{s['rentSqm']}/m\u00b2")
else:
oi("Rent", s["outdoorRent"])
else:
oi("Property Tax", s["propertyTax"])
oi("Insurance", s["insurance"])
oi("Electricity", s["electricity"])
if is_in:
oi("Heating", s["heating"])
oi("Water", s["water"])
oi("Maintenance", s["maintenance"])
if is_in:
oi("Cleaning", s["cleaning"])
oi("Marketing / Software / Misc", s["marketing"])
if s["staff"] > 0:
oi("Staff", s["staff"])
d["opexItems"] = opex_items
d["opex"] = sum(i["amount"] for i in opex_items)
d["annualOpex"] = d["opex"] * 12
# -- Financing --
d["equity"] = _round(d["capex"] * (1 - s["loanPct"] / 100))
d["loanAmount"] = d["capex"] - d["equity"]
d["monthlyPayment"] = (
pmt(s["interestRate"] / 100 / 12, max(s["loanTerm"], 1) * 12, d["loanAmount"])
if d["loanAmount"] > 0
else 0
)
d["annualDebtService"] = d["monthlyPayment"] * 12
d["ltv"] = d["loanAmount"] / d["capex"] if d["capex"] > 0 else 0
# -- Revenue model --
dpm = s["daysPerMonthIndoor"] if is_in else s["daysPerMonthOutdoor"]
d["daysPerMonth"] = dpm
if total_courts > 0:
w_rate = (
s["dblCourts"] * (s["ratePeak"] * s["peakPct"] / 100 + s["rateOffPeak"] * (1 - s["peakPct"] / 100))
+ s["sglCourts"] * s["rateSingle"]
) / total_courts
else:
w_rate = s["ratePeak"]
d["weightedRate"] = w_rate
d["availHoursMonth"] = s["hoursPerDay"] * dpm * total_courts
d["bookedHoursMonth"] = d["availHoursMonth"] * (s["utilTarget"] / 100)
d["courtRevMonth"] = d["bookedHoursMonth"] * w_rate
d["feeDeduction"] = d["courtRevMonth"] * (s["bookingFee"] / 100)
d["racketRev"] = d["bookedHoursMonth"] * (s["racketRentalRate"] / 100) * s["racketQty"] * s["racketPrice"]
d["ballMargin"] = d["bookedHoursMonth"] * (s["ballRate"] / 100) * (s["ballPrice"] - s["ballCost"])
d["membershipRev"] = total_courts * s["membershipRevPerCourt"]
d["fbRev"] = total_courts * s["fbRevPerCourt"]
d["coachingRev"] = total_courts * s["coachingRevPerCourt"]
d["retailRev"] = total_courts * s["retailRevPerCourt"]
d["grossRevMonth"] = (
d["courtRevMonth"] + d["racketRev"] + d["ballMargin"]
+ d["membershipRev"] + d["fbRev"] + d["coachingRev"] + d["retailRev"]
)
d["netRevMonth"] = d["grossRevMonth"] - d["feeDeduction"]
d["ebitdaMonth"] = d["netRevMonth"] - d["opex"]
d["netCFMonth"] = d["ebitdaMonth"] - d["monthlyPayment"]
# -- 60-month cash flow projection --
months: list[dict] = []
for m in range(1, 61):
cm = (m - 1) % 12
yr = math.ceil(m / 12)
ramp = s["ramp"][m - 1] if m <= 12 else 1
seas = 1 if is_in else s["season"][cm]
eff_util = (s["utilTarget"] / 100) * ramp * seas
avail = s["hoursPerDay"] * dpm * total_courts if seas > 0 else 0
booked = avail * eff_util
court_rev = booked * w_rate
fees = -court_rev * (s["bookingFee"] / 100)
ancillary = booked * (
(s["racketRentalRate"] / 100) * s["racketQty"] * s["racketPrice"]
+ (s["ballRate"] / 100) * (s["ballPrice"] - s["ballCost"])
)
membership = total_courts * s["membershipRevPerCourt"] * (ramp if seas > 0 else 0)
fb = total_courts * s["fbRevPerCourt"] * (ramp if seas > 0 else 0)
coaching = total_courts * s["coachingRevPerCourt"] * (ramp if seas > 0 else 0)
retail = total_courts * s["retailRevPerCourt"] * (ramp if seas > 0 else 0)
total_rev = court_rev + fees + ancillary + membership + fb + coaching + retail
opex_val = -d["opex"]
loan = -d["monthlyPayment"]
ebitda = total_rev + opex_val
ncf = ebitda + loan
prev = months[-1] if months else None
cum = (prev["cum"] if prev else -d["capex"]) + ncf
months.append({
"m": m, "cm": cm + 1, "yr": yr, "ramp": ramp, "seas": seas,
"effUtil": eff_util, "avail": avail, "booked": booked,
"courtRev": court_rev, "fees": fees, "ancillary": ancillary,
"membership": membership, "totalRev": total_rev, "opex": opex_val,
"loan": loan, "ebitda": ebitda, "ncf": ncf, "cum": cum,
})
d["months"] = months
# -- Annual summaries --
annuals: list[dict] = []
for y in range(1, 6):
ym = [m for m in months if m["yr"] == y]
annuals.append({
"year": y,
"revenue": sum(m["totalRev"] for m in ym),
"ebitda": sum(m["ebitda"] for m in ym),
"ncf": sum(m["ncf"] for m in ym),
"ds": sum(abs(m["loan"]) for m in ym),
"booked": sum(m["booked"] for m in ym),
"avail": sum(m["avail"] for m in ym),
})
d["annuals"] = annuals
# -- Returns & exit --
y3_ebitda = annuals[2]["ebitda"] if len(annuals) >= 3 else 0
d["stabEbitda"] = y3_ebitda
d["exitValue"] = y3_ebitda * s["exitMultiple"]
d["remainingLoan"] = d["loanAmount"] * max(0, 1 - s["holdYears"] / (max(s["loanTerm"], 1) * 1.5))
d["netExit"] = d["exitValue"] - d["remainingLoan"]
irr_cfs = [-d["capex"]]
for y in range(s["holdYears"]):
ycf = annuals[y]["ncf"] if y < len(annuals) else (annuals[-1]["ncf"] if annuals else 0)
if y == s["holdYears"] - 1:
irr_cfs.append(ycf + d["netExit"])
else:
irr_cfs.append(ycf)
d["irr"] = calc_irr(irr_cfs)
d["totalReturned"] = sum(irr_cfs[1:])
d["moic"] = d["totalReturned"] / d["capex"] if d["capex"] > 0 else 0
d["dscr"] = [
{"year": a["year"], "dscr": a["ebitda"] / a["ds"] if a["ds"] > 0 else 999}
for a in annuals
]
payback_idx = -1
for i, m in enumerate(months):
if m["cum"] >= 0:
payback_idx = i
break
d["paybackIdx"] = payback_idx
# -- Efficiency metrics --
rev_per_hr = (
w_rate * (1 - s["bookingFee"] / 100)
+ (s["racketRentalRate"] / 100) * s["racketQty"] * s["racketPrice"]
+ (s["ballRate"] / 100) * (s["ballPrice"] - s["ballCost"])
)
fixed_month = d["opex"] + d["monthlyPayment"]
d["breakEvenHrs"] = fixed_month / max(rev_per_hr, 0.01)
d["breakEvenUtil"] = d["breakEvenHrs"] / d["availHoursMonth"] if d["availHoursMonth"] > 0 else 1
d["breakEvenHrsPerCourt"] = d["breakEvenHrs"] / total_courts / dpm if total_courts > 0 else 0
d["revPAH"] = d["netRevMonth"] / d["availHoursMonth"] if d["availHoursMonth"] > 0 else 0
d["revPerSqm"] = (d["netRevMonth"] * 12) / d["sqm"] if d["sqm"] > 0 else 0
d["ebitdaMargin"] = d["ebitdaMonth"] / d["netRevMonth"] if d["netRevMonth"] > 0 else 0
d["opexRatio"] = d["opex"] / d["netRevMonth"] if d["netRevMonth"] > 0 else 0
# Rent ratio — find the "Rent" item in opex
rent_amount = 0
for item in opex_items:
if item["name"] == "Rent":
rent_amount = item["amount"]
break
d["rentRatio"] = rent_amount / d["netRevMonth"] if d["netRevMonth"] > 0 else 0
d["cashOnCash"] = (annuals[2]["ncf"] if len(annuals) >= 3 else 0) / d["equity"] if d["equity"] > 0 else 0
d["yieldOnCost"] = d["stabEbitda"] / d["capex"] if d["capex"] > 0 else 0
d["debtYield"] = d["stabEbitda"] / d["loanAmount"] if d["loanAmount"] > 0 else 0
d["costPerBookedHr"] = (
(d["opex"] + d["monthlyPayment"]) / d["bookedHoursMonth"]
if d["bookedHoursMonth"] > 0
else 0
)
d["avgUtil"] = (
annuals[2]["booked"] / annuals[2]["avail"]
if len(annuals) >= 3 and annuals[2]["avail"] > 0
else 0
)
return d

View File

@@ -9,6 +9,7 @@ from quart import Blueprint, render_template, request, g, jsonify
from ..core import fetch_one, fetch_all, execute, csrf_protect
from ..auth.routes import login_required
from .calculator import calc, validate_state
bp = Blueprint(
"planner",
@@ -53,13 +54,26 @@ async def get_scenarios(user_id: int) -> list[dict]:
async def index():
scenario_count = await count_scenarios(g.user["id"])
default = await get_default_scenario(g.user["id"])
initial_state = json.loads(default["state_json"]) if default else {}
state = validate_state(initial_state)
initial_d = calc(state)
return await render_template(
"planner.html",
initial_state=default["state_json"] if default else None,
initial_d=json.dumps(initial_d),
scenario_count=scenario_count,
)
@bp.route("/calculate", methods=["POST"])
@login_required
async def calculate():
data = await request.get_json()
state = validate_state(data.get("state", {}))
d = calc(state)
return jsonify(d)
@bp.route("/scenarios", methods=["GET"])
@login_required
async def scenario_list():

View File

@@ -190,6 +190,8 @@
{% if initial_state %}
window.__PADELNOMICS_INITIAL_STATE__ = {{ initial_state | safe }};
{% endif %}
window.__PADELNOMICS_INITIAL_D__ = {{ initial_d | safe }};
window.__PADELNOMICS_CALC_URL__ = "{{ url_for('planner.calculate') }}";
window.__PADELNOMICS_SAVE_URL__ = "{{ url_for('planner.save_scenario') }}";
window.__PADELNOMICS_SCENARIO_URL__ = "{{ url_for('planner.index') }}scenarios/";
</script>

View File

@@ -597,3 +597,13 @@
85% { opacity: 1; }
100% { opacity: 0; }
}
/* ── Computing indicator ── */
.planner-app--computing .planner-header h1::after {
content: 'computing\2026';
font-size: 10px;
font-weight: 500;
color: var(--txt-3);
margin-left: 10px;
letter-spacing: 0.03em;
}

View File

@@ -49,22 +49,6 @@ const fmtX=n=>`${n.toFixed(2)}x`;
const fmtN=n=>new Intl.NumberFormat('de-DE').format(Math.round(n));
const fE=v=>fmt(v), fP=v=>v+'%', fN=v=>v, fR=v=>v+'x', fY=v=>v+' yr', fH=v=>v+'h', fD=v=>'\u20AC'+v, fM=v=>v+' mo';
function pmt(rate,nper,pv){if(rate===0)return pv/nper;return pv*rate*Math.pow(1+rate,nper)/(Math.pow(1+rate,nper)-1)}
function calcIRR(cfs,guess=.1){
let r=guess;
for(let i=0;i<300;i++){
let npv=0,d=0;
for(let t=0;t<cfs.length;t++){npv+=cfs[t]/Math.pow(1+r,t);d-=t*cfs[t]/Math.pow(1+r,t+1)}
if(Math.abs(d)<1e-12)break;
const nr=r-npv/d;
if(Math.abs(nr-r)<1e-9)return nr;
r=nr;
if(r<-0.99)r=-0.99;if(r>10)r=10;
}
return r;
}
function ti(text){
if(!text) return '';
return ` <span class="ti">i<span class="tp">${text}</span></span>`;
@@ -87,171 +71,40 @@ function cardSmHTML(label,value,sub,cls='',tip=''){
</div>`;
}
// ── Derived Calculations ──────────────────────────────────
function calc(){
const d = {};
const isIn = S.venue==='indoor', isBuy = S.own==='buy';
d.totalCourts = S.dblCourts + S.sglCourts;
d.hallSqm = d.totalCourts ? S.dblCourts*S.sqmPerDblHall + S.sglCourts*S.sqmPerSglHall + 200 + d.totalCourts*20 : 0;
d.outdoorLandSqm = d.totalCourts ? S.dblCourts*S.sqmPerDblOutdoor + S.sglCourts*S.sqmPerSglOutdoor + 100 : 0;
d.sqm = isIn ? d.hallSqm : d.outdoorLandSqm;
// ── Server-side calculation ──────────────────────────────
let _lastD = window.__PADELNOMICS_INITIAL_D__ || null;
let _calcTimer = null;
let _calcController = null;
d.capexItems = [];
const ci = (name,amount,info) => d.capexItems.push({name,amount:Math.round(amount),info});
ci('Padel Courts', S.dblCourts*S.courtCostDbl + S.sglCourts*S.courtCostSgl, `${S.dblCourts}\u00D7dbl + ${S.sglCourts}\u00D7sgl`);
ci('Shipping', Math.ceil(d.totalCourts/2)*S.shipping);
if(isIn){
if(isBuy){
ci('Hall Construction', d.hallSqm*S.hallCostSqm, `${d.hallSqm}m\u00B2 \u00D7 ${fmt(S.hallCostSqm)}/m\u00B2`);
ci('Foundation', d.hallSqm*S.foundationSqm, `${d.hallSqm}m\u00B2 \u00D7 ${fmt(S.foundationSqm)}/m\u00B2`);
const landSqm = Math.round(d.hallSqm*1.25);
ci('Land Purchase', landSqm*S.landPriceSqm, `${landSqm}m\u00B2 \u00D7 ${fmt(S.landPriceSqm)}/m\u00B2`);
ci('Transaction Costs', Math.round(landSqm*S.landPriceSqm*0.1), '~10% of land');
ci('HVAC System', S.hvac);
ci('Electrical + Lighting', S.electrical);
ci('Sanitary / Changing', S.sanitary);
ci('Parking + Exterior', S.parking);
ci('Planning + Permits', S.planning);
ci('Fire Protection', S.fireProtection);
} else {
ci('Floor Preparation', S.floorPrep);
ci('HVAC Upgrade', S.hvacUpgrade);
ci('Lighting Upgrade', S.lightingUpgrade);
ci('Fit-Out & Reception', S.fitout);
function fetchCalc(){
if(_calcController) _calcController.abort();
_calcController = new AbortController();
const app = $('.planner-app');
if(app) app.classList.add('planner-app--computing');
fetch(window.__PADELNOMICS_CALC_URL__, {
method:'POST',
headers:{'Content-Type':'application/json'},
body:JSON.stringify({state:S}),
signal:_calcController.signal,
})
.then(r=>r.json())
.then(d=>{
_lastD = d;
_calcController = null;
if(app) app.classList.remove('planner-app--computing');
renderWith(d);
})
.catch(e=>{
if(e.name!=='AbortError'){
_calcController = null;
if(app) app.classList.remove('planner-app--computing');
}
} else {
ci('Concrete Foundation', (S.dblCourts*250+S.sglCourts*150)*S.outdoorFoundation);
ci('Site Work', S.outdoorSiteWork);
ci('Lighting', d.totalCourts*S.outdoorLighting);
ci('Fencing', S.outdoorFencing);
if(isBuy){
ci('Land Purchase', d.outdoorLandSqm*S.landPriceSqm, `${d.outdoorLandSqm}m\u00B2 \u00D7 ${fmt(S.landPriceSqm)}/m\u00B2`);
ci('Transaction Costs', Math.round(d.outdoorLandSqm*S.landPriceSqm*0.1));
}
}
ci('Equipment', S.equipment + d.totalCourts*300);
ci('Working Capital', S.workingCapital);
ci('Miscellaneous', isBuy ? 8000 : 6000);
const sub = d.capexItems.reduce((s,i)=>s+i.amount,0);
const cont = Math.round(sub*S.contingencyPct/100);
if(S.contingencyPct>0) ci(`Contingency (${S.contingencyPct}%)`, cont);
d.capex = sub + cont;
d.capexPerCourt = d.totalCourts>0 ? d.capex/d.totalCourts : 0;
d.capexPerSqm = d.sqm>0 ? d.capex/d.sqm : 0;
});
}
d.opexItems = [];
const oi = (name,amount,info) => d.opexItems.push({name,amount:Math.round(amount),info});
if(!isBuy){
if(isIn) oi('Rent', d.hallSqm*S.rentSqm, `${d.hallSqm}m\u00B2 \u00D7 \u20AC${S.rentSqm}/m\u00B2`);
else oi('Rent', S.outdoorRent);
} else {
oi('Property Tax', S.propertyTax);
}
oi('Insurance', S.insurance);
oi('Electricity', S.electricity);
if(isIn){ oi('Heating', S.heating); oi('Water', S.water); }
oi('Maintenance', S.maintenance);
if(isIn) oi('Cleaning', S.cleaning);
oi('Marketing / Software / Misc', S.marketing);
if(S.staff>0) oi('Staff', S.staff);
d.opex = d.opexItems.reduce((s,i)=>s+i.amount,0);
d.annualOpex = d.opex*12;
d.equity = Math.round(d.capex*(1-S.loanPct/100));
d.loanAmount = d.capex - d.equity;
d.monthlyPayment = d.loanAmount>0 ? pmt(S.interestRate/100/12, Math.max(S.loanTerm,1)*12, d.loanAmount) : 0;
d.annualDebtService = d.monthlyPayment*12;
d.ltv = d.capex>0 ? d.loanAmount/d.capex : 0;
const dpm = isIn ? S.daysPerMonthIndoor : S.daysPerMonthOutdoor;
d.daysPerMonth = dpm;
const wRate = d.totalCourts>0 ? (S.dblCourts*(S.ratePeak*S.peakPct/100+S.rateOffPeak*(1-S.peakPct/100)) + S.sglCourts*S.rateSingle)/d.totalCourts : S.ratePeak;
d.weightedRate = wRate;
d.availHoursMonth = S.hoursPerDay * dpm * d.totalCourts;
d.bookedHoursMonth = d.availHoursMonth * (S.utilTarget/100);
d.courtRevMonth = d.bookedHoursMonth * wRate;
d.feeDeduction = d.courtRevMonth * (S.bookingFee/100);
d.racketRev = d.bookedHoursMonth * (S.racketRentalRate/100) * S.racketQty * S.racketPrice;
d.ballMargin = d.bookedHoursMonth * (S.ballRate/100) * (S.ballPrice - S.ballCost);
d.membershipRev = d.totalCourts * S.membershipRevPerCourt;
d.fbRev = d.totalCourts * S.fbRevPerCourt;
d.coachingRev = d.totalCourts * S.coachingRevPerCourt;
d.retailRev = d.totalCourts * S.retailRevPerCourt;
d.grossRevMonth = d.courtRevMonth + d.racketRev + d.ballMargin + d.membershipRev + d.fbRev + d.coachingRev + d.retailRev;
d.netRevMonth = d.grossRevMonth - d.feeDeduction;
d.ebitdaMonth = d.netRevMonth - d.opex;
d.netCFMonth = d.ebitdaMonth - d.monthlyPayment;
d.months = [];
for(let m=1;m<=60;m++){
const cm = (m-1)%12;
const yr = Math.ceil(m/12);
const ramp = m<=12 ? S.ramp[m-1] : 1;
const seas = isIn ? 1 : S.season[cm];
const effUtil = (S.utilTarget/100)*ramp*seas;
const avail = seas>0 ? S.hoursPerDay*dpm*d.totalCourts : 0;
const booked = avail*effUtil;
const courtRev = booked*wRate;
const fees = -courtRev*(S.bookingFee/100);
const ancillary = booked*((S.racketRentalRate/100)*S.racketQty*S.racketPrice+(S.ballRate/100)*(S.ballPrice-S.ballCost));
const membership = d.totalCourts*S.membershipRevPerCourt*(seas>0?ramp:0);
const fb = d.totalCourts*S.fbRevPerCourt*(seas>0?ramp:0);
const coaching = d.totalCourts*S.coachingRevPerCourt*(seas>0?ramp:0);
const retail = d.totalCourts*S.retailRevPerCourt*(seas>0?ramp:0);
const totalRev = courtRev+fees+ancillary+membership+fb+coaching+retail;
const opex = -d.opex;
const loan = -d.monthlyPayment;
const ebitda = totalRev+opex;
const ncf = ebitda+loan;
const prev = d.months.length>0?d.months[d.months.length-1]:null;
const cum = (prev?prev.cum:-d.capex)+ncf;
d.months.push({m,cm:cm+1,yr,ramp,seas,effUtil,avail,booked,courtRev,fees,ancillary,membership,totalRev,opex,loan,ebitda,ncf,cum});
}
d.annuals = [];
for(let y=1;y<=5;y++){
const ym = d.months.filter(m=>m.yr===y);
d.annuals.push({year:y,
revenue:ym.reduce((s,m)=>s+m.totalRev,0), ebitda:ym.reduce((s,m)=>s+m.ebitda,0),
ncf:ym.reduce((s,m)=>s+m.ncf,0), ds:ym.reduce((s,m)=>s+Math.abs(m.loan),0),
booked:ym.reduce((s,m)=>s+m.booked,0), avail:ym.reduce((s,m)=>s+m.avail,0)});
}
const y3ebitda = d.annuals.length>=3?d.annuals[2].ebitda:0;
d.stabEbitda = y3ebitda;
d.exitValue = y3ebitda * S.exitMultiple;
d.remainingLoan = d.loanAmount * Math.max(0, 1 - S.holdYears/(Math.max(S.loanTerm,1)*1.5));
d.netExit = d.exitValue - d.remainingLoan;
const irrCFs = [-d.capex];
for(let y=0;y<S.holdYears;y++){
const ycf = y<d.annuals.length?d.annuals[y].ncf:(d.annuals.length>0?d.annuals[d.annuals.length-1].ncf:0);
irrCFs.push(y===S.holdYears-1?ycf+d.netExit:ycf);
}
d.irr = calcIRR(irrCFs);
d.totalReturned = irrCFs.slice(1).reduce((s,v)=>s+v,0);
d.moic = d.capex>0?d.totalReturned/d.capex:0;
d.dscr = d.annuals.map(y=>({year:y.year, dscr:y.ds>0?y.ebitda/y.ds:999}));
d.paybackIdx = d.months.findIndex(m=>m.cum>=0);
const revPerHr = wRate*(1-S.bookingFee/100)+(S.racketRentalRate/100)*S.racketQty*S.racketPrice+(S.ballRate/100)*(S.ballPrice-S.ballCost);
const fixedMonth = d.opex+d.monthlyPayment;
d.breakEvenHrs = fixedMonth/Math.max(revPerHr,0.01);
d.breakEvenUtil = d.availHoursMonth>0?d.breakEvenHrs/d.availHoursMonth:1;
d.breakEvenHrsPerCourt = d.totalCourts>0?d.breakEvenHrs/d.totalCourts/dpm:0;
d.revPAH = d.availHoursMonth>0?d.netRevMonth/d.availHoursMonth:0;
d.revPerSqm = d.sqm>0?(d.netRevMonth*12)/d.sqm:0;
d.ebitdaMargin = d.netRevMonth>0?d.ebitdaMonth/d.netRevMonth:0;
d.opexRatio = d.netRevMonth>0?d.opex/d.netRevMonth:0;
d.rentRatio = d.netRevMonth>0?(d.opexItems.find(i=>i.name==='Rent')?.amount||0)/d.netRevMonth:0;
d.cashOnCash = d.equity>0?(d.annuals.length>=3?d.annuals[2].ncf:0)/d.equity:0;
d.yieldOnCost = d.capex>0?d.stabEbitda/d.capex:0;
d.debtYield = d.loanAmount>0?d.stabEbitda/d.loanAmount:0;
d.costPerBookedHr = d.bookedHoursMonth>0?(d.opex+d.monthlyPayment)/d.bookedHoursMonth:0;
d.avgUtil = d.annuals.length>=3&&d.annuals[2].avail>0?d.annuals[2].booked/d.annuals[2].avail:0;
return d;
function scheduleCalc(){
if(_calcTimer) clearTimeout(_calcTimer);
_calcTimer = setTimeout(fetchCalc, 200);
}
// ── UI Builders ───────────────────────────────────────────
@@ -413,19 +266,26 @@ function bindSliders(){
// ── Render ─────────────────────────────────────────────────
function render(){
const d = calc();
const isIn=S.venue==='indoor';
const label = `${isIn?'Indoor':'Outdoor'} \u00B7 ${S.own==='buy'?'Build/Buy':'Rent'}`;
$('#headerTag').textContent = `${label} \u00B7 ${d.totalCourts} courts \u00B7 ${fmtK(d.capex)}`;
// Update tab visibility immediately (no server call needed)
$$('.tab-btn').forEach(b=>{
const a = b.dataset.tab===activeTab;
b.classList.toggle('tab-btn--active', a);
b.classList.toggle('tab-btn--active', b.dataset.tab===activeTab);
});
$$('.tab').forEach(t=>{
t.classList.toggle('active',t.id===`tab-${activeTab}`);
});
// If we have cached data, render immediately with it
if(_lastD) renderWith(_lastD);
// Schedule server-side recalculation
scheduleCalc();
}
function renderWith(d){
const isIn=S.venue==='indoor';
const label = `${isIn?'Indoor':'Outdoor'} \u00B7 ${S.own==='buy'?'Build/Buy':'Rent'}`;
$('#headerTag').textContent = `${label} \u00B7 ${d.totalCourts} courts \u00B7 ${fmtK(d.capex)}`;
const courtPlaySqm = S.dblCourts*200+S.sglCourts*120;
$('#courtSummary').innerHTML =
cardSmHTML('Total Courts',d.totalCourts)+
@@ -743,4 +603,12 @@ document.addEventListener('DOMContentLoaded', () => {
buildNav();
buildInputs();
bindSliders();
render();
// Use server-provided initial data for first render (no API call needed)
if(_lastD){
renderWith(_lastD);
// Update tab visibility
$$('.tab-btn').forEach(b=>b.classList.toggle('tab-btn--active', b.dataset.tab===activeTab));
$$('.tab').forEach(t=>t.classList.toggle('active',t.id===`tab-${activeTab}`));
} else {
render();
}

View File

@@ -0,0 +1,180 @@
"""
Shared test fixtures for the padelnomics test suite.
"""
import hashlib
import hmac
from datetime import datetime
from pathlib import Path
from unittest.mock import AsyncMock, patch
import aiosqlite
import pytest
from padelnomics import core
from padelnomics.app import create_app
from padelnomics.billing.routes import VARIANT_TO_PLAN
SCHEMA_PATH = Path(__file__).parent.parent / "src" / "padelnomics" / "migrations" / "schema.sql"
# ── Database ─────────────────────────────────────────────────
@pytest.fixture
async def db():
"""In-memory SQLite with full schema, patches core._db."""
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA foreign_keys=ON")
schema = SCHEMA_PATH.read_text()
await conn.executescript(schema)
await conn.commit()
original_db = core._db
core._db = conn
yield conn
core._db = original_db
await conn.close()
# ── App & client ─────────────────────────────────────────────
@pytest.fixture
async def app(db):
"""Quart app with DB already initialized (init_db/close_db patched to no-op)."""
with patch.object(core, "init_db", new_callable=AsyncMock), \
patch.object(core, "close_db", new_callable=AsyncMock):
application = create_app()
application.config["TESTING"] = True
yield application
@pytest.fixture
async def client(app):
"""Unauthenticated test client."""
async with app.test_client() as c:
yield c
# ── Users ────────────────────────────────────────────────────
@pytest.fixture
async def test_user(db):
"""Create a test user, return dict with id/email/name."""
now = datetime.utcnow().isoformat()
async with db.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("test@example.com", "Test User", now),
) as cursor:
user_id = cursor.lastrowid
await db.commit()
return {"id": user_id, "email": "test@example.com", "name": "Test User"}
@pytest.fixture
async def auth_client(app, test_user):
"""Test client with session['user_id'] pre-set."""
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
yield c
# ── Subscriptions ────────────────────────────────────────────
@pytest.fixture
def create_subscription(db):
"""Factory: create a subscription row for a user."""
async def _create(
user_id: int,
plan: str = "pro",
status: str = "active",
ls_customer_id: str = "cust_123",
ls_subscription_id: str = "sub_456",
current_period_end: str = "2025-03-01T00:00:00Z",
) -> int:
now = datetime.utcnow().isoformat()
async with db.execute(
"""INSERT INTO subscriptions
(user_id, plan, status, lemonsqueezy_customer_id,
lemonsqueezy_subscription_id, current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, ls_customer_id, ls_subscription_id,
current_period_end, now, now),
) as cursor:
sub_id = cursor.lastrowid
await db.commit()
return sub_id
return _create
# ── Config ───────────────────────────────────────────────────
@pytest.fixture(autouse=True)
def patch_config():
"""Set test LemonSqueezy config values. Clears variant cache on teardown."""
original_values = {}
test_values = {
"LEMONSQUEEZY_API_KEY": "test_api_key_123",
"LEMONSQUEEZY_STORE_ID": "store_999",
"LEMONSQUEEZY_WEBHOOK_SECRET": "whsec_test_secret",
"LEMONSQUEEZY_MONTHLY_VARIANT_ID": "variant_monthly_100",
"LEMONSQUEEZY_YEARLY_VARIANT_ID": "variant_yearly_200",
"BASE_URL": "http://localhost:5000",
"DEBUG": True,
}
for key, val in test_values.items():
original_values[key] = getattr(core.config, key, None)
setattr(core.config, key, val)
VARIANT_TO_PLAN.clear()
yield
VARIANT_TO_PLAN.clear()
for key, val in original_values.items():
setattr(core.config, key, val)
# ── Webhook helpers ──────────────────────────────────────────
def make_webhook_payload(
event_name: str,
subscription_id: str = "sub_456",
user_id: str = "1",
variant_id: str = "variant_monthly_100",
status: str = "active",
customer_id: int = 67890,
renews_at: str = "2025-03-01T00:00:00.000000Z",
) -> dict:
"""Build a LemonSqueezy webhook payload dict."""
return {
"meta": {
"event_name": event_name,
"custom_data": {"user_id": user_id},
"webhook_id": "wh_test_123",
},
"data": {
"type": "subscriptions",
"id": subscription_id,
"attributes": {
"store_id": 12345,
"customer_id": customer_id,
"order_id": 11111,
"product_id": 22222,
"variant_id": variant_id,
"status": status,
"renews_at": renews_at,
"ends_at": None,
"created_at": "2025-02-01T00:00:00.000000Z",
"updated_at": "2025-02-01T00:00:00.000000Z",
},
},
}
def sign_payload(payload_bytes: bytes, secret: str = "whsec_test_secret") -> str:
"""Compute HMAC-SHA256 signature for a webhook payload."""
return hmac.new(secret.encode(), payload_bytes, hashlib.sha256).hexdigest()

View File

@@ -0,0 +1,335 @@
"""
Unit tests for billing SQL helpers, feature/limit access, and plan determination.
"""
import pytest
from hypothesis import HealthCheck, given
from hypothesis import settings as h_settings
from hypothesis import strategies as st
from padelnomics.billing.routes import (
VARIANT_TO_PLAN,
can_access_feature,
determine_plan,
get_subscription,
get_subscription_by_provider_id,
is_within_limits,
update_subscription_status,
upsert_subscription,
)
from padelnomics.core import config
# ════════════════════════════════════════════════════════════
# get_subscription
# ════════════════════════════════════════════════════════════
class TestGetSubscription:
async def test_returns_none_for_user_without_subscription(self, db, test_user):
result = await get_subscription(test_user["id"])
assert result is None
async def test_returns_subscription_for_user(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="active")
result = await get_subscription(test_user["id"])
assert result is not None
assert result["plan"] == "pro"
assert result["status"] == "active"
assert result["user_id"] == test_user["id"]
# ════════════════════════════════════════════════════════════
# upsert_subscription
# ════════════════════════════════════════════════════════════
class TestUpsertSubscription:
async def test_insert_new_subscription(self, db, test_user):
sub_id = await upsert_subscription(
user_id=test_user["id"],
plan="pro",
status="active",
provider_customer_id="cust_abc",
provider_subscription_id="sub_xyz",
current_period_end="2025-06-01T00:00:00Z",
)
assert sub_id > 0
row = await get_subscription(test_user["id"])
assert row["plan"] == "pro"
assert row["status"] == "active"
assert row["lemonsqueezy_customer_id"] == "cust_abc"
assert row["lemonsqueezy_subscription_id"] == "sub_xyz"
assert row["current_period_end"] == "2025-06-01T00:00:00Z"
async def test_update_existing_subscription(self, db, test_user, create_subscription):
original_id = await create_subscription(
test_user["id"], plan="starter", status="active",
ls_subscription_id="sub_old",
)
returned_id = await upsert_subscription(
user_id=test_user["id"],
plan="pro",
status="active",
provider_customer_id="cust_new",
provider_subscription_id="sub_new",
)
assert returned_id == original_id
row = await get_subscription(test_user["id"])
assert row["plan"] == "pro"
assert row["lemonsqueezy_subscription_id"] == "sub_new"
async def test_upsert_with_none_period_end(self, db, test_user):
await upsert_subscription(
user_id=test_user["id"],
plan="pro",
status="active",
provider_customer_id="cust_1",
provider_subscription_id="sub_1",
current_period_end=None,
)
row = await get_subscription(test_user["id"])
assert row["current_period_end"] is None
# ════════════════════════════════════════════════════════════
# get_subscription_by_provider_id
# ════════════════════════════════════════════════════════════
class TestGetSubscriptionByProviderId:
async def test_returns_none_for_unknown_id(self, db):
result = await get_subscription_by_provider_id("nonexistent")
assert result is None
async def test_finds_by_lemonsqueezy_subscription_id(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_findme")
result = await get_subscription_by_provider_id("sub_findme")
assert result is not None
assert result["user_id"] == test_user["id"]
# ════════════════════════════════════════════════════════════
# update_subscription_status
# ════════════════════════════════════════════════════════════
class TestUpdateSubscriptionStatus:
async def test_updates_status(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active", ls_subscription_id="sub_upd")
await update_subscription_status("sub_upd", status="cancelled")
row = await get_subscription(test_user["id"])
assert row["status"] == "cancelled"
assert row["updated_at"] is not None
async def test_updates_extra_fields(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_extra")
await update_subscription_status(
"sub_extra",
status="active",
plan="starter",
current_period_end="2026-01-01T00:00:00Z",
)
row = await get_subscription(test_user["id"])
assert row["status"] == "active"
assert row["plan"] == "starter"
assert row["current_period_end"] == "2026-01-01T00:00:00Z"
async def test_noop_for_unknown_provider_id(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_known", status="active")
await update_subscription_status("sub_unknown", status="expired")
row = await get_subscription(test_user["id"])
assert row["status"] == "active" # unchanged
# ════════════════════════════════════════════════════════════
# can_access_feature
# ════════════════════════════════════════════════════════════
class TestCanAccessFeature:
async def test_no_subscription_gets_free_features(self, db, test_user):
assert await can_access_feature(test_user["id"], "basic") is True
assert await can_access_feature(test_user["id"], "export") is False
assert await can_access_feature(test_user["id"], "api") is False
async def test_active_pro_gets_all_features(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="active")
assert await can_access_feature(test_user["id"], "basic") is True
assert await can_access_feature(test_user["id"], "export") is True
assert await can_access_feature(test_user["id"], "api") is True
assert await can_access_feature(test_user["id"], "priority_support") is True
async def test_active_starter_gets_starter_features(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="starter", status="active")
assert await can_access_feature(test_user["id"], "basic") is True
assert await can_access_feature(test_user["id"], "export") is True
assert await can_access_feature(test_user["id"], "api") is False
async def test_cancelled_still_has_features(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="cancelled")
assert await can_access_feature(test_user["id"], "api") is True
async def test_on_trial_has_features(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="on_trial")
assert await can_access_feature(test_user["id"], "api") is True
async def test_expired_falls_back_to_free(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="expired")
assert await can_access_feature(test_user["id"], "api") is False
assert await can_access_feature(test_user["id"], "basic") is True
async def test_past_due_falls_back_to_free(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="past_due")
assert await can_access_feature(test_user["id"], "export") is False
async def test_paused_falls_back_to_free(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="paused")
assert await can_access_feature(test_user["id"], "api") is False
async def test_nonexistent_feature_returns_false(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="active")
assert await can_access_feature(test_user["id"], "teleportation") is False
# ════════════════════════════════════════════════════════════
# is_within_limits
# ════════════════════════════════════════════════════════════
class TestIsWithinLimits:
async def test_free_user_within_limits(self, db, test_user):
assert await is_within_limits(test_user["id"], "items", 50) is True
async def test_free_user_at_limit(self, db, test_user):
assert await is_within_limits(test_user["id"], "items", 100) is False
async def test_free_user_over_limit(self, db, test_user):
assert await is_within_limits(test_user["id"], "items", 150) is False
async def test_pro_unlimited(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="active")
assert await is_within_limits(test_user["id"], "items", 999999) is True
assert await is_within_limits(test_user["id"], "api_calls", 999999) is True
async def test_starter_limits(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="starter", status="active")
assert await is_within_limits(test_user["id"], "items", 999) is True
assert await is_within_limits(test_user["id"], "items", 1000) is False
async def test_expired_pro_gets_free_limits(self, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="expired")
assert await is_within_limits(test_user["id"], "items", 100) is False
async def test_unknown_resource_returns_false(self, db, test_user):
assert await is_within_limits(test_user["id"], "unicorns", 0) is False
# ════════════════════════════════════════════════════════════
# determine_plan
# ════════════════════════════════════════════════════════════
class TestDeterminePlan:
def test_monthly_variant_returns_pro(self):
VARIANT_TO_PLAN.clear()
assert determine_plan("variant_monthly_100") == "pro"
def test_yearly_variant_returns_pro(self):
VARIANT_TO_PLAN.clear()
assert determine_plan("variant_yearly_200") == "pro"
def test_unknown_variant_returns_free(self):
VARIANT_TO_PLAN.clear()
assert determine_plan("unknown_variant") == "free"
def test_integer_variant_id_coerced(self):
VARIANT_TO_PLAN.clear()
assert determine_plan(12345) == "free"
def test_none_variant_returns_free(self):
VARIANT_TO_PLAN.clear()
assert determine_plan(None) == "free"
# ════════════════════════════════════════════════════════════
# Parameterized: status × feature access matrix
# ════════════════════════════════════════════════════════════
STATUSES = ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"]
FEATURES = ["basic", "export", "api", "priority_support"]
ACTIVE_STATUSES = {"active", "on_trial", "cancelled"}
@pytest.mark.parametrize("status", STATUSES)
@pytest.mark.parametrize("feature", FEATURES)
async def test_feature_access_matrix(db, test_user, create_subscription, status, feature):
if status != "free":
await create_subscription(test_user["id"], plan="pro", status=status)
result = await can_access_feature(test_user["id"], feature)
if status in ACTIVE_STATUSES:
expected = feature in config.PLAN_FEATURES["pro"]
else:
expected = feature in config.PLAN_FEATURES["free"]
assert result == expected, f"status={status}, feature={feature}"
# ════════════════════════════════════════════════════════════
# Parameterized: plan × feature matrix (active status)
# ════════════════════════════════════════════════════════════
PLANS = ["free", "starter", "pro"]
@pytest.mark.parametrize("plan", PLANS)
@pytest.mark.parametrize("feature", FEATURES)
async def test_plan_feature_matrix(db, test_user, create_subscription, plan, feature):
if plan != "free":
await create_subscription(test_user["id"], plan=plan, status="active")
result = await can_access_feature(test_user["id"], feature)
expected = feature in config.PLAN_FEATURES.get(plan, [])
assert result == expected, f"plan={plan}, feature={feature}"
# ════════════════════════════════════════════════════════════
# Parameterized: plan × resource limit boundaries
# ════════════════════════════════════════════════════════════
@pytest.mark.parametrize("plan", PLANS)
@pytest.mark.parametrize("resource,at_limit", [
("items", 100),
("items", 1000),
("api_calls", 1000),
("api_calls", 10000),
])
async def test_plan_limit_matrix(db, test_user, create_subscription, plan, resource, at_limit):
if plan != "free":
await create_subscription(test_user["id"], plan=plan, status="active")
plan_limit = config.PLAN_LIMITS.get(plan, {}).get(resource, 0)
result = await is_within_limits(test_user["id"], resource, at_limit)
if plan_limit == -1:
assert result is True
elif at_limit < plan_limit:
assert result is True
else:
assert result is False
# ════════════════════════════════════════════════════════════
# Hypothesis: limit boundaries
# ════════════════════════════════════════════════════════════
class TestLimitsHypothesis:
@given(count=st.integers(min_value=0, max_value=10000))
@h_settings(max_examples=100, deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture])
async def test_free_limit_boundary_items(self, db, test_user, count):
result = await is_within_limits(test_user["id"], "items", count)
assert result == (count < 100)
@given(count=st.integers(min_value=0, max_value=100000))
@h_settings(max_examples=100, deadline=2000, suppress_health_check=[HealthCheck.function_scoped_fixture])
async def test_pro_always_within_limits(self, db, test_user, create_subscription, count):
# Use upsert to avoid duplicate inserts across Hypothesis examples
await upsert_subscription(
user_id=test_user["id"], plan="pro", status="active",
provider_customer_id="cust_hyp", provider_subscription_id="sub_hyp",
)
result = await is_within_limits(test_user["id"], "items", count)
assert result is True

View File

@@ -0,0 +1,308 @@
"""
Route tests for billing pages, checkout, manage, cancel, resume.
External LemonSqueezy API calls mocked with respx.
"""
import json
import httpx
import pytest
import respx
# ════════════════════════════════════════════════════════════
# GET /billing/pricing
# ════════════════════════════════════════════════════════════
class TestPricingPage:
async def test_accessible_without_auth(self, client, db):
response = await client.get("/billing/pricing")
assert response.status_code == 200
async def test_accessible_with_auth(self, auth_client, db, test_user):
response = await auth_client.get("/billing/pricing")
assert response.status_code == 200
async def test_with_subscription(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], plan="pro", status="active")
response = await auth_client.get("/billing/pricing")
assert response.status_code == 200
# ════════════════════════════════════════════════════════════
# GET /billing/success
# ════════════════════════════════════════════════════════════
class TestSuccessPage:
async def test_requires_auth(self, client, db):
response = await client.get("/billing/success")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
async def test_accessible_with_auth(self, auth_client, db, test_user):
response = await auth_client.get("/billing/success")
assert response.status_code == 200
# ════════════════════════════════════════════════════════════
# GET /billing/checkout/<plan>
# ════════════════════════════════════════════════════════════
class TestCheckoutRoute:
async def test_requires_auth(self, client, db):
response = await client.get("/billing/checkout/monthly")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
@respx.mock
async def test_monthly_redirects_to_checkout_url(self, auth_client, db, test_user):
checkout_url = "https://checkout.lemonsqueezy.com/checkout/test123"
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": checkout_url}}},
)
)
response = await auth_client.get("/billing/checkout/monthly")
assert response.status_code == 302
assert response.headers["Location"] == checkout_url
@respx.mock
async def test_yearly_redirects_to_checkout_url(self, auth_client, db, test_user):
checkout_url = "https://checkout.lemonsqueezy.com/checkout/yearly456"
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": checkout_url}}},
)
)
response = await auth_client.get("/billing/checkout/yearly")
assert response.status_code == 302
assert response.headers["Location"] == checkout_url
async def test_invalid_plan_redirects_to_pricing(self, auth_client, db, test_user):
response = await auth_client.get("/billing/checkout/enterprise")
assert response.status_code == 302
assert "/billing/pricing" in response.headers["Location"]
@respx.mock
async def test_ajax_returns_json(self, auth_client, db, test_user):
checkout_url = "https://checkout.lemonsqueezy.com/checkout/ajax789"
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": checkout_url}}},
)
)
response = await auth_client.get(
"/billing/checkout/monthly",
headers={"X-Requested-With": "XMLHttpRequest"},
)
assert response.status_code == 200
data = await response.get_json()
assert data["checkout_url"] == checkout_url
@respx.mock
async def test_sends_correct_api_payload(self, auth_client, db, test_user):
route = respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(
200, json={"data": {"attributes": {"url": "https://example.com"}}},
)
)
await auth_client.get("/billing/checkout/monthly")
assert route.called
sent_json = json.loads(route.calls.last.request.content)
assert sent_json["data"]["type"] == "checkouts"
assert sent_json["data"]["attributes"]["checkout_data"]["email"] == test_user["email"]
assert sent_json["data"]["attributes"]["checkout_data"]["custom"]["user_id"] == str(test_user["id"])
assert sent_json["data"]["relationships"]["store"]["data"]["id"] == "store_999"
assert sent_json["data"]["relationships"]["variant"]["data"]["id"] == "variant_monthly_100"
@respx.mock
async def test_api_error_propagates(self, auth_client, db, test_user):
respx.post("https://api.lemonsqueezy.com/v1/checkouts").mock(
return_value=httpx.Response(500, json={"error": "server error"})
)
with pytest.raises(httpx.HTTPStatusError):
await auth_client.get("/billing/checkout/monthly")
# ════════════════════════════════════════════════════════════
# POST /billing/manage
# ════════════════════════════════════════════════════════════
class TestManageRoute:
async def test_requires_auth(self, client, db):
response = await client.post("/billing/manage")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
async def test_no_subscription_redirects(self, auth_client, db, test_user):
response = await auth_client.post("/billing/manage")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
@respx.mock
async def test_redirects_to_portal(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_manage_001")
portal_url = "https://app.lemonsqueezy.com/my-orders/portal"
respx.get("https://api.lemonsqueezy.com/v1/subscriptions/sub_manage_001").mock(
return_value=httpx.Response(200, json={
"data": {"attributes": {"urls": {"customer_portal": portal_url}}}
})
)
response = await auth_client.post("/billing/manage")
assert response.status_code == 302
assert response.headers["Location"] == portal_url
# ════════════════════════════════════════════════════════════
# POST /billing/cancel
# ════════════════════════════════════════════════════════════
class TestCancelRoute:
async def test_requires_auth(self, client, db):
response = await client.post("/billing/cancel")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
async def test_no_subscription_redirects(self, auth_client, db, test_user):
response = await auth_client.post("/billing/cancel")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
@respx.mock
async def test_sends_cancel_patch(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_cancel_route")
route = respx.patch(
"https://api.lemonsqueezy.com/v1/subscriptions/sub_cancel_route"
).mock(return_value=httpx.Response(200, json={}))
response = await auth_client.post("/billing/cancel")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
assert route.called
sent_json = json.loads(route.calls.last.request.content)
assert sent_json["data"]["attributes"]["cancelled"] is True
# ════════════════════════════════════════════════════════════
# POST /billing/resume
# ════════════════════════════════════════════════════════════
class TestResumeRoute:
async def test_requires_auth(self, client, db):
response = await client.post("/billing/resume")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
@respx.mock
async def test_sends_resume_patch(self, auth_client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_resume_route")
route = respx.patch(
"https://api.lemonsqueezy.com/v1/subscriptions/sub_resume_route"
).mock(return_value=httpx.Response(200, json={}))
response = await auth_client.post("/billing/resume")
assert response.status_code == 302
assert "/dashboard" in response.headers["Location"]
assert route.called
sent_json = json.loads(route.calls.last.request.content)
assert sent_json["data"]["attributes"]["cancelled"] is False
# ════════════════════════════════════════════════════════════
# subscription_required decorator
# ════════════════════════════════════════════════════════════
class TestSubscriptionRequired:
@pytest.fixture
async def gated_app(self, app):
"""Register a test route using subscription_required with restricted allowed."""
from padelnomics.billing.routes import subscription_required
@app.route("/test-gated")
@subscription_required(allowed=("active", "on_trial"))
async def gated():
return "OK", 200
return app
async def test_no_session_redirects_to_login(self, gated_app, db):
async with gated_app.test_client() as c:
response = await c.get("/test-gated")
assert response.status_code == 302
assert "/auth/login" in response.headers["Location"]
async def test_no_subscription_redirects_to_pricing(self, gated_app, db, test_user):
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 302
assert "/billing/pricing" in response.headers["Location"]
async def test_active_passes(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="active")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 200
async def test_on_trial_passes(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="on_trial")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 200
async def test_cancelled_rejected_when_not_in_allowed(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="cancelled")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 302
assert "/billing/pricing" in response.headers["Location"]
async def test_expired_redirects(self, gated_app, db, test_user, create_subscription):
await create_subscription(test_user["id"], status="expired")
async with gated_app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get("/test-gated")
assert response.status_code == 302
# ════════════════════════════════════════════════════════════
# Parameterized: subscription_required default allowed
# ════════════════════════════════════════════════════════════
ALL_STATUSES = ["free", "active", "on_trial", "cancelled", "past_due", "paused", "expired"]
DEFAULT_ALLOWED = ("active", "on_trial", "cancelled")
@pytest.mark.parametrize("status", ALL_STATUSES)
async def test_subscription_required_default_allowed(app, db, test_user, create_subscription, status):
from padelnomics.billing.routes import subscription_required
@app.route(f"/test-gate-{status}")
@subscription_required()
async def gated():
return "OK", 200
if status != "free":
await create_subscription(test_user["id"], status=status)
async with app.test_client() as c:
async with c.session_transaction() as sess:
sess["user_id"] = test_user["id"]
response = await c.get(f"/test-gate-{status}")
if status in DEFAULT_ALLOWED:
assert response.status_code == 200
else:
assert response.status_code == 302

View File

@@ -0,0 +1,450 @@
"""
Integration tests for the LemonSqueezy webhook endpoint.
Tests signature verification, all event types, parameterized status
transitions, and Hypothesis fuzzing.
"""
import json
import pytest
from conftest import make_webhook_payload, sign_payload
from hypothesis import HealthCheck, given
from hypothesis import settings as h_settings
from hypothesis import strategies as st
from padelnomics.billing.routes import get_subscription
WEBHOOK_PATH = "/billing/webhook/lemonsqueezy"
# ════════════════════════════════════════════════════════════
# Signature verification
# ════════════════════════════════════════════════════════════
class TestWebhookSignatureVerification:
async def test_valid_signature_returns_200(self, client, db, test_user):
payload = make_webhook_payload("subscription_created", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
async def test_invalid_signature_returns_401(self, client, db):
payload = make_webhook_payload("subscription_created")
payload_bytes = json.dumps(payload).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": "bad_signature", "Content-Type": "application/json"},
)
assert response.status_code == 401
data = await response.get_json()
assert data["error"] == "Invalid signature"
async def test_missing_signature_returns_401(self, client, db):
payload_bytes = json.dumps(make_webhook_payload("subscription_created")).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"Content-Type": "application/json"},
)
assert response.status_code == 401
async def test_empty_signature_returns_401(self, client, db):
payload_bytes = json.dumps(make_webhook_payload("subscription_created")).encode()
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": "", "Content-Type": "application/json"},
)
assert response.status_code == 401
async def test_tampered_payload_returns_401(self, client, db, test_user):
payload = make_webhook_payload("subscription_created", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
tampered = payload_bytes + b"extra"
response = await client.post(
WEBHOOK_PATH,
data=tampered,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 401
# ════════════════════════════════════════════════════════════
# subscription_created
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionCreated:
async def test_creates_new_subscription(self, client, db, test_user):
payload = make_webhook_payload(
"subscription_created",
subscription_id="sub_new_001",
user_id=str(test_user["id"]),
variant_id="variant_monthly_100",
status="active",
customer_id=99999,
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
sub = await get_subscription(test_user["id"])
assert sub is not None
assert sub["plan"] == "pro"
assert sub["status"] == "active"
assert sub["lemonsqueezy_subscription_id"] == "sub_new_001"
assert sub["lemonsqueezy_customer_id"] == "99999"
async def test_unknown_variant_gets_free_plan(self, client, db, test_user):
payload = make_webhook_payload(
"subscription_created",
user_id=str(test_user["id"]),
variant_id="unknown_variant",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["plan"] == "free"
async def test_missing_user_id_causes_db_error(self, client, db):
"""When user_id is absent, the handler passes user_id=0 which violates the FK constraint."""
payload = make_webhook_payload("subscription_created")
payload["meta"]["custom_data"] = {}
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
with pytest.raises(Exception):
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
# ════════════════════════════════════════════════════════════
# subscription_updated
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionUpdated:
async def test_updates_existing_subscription(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_upd_001", plan="starter")
payload = make_webhook_payload(
"subscription_updated",
subscription_id="sub_upd_001",
variant_id="variant_yearly_200",
status="active",
renews_at="2026-01-01T00:00:00Z",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
sub = await get_subscription(test_user["id"])
assert sub["plan"] == "pro"
assert sub["current_period_end"] == "2026-01-01T00:00:00Z"
# ════════════════════════════════════════════════════════════
# subscription_payment_success
# ════════════════════════════════════════════════════════════
class TestWebhookPaymentSuccess:
async def test_updates_status_and_period(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_pay_001")
payload = make_webhook_payload(
"subscription_payment_success",
subscription_id="sub_pay_001",
status="active",
renews_at="2026-06-01T00:00:00Z",
)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "active"
assert sub["current_period_end"] == "2026-06-01T00:00:00Z"
# ════════════════════════════════════════════════════════════
# subscription_cancelled
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionCancelled:
async def test_sets_status_to_cancelled(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_cancel_001", status="active")
payload = make_webhook_payload("subscription_cancelled", subscription_id="sub_cancel_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "cancelled"
# ════════════════════════════════════════════════════════════
# subscription_expired
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionExpired:
async def test_sets_status_to_expired(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_exp_001")
payload = make_webhook_payload("subscription_expired", subscription_id="sub_exp_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "expired"
# ════════════════════════════════════════════════════════════
# order_refunded
# ════════════════════════════════════════════════════════════
class TestWebhookOrderRefunded:
async def test_sets_status_to_expired(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_refund_001")
payload = make_webhook_payload("order_refunded", subscription_id="sub_refund_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "expired"
# ════════════════════════════════════════════════════════════
# subscription_payment_failed
# ════════════════════════════════════════════════════════════
class TestWebhookPaymentFailed:
async def test_sets_status_to_past_due(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_fail_001")
payload = make_webhook_payload("subscription_payment_failed", subscription_id="sub_fail_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "past_due"
# ════════════════════════════════════════════════════════════
# subscription_paused
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionPaused:
async def test_sets_status_to_paused(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_pause_001")
payload = make_webhook_payload("subscription_paused", subscription_id="sub_pause_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "paused"
# ════════════════════════════════════════════════════════════
# subscription_unpaused / subscription_resumed
# ════════════════════════════════════════════════════════════
class TestWebhookSubscriptionUnpaused:
async def test_unpaused_sets_active(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_unpause_001", status="paused")
payload = make_webhook_payload("subscription_unpaused", subscription_id="sub_unpause_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "active"
async def test_resumed_sets_active(self, client, db, test_user, create_subscription):
await create_subscription(test_user["id"], ls_subscription_id="sub_resume_001", status="paused")
payload = make_webhook_payload("subscription_resumed", subscription_id="sub_resume_001")
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
sub = await get_subscription(test_user["id"])
assert sub["status"] == "active"
# ════════════════════════════════════════════════════════════
# Unknown event
# ════════════════════════════════════════════════════════════
class TestWebhookUnknownEvent:
async def test_unknown_event_returns_200(self, client, db, test_user):
payload = make_webhook_payload("some_future_event", user_id=str(test_user["id"]))
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
# ════════════════════════════════════════════════════════════
# Parameterized: event → expected status
# ════════════════════════════════════════════════════════════
WEBHOOK_EVENT_STATUS_MAP = [
("subscription_cancelled", "cancelled"),
("subscription_expired", "expired"),
("order_refunded", "expired"),
("subscription_payment_failed", "past_due"),
("subscription_paused", "paused"),
("subscription_unpaused", "active"),
("subscription_resumed", "active"),
]
@pytest.mark.parametrize("event_name,expected_status", WEBHOOK_EVENT_STATUS_MAP)
async def test_webhook_event_status_transition(
client, db, test_user, create_subscription, event_name, expected_status,
):
sub_id = f"sub_param_{event_name}"
await create_subscription(test_user["id"], ls_subscription_id=sub_id, status="active")
payload = make_webhook_payload(event_name, subscription_id=sub_id)
payload_bytes = json.dumps(payload).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code == 200
sub = await get_subscription(test_user["id"])
assert sub["status"] == expected_status
# ════════════════════════════════════════════════════════════
# Hypothesis: fuzz webhook payloads
# ════════════════════════════════════════════════════════════
webhook_event_names = st.sampled_from([
"subscription_created", "subscription_updated", "subscription_payment_success",
"subscription_cancelled", "subscription_expired", "order_refunded",
"subscription_payment_failed", "subscription_paused",
"subscription_unpaused", "subscription_resumed",
"unknown_event", "order_created",
])
fuzz_payload = st.fixed_dictionaries({
"meta": st.fixed_dictionaries({
"event_name": webhook_event_names,
"custom_data": st.fixed_dictionaries({
"user_id": st.from_regex(r"[0-9]{1,6}", fullmatch=True),
}),
"webhook_id": st.text(min_size=1, max_size=20),
}),
"data": st.fixed_dictionaries({
"type": st.just("subscriptions"),
"id": st.text(min_size=1, max_size=30).filter(lambda x: x.strip()),
"attributes": st.fixed_dictionaries({
"store_id": st.integers(1, 99999),
"customer_id": st.integers(1, 99999),
"order_id": st.integers(1, 99999),
"product_id": st.integers(1, 99999),
"variant_id": st.text(min_size=1, max_size=30),
"status": st.sampled_from(["active", "on_trial", "cancelled", "past_due", "paused", "expired"]),
"renews_at": st.from_regex(r"2025-\d{2}-\d{2}T00:00:00Z", fullmatch=True),
}),
}),
})
class TestWebhookHypothesis:
@given(payload_dict=fuzz_payload)
@h_settings(max_examples=50, deadline=5000, suppress_health_check=[HealthCheck.function_scoped_fixture])
async def test_webhook_never_500s(self, client, db, test_user, payload_dict):
# Pin user_id to the test user so subscription_created events don't hit FK violations
payload_dict["meta"]["custom_data"]["user_id"] = str(test_user["id"])
payload_bytes = json.dumps(payload_dict).encode()
sig = sign_payload(payload_bytes)
response = await client.post(
WEBHOOK_PATH,
data=payload_bytes,
headers={"X-Signature": sig, "Content-Type": "application/json"},
)
assert response.status_code < 500

File diff suppressed because it is too large Load Diff

37
padelnomics/uv.lock generated
View File

@@ -172,6 +172,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/48/30/47d0bf6072f7252e6521f3447ccfa40b421b6824517f82854703d0f5a98b/hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5", size = 13007, upload-time = "2025-01-22T21:41:47.295Z" },
]
[[package]]
name = "hypothesis"
version = "6.151.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "sortedcontainers" },
]
sdist = { url = "https://files.pythonhosted.org/packages/00/5b/039c095977004f2316225559d591c5a4c62b2e4d7a429db2dd01d37c3ec2/hypothesis-6.151.6.tar.gz", hash = "sha256:755decfa326c8c97a4c8766fe40509985003396442138554b0ae824f9584318f", size = 475846, upload-time = "2026-02-11T04:42:06.891Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/70/42760b369723f8b5aa6a21e5fae58809f503ca7ebb6da13b99f4de36305a/hypothesis-6.151.6-py3-none-any.whl", hash = "sha256:4e6e933a98c6f606b3e0ada97a750e7fff12277a40260b9300a05e7a5c3c5e2e", size = 543324, upload-time = "2026-02-11T04:42:04.025Z" },
]
[[package]]
name = "idna"
version = "3.11"
@@ -310,8 +322,10 @@ dependencies = [
[package.dev-dependencies]
dev = [
{ name = "hypothesis" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
{ name = "respx" },
{ name = "ruff" },
]
@@ -328,8 +342,10 @@ requires-dist = [
[package.metadata.requires-dev]
dev = [
{ name = "hypothesis", specifier = ">=6.151.6" },
{ name = "pytest", specifier = ">=8.0.0" },
{ name = "pytest-asyncio", specifier = ">=0.23.0" },
{ name = "respx", specifier = ">=0.22.0" },
{ name = "ruff", specifier = ">=0.3.0" },
]
@@ -418,6 +434,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/e9/cc28f21f52913adf333f653b9e0a3bf9cb223f5083a26422968ba73edd8d/quart-0.20.0-py3-none-any.whl", hash = "sha256:003c08f551746710acb757de49d9b768986fd431517d0eb127380b656b98b8f1", size = 77960, upload-time = "2024-12-23T13:53:02.842Z" },
]
[[package]]
name = "respx"
version = "0.22.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f4/7c/96bd0bc759cf009675ad1ee1f96535edcb11e9666b985717eb8c87192a95/respx-0.22.0.tar.gz", hash = "sha256:3c8924caa2a50bd71aefc07aa812f2466ff489f1848c96e954a5362d17095d91", size = 28439, upload-time = "2024-12-19T22:33:59.374Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/8e/67/afbb0978d5399bc9ea200f1d4489a23c9a1dad4eee6376242b8182389c79/respx-0.22.0-py2.py3-none-any.whl", hash = "sha256:631128d4c9aba15e56903fb5f66fb1eff412ce28dd387ca3a81339e52dbd3ad0", size = 25127, upload-time = "2024-12-19T22:33:57.837Z" },
]
[[package]]
name = "ruff"
version = "0.15.0"
@@ -443,6 +471,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/b0/2d823f6e77ebe560f4e397d078487e8d52c1516b331e3521bc75db4272ca/ruff-0.15.0-py3-none-win_arm64.whl", hash = "sha256:c480d632cc0ca3f0727acac8b7d053542d9e114a462a145d0b00e7cd658c515a", size = 10865753, upload-time = "2026-02-03T17:53:03.014Z" },
]
[[package]]
name = "sortedcontainers"
version = "2.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e8/c4/ba2f8066cceb6f23394729afe52f3bf7adec04bf9ed2c820b39e19299111/sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88", size = 30594, upload-time = "2021-05-16T22:03:42.897Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/32/46/9cb0e58b2deb7f82b84065f37f3bffeb12413f947f9388e4cac22c4621ce/sortedcontainers-2.4.0-py2.py3-none-any.whl", hash = "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0", size = 29575, upload-time = "2021-05-16T22:03:41.177Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"