diff --git a/web/src/padelnomics/worker.py b/web/src/padelnomics/worker.py index 0681b25..44c53a0 100644 --- a/web/src/padelnomics/worker.py +++ b/web/src/padelnomics/worker.py @@ -7,9 +7,14 @@ import json import traceback from datetime import datetime, timedelta -from .core import EMAIL_ADDRESSES, config, execute, fetch_all, fetch_one, init_db, send_email +import logging + +from .core import EMAIL_ADDRESSES, config, execute, fetch_all, fetch_one, init_db, send_email, setup_logging from .i18n import get_translations +logger = logging.getLogger(__name__) +scheduler_logger = logging.getLogger(f"{__name__}.scheduler") + # Task handlers registry HANDLERS: dict[str, callable] = {} @@ -208,10 +213,7 @@ async def handle_send_magic_link(payload: dict) -> None: link = f"{config.BASE_URL}/auth/verify?token={payload['token']}" if config.DEBUG: - print(f"\n{'=' * 60}") - print(f" MAGIC LINK for {payload['email']}") - print(f" {link}") - print(f"{'=' * 60}\n") + logger.debug("MAGIC LINK for %s: %s", payload["email"], link) expiry_minutes = config.MAGIC_LINK_EXPIRY_MINUTES body = ( @@ -243,10 +245,7 @@ async def handle_send_quote_verification(payload: dict) -> None: ) if config.DEBUG: - print(f"\n{'=' * 60}") - print(f" QUOTE VERIFICATION for {payload['email']}") - print(f" {link}") - print(f"{'=' * 60}\n") + logger.debug("QUOTE VERIFICATION for %s: %s", payload["email"], link) first_name = ( payload.get("contact_name", "").split()[0] if payload.get("contact_name") else "there" @@ -485,7 +484,7 @@ async def handle_send_lead_forward_email(payload: dict) -> None: # Send to supplier contact email or general contact to_email = supplier.get("contact_email") or supplier.get("contact") or "" if not to_email: - print(f"[WORKER] No email for supplier {supplier_id}, skipping lead forward") + logger.warning("No email for supplier %s, skipping lead forward", supplier_id) return await send_email( @@ -588,9 +587,9 @@ async def handle_refill_monthly_credits(payload: dict) -> None: for s in suppliers: try: await monthly_credit_refill(s["id"]) - print(f"[WORKER] Refilled credits for supplier {s['id']}") + logger.info("Refilled credits for supplier %s", s["id"]) except Exception as e: - print(f"[WORKER] Failed to refill credits for supplier {s['id']}: {e}") + logger.error("Failed to refill credits for supplier %s: %s", s["id"], e) @task("generate_business_plan") @@ -651,7 +650,7 @@ async def handle_generate_business_plan(payload: dict) -> None: email_type="business_plan", ) - print(f"[WORKER] Generated business plan PDF: export_id={export_id}") + logger.info("Generated business plan PDF: export_id=%s", export_id) except Exception: await execute( @@ -680,7 +679,7 @@ async def handle_sync_gsc(payload: dict) -> None: from .seo import sync_gsc days_back = payload.get("days_back", 3) rows = await sync_gsc(days_back=days_back) - print(f"[WORKER] GSC sync complete: {rows} rows") + logger.info("GSC sync complete: %s rows", rows) @task("sync_bing") @@ -689,7 +688,7 @@ async def handle_sync_bing(payload: dict) -> None: from .seo import sync_bing days_back = payload.get("days_back", 3) rows = await sync_bing(days_back=days_back) - print(f"[WORKER] Bing sync complete: {rows} rows") + logger.info("Bing sync complete: %s rows", rows) @task("sync_umami") @@ -698,7 +697,7 @@ async def handle_sync_umami(payload: dict) -> None: from .seo import sync_umami days_back = payload.get("days_back", 3) rows = await sync_umami(days_back=days_back) - print(f"[WORKER] Umami sync complete: {rows} rows") + logger.info("Umami sync complete: %s rows", rows) @task("cleanup_seo_metrics") @@ -706,7 +705,7 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None: """Delete SEO metrics older than 12 months.""" from .seo import cleanup_old_metrics deleted = await cleanup_old_metrics(retention_days=365) - print(f"[WORKER] Cleaned up {deleted} old SEO metric rows") + logger.info("Cleaned up %s old SEO metric rows", deleted) @task("generate_articles") @@ -722,7 +721,7 @@ async def handle_generate_articles(payload: dict) -> None: limit = payload.get("limit", 500) count = await generate_articles(slug, start_date, articles_per_day, limit=limit) - print(f"[WORKER] Generated {count} articles for template '{slug}'") + logger.info("Generated %s articles for template '%s'", count, slug) # ============================================================================= @@ -745,20 +744,21 @@ async def process_task(task: dict) -> None: payload = json.loads(task["payload"]) if task["payload"] else {} await handler(payload) await mark_complete(task_id) - print(f"[WORKER] Completed: {task_name} (id={task_id})") + logger.info("Completed: %s (id=%s)", task_name, task_id) except Exception as e: error = f"{e}\n{traceback.format_exc()}" await mark_failed(task_id, error, retries) - print(f"[WORKER] Failed: {task_name} (id={task_id}): {e}") + logger.error("Failed: %s (id=%s): %s", task_name, task_id, e) async def run_worker(poll_interval: float = 1.0) -> None: """Main worker loop.""" - print("[WORKER] Starting...") + setup_logging() + logger.info("Starting...") await init_db() from .analytics import open_analytics_db open_analytics_db() - print("[WORKER] Analytics DB opened.") + logger.info("Analytics DB opened.") while True: try: @@ -771,13 +771,14 @@ async def run_worker(poll_interval: float = 1.0) -> None: await asyncio.sleep(poll_interval) except Exception as e: - print(f"[WORKER] Error: {e}") + logger.error("Error: %s", e) await asyncio.sleep(poll_interval * 5) async def run_scheduler() -> None: """Schedule periodic cleanup tasks.""" - print("[SCHEDULER] Starting...") + setup_logging() + scheduler_logger.info("Starting...") await init_db() last_credit_refill = None @@ -798,7 +799,7 @@ async def run_scheduler() -> None: if today.day == 1 and last_credit_refill != this_month: await enqueue("refill_monthly_credits") last_credit_refill = this_month - print(f"[SCHEDULER] Queued monthly credit refill for {this_month}") + scheduler_logger.info("Queued monthly credit refill for %s", this_month) # Daily SEO metrics sync — run once per day after 6am UTC # (GSC data has ~2 day delay, syncing at 6am ensures data is ready) @@ -809,12 +810,12 @@ async def run_scheduler() -> None: await enqueue("sync_umami") await enqueue("cleanup_seo_metrics") last_seo_sync_date = today_date - print(f"[SCHEDULER] Queued SEO metric syncs for {today_date}") + scheduler_logger.info("Queued SEO metric syncs for %s", today_date) await asyncio.sleep(3600) # 1 hour except Exception as e: - print(f"[SCHEDULER] Error: {e}") + scheduler_logger.error("Error: %s", e) await asyncio.sleep(60)