Files
padelnomics/web/src/padelnomics/worker.py
Deeman 2e42245ad5 fix(worker): use sqlmesh run prod instead of plan prod --auto-apply
`plan --auto-apply` only detects SQL model changes and won't re-run
for new data. `run prod` evaluates missing cron intervals and picks
up newly extracted data — matching the fix already applied to the
supervisor.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-05 21:49:51 +01:00

968 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Background task worker - SQLite-based queue (no Redis needed).
"""
import asyncio
import json
import logging
import secrets
import traceback
from datetime import datetime, timedelta
from .core import (
EMAIL_ADDRESSES,
config,
execute,
fetch_all,
fetch_one,
init_db,
send_email,
setup_logging,
transaction,
utcnow,
utcnow_iso,
)
from .email_templates import render_email_template
from .i18n import get_translations
logger = logging.getLogger(__name__)
scheduler_logger = logging.getLogger(f"{__name__}.scheduler")
# Task handlers registry
HANDLERS: dict[str, callable] = {}
def _t(key: str, lang: str = "en", **kwargs) -> str:
"""Look up an email translation key, interpolating {placeholders}.
Falls back to English if key is missing for the requested language.
"""
translations = get_translations(lang)
raw = translations.get(key, get_translations("en").get(key, key))
return raw.format(**kwargs) if kwargs else raw
def task(name: str):
"""Decorator to register a task handler."""
def decorator(f):
HANDLERS[name] = f
return f
return decorator
# =============================================================================
# Task Queue Operations
# =============================================================================
async def enqueue(task_name: str, payload: dict = None, run_at: datetime = None) -> int:
"""Add a task to the queue."""
return await execute(
"""
INSERT INTO tasks (task_name, payload, status, run_at, created_at)
VALUES (?, ?, 'pending', ?, ?)
""",
(
task_name,
json.dumps(payload or {}),
(run_at or utcnow()).strftime("%Y-%m-%d %H:%M:%S"),
utcnow_iso(),
),
)
async def get_pending_tasks(limit: int = 10) -> list[dict]:
"""Get pending tasks ready to run."""
now = utcnow_iso()
return await fetch_all(
"""
SELECT * FROM tasks
WHERE status = 'pending' AND run_at <= ?
ORDER BY run_at ASC
LIMIT ?
""",
(now, limit),
)
async def mark_complete(task_id: int) -> None:
"""Mark task as completed."""
await execute(
"UPDATE tasks SET status = 'complete', completed_at = ? WHERE id = ?",
(utcnow_iso(), task_id),
)
async def mark_failed(task_id: int, error: str, retries: int) -> None:
"""Mark task as failed, schedule retry if attempts remain."""
max_retries = 3
if retries < max_retries:
# Exponential backoff: 1min, 5min, 25min
delay = timedelta(minutes=5**retries)
run_at = utcnow() + delay
await execute(
"""
UPDATE tasks
SET status = 'pending', error = ?, retries = ?, run_at = ?
WHERE id = ?
""",
(error, retries + 1, run_at.isoformat(), task_id),
)
else:
await execute(
"UPDATE tasks SET status = 'failed', error = ? WHERE id = ?", (error, task_id)
)
# =============================================================================
# Built-in Task Handlers
# =============================================================================
@task("send_email")
async def handle_send_email(payload: dict) -> None:
"""Send an email."""
await send_email(
to=payload["to"],
subject=payload["subject"],
html=payload["html"],
text=payload.get("text"),
from_addr=payload.get("from_addr"),
email_type="generic",
)
@task("send_magic_link")
async def handle_send_magic_link(payload: dict) -> None:
"""Send magic link email."""
lang = payload.get("lang", "en")
link = f"{config.BASE_URL}/auth/verify?token={payload['token']}"
if config.DEBUG:
logger.debug("MAGIC LINK for %s: %s", payload["email"], link)
expiry_minutes = config.MAGIC_LINK_EXPIRY_MINUTES
html = render_email_template(
"emails/magic_link.html",
lang=lang,
link=link,
expiry_minutes=expiry_minutes,
preheader=_t("email_magic_link_preheader", lang, expiry_minutes=expiry_minutes),
)
await send_email(
to=payload["email"],
subject=_t("email_magic_link_subject", lang, app_name=config.APP_NAME),
html=html,
from_addr=EMAIL_ADDRESSES["transactional"],
email_type="magic_link",
)
@task("send_quote_verification")
async def handle_send_quote_verification(payload: dict) -> None:
"""Send verification email for quote request."""
lang = payload.get("lang", "en")
link = (
f"{config.BASE_URL}/{lang}/leads/verify"
f"?token={payload['token']}&lead={payload['lead_token']}"
)
if config.DEBUG:
logger.debug("QUOTE VERIFICATION for %s: %s", payload["email"], link)
first_name = (
payload.get("contact_name", "").split()[0] if payload.get("contact_name") else "there"
)
court_count = payload.get("court_count", "")
facility_type = payload.get("facility_type", "")
country = payload.get("country", "")
recap_parts = []
if court_count:
recap_parts.append(f"{court_count} courts")
if facility_type:
recap_parts.append(facility_type)
if country:
recap_parts.append(country)
preheader = _t("email_quote_verify_preheader_courts", lang, court_count=court_count) if court_count else _t("email_quote_verify_preheader", lang)
html = render_email_template(
"emails/quote_verification.html",
lang=lang,
link=link,
first_name=first_name,
recap_parts=recap_parts,
preheader=preheader,
)
await send_email(
to=payload["email"],
subject=_t("email_quote_verify_subject", lang),
html=html,
from_addr=EMAIL_ADDRESSES["transactional"],
email_type="quote_verification",
)
@task("send_welcome")
async def handle_send_welcome(payload: dict) -> None:
"""Send welcome email to new user."""
lang = payload.get("lang", "en")
name_parts = (payload.get("name") or "").split()
first_name = name_parts[0] if name_parts else "there"
html = render_email_template(
"emails/welcome.html",
lang=lang,
first_name=first_name,
preheader=_t("email_welcome_preheader", lang),
)
await send_email(
to=payload["email"],
subject=_t("email_welcome_subject", lang),
html=html,
from_addr=EMAIL_ADDRESSES["transactional"],
email_type="welcome",
)
@task("send_waitlist_confirmation")
async def handle_send_waitlist_confirmation(payload: dict) -> None:
"""Send waitlist confirmation email."""
intent = payload.get("intent", "signup")
lang = payload.get("lang", "en")
email = payload["email"]
if intent.startswith("supplier_"):
plan_name = intent.replace("supplier_", "").title()
subject = _t("email_waitlist_supplier_subject", lang, plan_name=plan_name)
html = render_email_template(
"emails/waitlist_supplier.html",
lang=lang,
plan_name=plan_name,
preheader=_t("email_waitlist_supplier_preheader", lang),
)
else:
subject = _t("email_waitlist_general_subject", lang)
html = render_email_template(
"emails/waitlist_general.html",
lang=lang,
preheader=_t("email_waitlist_general_preheader", lang),
)
await send_email(
to=email,
subject=subject,
html=html,
from_addr=EMAIL_ADDRESSES["transactional"],
email_type="waitlist",
)
@task("cleanup_expired_tokens")
async def handle_cleanup_tokens(payload: dict) -> None:
"""Clean up expired auth tokens."""
await execute("DELETE FROM auth_tokens WHERE expires_at < ?", (utcnow_iso(),))
@task("cleanup_rate_limits")
async def handle_cleanup_rate_limits(payload: dict) -> None:
"""Clean up old rate limit entries."""
cutoff = (utcnow() - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")
await execute("DELETE FROM rate_limits WHERE timestamp < ?", (cutoff,))
@task("send_lead_forward_email")
async def handle_send_lead_forward_email(payload: dict) -> None:
"""Send full project brief to supplier who unlocked/was forwarded a lead."""
lang = payload.get("lang", "en")
lead_id = payload["lead_id"]
supplier_id = payload["supplier_id"]
lead = await fetch_one("SELECT * FROM lead_requests WHERE id = ?", (lead_id,))
supplier = await fetch_one("SELECT * FROM suppliers WHERE id = ?", (supplier_id,))
if not lead or not supplier:
return
heat = (lead["heat_score"] or "cool").upper()
country = lead["country"] or "Unknown"
courts = lead["court_count"] or "?"
budget = lead["budget_estimate"] or "?"
facility_type = lead["facility_type"] or "padel"
timeline = lead["timeline"] or ""
contact_email = lead["contact_email"] or ""
subject = f"[{heat}] New padel project in {country} \u2014 {courts} courts, \u20ac{budget}"
tl = lambda key: _t(key, lang) # noqa: E731
brief_rows = [
(tl("email_lead_forward_lbl_facility"), f"{facility_type} ({lead['build_context'] or '-'})"),
(tl("email_lead_forward_lbl_courts"), f"{courts} | Glass: {lead['glass_type'] or '-'} | Lighting: {lead['lighting_type'] or '-'}"),
(tl("email_lead_forward_lbl_location"), f"{lead['location'] or '-'}, {country}"),
(tl("email_lead_forward_lbl_timeline"), f"{timeline or '-'} | Budget: \u20ac{budget}"),
(tl("email_lead_forward_lbl_phase"), f"{lead['location_status'] or '-'} | Financing: {lead['financing_status'] or '-'}"),
(tl("email_lead_forward_lbl_services"), lead["services_needed"] or "-"),
(tl("email_lead_forward_lbl_additional"), lead["additional_info"] or "-"),
]
preheader_parts = [f"{facility_type} project"]
if timeline:
preheader_parts.append(f"{timeline} timeline")
preheader_parts.append(_t("email_lead_forward_preheader_suffix", lang))
# Send to supplier contact email or general contact
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
if not to_email:
logger.warning("No email for supplier %s, skipping lead forward", supplier_id)
return
# Generate one-click "I've contacted this lead" CTA token
cta_token = secrets.token_urlsafe(24)
cta_url = f"{config.BASE_URL}/suppliers/leads/cta/{cta_token}"
html = render_email_template(
"emails/lead_forward.html",
lang=lang,
heat=heat,
brief_rows=brief_rows,
contact_name=lead["contact_name"] or "-",
contact_email=contact_email,
contact_phone=lead["contact_phone"] or "-",
contact_company=lead["contact_company"] or "-",
stakeholder_type=lead["stakeholder_type"] or "-",
cta_url=cta_url,
preheader=", ".join(preheader_parts),
)
await send_email(
to=to_email,
subject=subject,
html=html,
from_addr=EMAIL_ADDRESSES["leads"],
email_type="lead_forward",
)
# Update email_sent_at and store cta_token on lead_forward
now = utcnow_iso()
await execute(
"UPDATE lead_forwards SET email_sent_at = ?, cta_token = ? WHERE lead_id = ? AND supplier_id = ?",
(now, cta_token, lead_id, supplier_id),
)
@task("send_lead_matched_notification")
async def handle_send_lead_matched_notification(payload: dict) -> None:
"""Notify the entrepreneur that a supplier has been matched to their project."""
lang = payload.get("lang", "en")
lead_id = payload["lead_id"]
lead = await fetch_one("SELECT * FROM lead_requests WHERE id = ?", (lead_id,))
if not lead or not lead["contact_email"]:
return
first_name = (lead["contact_name"] or "").split()[0] if lead.get("contact_name") else "there"
html = render_email_template(
"emails/lead_matched.html",
lang=lang,
first_name=first_name,
facility_type=lead["facility_type"] or "padel",
court_count=lead["court_count"] or "?",
country=lead["country"] or "your area",
preheader=_t("email_lead_matched_preheader", lang),
)
await send_email(
to=lead["contact_email"],
subject=_t("email_lead_matched_subject", lang, first_name=first_name),
html=html,
from_addr=EMAIL_ADDRESSES["leads"],
email_type="lead_matched",
)
@task("notify_matching_suppliers")
async def handle_notify_matching_suppliers(payload: dict) -> None:
"""Notify growth/pro suppliers whose service_area matches a newly verified lead."""
lead_id = payload["lead_id"]
lang = payload.get("lang", "en")
lead = await fetch_one(
"SELECT * FROM lead_requests WHERE id = ? AND status = 'new' AND verified_at IS NOT NULL AND visible_from <= datetime('now')",
(lead_id,),
)
if not lead or not lead.get("country"):
return
country = lead["country"]
heat = (lead["heat_score"] or "cool").upper()
# Find matching suppliers: paid tier, have credits, service_area includes lead country
# service_area is comma-separated country codes (e.g. "DE,AT,CH")
matching = await fetch_all(
"""SELECT id, name, contact_email, contact, tier
FROM suppliers
WHERE tier IN ('growth', 'pro')
AND credit_balance > 0
AND (service_area = ? OR service_area LIKE ? OR service_area LIKE ? OR service_area LIKE ?)
LIMIT 20""",
(country, f"{country},%", f"%,{country}", f"%,{country},%"),
)
if not matching:
return
courts = lead["court_count"] or "?"
timeline = lead["timeline"] or ""
facility_type = lead["facility_type"] or "padel"
for supplier in matching:
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
if not to_email:
continue
notify_html = render_email_template(
"emails/lead_match_notify.html",
lang=lang,
heat=heat,
country=country,
facility_type=facility_type,
courts=courts,
timeline=timeline,
credit_cost=lead.get("credit_cost", "?"),
preheader=f"New matching lead in {country}",
)
await send_email(
to=to_email,
subject=f"[{heat}] New {facility_type} project in {country}{courts} courts",
html=notify_html,
from_addr=EMAIL_ADDRESSES["leads"],
email_type="lead_match_notify",
)
@task("send_weekly_lead_digest")
async def handle_send_weekly_lead_digest(payload: dict) -> None:
"""Weekly digest to active suppliers: new matching leads in their area."""
# Find paid suppliers with credits
active_suppliers = await fetch_all(
"SELECT id, name, service_area, contact_email, contact FROM suppliers WHERE tier IN ('growth','pro') AND credit_balance > 0"
)
for supplier in active_suppliers:
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
if not to_email:
continue
service_area_raw = (supplier.get("service_area") or "").strip()
if not service_area_raw:
continue
countries = [c.strip() for c in service_area_raw.split(",") if c.strip()]
if not countries:
continue
placeholders = ",".join("?" * len(countries))
new_leads = await fetch_all(
f"""SELECT id, heat_score, country, court_count, facility_type, timeline, credit_cost, created_at
FROM lead_requests
WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL
AND visible_from <= datetime('now')
AND country IN ({placeholders})
AND created_at >= datetime('now', '-7 days')
AND NOT EXISTS (
SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ?
)
ORDER BY
CASE heat_score WHEN 'hot' THEN 0 WHEN 'warm' THEN 1 ELSE 2 END,
created_at DESC
LIMIT 5""",
tuple(countries) + (supplier["id"],),
)
if not new_leads:
continue
# Normalise lead dicts for template — heat_score → heat (uppercase)
digest_leads = [
{**ld, "heat": (ld["heat_score"] or "cool").upper()}
for ld in new_leads
]
area_summary = ", ".join(countries[:3])
if len(countries) > 3:
area_summary += f" +{len(countries) - 3}"
digest_html = render_email_template(
"emails/weekly_digest.html",
lang="en",
leads=digest_leads,
preheader=f"{len(new_leads)} new leads matching your service area",
)
await send_email(
to=to_email,
subject=f"{len(new_leads)} new padel {'lead' if len(new_leads) == 1 else 'leads'} in {area_summary}",
html=digest_html,
from_addr=EMAIL_ADDRESSES["leads"],
email_type="weekly_digest",
)
@task("send_supplier_enquiry_email")
async def handle_send_supplier_enquiry_email(payload: dict) -> None:
"""Relay a directory enquiry form submission to the supplier's contact email."""
lang = payload.get("lang", "en")
supplier_email = payload.get("supplier_email", "")
if not supplier_email:
return
supplier_name = payload.get("supplier_name", "")
contact_name = payload.get("contact_name", "")
contact_email = payload.get("contact_email", "")
message = payload.get("message", "")
html = render_email_template(
"emails/supplier_enquiry.html",
lang=lang,
supplier_name=supplier_name,
contact_name=contact_name,
contact_email=contact_email,
message=message,
preheader=_t("email_enquiry_preheader", lang),
)
await send_email(
to=supplier_email,
subject=_t("email_enquiry_subject", lang, contact_name=contact_name),
html=html,
from_addr=EMAIL_ADDRESSES["transactional"],
email_type="supplier_enquiry",
)
@task("refill_monthly_credits")
async def handle_refill_monthly_credits(payload: dict) -> None:
"""Refill monthly credits for all claimed suppliers with a paid tier.
Uses two bulk SQL statements instead of N×3 per-supplier queries:
1. INSERT INTO credit_ledger SELECT ... for all eligible suppliers at once
2. UPDATE suppliers SET credit_balance = credit_balance + monthly_credits
"""
now = utcnow_iso()
async with transaction() as db:
# Batch-insert ledger rows for all eligible suppliers in one statement
await db.execute(
"""INSERT INTO credit_ledger
(supplier_id, delta, balance_after, event_type, note, created_at)
SELECT id,
monthly_credits,
credit_balance + monthly_credits,
'monthly_allocation',
'Monthly refill (' || tier || ' plan)',
?
FROM suppliers
WHERE tier IN ('growth', 'pro')
AND claimed_by IS NOT NULL
AND monthly_credits > 0""",
(now,),
)
# Update balances and refill timestamps in one statement
result = await db.execute(
"""UPDATE suppliers
SET credit_balance = credit_balance + monthly_credits,
last_credit_refill = ?
WHERE tier IN ('growth', 'pro')
AND claimed_by IS NOT NULL
AND monthly_credits > 0""",
(now,),
)
logger.info("Monthly credit refill complete — %d suppliers updated", result.rowcount)
@task("generate_business_plan")
async def handle_generate_business_plan(payload: dict) -> None:
"""Generate a business plan PDF and save it to disk."""
from pathlib import Path
export_id = payload["export_id"]
user_id = payload["user_id"]
scenario_id = payload["scenario_id"]
language = payload.get("language", "en")
# Mark as generating
await execute(
"UPDATE business_plan_exports SET status = 'generating' WHERE id = ?",
(export_id,),
)
try:
from .businessplan import generate_business_plan
pdf_bytes = await generate_business_plan(scenario_id, user_id, language)
# Save PDF
export_dir = Path("data/exports")
export_dir.mkdir(parents=True, exist_ok=True)
file_path = export_dir / f"{export_id}.pdf"
file_path.write_bytes(pdf_bytes)
# Update record
now = utcnow_iso()
await execute(
"UPDATE business_plan_exports SET status = 'ready', file_path = ?, completed_at = ? WHERE id = ?",
(str(file_path), now, export_id),
)
# Notify user via email
export_row = await fetch_one(
"SELECT token FROM business_plan_exports WHERE id = ?", (export_id,)
)
export_token = export_row["token"]
user = await fetch_one("SELECT email FROM users WHERE id = ?", (user_id,))
if user:
bp_html = render_email_template(
"emails/business_plan.html",
lang=language,
download_url=f"{config.BASE_URL}/planner/export/{export_token}",
quote_url=f"{config.BASE_URL}/{language}/leads/quote",
preheader=_t("email_business_plan_preheader", language),
)
await send_email(
to=user["email"],
subject=_t("email_business_plan_subject", language),
html=bp_html,
from_addr=EMAIL_ADDRESSES["transactional"],
email_type="business_plan",
)
logger.info("Generated business plan PDF: export_id=%s", export_id)
except Exception:
await execute(
"UPDATE business_plan_exports SET status = 'failed' WHERE id = ?",
(export_id,),
)
raise
@task("cleanup_old_tasks")
async def handle_cleanup_tasks(payload: dict) -> None:
"""Clean up completed/failed tasks older than 7 days."""
cutoff = (utcnow() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
await execute(
"DELETE FROM tasks WHERE status IN ('complete', 'failed') AND created_at < ?", (cutoff,)
)
# =============================================================================
# SEO Metrics Sync
# =============================================================================
@task("sync_gsc")
async def handle_sync_gsc(payload: dict) -> None:
"""Sync Google Search Console data."""
from .seo import sync_gsc
days_back = payload.get("days_back", 3)
rows = await sync_gsc(days_back=days_back)
logger.info("GSC sync complete: %s rows", rows)
@task("sync_bing")
async def handle_sync_bing(payload: dict) -> None:
"""Sync Bing Webmaster data."""
from .seo import sync_bing
days_back = payload.get("days_back", 3)
rows = await sync_bing(days_back=days_back)
logger.info("Bing sync complete: %s rows", rows)
@task("sync_umami")
async def handle_sync_umami(payload: dict) -> None:
"""Sync Umami analytics data."""
from .seo import sync_umami
days_back = payload.get("days_back", 3)
rows = await sync_umami(days_back=days_back)
logger.info("Umami sync complete: %s rows", rows)
@task("cleanup_seo_metrics")
async def handle_cleanup_seo_metrics(payload: dict) -> None:
"""Delete SEO metrics older than 12 months."""
from .seo import cleanup_old_metrics
deleted = await cleanup_old_metrics(retention_days=365)
logger.info("Cleaned up %s old SEO metric rows", deleted)
@task("run_extraction")
async def handle_run_extraction(payload: dict) -> None:
"""Run the extraction pipeline in the background.
Shells out to the extraction CLI in the repo root. The extraction CLI
manages its own state in .state.sqlite and writes to the landing zone.
payload["extractor"]: optional workflow name (e.g. "overpass").
If set, runs only that extractor via its entry point (extract-overpass).
If absent, runs all extractors via the umbrella `extract` entry point.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
extractor = payload.get("extractor", "").strip()
if extractor:
cmd_name = f"extract-{extractor.replace('_', '-')}"
cmd = ["uv", "run", "--package", "padelnomics_extract", cmd_name]
else:
cmd = ["uv", "run", "--package", "padelnomics_extract", "extract"]
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=7200, # 2-hour absolute timeout
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Extraction failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_transform")
async def handle_run_transform(payload: dict) -> None:
"""Run SQLMesh transform (prod run) in the background.
Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics run prod`.
2-hour absolute timeout — same as extraction.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "run", "prod"],
capture_output=True,
text=True,
timeout=7200,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"SQLMesh transform failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("SQLMesh transform completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_export")
async def handle_run_export(payload: dict) -> None:
"""Export serving tables from lakehouse.duckdb → analytics.duckdb.
Shells out to `uv run python src/padelnomics/export_serving.py`.
10-minute absolute timeout.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "python", "src/padelnomics/export_serving.py"],
capture_output=True,
text=True,
timeout=600,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Export failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("Export completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_pipeline")
async def handle_run_pipeline(payload: dict) -> None:
"""Run full ELT pipeline: extract → transform → export, stopping on first failure."""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
steps = [
(
"extraction",
["uv", "run", "--package", "padelnomics_extract", "extract"],
7200,
),
(
"transform",
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "run", "prod"],
7200,
),
(
"export",
["uv", "run", "python", "src/padelnomics/export_serving.py"],
600,
),
]
for step_name, cmd, timeout_seconds in steps:
logger.info("Pipeline step starting: %s", step_name)
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=timeout_seconds,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Pipeline failed at {step_name} (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info(
"Pipeline step complete: %s%s",
step_name,
result.stdout[-200:] if result.stdout else "(no output)",
)
logger.info("Full pipeline complete (extract → transform → export)")
@task("generate_articles")
async def handle_generate_articles(payload: dict) -> None:
"""Generate articles from a template in the background."""
from datetime import date as date_cls
from .content import generate_articles
slug = payload["template_slug"]
start_date = date_cls.fromisoformat(payload["start_date"])
articles_per_day = payload.get("articles_per_day", 3)
limit = payload.get("limit", 0)
task_id = payload.get("_task_id")
count = await generate_articles(
slug, start_date, articles_per_day, limit=limit, task_id=task_id
)
logger.info("Generated %s articles for template '%s'", count, slug)
# =============================================================================
# Worker Loop
# =============================================================================
async def process_task(task: dict) -> None:
"""Process a single task."""
task_name = task["task_name"]
task_id = task["id"]
retries = task.get("retries", 0)
handler = HANDLERS.get(task_name)
if not handler:
await mark_failed(task_id, f"Unknown task: {task_name}", retries)
return
try:
payload = json.loads(task["payload"]) if task["payload"] else {}
# Inject task_id so progress-aware handlers (e.g. generate_articles) can
# write progress_current to the tasks table without a separate lookup.
payload["_task_id"] = task_id
await handler(payload)
await mark_complete(task_id)
logger.info("Completed: %s (id=%s)", task_name, task_id)
except Exception as e:
error = f"{e}\n{traceback.format_exc()}"
await mark_failed(task_id, error, retries)
logger.error("Failed: %s (id=%s): %s", task_name, task_id, e)
async def run_worker(poll_interval_seconds: float = 1.0) -> None:
"""Main worker loop."""
setup_logging()
logger.info("Starting...")
await init_db()
from .analytics import open_analytics_db
open_analytics_db()
logger.info("Analytics DB opened.")
while True:
try:
tasks = await get_pending_tasks(limit=10)
for task in tasks:
await process_task(task)
if not tasks:
await asyncio.sleep(poll_interval_seconds)
except Exception as e:
logger.error("Error: %s", e)
await asyncio.sleep(poll_interval_seconds * 5)
async def run_scheduler() -> None:
"""Schedule periodic cleanup tasks."""
setup_logging()
scheduler_logger.info("Starting...")
await init_db()
last_credit_refill = None
last_seo_sync_date = None
last_weekly_digest = None
while True:
try:
# Schedule cleanup tasks every hour
await enqueue("cleanup_expired_tokens")
await enqueue("cleanup_rate_limits")
await enqueue("cleanup_old_tasks")
# Monthly credit refill — run on the 1st of each month
today = utcnow()
this_month = f"{today.year}-{today.month:02d}"
if today.day == 1 and last_credit_refill != this_month:
await enqueue("refill_monthly_credits")
last_credit_refill = this_month
scheduler_logger.info("Queued monthly credit refill for %s", this_month)
# Daily SEO metrics sync — run once per day after 6am UTC
# (GSC data has ~2 day delay, syncing at 6am ensures data is ready)
today_date = today.strftime("%Y-%m-%d")
if last_seo_sync_date != today_date and today.hour >= 6:
await enqueue("sync_gsc")
await enqueue("sync_bing")
await enqueue("sync_umami")
await enqueue("cleanup_seo_metrics")
last_seo_sync_date = today_date
scheduler_logger.info("Queued SEO metric syncs for %s", today_date)
# Weekly lead digest — every Monday after 8am UTC
if today.weekday() == 0 and today.hour >= 8 and last_weekly_digest != today_date:
await enqueue("send_weekly_lead_digest", {})
last_weekly_digest = today_date
scheduler_logger.info("Queued weekly lead digest for %s", today_date)
await asyncio.sleep(3600) # 1 hour
except Exception as e:
scheduler_logger.error("Error: %s", e)
await asyncio.sleep(60)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "scheduler":
asyncio.run(run_scheduler())
else:
asyncio.run(run_worker())