feat(logging): convert worker.py print() to logging
- Add module logger (padelnomics.worker) and scheduler_logger (padelnomics.worker.scheduler) - Call setup_logging() at start of run_worker() and run_scheduler() - Convert all 26 print() calls — drop manual [WORKER]/[SCHEDULER] prefixes - Magic link + quote verification debug prints → logger.debug() (only shown when LOG_LEVEL=DEBUG) - Errors with exception context use logger.error() with %s formatting Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -7,9 +7,14 @@ import json
|
|||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from .core import EMAIL_ADDRESSES, config, execute, fetch_all, fetch_one, init_db, send_email
|
import logging
|
||||||
|
|
||||||
|
from .core import EMAIL_ADDRESSES, config, execute, fetch_all, fetch_one, init_db, send_email, setup_logging
|
||||||
from .i18n import get_translations
|
from .i18n import get_translations
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
scheduler_logger = logging.getLogger(f"{__name__}.scheduler")
|
||||||
|
|
||||||
# Task handlers registry
|
# Task handlers registry
|
||||||
HANDLERS: dict[str, callable] = {}
|
HANDLERS: dict[str, callable] = {}
|
||||||
|
|
||||||
@@ -208,10 +213,7 @@ async def handle_send_magic_link(payload: dict) -> None:
|
|||||||
link = f"{config.BASE_URL}/auth/verify?token={payload['token']}"
|
link = f"{config.BASE_URL}/auth/verify?token={payload['token']}"
|
||||||
|
|
||||||
if config.DEBUG:
|
if config.DEBUG:
|
||||||
print(f"\n{'=' * 60}")
|
logger.debug("MAGIC LINK for %s: %s", payload["email"], link)
|
||||||
print(f" MAGIC LINK for {payload['email']}")
|
|
||||||
print(f" {link}")
|
|
||||||
print(f"{'=' * 60}\n")
|
|
||||||
|
|
||||||
expiry_minutes = config.MAGIC_LINK_EXPIRY_MINUTES
|
expiry_minutes = config.MAGIC_LINK_EXPIRY_MINUTES
|
||||||
body = (
|
body = (
|
||||||
@@ -243,10 +245,7 @@ async def handle_send_quote_verification(payload: dict) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if config.DEBUG:
|
if config.DEBUG:
|
||||||
print(f"\n{'=' * 60}")
|
logger.debug("QUOTE VERIFICATION for %s: %s", payload["email"], link)
|
||||||
print(f" QUOTE VERIFICATION for {payload['email']}")
|
|
||||||
print(f" {link}")
|
|
||||||
print(f"{'=' * 60}\n")
|
|
||||||
|
|
||||||
first_name = (
|
first_name = (
|
||||||
payload.get("contact_name", "").split()[0] if payload.get("contact_name") else "there"
|
payload.get("contact_name", "").split()[0] if payload.get("contact_name") else "there"
|
||||||
@@ -485,7 +484,7 @@ async def handle_send_lead_forward_email(payload: dict) -> None:
|
|||||||
# Send to supplier contact email or general contact
|
# Send to supplier contact email or general contact
|
||||||
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
|
to_email = supplier.get("contact_email") or supplier.get("contact") or ""
|
||||||
if not to_email:
|
if not to_email:
|
||||||
print(f"[WORKER] No email for supplier {supplier_id}, skipping lead forward")
|
logger.warning("No email for supplier %s, skipping lead forward", supplier_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
await send_email(
|
await send_email(
|
||||||
@@ -588,9 +587,9 @@ async def handle_refill_monthly_credits(payload: dict) -> None:
|
|||||||
for s in suppliers:
|
for s in suppliers:
|
||||||
try:
|
try:
|
||||||
await monthly_credit_refill(s["id"])
|
await monthly_credit_refill(s["id"])
|
||||||
print(f"[WORKER] Refilled credits for supplier {s['id']}")
|
logger.info("Refilled credits for supplier %s", s["id"])
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WORKER] Failed to refill credits for supplier {s['id']}: {e}")
|
logger.error("Failed to refill credits for supplier %s: %s", s["id"], e)
|
||||||
|
|
||||||
|
|
||||||
@task("generate_business_plan")
|
@task("generate_business_plan")
|
||||||
@@ -651,7 +650,7 @@ async def handle_generate_business_plan(payload: dict) -> None:
|
|||||||
email_type="business_plan",
|
email_type="business_plan",
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"[WORKER] Generated business plan PDF: export_id={export_id}")
|
logger.info("Generated business plan PDF: export_id=%s", export_id)
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
await execute(
|
await execute(
|
||||||
@@ -680,7 +679,7 @@ async def handle_sync_gsc(payload: dict) -> None:
|
|||||||
from .seo import sync_gsc
|
from .seo import sync_gsc
|
||||||
days_back = payload.get("days_back", 3)
|
days_back = payload.get("days_back", 3)
|
||||||
rows = await sync_gsc(days_back=days_back)
|
rows = await sync_gsc(days_back=days_back)
|
||||||
print(f"[WORKER] GSC sync complete: {rows} rows")
|
logger.info("GSC sync complete: %s rows", rows)
|
||||||
|
|
||||||
|
|
||||||
@task("sync_bing")
|
@task("sync_bing")
|
||||||
@@ -689,7 +688,7 @@ async def handle_sync_bing(payload: dict) -> None:
|
|||||||
from .seo import sync_bing
|
from .seo import sync_bing
|
||||||
days_back = payload.get("days_back", 3)
|
days_back = payload.get("days_back", 3)
|
||||||
rows = await sync_bing(days_back=days_back)
|
rows = await sync_bing(days_back=days_back)
|
||||||
print(f"[WORKER] Bing sync complete: {rows} rows")
|
logger.info("Bing sync complete: %s rows", rows)
|
||||||
|
|
||||||
|
|
||||||
@task("sync_umami")
|
@task("sync_umami")
|
||||||
@@ -698,7 +697,7 @@ async def handle_sync_umami(payload: dict) -> None:
|
|||||||
from .seo import sync_umami
|
from .seo import sync_umami
|
||||||
days_back = payload.get("days_back", 3)
|
days_back = payload.get("days_back", 3)
|
||||||
rows = await sync_umami(days_back=days_back)
|
rows = await sync_umami(days_back=days_back)
|
||||||
print(f"[WORKER] Umami sync complete: {rows} rows")
|
logger.info("Umami sync complete: %s rows", rows)
|
||||||
|
|
||||||
|
|
||||||
@task("cleanup_seo_metrics")
|
@task("cleanup_seo_metrics")
|
||||||
@@ -706,7 +705,7 @@ async def handle_cleanup_seo_metrics(payload: dict) -> None:
|
|||||||
"""Delete SEO metrics older than 12 months."""
|
"""Delete SEO metrics older than 12 months."""
|
||||||
from .seo import cleanup_old_metrics
|
from .seo import cleanup_old_metrics
|
||||||
deleted = await cleanup_old_metrics(retention_days=365)
|
deleted = await cleanup_old_metrics(retention_days=365)
|
||||||
print(f"[WORKER] Cleaned up {deleted} old SEO metric rows")
|
logger.info("Cleaned up %s old SEO metric rows", deleted)
|
||||||
|
|
||||||
|
|
||||||
@task("generate_articles")
|
@task("generate_articles")
|
||||||
@@ -722,7 +721,7 @@ async def handle_generate_articles(payload: dict) -> None:
|
|||||||
limit = payload.get("limit", 500)
|
limit = payload.get("limit", 500)
|
||||||
|
|
||||||
count = await generate_articles(slug, start_date, articles_per_day, limit=limit)
|
count = await generate_articles(slug, start_date, articles_per_day, limit=limit)
|
||||||
print(f"[WORKER] Generated {count} articles for template '{slug}'")
|
logger.info("Generated %s articles for template '%s'", count, slug)
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
@@ -745,20 +744,21 @@ async def process_task(task: dict) -> None:
|
|||||||
payload = json.loads(task["payload"]) if task["payload"] else {}
|
payload = json.loads(task["payload"]) if task["payload"] else {}
|
||||||
await handler(payload)
|
await handler(payload)
|
||||||
await mark_complete(task_id)
|
await mark_complete(task_id)
|
||||||
print(f"[WORKER] Completed: {task_name} (id={task_id})")
|
logger.info("Completed: %s (id=%s)", task_name, task_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error = f"{e}\n{traceback.format_exc()}"
|
error = f"{e}\n{traceback.format_exc()}"
|
||||||
await mark_failed(task_id, error, retries)
|
await mark_failed(task_id, error, retries)
|
||||||
print(f"[WORKER] Failed: {task_name} (id={task_id}): {e}")
|
logger.error("Failed: %s (id=%s): %s", task_name, task_id, e)
|
||||||
|
|
||||||
|
|
||||||
async def run_worker(poll_interval: float = 1.0) -> None:
|
async def run_worker(poll_interval: float = 1.0) -> None:
|
||||||
"""Main worker loop."""
|
"""Main worker loop."""
|
||||||
print("[WORKER] Starting...")
|
setup_logging()
|
||||||
|
logger.info("Starting...")
|
||||||
await init_db()
|
await init_db()
|
||||||
from .analytics import open_analytics_db
|
from .analytics import open_analytics_db
|
||||||
open_analytics_db()
|
open_analytics_db()
|
||||||
print("[WORKER] Analytics DB opened.")
|
logger.info("Analytics DB opened.")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -771,13 +771,14 @@ async def run_worker(poll_interval: float = 1.0) -> None:
|
|||||||
await asyncio.sleep(poll_interval)
|
await asyncio.sleep(poll_interval)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WORKER] Error: {e}")
|
logger.error("Error: %s", e)
|
||||||
await asyncio.sleep(poll_interval * 5)
|
await asyncio.sleep(poll_interval * 5)
|
||||||
|
|
||||||
|
|
||||||
async def run_scheduler() -> None:
|
async def run_scheduler() -> None:
|
||||||
"""Schedule periodic cleanup tasks."""
|
"""Schedule periodic cleanup tasks."""
|
||||||
print("[SCHEDULER] Starting...")
|
setup_logging()
|
||||||
|
scheduler_logger.info("Starting...")
|
||||||
await init_db()
|
await init_db()
|
||||||
|
|
||||||
last_credit_refill = None
|
last_credit_refill = None
|
||||||
@@ -798,7 +799,7 @@ async def run_scheduler() -> None:
|
|||||||
if today.day == 1 and last_credit_refill != this_month:
|
if today.day == 1 and last_credit_refill != this_month:
|
||||||
await enqueue("refill_monthly_credits")
|
await enqueue("refill_monthly_credits")
|
||||||
last_credit_refill = this_month
|
last_credit_refill = this_month
|
||||||
print(f"[SCHEDULER] Queued monthly credit refill for {this_month}")
|
scheduler_logger.info("Queued monthly credit refill for %s", this_month)
|
||||||
|
|
||||||
# Daily SEO metrics sync — run once per day after 6am UTC
|
# Daily SEO metrics sync — run once per day after 6am UTC
|
||||||
# (GSC data has ~2 day delay, syncing at 6am ensures data is ready)
|
# (GSC data has ~2 day delay, syncing at 6am ensures data is ready)
|
||||||
@@ -809,12 +810,12 @@ async def run_scheduler() -> None:
|
|||||||
await enqueue("sync_umami")
|
await enqueue("sync_umami")
|
||||||
await enqueue("cleanup_seo_metrics")
|
await enqueue("cleanup_seo_metrics")
|
||||||
last_seo_sync_date = today_date
|
last_seo_sync_date = today_date
|
||||||
print(f"[SCHEDULER] Queued SEO metric syncs for {today_date}")
|
scheduler_logger.info("Queued SEO metric syncs for %s", today_date)
|
||||||
|
|
||||||
await asyncio.sleep(3600) # 1 hour
|
await asyncio.sleep(3600) # 1 hour
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[SCHEDULER] Error: {e}")
|
scheduler_logger.error("Error: %s", e)
|
||||||
await asyncio.sleep(60)
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user