Files
padelnomics/web/src/padelnomics/core.py
Deeman 84229e50f7 Merge branch 'worktree-supervisor-flags'
Python supervisor + DB-backed feature flags

- supervisor.py replaces supervisor.sh (topological wave scheduling, croniter)
- workflows.toml workflow registry (5 extractors, cron presets, depends_on)
- proxy.py round-robin + sticky proxy rotation via PROXY_URLS
- Feature flags: migration 0019, is_flag_enabled(), feature_gate() decorator
- Admin /admin/flags UI with toggle (admin-only)
- lead_unlock gate on unlock_lead route
- 59 new tests (test_supervisor.py + test_feature_flags.py)
- Fix is_flag_enabled bug (fetch_one instead of execute_fetchone)

# Conflicts:
#	CHANGELOG.md
#	web/pyproject.toml
2026-02-23 15:29:43 +01:00

794 lines
25 KiB
Python

"""
Core infrastructure: database, config, email, and shared utilities.
"""
import hashlib
import hmac
import os
import random
import re
import secrets
import unicodedata
from contextvars import ContextVar
from datetime import datetime, timedelta
from functools import wraps
from pathlib import Path
import aiosqlite
import resend
from dotenv import load_dotenv
from quart import g, make_response, render_template, request, session
load_dotenv()
def _env(key: str, default: str) -> str:
"""Get env var, treating empty string same as unset."""
return os.getenv(key, "") or default
# =============================================================================
# Configuration
# =============================================================================
class Config:
APP_NAME: str = _env("APP_NAME", "Padelnomics")
SECRET_KEY: str = _env("SECRET_KEY", "change-me-in-production")
BASE_URL: str = _env("BASE_URL", "http://localhost:5000")
DEBUG: bool = os.getenv("DEBUG", "false").lower() == "true"
DATABASE_PATH: str = os.getenv("DATABASE_PATH", "data/app.db")
MAGIC_LINK_EXPIRY_MINUTES: int = int(os.getenv("MAGIC_LINK_EXPIRY_MINUTES", "15"))
SESSION_LIFETIME_DAYS: int = int(os.getenv("SESSION_LIFETIME_DAYS", "30"))
PAYMENT_PROVIDER: str = "paddle"
PADDLE_API_KEY: str = os.getenv("PADDLE_API_KEY", "")
PADDLE_CLIENT_TOKEN: str = os.getenv("PADDLE_CLIENT_TOKEN", "")
PADDLE_WEBHOOK_SECRET: str = os.getenv("PADDLE_WEBHOOK_SECRET", "")
PADDLE_ENVIRONMENT: str = _env("PADDLE_ENVIRONMENT", "sandbox")
UMAMI_API_URL: str = os.getenv("UMAMI_API_URL", "https://umami.padelnomics.io")
UMAMI_API_TOKEN: str = os.getenv("UMAMI_API_TOKEN", "")
UMAMI_WEBSITE_ID: str = "4474414b-58d6-4c6e-89a1-df5ea1f49d70"
# SEO metrics sync
GSC_SERVICE_ACCOUNT_PATH: str = os.getenv("GSC_SERVICE_ACCOUNT_PATH", "")
GSC_SITE_URL: str = os.getenv("GSC_SITE_URL", "")
BING_WEBMASTER_API_KEY: str = os.getenv("BING_WEBMASTER_API_KEY", "")
BING_SITE_URL: str = os.getenv("BING_SITE_URL", "")
RESEND_API_KEY: str = os.getenv("RESEND_API_KEY", "")
EMAIL_FROM: str = _env("EMAIL_FROM", "hello@padelnomics.io")
LEADS_EMAIL: str = _env("LEADS_EMAIL", "leads@padelnomics.io")
ADMIN_EMAILS: list[str] = [
e.strip().lower() for e in os.getenv("ADMIN_EMAILS", "").split(",") if e.strip()
]
RESEND_AUDIENCE_PLANNER: str = os.getenv("RESEND_AUDIENCE_PLANNER", "")
RESEND_WEBHOOK_SECRET: str = os.getenv("RESEND_WEBHOOK_SECRET", "")
WAITLIST_MODE: bool = os.getenv("WAITLIST_MODE", "false").lower() == "true"
RATE_LIMIT_REQUESTS: int = int(os.getenv("RATE_LIMIT_REQUESTS", "100"))
RATE_LIMIT_WINDOW: int = int(os.getenv("RATE_LIMIT_WINDOW", "60"))
PLAN_FEATURES: dict = {
"free": ["basic"],
"starter": ["basic", "export"],
"pro": ["basic", "export", "api", "priority_support"],
}
PLAN_LIMITS: dict = {
"free": {"items": 100, "api_calls": 1000},
"starter": {"items": 1000, "api_calls": 10000},
"pro": {"items": -1, "api_calls": -1}, # -1 = unlimited
}
config = Config()
# =============================================================================
# Database
# =============================================================================
_db: aiosqlite.Connection | None = None
async def init_db(path: str = None) -> None:
"""Initialize database connection with WAL mode."""
global _db
db_path = path or config.DATABASE_PATH
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
_db = await aiosqlite.connect(db_path)
_db.row_factory = aiosqlite.Row
await _db.execute("PRAGMA journal_mode=WAL")
await _db.execute("PRAGMA foreign_keys=ON")
await _db.execute("PRAGMA busy_timeout=5000")
await _db.execute("PRAGMA synchronous=NORMAL")
await _db.execute("PRAGMA cache_size=-64000")
await _db.execute("PRAGMA temp_store=MEMORY")
await _db.execute("PRAGMA mmap_size=268435456")
await _db.commit()
async def close_db() -> None:
"""Close database connection."""
global _db
if _db:
await _db.execute("PRAGMA wal_checkpoint(TRUNCATE)")
await _db.close()
_db = None
async def get_db() -> aiosqlite.Connection:
"""Get database connection."""
if _db is None:
await init_db()
return _db
async def fetch_one(sql: str, params: tuple = ()) -> dict | None:
"""Fetch a single row as dict."""
db = await get_db()
async with db.execute(sql, params) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def fetch_all(sql: str, params: tuple = ()) -> list[dict]:
"""Fetch all rows as list of dicts."""
db = await get_db()
async with db.execute(sql, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def execute(sql: str, params: tuple = ()) -> int:
"""Execute SQL and return lastrowid."""
db = await get_db()
async with db.execute(sql, params) as cursor:
await db.commit()
return cursor.lastrowid
async def execute_many(sql: str, params_list: list[tuple]) -> None:
"""Execute SQL for multiple parameter sets."""
db = await get_db()
await db.executemany(sql, params_list)
await db.commit()
class transaction:
"""Async context manager for transactions."""
async def __aenter__(self):
self.db = await get_db()
return self.db
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
await self.db.commit()
else:
await self.db.rollback()
return False
# =============================================================================
# Email
# =============================================================================
EMAIL_ADDRESSES = {
"transactional": "Padelnomics <hello@notifications.padelnomics.io>",
"leads": "Padelnomics Leads <leads@notifications.padelnomics.io>",
"nurture": "Padelnomics <coach@notifications.padelnomics.io>",
}
# ──────────────────────────────────────────────────────────────────────────────
# Input validation helpers
# ──────────────────────────────────────────────────────────────────────────────
_DISPOSABLE_EMAIL_DOMAINS: frozenset[str] = frozenset(
{
# Germany / Austria / Switzerland common disposables
"byom.de",
"trash-mail.de",
"spamgourmet.de",
"mailnull.com",
"spambog.de",
"trashmail.de",
"wegwerf-email.de",
"spam4.me",
"yopmail.de",
# Global well-known disposables
"guerrillamail.com",
"guerrillamail.net",
"guerrillamail.org",
"guerrillamail.biz",
"guerrillamail.de",
"guerrillamail.info",
"guerrillamailblock.com",
"grr.la",
"spam4.me",
"mailinator.com",
"mailinator.net",
"mailinator.org",
"tempmail.com",
"temp-mail.org",
"tempmail.net",
"tempmail.io",
"10minutemail.com",
"10minutemail.net",
"10minutemail.org",
"10minemail.com",
"10minutemail.de",
"yopmail.com",
"yopmail.fr",
"yopmail.net",
"sharklasers.com",
"guerrillamail.info",
"grr.la",
"throwam.com",
"throwam.net",
"maildrop.cc",
"dispostable.com",
"discard.email",
"discardmail.com",
"discardmail.de",
"spamgourmet.com",
"spamgourmet.net",
"trashmail.at",
"trashmail.com",
"trashmail.io",
"trashmail.me",
"trashmail.net",
"trashmail.org",
"trash-mail.at",
"trash-mail.com",
"fakeinbox.com",
"fakemail.fr",
"fakemail.net",
"getnada.com",
"getairmail.com",
"bccto.me",
"chacuo.net",
"crapmail.org",
"crap.email",
"spamherelots.com",
"spamhereplease.com",
"throwam.com",
"throwam.net",
"spamspot.com",
"spamthisplease.com",
"filzmail.com",
"mytemp.email",
"mynullmail.com",
"mailnesia.com",
"mailnull.com",
"no-spam.ws",
"noblepioneer.com",
"nospam.ze.tc",
"nospam4.us",
"owlpic.com",
"pookmail.com",
"poof.email",
"qq1234.org",
"receivemail.org",
"rtrtr.com",
"s0ny.net",
"safetymail.info",
"shitmail.me",
"smellfear.com",
"spamavert.com",
"spambog.com",
"spambog.net",
"spambog.ru",
"spamgob.com",
"spamherelots.com",
"spamslicer.com",
"spamthisplease.com",
"spoofmail.de",
"super-auswahl.de",
"tempr.email",
"throwam.com",
"tilien.com",
"tmailinator.com",
"trashdevil.com",
"trashdevil.de",
"trbvm.com",
"turual.com",
"uggsrock.com",
"viditag.com",
"vomoto.com",
"vpn.st",
"wegwerfemail.de",
"wegwerfemail.net",
"wegwerfemail.org",
"wetrainbayarea.com",
"willhackforfood.biz",
"wuzupmail.net",
"xemaps.com",
"xmailer.be",
"xoxy.net",
"yep.it",
"yogamaven.com",
"z1p.biz",
"zoemail.org",
}
)
def is_disposable_email(email: str) -> bool:
"""Return True if the email address uses a known disposable domain."""
if not email or "@" not in email:
return False
domain = email.rsplit("@", 1)[1].strip().lower()
return domain in _DISPOSABLE_EMAIL_DOMAINS
def is_plausible_phone(phone: str) -> bool:
"""Return True if the phone number looks like a real number.
Rejects:
- Too short after stripping formatting (<7 digits)
- All-same digits (e.g. 0000000000, 1111111111)
- The entire digit string is a sequential run (e.g. 1234567890, 0987654321)
"""
if not phone:
return False
digits = "".join(c for c in phone if c.isdigit())
if len(digits) < 7:
return False
if len(set(digits)) == 1:
return False
# Reject only when the entire digit string is a consecutive run
ascending = "0123456789"
descending = "9876543210"
if digits in ascending or digits in descending:
return False
return True
async def send_email(
to: str, subject: str, html: str, text: str = None,
from_addr: str = None, email_type: str = "ad_hoc",
) -> str | None:
"""Send email via Resend SDK. Returns resend_id on success, None on failure.
Truthy string works like True for existing boolean callers; None is falsy.
"""
sender = from_addr or config.EMAIL_FROM
resend_id = None
if not config.RESEND_API_KEY:
print(f"[EMAIL] Would send to {to}: {subject}")
resend_id = "dev"
else:
resend.api_key = config.RESEND_API_KEY
try:
result = resend.Emails.send(
{
"from": sender,
"to": to,
"subject": subject,
"html": html,
"text": text or html,
}
)
resend_id = result.get("id") if isinstance(result, dict) else getattr(result, "id", None)
except Exception as e:
print(f"[EMAIL] Error sending to {to}: {e}")
return None
# Log to email_log (best-effort, never fail the send)
try:
await execute(
"""INSERT INTO email_log (resend_id, from_addr, to_addr, subject, email_type)
VALUES (?, ?, ?, ?, ?)""",
(resend_id, sender, to, subject, email_type),
)
except Exception as e:
print(f"[EMAIL] Failed to log email: {e}")
return resend_id
# =============================================================================
# Waitlist
# =============================================================================
async def _get_or_create_resend_audience(name: str) -> str | None:
"""Get cached Resend audience ID, or create one via API. Returns None on failure."""
row = await fetch_one("SELECT audience_id FROM resend_audiences WHERE name = ?", (name,))
if row:
return row["audience_id"]
try:
resend.api_key = config.RESEND_API_KEY
result = resend.Audiences.create({"name": name})
audience_id = result["id"]
await execute(
"INSERT OR IGNORE INTO resend_audiences (name, audience_id) VALUES (?, ?)",
(name, audience_id),
)
return audience_id
except Exception:
return None
_BLUEPRINT_TO_AUDIENCE = {
"suppliers": "suppliers",
"planner": "leads",
"leads": "leads",
"auth": "newsletter",
"content": "newsletter",
"public": "newsletter",
}
def _audience_for_blueprint(blueprint: str) -> str:
"""Map blueprint name to one of 3 Resend audiences (free plan limit)."""
return _BLUEPRINT_TO_AUDIENCE.get(blueprint, "newsletter")
async def capture_waitlist_email(
email: str, intent: str, plan: str = None, email_intent: str = None
) -> bool:
"""Insert email into waitlist, enqueue confirmation, add to Resend audience.
Args:
email: Email address to capture
intent: Intent value stored in database
plan: Optional plan name stored in database
email_intent: Optional intent value for email (defaults to `intent`)
Returns:
True if new row inserted, False if duplicate.
"""
# INSERT OR IGNORE
try:
cursor_result = await execute(
"INSERT OR IGNORE INTO waitlist (email, intent, plan, ip_address) VALUES (?, ?, ?, ?)",
(email, intent, plan, request.remote_addr),
)
is_new = cursor_result > 0
except Exception:
# If anything fails, treat as not-new to avoid double-sending
is_new = False
# Enqueue confirmation email only if new
if is_new:
from .worker import enqueue
email_intent_value = email_intent if email_intent is not None else intent
lang = g.get("lang", "en") if g else "en"
await enqueue("send_waitlist_confirmation", {"email": email, "intent": email_intent_value, "lang": lang})
# Add to Resend audience (silent fail - not critical)
# 3 named audiences: suppliers, leads, newsletter (free plan limit = 3)
if config.RESEND_API_KEY:
blueprint = request.blueprints[0] if request.blueprints else "public"
audience_name = _audience_for_blueprint(blueprint)
audience_id = await _get_or_create_resend_audience(audience_name)
if audience_id:
try:
resend.api_key = config.RESEND_API_KEY
resend.Contacts.create({"email": email, "audience_id": audience_id})
except Exception:
pass # Silent fail
return is_new
# =============================================================================
# CSRF Protection
# =============================================================================
def get_csrf_token() -> str:
"""Get or create CSRF token for current session."""
if "csrf_token" not in session:
session["csrf_token"] = secrets.token_urlsafe(32)
return session["csrf_token"]
def validate_csrf_token(token: str) -> bool:
"""Validate CSRF token."""
return token and secrets.compare_digest(token, session.get("csrf_token", ""))
def csrf_protect(f):
"""Decorator to require valid CSRF token for POST requests."""
@wraps(f)
async def decorated(*args, **kwargs):
if request.method == "POST":
form = await request.form
token = form.get("csrf_token") or request.headers.get("X-CSRF-Token")
if not validate_csrf_token(token):
return {"error": "Invalid CSRF token"}, 403
return await f(*args, **kwargs)
return decorated
# =============================================================================
# Rate Limiting (SQLite-based)
# =============================================================================
async def check_rate_limit(key: str, limit: int = None, window: int = None) -> tuple[bool, dict]:
"""
Check if rate limit exceeded. Returns (is_allowed, info).
Uses SQLite for storage - no Redis needed.
"""
limit = limit or config.RATE_LIMIT_REQUESTS
window = window or config.RATE_LIMIT_WINDOW
now = datetime.utcnow()
window_start = now - timedelta(seconds=window)
# Clean old entries and count recent
await execute(
"DELETE FROM rate_limits WHERE key = ? AND timestamp < ?", (key, window_start.isoformat())
)
result = await fetch_one(
"SELECT COUNT(*) as count FROM rate_limits WHERE key = ? AND timestamp > ?",
(key, window_start.isoformat()),
)
count = result["count"] if result else 0
info = {
"limit": limit,
"remaining": max(0, limit - count - 1),
"reset": int((window_start + timedelta(seconds=window)).timestamp()),
}
if count >= limit:
return False, info
# Record this request
await execute("INSERT INTO rate_limits (key, timestamp) VALUES (?, ?)", (key, now.isoformat()))
return True, info
def rate_limit(limit: int = None, window: int = None, key_func=None):
"""Decorator for rate limiting routes."""
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
if key_func:
key = key_func()
else:
key = f"ip:{request.remote_addr}"
allowed, info = await check_rate_limit(key, limit, window)
if not allowed:
response = {"error": "Rate limit exceeded", **info}
return response, 429
return await f(*args, **kwargs)
return decorated
return decorator
# =============================================================================
# Request ID Tracking
# =============================================================================
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
def get_request_id() -> str:
"""Get current request ID."""
return request_id_var.get()
def setup_request_id(app):
"""Setup request ID middleware."""
@app.before_request
async def set_request_id():
rid = request.headers.get("X-Request-ID") or secrets.token_hex(8)
request_id_var.set(rid)
g.request_id = rid
@app.after_request
async def add_request_id_header(response):
response.headers["X-Request-ID"] = get_request_id()
return response
# =============================================================================
# Webhook Signature Verification
# =============================================================================
def verify_hmac_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(signature, expected)
# =============================================================================
# Soft Delete Helpers
# =============================================================================
async def soft_delete(table: str, id: int) -> bool:
"""Mark record as deleted."""
result = await execute(
f"UPDATE {table} SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL",
(datetime.utcnow().isoformat(), id),
)
return result > 0
async def restore(table: str, id: int) -> bool:
"""Restore soft-deleted record."""
result = await execute(f"UPDATE {table} SET deleted_at = NULL WHERE id = ?", (id,))
return result > 0
async def hard_delete(table: str, id: int) -> bool:
"""Permanently delete record."""
result = await execute(f"DELETE FROM {table} WHERE id = ?", (id,))
return result > 0
async def purge_deleted(table: str, days: int = 30) -> int:
"""Purge records deleted more than X days ago."""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
return await execute(
f"DELETE FROM {table} WHERE deleted_at IS NOT NULL AND deleted_at < ?", (cutoff,)
)
# =============================================================================
# Paddle Product Lookup
# =============================================================================
async def get_paddle_price(key: str) -> str | None:
"""Look up a Paddle price ID by product key from the paddle_products table."""
row = await fetch_one("SELECT paddle_price_id FROM paddle_products WHERE key = ?", (key,))
return row["paddle_price_id"] if row else None
async def get_all_paddle_prices() -> dict[str, str]:
"""Load all Paddle price IDs as a {key: price_id} dict."""
rows = await fetch_all("SELECT key, paddle_price_id FROM paddle_products")
return {r["key"]: r["paddle_price_id"] for r in rows}
# =============================================================================
# Text Utilities
# =============================================================================
def slugify(text: str, max_length_chars: int = 80) -> str:
"""Convert text to URL-safe slug."""
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode()
text = re.sub(r"[^\w\s-]", "", text.lower())
text = re.sub(r"[-\s]+", "-", text).strip("-")
return text[:max_length_chars]
# =============================================================================
# A/B Testing
# =============================================================================
def _has_functional_consent() -> bool:
"""Return True if the visitor has accepted functional cookies."""
return "functional" in request.cookies.get("cookie_consent", "")
def ab_test(experiment: str, variants: tuple = ("control", "treatment")):
"""Assign visitor to an A/B test variant, tag Umami pageviews.
Only persists the variant cookie when the visitor has given functional
cookie consent. Without consent a random variant is picked per-request
(so the page renders fine and Umami is tagged), but no cookie is set.
"""
def decorator(f):
@wraps(f)
async def wrapper(*args, **kwargs):
cookie_key = f"ab_{experiment}"
has_consent = _has_functional_consent()
assigned = request.cookies.get(cookie_key) if has_consent else None
if assigned not in variants:
assigned = random.choice(variants)
g.ab_variant = assigned
g.ab_tag = f"{experiment}-{assigned}"
response = await make_response(await f(*args, **kwargs))
if has_consent:
response.set_cookie(cookie_key, assigned, max_age=30 * 24 * 60 * 60)
return response
return wrapper
return decorator
async def is_flag_enabled(name: str, default: bool = False) -> bool:
"""Check if a feature flag is enabled. Falls back to default if flag doesn't exist.
Reads from the feature_flags table. Flags are toggled via the admin UI
and take effect immediately — no restart needed.
"""
row = await fetch_one(
"SELECT enabled FROM feature_flags WHERE name = ?", (name,)
)
if row is None:
return default
return bool(row["enabled"])
def feature_gate(flag_name: str, waitlist_template: str, **extra_context):
"""Gate a route behind a feature flag. Shows waitlist template if flag is disabled.
Replaces the old waitlist_gate() which used a global WAITLIST_MODE env var.
This checks per-feature flags from the database instead.
Args:
flag_name: Name of the feature flag (e.g., "payments", "supplier_signup")
waitlist_template: Template to render when the flag is OFF and method is GET
**extra_context: Additional context. Values can be callables (evaluated at request time).
Usage:
@bp.route("/signup", methods=["GET", "POST"])
@csrf_protect
@feature_gate("payments", "waitlist.html", plan=lambda: request.args.get("plan", "free"))
async def signup():
...
"""
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
if not await is_flag_enabled(flag_name) and request.method == "GET":
ctx = {}
for key, val in extra_context.items():
ctx[key] = val() if callable(val) else val
return await render_template(waitlist_template, **ctx)
return await f(*args, **kwargs)
return decorated
return decorator
def waitlist_gate(template: str, **extra_context):
"""DEPRECATED: Use feature_gate() instead. Kept for backwards compatibility.
Intercepts GET requests when WAITLIST_MODE is enabled.
"""
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
if config.WAITLIST_MODE and request.method == "GET":
ctx = {}
for key, val in extra_context.items():
ctx[key] = val() if callable(val) else val
return await render_template(template, **ctx)
return await f(*args, **kwargs)
return decorated
return decorator