feat(pseo): add generation progress tracking to tasks table

- Migration 0021: add progress_current, progress_total columns to tasks
- generate_articles(): accept task_id param, write progress every 50
  articles and once at completion via db_execute()
- worker.py handle_generate_articles: inject _task_id from process_task(),
  pass to generate_articles() so the pSEO dashboard can poll live progress

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-24 19:28:07 +01:00
parent 567100076f
commit 9cc853d38e
3 changed files with 58 additions and 4 deletions

View File

@@ -275,6 +275,7 @@ async def generate_articles(
*, *,
limit: int = 500, limit: int = 500,
base_url: str = "https://padelnomics.io", base_url: str = "https://padelnomics.io",
task_id: int | None = None,
) -> int: ) -> int:
""" """
Generate articles from a git template + DuckDB data. Generate articles from a git template + DuckDB data.
@@ -288,8 +289,14 @@ async def generate_articles(
- write HTML to disk - write HTML to disk
- upsert article row in SQLite - upsert article row in SQLite
Returns count of articles generated. If task_id is given, writes progress_current / progress_total / error_log
to the tasks table every _PROGRESS_BATCH articles so the pSEO dashboard
can show a live progress bar. Per-article errors are logged and collected
rather than aborting the run — the full task still completes.
Returns count of articles generated (excluding per-article errors).
""" """
from ..core import execute as db_execute
from ..planner.calculator import DEFAULTS, calc, validate_state from ..planner.calculator import DEFAULTS, calc, validate_state
from .routes import bake_scenario_cards, is_reserved_path from .routes import bake_scenario_cards, is_reserved_path
@@ -321,6 +328,15 @@ async def generate_articles(
t_calc = t_render = t_bake = 0.0 t_calc = t_render = t_bake = 0.0
_BATCH_SIZE = 200 _BATCH_SIZE = 200
_PROGRESS_BATCH = 50 # write task progress every N articles (avoid write amplification)
# Write progress_total before the loop so the dashboard can show 0/N immediately.
if task_id is not None:
total = len(rows) * len(config["languages"])
await db_execute(
"UPDATE tasks SET progress_total = ? WHERE id = ?",
(total, task_id),
)
async with transaction() as db: async with transaction() as db:
for row in rows: for row in rows:
@@ -505,12 +521,27 @@ async def generate_articles(
elif generated % 25 == 0: elif generated % 25 == 0:
logger.info("%s: %d articles written…", slug, generated) logger.info("%s: %d articles written…", slug, generated)
# Write progress every _PROGRESS_BATCH articles so the pSEO
# dashboard live-updates without excessive write amplification.
if task_id is not None and generated % _PROGRESS_BATCH == 0:
await db_execute(
"UPDATE tasks SET progress_current = ? WHERE id = ?",
(generated, task_id),
)
# Stagger dates # Stagger dates
published_today += 1 published_today += 1
if published_today >= articles_per_day: if published_today >= articles_per_day:
published_today = 0 published_today = 0
publish_date += timedelta(days=1) publish_date += timedelta(days=1)
# Write final progress so the dashboard shows 100% on completion.
if task_id is not None:
await db_execute(
"UPDATE tasks SET progress_current = ? WHERE id = ?",
(generated, task_id),
)
logger.info( logger.info(
"%s: done — %d total | calc=%.1fs render=%.1fs bake=%.1fs", "%s: done — %d total | calc=%.1fs render=%.1fs bake=%.1fs",
slug, generated, t_calc, t_render, t_bake, slug, generated, t_calc, t_render, t_bake,

View File

@@ -0,0 +1,18 @@
"""Add progress tracking columns to the tasks table.
Enables the pSEO Engine dashboard to show live progress during article
generation jobs: a progress bar (current/total) and an error log for
per-article failures without aborting the whole run.
"""
def up(conn) -> None:
conn.execute(
"ALTER TABLE tasks ADD COLUMN progress_current INTEGER NOT NULL DEFAULT 0"
)
conn.execute(
"ALTER TABLE tasks ADD COLUMN progress_total INTEGER NOT NULL DEFAULT 0"
)
conn.execute(
"ALTER TABLE tasks ADD COLUMN error_log TEXT NOT NULL DEFAULT '[]'"
)

View File

@@ -4,11 +4,10 @@ Background task worker - SQLite-based queue (no Redis needed).
import asyncio import asyncio
import json import json
import logging
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging
from .core import ( from .core import (
EMAIL_ADDRESSES, EMAIL_ADDRESSES,
config, config,
@@ -730,8 +729,11 @@ async def handle_generate_articles(payload: dict) -> None:
start_date = date_cls.fromisoformat(payload["start_date"]) start_date = date_cls.fromisoformat(payload["start_date"])
articles_per_day = payload.get("articles_per_day", 3) articles_per_day = payload.get("articles_per_day", 3)
limit = payload.get("limit", 500) limit = payload.get("limit", 500)
task_id = payload.get("_task_id")
count = await generate_articles(slug, start_date, articles_per_day, limit=limit) count = await generate_articles(
slug, start_date, articles_per_day, limit=limit, task_id=task_id
)
logger.info("Generated %s articles for template '%s'", count, slug) logger.info("Generated %s articles for template '%s'", count, slug)
@@ -753,6 +755,9 @@ async def process_task(task: dict) -> None:
try: try:
payload = json.loads(task["payload"]) if task["payload"] else {} payload = json.loads(task["payload"]) if task["payload"] else {}
# Inject task_id so progress-aware handlers (e.g. generate_articles) can
# write progress_current to the tasks table without a separate lookup.
payload["_task_id"] = task_id
await handler(payload) await handler(payload)
await mark_complete(task_id) await mark_complete(task_id)
logger.info("Completed: %s (id=%s)", task_name, task_id) logger.info("Completed: %s (id=%s)", task_name, task_id)