Compare commits
6 Commits
v202603011
...
v202603011
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b54f2d544 | ||
|
|
08bd2b2989 | ||
|
|
81a57db272 | ||
|
|
f92d863781 | ||
|
|
a3dd37b1be | ||
|
|
e5cbcf462e |
@@ -58,7 +58,7 @@ NTFY_TOKEN=
|
||||
#ENC[AES256_GCM,data:BCyQYjRnTx8yW9A=,iv:4OPCP+xzRLUJrpoFewVnbZRKnZH4sAbV76SM//2k5wU=,tag:HxwEp7VFVZUN/VjPiL/+Vw==,type:comment]
|
||||
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:YWM=,iv:iY5+uMazLAFdwyLT7Gr7MaF1QHBIgHuoi6nF2VbSsOA=,tag:dc6AmuJdTQ55gVe16uzs6A==,type:str]
|
||||
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:lfmlsjXFtL+zo40SNFLiFKaZiYvE7CNH+zRwjMK5pqPfCs0TlMX+Y9e1KmzAS+y/cI69TP5sgMPRBzER0Jn7RvH0KA==,iv:jBN/4/K5L5886G4rSzxt8V8u/57tAuj3R76haltzqeU=,tag:Xe6o9eg2PodfktDqmLgVNA==,type:str]
|
||||
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:X6xpxz5u8Xh3OXjkIz3UwqH847qLvY9cVWVktW5B+lqhmXAKTzoTzHds8vlRGJf5Up9Yx44XcigbvuK33ZJDSq9ovkAIbY55OK4=,iv:3hHyFD+H9HMzQ/27bPjGr59+7yWmEneUdN9XPQasCig=,tag:oBXsSuV5idB7HqNrNOruwg==,type:str]
|
||||
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:Eec0X65EMsV2PD3Qvn+JjGqYaHtLupn0k99H918vmuRuAinP3rv/pwEoyKHmygazrUExg7U2PUELycyzq3lU6RIGtO+r0pRAn/n0S8RwdoZS,iv:T+bfbvULwSLRVD/hyW7rDN8tLLBf1FQkwCEbpiuBB+0=,tag:W/YHfl5U2yaA7ZOXgAFw+Q==,type:str]
|
||||
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:1D9VRZ3MCXPQWfiMH8+CLcrxeYnVVcQgZDvt5kltvbSTuSHQ2hHDmZpBkTOMIBJnw4JLZ2JQKHgG4OaYDtsM2VltFPnfwaRgVI9G5PSenR3o4PeQmYO1AqWOmjn19jPxNXRhEXdupP9UT+xQNXoBJsl6RR20XOpMA5AipUHmSjD0UIKXoZLU,iv:uWUkAydac//qrOTPUThuOLKAKXK4xcZmK9qBVFwpqt4=,tag:1vYhukBW9kEuSXCLAiZZmQ==,type:str]
|
||||
CIRCUIT_BREAKER_THRESHOLD=
|
||||
#ENC[AES256_GCM,data:ZcX/OEbrMfKizIQYq3CYGnvzeTEX7KsmQaz2+Jj1rG5tbTy2aljQBIEkjtiwuo8NsNAD+FhIGRGVfBmKe1CAKME1MuiCbgSG,iv:4BSkeD3jZFawP09qECcqyuiWcDnCNSgbIjBATYhazq4=,tag:Ep1d2Uk700MOlWcLWaQ/ig==,type:comment]
|
||||
@@ -71,7 +71,7 @@ GEONAMES_USERNAME=ENC[AES256_GCM,data:aSkVdLNrhiF6tlg=,iv:eemFGwDIv3EG/P3lVHGZj9
|
||||
CENSUS_API_KEY=ENC[AES256_GCM,data:qqG971573aGq9MiHI2xLlanKKFwjfcNNoMXtm8LNbyh0rMbQN2XukQ==,iv:az2i0ldH75nHGah4DeOxaXmDbVYqmC1c77ptZqFA9BI=,tag:zoDdKj9bR7fgIDo1/dEU2g==,type:str]
|
||||
sops_age__list_0__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBxNWNmUzVNUGdWRnE0ZFpF\nM0JQZWZ3UDdEVzlwTmIxakxOZXBkT2x2ZlNrClRtV2M3S2daSGxUZmFDSWQ2Nmh4\neU51QndFcUxlSE00RFovOVJTcDZmUUUKLS0tIDcvL3hRMDRoMWZZSXljNzA3WG5o\nMWFic21MV0krMzlIaldBTVU0ZDdlTE0K7euGQtA+9lHNws+x7TMCArZamm9att96\nL8cXoUDWe5fNI5+M1bXReqVfNwPTwZsV6j/+ZtYKybklIzWz02Ex4A==\n-----END AGE ENCRYPTED FILE-----\n
|
||||
sops_age__list_0__map_recipient=age1f5002gj4s78jju45jd28kuejtcfhn5cdujz885fl7z2p9ym68pnsgky87a
|
||||
sops_lastmodified=2026-02-28T15:50:46Z
|
||||
sops_mac=ENC[AES256_GCM,data:HiLZTLa+p3mqa4hw+tKOK27F/bsJOy4jmDi8MHToi6S7tRfBA/TzcEzXvXUIkkwAixN73NQHvBVeRnbcEsApVpkaxH1OqnjvvyT+B3YFkTEtxczaKGWlCvbqFZNmXYsFvGR9njaWYWsTQPkRIjrroXrSrhr7uxC8F40v7ByxJKo=,iv:qj2IpzWRIh/mM1HtjjkNbyFuhtORKXslVnf/vdEC9Uw=,tag:fr9CZsL74HxRJLXn9eS0xQ==,type:str]
|
||||
sops_lastmodified=2026-03-01T13:26:08Z
|
||||
sops_mac=ENC[AES256_GCM,data:WmbT6tCUEoCDyKu673NQoJNzmCiilpG8yDVGl6ObxTOYleWt+1DVdPS+XUV+0Wd4bfkEhGTEfXAyy+wfoCVfYnenMuDGjXUUdsvqrOX6nnNCJ8nIntL46LfbRsbVrU6eeYGu/TaTyfouWjkk6pqlxffNSS6rrEFNZE4Q+v58+EI=,iv:TuCEmK6YJXsYISbN4mbuVbS6OvUNuhPRLstjjNkkrPk=,tag:hWLS036q7H5lMNpR6gZBVA==,type:str]
|
||||
sops_unencrypted_suffix=_unencrypted
|
||||
sops_version=3.12.1
|
||||
|
||||
@@ -39,8 +39,8 @@ ALERT_WEBHOOK_URL=ENC[AES256_GCM,data:4sXQk8zklruC525J279TUUatdDJQ43qweuoPhtpI82
|
||||
NTFY_TOKEN=ENC[AES256_GCM,data:YlOxhsRJ8P1y4kk6ugWm41iyRCsM6oAWjvbU9lGcD0A=,iv:JZXOvi3wTOPV9A46c7fMiqbszNCvXkOgh9i/H1hob24=,tag:8xnPimgy7sesOAnxhaXmpg==,type:str]
|
||||
SUPERVISOR_GIT_PULL=ENC[AES256_GCM,data:mg==,iv:KgqMVYj12FjOzWxtA1T0r0pqCDJ6MtHzMjE+4W/W+s4=,tag:czFaOqhHG8nqrQ8AZ8QiGw==,type:str]
|
||||
#ENC[AES256_GCM,data:hzAZvCWc4RTk290=,iv:RsSI4OpAOQGcFVpfXDZ6t705yWmlO0JEWwWF5uQu9As=,tag:UPqFtA2tXiSa0vzJAv8qXg==,type:comment]
|
||||
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:x/F0toXDc8stsUNxaepCmxq1+WuacqqPtdc+R5mxTwcAzsKxCdwt8KpBZWMvz7ku4tHDGsKD949QAX2ANXP9oCMTgW0=,iv:6G9gE9/v7GaYj8aqVTmMrpw6AcQK9yMSCAohNdAD1Ws=,tag:2Jimr1ldVSfkh8LPEwdN3w==,type:str]
|
||||
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:6BfXBYmyHpgZU/kJWpZLf8eH5VowVK1n0r6GzFTNAx/OmyaaS1RZVPC1JPkPBnTwEmo0WHYRW8uiUdkABmH9F5ZqqlsAesyfW7zvU9r7yD+D7w==,iv:3CBn2qCoTueQy8xVcQqZS4E3F0qoFYnNbzTZTpJ1veo=,tag:wC3Ecl4uNTwPiT23ATvRZg==,type:str]
|
||||
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:vxRcXQ/8TUTCtr6hKWBD1zVF47GFSfluIHZ8q0tt8SqQOWDdDe2D7Of6boy/kG3lqlpl7TjqMGJ7fLORcr0klKCykQ==,iv:YjegXXtIXm2qr0a3ZHRHxj3L1JoGZ1iQXkVXQupGQ2E=,tag:kahoHRskXbzplZasWOeiig==,type:str]
|
||||
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:23TgU6oUeO7J+MFkraALQ5/RO38DZ3ib5oYYJr7Lj3KXQSlRsgwA+bJlweI5gcUpFphnPXvmwFGiuL6AeY8LzAQ3bx46dcZa5w9LfKw2PMFt,iv:AGXwYLqWjT5VmU02qqada3PbdjfC0mLK2sPruO0uru8=,tag:Z2IS/JPOqWX+x0LZYwyArA==,type:str]
|
||||
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:/N77CFf6tJWCk7HrnBOm2Q1ynx7XoblzfbzJySeCjrxqiu4r+CB90aDkaPahlQKI00DUZih3pcy7WhnjdAwI30G5kJZ3P8H8/R0tP7OBK1wPVbsJq8prQJPFOAWewsS4KWNtSURZPYSCxslcBb7DHLX6ZAjv6A5KFOjRK2N8usR9sIabrCWh,iv:G3Ropu/JGytZK/zKsNGFjjSu3Wt6fvHaAqI9RpUHvlI=,tag:fv6xuS94OR+4xfiyKrYELA==,type:str]
|
||||
PROXY_CONCURRENCY=ENC[AES256_GCM,data:vdEZ,iv:+eTNQO+s/SsVDBLg1/+fneMzEEsFkuEFxo/FcVV+mWc=,tag:i/EPwi/jOoWl3xW8H0XMdw==,type:str]
|
||||
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:L2s=,iv:fV3mCKmK5fxUmIWRePELBDAPTb8JZqasVIhnAl55kYw=,tag:XL+PO6sblz/7WqHC3dtk1w==,type:str]
|
||||
@@ -58,7 +58,7 @@ sops_age__list_1__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb2
|
||||
sops_age__list_1__map_recipient=age1wjepykv3glvsrtegu25tevg7vyn3ngpl607u3yjc9ucay04s045s796msw
|
||||
sops_age__list_2__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBFeHhaOURNZnRVMEwxNThu\nUjF4Q0kwUXhTUE1QSzZJbmpubnh3RnpQTmdvCjRmWWxpNkxFUmVGb3NRbnlydW5O\nWEg3ZXJQTU4vcndzS2pUQXY3Q0ttYjAKLS0tIE9IRFJ1c2ZxbGVHa2xTL0swbGN1\nTzgwMThPUDRFTWhuZHJjZUYxOTZrU00KY62qrNBCUQYxwcLMXFEnLkwncxq3BPJB\nKm4NzeHBU87XmPWVrgrKuf+PH1mxJlBsl7Hev8xBTy7l6feiZjLIvQ==\n-----END AGE ENCRYPTED FILE-----\n
|
||||
sops_age__list_2__map_recipient=age1c783ym2q5x9tv7py5d28uc4k44aguudjn03g97l9nzs00dd9tsrqum8h4d
|
||||
sops_lastmodified=2026-03-01T00:26:54Z
|
||||
sops_mac=ENC[AES256_GCM,data:DdcABGVm9KbAcFrF0iuZlAaugsouNs7Hon2mZISaHs15/2H/Pd9FniXW3KeQ0+/NdZFQkz/h3i3bVFampcpFS1AxuOE5+1/IgWn8sKtaqPc7E9y8g6lxMnwTkUX2z+n/Q2nR8KAcO9IyE0GNjIluMWkxPWQuLzlRYDOjRN4/1e0=,iv:rm+6lXhYu6VUmrdCIrU0BRN2/ooa21Fw1ESWxr7vATg=,tag:GZmLLZf/LQaNeNNAAEg5bA==,type:str]
|
||||
sops_lastmodified=2026-03-01T13:25:41Z
|
||||
sops_mac=ENC[AES256_GCM,data:EL9Bgo0pWWECeHaaM1bHtkvwBgBmS3P2cX+6oahHKmLEJLI7P7fiomP7G8SdrfUyNpZaP9d4LlfwZSuCPqH6rP8jzF67oNkfXfd/xK4OW2U2TqSvouCMzlhqVQgS4HHl5EgvOI488WEIZko7KK2A1rxnpkm8C29WG9d9G64LKvw=,iv:XzsNm3CXnlC6SIef63BdddALjGustp8czHQCWOtjXBQ=,tag:zll0db6K1+M4brOpfVWnhg==,type:str]
|
||||
sops_unencrypted_suffix=_unencrypted
|
||||
sops_version=3.12.1
|
||||
|
||||
10
CHANGELOG.md
10
CHANGELOG.md
@@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Fixed
|
||||
- **Proxy URL scheme validation in `load_proxy_tiers()`** — URLs in `PROXY_URLS_DATACENTER` / `PROXY_URLS_RESIDENTIAL` that are missing an `http://` or `https://` scheme are now logged as a warning and skipped, rather than being passed through and causing SSL handshake failures or connection errors at request time. Also fixed a missing `http://` prefix in the dev `.env` `PROXY_URLS_DATACENTER` entry.
|
||||
|
||||
### Changed
|
||||
- **Per-proxy dead tracking in tiered cycler** — `make_tiered_cycler` now accepts a `proxy_failure_limit` parameter (default 3). Individual proxies that hit the limit are marked dead and permanently skipped by `next_proxy()`. If all proxies in the active tier are dead, `next_proxy()` auto-escalates to the next tier without needing the tier-level threshold. `record_failure(proxy_url)` and `record_success(proxy_url)` accept an optional `proxy_url` argument for per-proxy tracking; callers without `proxy_url` are fully backward-compatible. New `dead_proxy_count()` callable exposed for monitoring.
|
||||
- `extract/padelnomics_extract/src/padelnomics_extract/proxy.py`: added per-proxy state (`proxy_failure_counts`, `dead_proxies`), updated `next_proxy`/`record_failure`/`record_success`, added `dead_proxy_count`
|
||||
@@ -14,6 +17,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
|
||||
- `web/tests/test_supervisor.py`: 11 new tests in `TestTieredCyclerDeadProxyTracking` covering dead proxy skipping, auto-escalation, `dead_proxy_count`, backward compat, and thread safety
|
||||
|
||||
### Added
|
||||
- **Pipeline Transform tab + live extraction status** — new "Transform" tab in the pipeline admin with status cards for SQLMesh transform and export-serving tasks, a "Run Full Pipeline" button, and a recent run history table. The Overview tab now auto-polls every 5 s while an extraction task is pending and stops automatically when quiet. Per-extractor "Run" buttons use HTMX in-place updates instead of redirects. The header "Run Pipeline" button now enqueues the full ELT pipeline (extract → transform → export) instead of extraction only. Three new worker task handlers: `run_transform` (sqlmesh plan prod --auto-apply, 2 h timeout), `run_export` (export_serving.py, 10 min timeout), `run_pipeline` (sequential, stops on first failure). Concurrency guard prevents double-enqueuing the same step.
|
||||
- `web/src/padelnomics/worker.py`: `handle_run_transform`, `handle_run_export`, `handle_run_pipeline`
|
||||
- `web/src/padelnomics/admin/pipeline_routes.py`: `_render_overview_partial()`, `_fetch_pipeline_tasks()`, `_format_duration()`, `pipeline_transform()`, `pipeline_trigger_transform()`; `pipeline_trigger_extract()` now HTMX-aware
|
||||
- `web/src/padelnomics/admin/templates/admin/pipeline.html`: pulse animation on `.status-dot.running`, Transform tab button, rewired header button
|
||||
- `web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html`: self-polling wrapper, HTMX Run buttons
|
||||
- `web/src/padelnomics/admin/templates/admin/partials/pipeline_transform.html`: new file
|
||||
|
||||
- **Affiliate programs management** — centralised retailer config (`affiliate_programs` table) with URL template + tracking tag + commission %. Products now use a program dropdown + product identifier (e.g. ASIN) instead of manually baking full URLs. URL is assembled at redirect time via `build_affiliate_url()`, so changing a tag propagates instantly to all products. Legacy products (baked `affiliate_url`) continue to work via fallback. Amazon OneLink configured in the Associates dashboard handles geo-redirect to local marketplaces — no per-country programs needed.
|
||||
- `web/src/padelnomics/migrations/versions/0027_affiliate_programs.py`: `affiliate_programs` table, nullable `program_id` + `product_identifier` columns on `affiliate_products`, seeds "Amazon" program, backfills ASINs from existing URLs
|
||||
- `web/src/padelnomics/affiliate.py`: `get_all_programs()`, `get_program()`, `get_program_by_slug()`, `build_affiliate_url()`; `get_product()` JOINs program for redirect assembly; `_parse_product()` extracts `_program` sub-dict
|
||||
|
||||
@@ -88,8 +88,14 @@ def load_proxy_tiers() -> list[list[str]]:
|
||||
for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
|
||||
raw = os.environ.get(var, "")
|
||||
urls = [u.strip() for u in raw.split(",") if u.strip()]
|
||||
if urls:
|
||||
tiers.append(urls)
|
||||
valid = []
|
||||
for url in urls:
|
||||
if not url.startswith(("http://", "https://")):
|
||||
logger.warning("%s contains URL without scheme, skipping: %s", var, url[:60])
|
||||
continue
|
||||
valid.append(url)
|
||||
if valid:
|
||||
tiers.append(valid)
|
||||
|
||||
return tiers
|
||||
|
||||
|
||||
@@ -6,7 +6,9 @@ Operational visibility for the data extraction and transformation pipeline:
|
||||
/admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats
|
||||
/admin/pipeline/extractions → HTMX tab: filterable extraction run history
|
||||
/admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed
|
||||
/admin/pipeline/extract/trigger → POST: enqueue full extraction run
|
||||
/admin/pipeline/extract/trigger → POST: enqueue extraction run (HTMX-aware)
|
||||
/admin/pipeline/transform → HTMX tab: SQLMesh + export status, run history
|
||||
/admin/pipeline/transform/trigger → POST: enqueue transform/export/pipeline step
|
||||
/admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data)
|
||||
/admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample)
|
||||
/admin/pipeline/query → HTMX tab: SQL query editor
|
||||
@@ -18,6 +20,7 @@ Data sources:
|
||||
- analytics.duckdb (DuckDB read-only via analytics.execute_user_query)
|
||||
- LANDING_DIR/ (filesystem scan for file sizes + dates)
|
||||
- infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib)
|
||||
- app.db tasks table (run_transform, run_export, run_pipeline task rows)
|
||||
"""
|
||||
import asyncio
|
||||
import json
|
||||
@@ -626,10 +629,8 @@ async def pipeline_dashboard():
|
||||
# ── Overview tab ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@bp.route("/overview")
|
||||
@role_required("admin")
|
||||
async def pipeline_overview():
|
||||
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
|
||||
async def _render_overview_partial():
|
||||
"""Build and render the pipeline overview partial (shared by GET and POST triggers)."""
|
||||
latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather(
|
||||
asyncio.to_thread(_fetch_latest_per_extractor_sync),
|
||||
asyncio.to_thread(_get_landing_zone_stats_sync),
|
||||
@@ -650,6 +651,13 @@ async def pipeline_overview():
|
||||
"stale": _is_stale(run) if run else False,
|
||||
})
|
||||
|
||||
# Treat pending extraction tasks as "running" (queued or active).
|
||||
from ..core import fetch_all as _fetch_all # noqa: PLC0415
|
||||
pending_extraction = await _fetch_all(
|
||||
"SELECT id FROM tasks WHERE task_name = 'run_extraction' AND status = 'pending' LIMIT 1"
|
||||
)
|
||||
any_running = bool(pending_extraction)
|
||||
|
||||
# Compute landing zone totals
|
||||
total_landing_bytes = sum(s["total_bytes"] for s in landing_stats)
|
||||
|
||||
@@ -677,10 +685,18 @@ async def pipeline_overview():
|
||||
total_landing_bytes=total_landing_bytes,
|
||||
serving_tables=serving_tables,
|
||||
last_export=last_export,
|
||||
any_running=any_running,
|
||||
format_bytes=_format_bytes,
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/overview")
|
||||
@role_required("admin")
|
||||
async def pipeline_overview():
|
||||
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
|
||||
return await _render_overview_partial()
|
||||
|
||||
|
||||
# ── Extractions tab ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -745,7 +761,11 @@ async def pipeline_mark_stale(run_id: int):
|
||||
@role_required("admin")
|
||||
@csrf_protect
|
||||
async def pipeline_trigger_extract():
|
||||
"""Enqueue an extraction run — all extractors, or a single named one."""
|
||||
"""Enqueue an extraction run — all extractors, or a single named one.
|
||||
|
||||
HTMX-aware: if the HX-Request header is present, returns the overview partial
|
||||
directly so the UI can update in-place without a redirect.
|
||||
"""
|
||||
from ..worker import enqueue
|
||||
|
||||
form = await request.form
|
||||
@@ -757,11 +777,15 @@ async def pipeline_trigger_extract():
|
||||
await flash(f"Unknown extractor '{extractor}'.", "warning")
|
||||
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||
await enqueue("run_extraction", {"extractor": extractor})
|
||||
await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success")
|
||||
else:
|
||||
await enqueue("run_extraction")
|
||||
await flash("Extraction run queued. Check the task queue for progress.", "success")
|
||||
|
||||
is_htmx = request.headers.get("HX-Request") == "true"
|
||||
if is_htmx:
|
||||
return await _render_overview_partial()
|
||||
|
||||
msg = f"Extractor '{extractor}' queued." if extractor else "Extraction run queued."
|
||||
await flash(f"{msg} Check the task queue for progress.", "success")
|
||||
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||
|
||||
|
||||
@@ -847,6 +871,156 @@ async def pipeline_lineage_schema(model: str):
|
||||
)
|
||||
|
||||
|
||||
# ── Transform tab ─────────────────────────────────────────────────────────────
|
||||
|
||||
_TRANSFORM_TASK_NAMES = ("run_transform", "run_export", "run_pipeline")
|
||||
|
||||
|
||||
async def _fetch_pipeline_tasks() -> dict:
|
||||
"""Fetch the latest task row for each transform task type, plus recent run history.
|
||||
|
||||
Returns:
|
||||
{
|
||||
"latest": {"run_transform": row|None, "run_export": row|None, "run_pipeline": row|None},
|
||||
"history": [row, ...], # last 20 rows across all three task types, newest first
|
||||
}
|
||||
"""
|
||||
from ..core import fetch_all as _fetch_all # noqa: PLC0415
|
||||
|
||||
# Latest row per task type (may be pending, complete, or failed)
|
||||
latest_rows = await _fetch_all(
|
||||
"""
|
||||
SELECT t.*
|
||||
FROM tasks t
|
||||
INNER JOIN (
|
||||
SELECT task_name, MAX(id) AS max_id
|
||||
FROM tasks
|
||||
WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline')
|
||||
GROUP BY task_name
|
||||
) latest ON t.id = latest.max_id
|
||||
"""
|
||||
)
|
||||
latest: dict = {"run_transform": None, "run_export": None, "run_pipeline": None}
|
||||
for row in latest_rows:
|
||||
latest[row["task_name"]] = dict(row)
|
||||
|
||||
history = await _fetch_all(
|
||||
"""
|
||||
SELECT id, task_name, status, created_at, completed_at, error
|
||||
FROM tasks
|
||||
WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline')
|
||||
ORDER BY id DESC
|
||||
LIMIT 20
|
||||
"""
|
||||
)
|
||||
return {"latest": latest, "history": [dict(r) for r in history]}
|
||||
|
||||
|
||||
def _format_duration(created_at: str | None, completed_at: str | None) -> str:
|
||||
"""Human-readable duration between created_at and completed_at, or '' if unavailable."""
|
||||
if not created_at or not completed_at:
|
||||
return ""
|
||||
try:
|
||||
fmt = "%Y-%m-%d %H:%M:%S"
|
||||
start = datetime.strptime(created_at, fmt)
|
||||
end = datetime.strptime(completed_at, fmt)
|
||||
delta = int((end - start).total_seconds())
|
||||
if delta < 0:
|
||||
return ""
|
||||
if delta < 60:
|
||||
return f"{delta}s"
|
||||
return f"{delta // 60}m {delta % 60}s"
|
||||
except ValueError:
|
||||
return ""
|
||||
|
||||
|
||||
async def _render_transform_partial():
|
||||
"""Build and render the transform tab partial."""
|
||||
task_data = await _fetch_pipeline_tasks()
|
||||
latest = task_data["latest"]
|
||||
history = task_data["history"]
|
||||
|
||||
# Enrich history rows with duration
|
||||
for row in history:
|
||||
row["duration"] = _format_duration(row.get("created_at"), row.get("completed_at"))
|
||||
# Truncate error for display
|
||||
if row.get("error"):
|
||||
row["error_short"] = row["error"][:120]
|
||||
else:
|
||||
row["error_short"] = None
|
||||
|
||||
any_running = any(
|
||||
t is not None and t["status"] == "pending" for t in latest.values()
|
||||
)
|
||||
|
||||
serving_meta = await asyncio.to_thread(_load_serving_meta)
|
||||
|
||||
return await render_template(
|
||||
"admin/partials/pipeline_transform.html",
|
||||
latest=latest,
|
||||
history=history,
|
||||
any_running=any_running,
|
||||
serving_meta=serving_meta,
|
||||
format_duration=_format_duration,
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/transform")
|
||||
@role_required("admin")
|
||||
async def pipeline_transform():
|
||||
"""HTMX tab: SQLMesh transform + export status, run history."""
|
||||
return await _render_transform_partial()
|
||||
|
||||
|
||||
@bp.route("/transform/trigger", methods=["POST"])
|
||||
@role_required("admin")
|
||||
@csrf_protect
|
||||
async def pipeline_trigger_transform():
|
||||
"""Enqueue a transform, export, or full pipeline task.
|
||||
|
||||
form field `step`: 'transform' | 'export' | 'pipeline'
|
||||
Concurrency guard: rejects if the same task type is already pending.
|
||||
HTMX-aware: returns the transform partial for HTMX requests.
|
||||
"""
|
||||
from ..core import fetch_one as _fetch_one # noqa: PLC0415
|
||||
from ..worker import enqueue
|
||||
|
||||
form = await request.form
|
||||
step = (form.get("step") or "").strip()
|
||||
|
||||
step_to_task = {
|
||||
"transform": "run_transform",
|
||||
"export": "run_export",
|
||||
"pipeline": "run_pipeline",
|
||||
}
|
||||
if step not in step_to_task:
|
||||
await flash(f"Unknown step '{step}'.", "warning")
|
||||
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||
|
||||
task_name = step_to_task[step]
|
||||
|
||||
# Concurrency guard: reject if same task type is already pending
|
||||
existing = await _fetch_one(
|
||||
"SELECT id FROM tasks WHERE task_name = ? AND status = 'pending' LIMIT 1",
|
||||
(task_name,),
|
||||
)
|
||||
if existing:
|
||||
is_htmx = request.headers.get("HX-Request") == "true"
|
||||
if is_htmx:
|
||||
return await _render_transform_partial()
|
||||
await flash(f"A '{step}' task is already queued (task #{existing['id']}).", "warning")
|
||||
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||
|
||||
await enqueue(task_name)
|
||||
|
||||
is_htmx = request.headers.get("HX-Request") == "true"
|
||||
if is_htmx:
|
||||
return await _render_transform_partial()
|
||||
|
||||
await flash(f"'{step}' task queued. Check the task queue for progress.", "success")
|
||||
return redirect(url_for("pipeline.pipeline_dashboard"))
|
||||
|
||||
|
||||
# ── Catalog tab ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
<!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone -->
|
||||
<!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone
|
||||
Self-polls every 5s while any extraction task is pending, stops when quiet. -->
|
||||
|
||||
<div id="pipeline-overview-content"
|
||||
hx-get="{{ url_for('pipeline.pipeline_overview') }}"
|
||||
hx-target="this"
|
||||
hx-swap="outerHTML"
|
||||
{% if any_running %}hx-trigger="every 5s"{% endif %}>
|
||||
|
||||
<!-- Extraction Status Grid -->
|
||||
<div class="card mb-4">
|
||||
@@ -26,12 +33,14 @@
|
||||
{% if stale %}
|
||||
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
|
||||
{% endif %}
|
||||
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0 ml-auto">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||
<input type="hidden" name="extractor" value="{{ wf.name }}">
|
||||
<button type="button" class="btn btn-sm" style="padding:2px 8px;font-size:11px"
|
||||
onclick="confirmAction('Run {{ wf.name }} extractor?', this.closest('form'))">Run</button>
|
||||
</form>
|
||||
<button type="button"
|
||||
class="btn btn-sm ml-auto"
|
||||
style="padding:2px 8px;font-size:11px"
|
||||
hx-post="{{ url_for('pipeline.pipeline_trigger_extract') }}"
|
||||
hx-target="#pipeline-overview-content"
|
||||
hx-swap="outerHTML"
|
||||
hx-vals='{"extractor": "{{ wf.name }}", "csrf_token": "{{ csrf_token() }}"}'
|
||||
onclick="if (!confirm('Run {{ wf.name }} extractor?')) return false;">Run</button>
|
||||
</div>
|
||||
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
|
||||
{% if run %}
|
||||
@@ -131,3 +140,5 @@
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
</div>{# end #pipeline-overview-content #}
|
||||
|
||||
@@ -0,0 +1,197 @@
|
||||
<!-- Pipeline Transform Tab: SQLMesh + export status, run history
|
||||
Self-polls every 5s while any transform/export task is pending. -->
|
||||
|
||||
<div id="pipeline-transform-content"
|
||||
hx-get="{{ url_for('pipeline.pipeline_transform') }}"
|
||||
hx-target="this"
|
||||
hx-swap="outerHTML"
|
||||
{% if any_running %}hx-trigger="every 5s"{% endif %}>
|
||||
|
||||
<!-- Status Cards: Transform + Export -->
|
||||
<div class="pipeline-two-col mb-4">
|
||||
|
||||
<!-- SQLMesh Transform -->
|
||||
{% set tx = latest['run_transform'] %}
|
||||
<div class="card">
|
||||
<p class="card-header">SQLMesh Transform</p>
|
||||
<div class="flex items-center gap-2 mb-3">
|
||||
{% if tx is none %}
|
||||
<span class="status-dot pending"></span>
|
||||
<span class="text-sm text-slate">Never run</span>
|
||||
{% elif tx.status == 'pending' %}
|
||||
<span class="status-dot running"></span>
|
||||
<span class="text-sm text-slate">Running…</span>
|
||||
{% elif tx.status == 'complete' %}
|
||||
<span class="status-dot ok"></span>
|
||||
<span class="text-sm text-slate">Complete</span>
|
||||
{% else %}
|
||||
<span class="status-dot failed"></span>
|
||||
<span class="text-sm text-danger">Failed</span>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% if tx %}
|
||||
<p class="text-xs text-slate mono">
|
||||
Started: {{ (tx.created_at or '')[:19] or '—' }}
|
||||
</p>
|
||||
{% if tx.completed_at %}
|
||||
<p class="text-xs text-slate mono">
|
||||
Finished: {{ tx.completed_at[:19] }}
|
||||
</p>
|
||||
{% endif %}
|
||||
{% if tx.status == 'failed' and tx.error %}
|
||||
<details class="mt-2">
|
||||
<summary class="text-xs text-danger cursor-pointer">Error</summary>
|
||||
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-height:8rem;white-space:pre-wrap">{{ tx.error[:400] }}</pre>
|
||||
</details>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
<div class="mt-3">
|
||||
<button type="button"
|
||||
class="btn btn-sm"
|
||||
{% if any_running %}disabled{% endif %}
|
||||
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
|
||||
hx-target="#pipeline-transform-content"
|
||||
hx-swap="outerHTML"
|
||||
hx-vals='{"step": "transform", "csrf_token": "{{ csrf_token() }}"}'
|
||||
onclick="if (!confirm('Run SQLMesh transform (prod --auto-apply)?')) return false;">
|
||||
Run Transform
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Export Serving -->
|
||||
{% set ex = latest['run_export'] %}
|
||||
<div class="card">
|
||||
<p class="card-header">Export Serving</p>
|
||||
<div class="flex items-center gap-2 mb-3">
|
||||
{% if ex is none %}
|
||||
<span class="status-dot pending"></span>
|
||||
<span class="text-sm text-slate">Never run</span>
|
||||
{% elif ex.status == 'pending' %}
|
||||
<span class="status-dot running"></span>
|
||||
<span class="text-sm text-slate">Running…</span>
|
||||
{% elif ex.status == 'complete' %}
|
||||
<span class="status-dot ok"></span>
|
||||
<span class="text-sm text-slate">Complete</span>
|
||||
{% else %}
|
||||
<span class="status-dot failed"></span>
|
||||
<span class="text-sm text-danger">Failed</span>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% if ex %}
|
||||
<p class="text-xs text-slate mono">
|
||||
Started: {{ (ex.created_at or '')[:19] or '—' }}
|
||||
</p>
|
||||
{% if ex.completed_at %}
|
||||
<p class="text-xs text-slate mono">
|
||||
Finished: {{ ex.completed_at[:19] }}
|
||||
</p>
|
||||
{% endif %}
|
||||
{% if serving_meta %}
|
||||
<p class="text-xs text-slate mt-1">
|
||||
Last export: <span class="font-semibold mono">{{ (serving_meta.exported_at_utc or '')[:19].replace('T', ' ') or '—' }}</span>
|
||||
</p>
|
||||
{% endif %}
|
||||
{% if ex.status == 'failed' and ex.error %}
|
||||
<details class="mt-2">
|
||||
<summary class="text-xs text-danger cursor-pointer">Error</summary>
|
||||
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-height:8rem;white-space:pre-wrap">{{ ex.error[:400] }}</pre>
|
||||
</details>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
<div class="mt-3">
|
||||
<button type="button"
|
||||
class="btn btn-sm"
|
||||
{% if any_running %}disabled{% endif %}
|
||||
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
|
||||
hx-target="#pipeline-transform-content"
|
||||
hx-swap="outerHTML"
|
||||
hx-vals='{"step": "export", "csrf_token": "{{ csrf_token() }}"}'
|
||||
onclick="if (!confirm('Export serving tables (lakehouse → analytics.duckdb)?')) return false;">
|
||||
Run Export
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
<!-- Run Full Pipeline -->
|
||||
{% set pl = latest['run_pipeline'] %}
|
||||
<div class="card mb-4">
|
||||
<div class="flex items-center justify-between flex-wrap gap-3">
|
||||
<div>
|
||||
<p class="font-semibold text-navy text-sm">Full Pipeline</p>
|
||||
<p class="text-xs text-slate mt-1">Runs extract → transform → export sequentially</p>
|
||||
{% if pl %}
|
||||
<p class="text-xs text-slate mono mt-1">
|
||||
Last: {{ (pl.created_at or '')[:19] or '—' }}
|
||||
{% if pl.status == 'complete' %}<span class="badge-success ml-2">Complete</span>{% endif %}
|
||||
{% if pl.status == 'pending' %}<span class="badge-warning ml-2">Running…</span>{% endif %}
|
||||
{% if pl.status == 'failed' %}<span class="badge-danger ml-2">Failed</span>{% endif %}
|
||||
</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
<button type="button"
|
||||
class="btn btn-sm"
|
||||
{% if any_running %}disabled{% endif %}
|
||||
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
|
||||
hx-target="#pipeline-transform-content"
|
||||
hx-swap="outerHTML"
|
||||
hx-vals='{"step": "pipeline", "csrf_token": "{{ csrf_token() }}"}'
|
||||
onclick="if (!confirm('Run full ELT pipeline (extract → transform → export)?')) return false;">
|
||||
Run Full Pipeline
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Recent Runs -->
|
||||
<div class="card">
|
||||
<p class="card-header">Recent Runs</p>
|
||||
{% if history %}
|
||||
<div style="overflow-x:auto">
|
||||
<table class="table" style="font-size:0.8125rem">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>#</th>
|
||||
<th>Step</th>
|
||||
<th>Started</th>
|
||||
<th>Duration</th>
|
||||
<th>Status</th>
|
||||
<th>Error</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for row in history %}
|
||||
<tr>
|
||||
<td class="text-xs text-slate">{{ row.id }}</td>
|
||||
<td class="mono text-xs">{{ row.task_name | replace('run_', '') }}</td>
|
||||
<td class="mono text-xs text-slate">{{ (row.created_at or '')[:19] or '—' }}</td>
|
||||
<td class="mono text-xs text-slate">{{ row.duration or '—' }}</td>
|
||||
<td>
|
||||
{% if row.status == 'complete' %}
|
||||
<span class="badge-success">Complete</span>
|
||||
{% elif row.status == 'failed' %}
|
||||
<span class="badge-danger">Failed</span>
|
||||
{% else %}
|
||||
<span class="badge-warning">Running…</span>
|
||||
{% endif %}
|
||||
</td>
|
||||
<td>
|
||||
{% if row.error_short %}
|
||||
<details>
|
||||
<summary class="text-xs text-danger cursor-pointer">Error</summary>
|
||||
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-width:24rem;white-space:pre-wrap">{{ row.error_short }}</pre>
|
||||
</details>
|
||||
{% else %}—{% endif %}
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
{% else %}
|
||||
<p class="text-sm text-slate">No transform runs yet.</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
</div>{# end #pipeline-transform-content #}
|
||||
@@ -33,6 +33,9 @@
|
||||
.status-dot.failed { background: #EF4444; }
|
||||
.status-dot.stale { background: #D97706; }
|
||||
.status-dot.running { background: #3B82F6; }
|
||||
|
||||
@keyframes pulse-dot { 0%,100%{opacity:1} 50%{opacity:0.4} }
|
||||
.status-dot.running { animation: pulse-dot 1.5s ease-in-out infinite; }
|
||||
.status-dot.pending { background: #CBD5E1; }
|
||||
|
||||
.pipeline-two-col {
|
||||
@@ -53,10 +56,11 @@
|
||||
<p class="text-sm text-slate mt-1">Extraction status, data catalog, and ad-hoc query editor</p>
|
||||
</div>
|
||||
<div class="flex gap-2">
|
||||
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0">
|
||||
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_transform') }}" class="m-0">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||
<input type="hidden" name="step" value="pipeline">
|
||||
<button type="button" class="btn btn-sm"
|
||||
onclick="confirmAction('Enqueue a full extraction run? This will run all extractors in the background.', this.closest('form'))">
|
||||
onclick="confirmAction('Run full ELT pipeline (extract → transform → export)? This runs in the background.', this.closest('form'))">
|
||||
Run Pipeline
|
||||
</button>
|
||||
</form>
|
||||
@@ -116,6 +120,10 @@
|
||||
hx-get="{{ url_for('pipeline.pipeline_lineage') }}"
|
||||
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
|
||||
hx-trigger="click">Lineage</button>
|
||||
<button data-tab="transform"
|
||||
hx-get="{{ url_for('pipeline.pipeline_transform') }}"
|
||||
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
|
||||
hx-trigger="click">Transform</button>
|
||||
</div>
|
||||
|
||||
<!-- Tab content (Overview loads on page load) -->
|
||||
|
||||
@@ -735,6 +735,107 @@ async def handle_run_extraction(payload: dict) -> None:
|
||||
logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
|
||||
|
||||
|
||||
@task("run_transform")
|
||||
async def handle_run_transform(payload: dict) -> None:
|
||||
"""Run SQLMesh transform (prod plan --auto-apply) in the background.
|
||||
|
||||
Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply`.
|
||||
2-hour absolute timeout — same as extraction.
|
||||
"""
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
repo_root = Path(__file__).resolve().parents[4]
|
||||
result = await asyncio.to_thread(
|
||||
subprocess.run,
|
||||
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=7200,
|
||||
cwd=str(repo_root),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(
|
||||
f"SQLMesh transform failed (exit {result.returncode}): {result.stderr[:500]}"
|
||||
)
|
||||
logger.info("SQLMesh transform completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
|
||||
|
||||
|
||||
@task("run_export")
|
||||
async def handle_run_export(payload: dict) -> None:
|
||||
"""Export serving tables from lakehouse.duckdb → analytics.duckdb.
|
||||
|
||||
Shells out to `uv run python src/padelnomics/export_serving.py`.
|
||||
10-minute absolute timeout.
|
||||
"""
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
repo_root = Path(__file__).resolve().parents[4]
|
||||
result = await asyncio.to_thread(
|
||||
subprocess.run,
|
||||
["uv", "run", "python", "src/padelnomics/export_serving.py"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=600,
|
||||
cwd=str(repo_root),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(
|
||||
f"Export failed (exit {result.returncode}): {result.stderr[:500]}"
|
||||
)
|
||||
logger.info("Export completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
|
||||
|
||||
|
||||
@task("run_pipeline")
|
||||
async def handle_run_pipeline(payload: dict) -> None:
|
||||
"""Run full ELT pipeline: extract → transform → export, stopping on first failure."""
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
repo_root = Path(__file__).resolve().parents[4]
|
||||
|
||||
steps = [
|
||||
(
|
||||
"extraction",
|
||||
["uv", "run", "--package", "padelnomics_extract", "extract"],
|
||||
7200,
|
||||
),
|
||||
(
|
||||
"transform",
|
||||
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"],
|
||||
7200,
|
||||
),
|
||||
(
|
||||
"export",
|
||||
["uv", "run", "python", "src/padelnomics/export_serving.py"],
|
||||
600,
|
||||
),
|
||||
]
|
||||
|
||||
for step_name, cmd, timeout_seconds in steps:
|
||||
logger.info("Pipeline step starting: %s", step_name)
|
||||
result = await asyncio.to_thread(
|
||||
subprocess.run,
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout_seconds,
|
||||
cwd=str(repo_root),
|
||||
)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(
|
||||
f"Pipeline failed at {step_name} (exit {result.returncode}): {result.stderr[:500]}"
|
||||
)
|
||||
logger.info(
|
||||
"Pipeline step complete: %s — %s",
|
||||
step_name,
|
||||
result.stdout[-200:] if result.stdout else "(no output)",
|
||||
)
|
||||
|
||||
logger.info("Full pipeline complete (extract → transform → export)")
|
||||
|
||||
|
||||
@task("generate_articles")
|
||||
async def handle_generate_articles(payload: dict) -> None:
|
||||
"""Generate articles from a template in the background."""
|
||||
|
||||
Reference in New Issue
Block a user