Compare commits

..

15 Commits

Author SHA1 Message Date
Deeman
9b54f2d544 fix(secrets): add http:// scheme to proxy URLs in dev + prod SOPS
All checks were successful
CI / test (push) Successful in 51s
CI / tag (push) Successful in 3s
PROXY_URLS_DATACENTER was missing the scheme prefix, causing SSL
handshake failures on the Rayobyte HTTP-only proxy.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 14:28:35 +01:00
Deeman
08bd2b2989 chore(changelog): document proxy URL scheme validation fix
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 14:26:57 +01:00
Deeman
81a57db272 fix(proxy): skip URLs without scheme in load_proxy_tiers()
Validates each URL in PROXY_URLS_DATACENTER / PROXY_URLS_RESIDENTIAL:
logs a warning and skips any entry missing an http:// or https:// scheme
instead of passing malformed URLs that cause SSL or connection errors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 14:26:41 +01:00
Deeman
f92d863781 feat(pipeline): live extraction status + Transform tab
Adds HTMX live polling to the Overview tab (stops when quiet) and a new
Transform tab for managing the SQLMesh + export steps of the ELT pipeline.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 13:47:17 +01:00
Deeman
a3dd37b1be chore(changelog): document pipeline transform tab + live status feature
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 13:47:07 +01:00
Deeman
e5cbcf462e feat(pipeline): live extraction status + Transform tab
- worker: add run_transform, run_export, run_pipeline task handlers
  - run_transform: sqlmesh plan prod --auto-apply, 2h timeout
  - run_export: export_serving.py, 10min timeout
  - run_pipeline: sequential extract → transform → export, stops on first failure

- pipeline_routes: refactor overview into _render_overview_partial() helper,
  make pipeline_trigger_extract() HTMX-aware (returns partial on HX-Request),
  add _fetch_pipeline_tasks(), _format_duration() helpers,
  add pipeline_transform() + pipeline_trigger_transform() with concurrency guard

- pipeline_overview.html: wrap in self-polling div (every 5s while any_running),
  convert Run buttons to hx-post targeting #pipeline-overview-content

- pipeline.html: add pulse animation for .status-dot.running, add Transform tab
  button, rewire header "Run Pipeline" button to enqueue run_pipeline task

- pipeline_transform.html: new partial — status cards for transform + export,
  "Run Full Pipeline" card, recent runs table with duration + error details

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 13:46:11 +01:00
Deeman
169092c8ea fix(admin): make pipeline data view responsive on mobile
All checks were successful
CI / test (push) Successful in 50s
CI / tag (push) Successful in 2s
- Tab bar: add overflow-x:auto so 5 tabs scroll on narrow screens
- Overview grid: replace hardcoded 1fr 1fr with .pipeline-two-col (stacks below 640px)
- Overview tables: wrap Serving Tables + Landing Zone in overflow-x:auto divs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 13:16:58 +01:00
Deeman
6ae16f6c1f feat(proxy): per-proxy dead tracking in tiered cycler
All checks were successful
CI / test (push) Successful in 51s
CI / tag (push) Successful in 3s
2026-03-01 12:37:00 +01:00
Deeman
8b33daa4f3 feat(content): remove artificial 500-article generation cap
- fetch_template_data: default limit=0 (all rows); skip LIMIT clause when 0
- generate_articles: default limit=0
- worker handle_generate_articles: default to 0 instead of 500
- Remove "limit": 500 from all 4 enqueue payloads
- template_generate GET handler: use count_template_data() instead of fetch(limit=501) probe

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 12:33:58 +01:00
Deeman
a898a06575 feat(proxy): per-proxy dead tracking in tiered cycler
Add proxy_failure_limit param to make_tiered_cycler (default 3).
Individual proxies hitting the limit are marked dead and permanently
skipped. next_proxy() auto-escalates when all proxies in the active
tier are dead. Both mechanisms coexist: per-proxy dead tracking removes
broken individuals; tier-level threshold catches systemic failure.

- proxy.py: dead_proxies set + proxy_failure_counts dict in state;
  next_proxy skips dead proxies with bounded loop; record_failure/
  record_success accept optional proxy_url; dead_proxy_count() added
- playtomic_tenants.py: pass proxy_url to record_success/record_failure
- playtomic_availability.py: _worker returns (proxy_url, result);
  serial loops in extract + extract_recheck capture proxy_url
- test_supervisor.py: 11 new tests in TestTieredCyclerDeadProxyTracking

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 12:28:54 +01:00
Deeman
219554b7cb fix(extract): use tiered cycler in playtomic_tenants
Previously the tenants extractor flattened all proxy tiers into a single
round-robin list, bypassing the circuit breaker entirely. When the free
Webshare tier runs out of bandwidth (402), all 20 free proxies fail and
the batch crashes — the paid datacenter/residential proxies are never tried.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 12:13:58 +01:00
Deeman
1aedf78ec6 fix(extract): use tiered cycler in playtomic_tenants
Previously the tenants extractor flattened all proxy tiers into a single
round-robin list, bypassing the circuit breaker entirely. When the free
Webshare tier runs out of bandwidth (402), all 20 free proxies fail and
the batch crashes — the paid datacenter/residential proxies are never tried.

Changes:
- Replace make_round_robin_cycler with make_tiered_cycler (same as availability)
- Add _fetch_page_via_cycler: retries per page across tiers, records
  success/failure in cycler so circuit breaker can escalate
- Fix batch_size to BATCH_SIZE=20 constant (was len(all_proxies) ≈ 22)
- Check cycler.is_exhausted() before each batch; catch RuntimeError mid-batch
  and write partial results rather than crashing with nothing
- CIRCUIT_BREAKER_THRESHOLD from env (default 10), matching availability

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 12:13:50 +01:00
Deeman
8f2ffd432b fix(admin): correct docker volume mount + pipeline_routes repo root
All checks were successful
CI / test (push) Successful in 50s
CI / tag (push) Successful in 2s
- docker-compose.prod.yml: fix volume mount for all 6 web containers
  from /opt/padelnomics/data (stale) → /data/padelnomics (live supervisor output);
  add LANDING_DIR=/app/data/pipeline/landing so extraction/landing stats work
- pipeline_routes.py: fix _REPO_ROOT parents[5] → parents[4] so workflows.toml
  is found in dev and pipeline overview shows workflow schedules

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 11:41:29 +01:00
Deeman
c9dec066f7 fix(admin): mobile UX fixes — contrast, scroll, responsive grids
- CSS: `.nav-mobile a` → `.nav-mobile a:not(.nav-auth-btn)` to fix Sign
  Out button showing slate text instead of white on mobile
- base_admin.html: add `overflow-y: hidden` + `scrollbar-width: none` to
  `.admin-subnav` to eliminate ghost 1px scrollbar on Content tab row
- routes.py: pass `outreach_email=EMAIL_ADDRESSES["outreach"]` to outreach
  template so sending domain is no longer hardcoded
- outreach.html: display dynamic `outreach_email`; replace inline
  `repeat(6,1fr)` grid with responsive `.pipeline-status-grid` (2→3→6 cols)
- index.html: replace inline `repeat(5,1fr)` Lead/Supplier Funnel grids
  with responsive `.funnel-grid` class (2 cols mobile, 5 cols md+)
- pipeline.html: replace inline `repeat(4,1fr)` stat grid with responsive
  `.pipeline-stat-grid` (2 cols mobile, 4 cols md+)
- 4 partials (lead/email/supplier/outreach results): wrap `<table>` in
  `<div style="overflow-x:auto">` so tables scroll on narrow screens

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 11:20:46 +01:00
Deeman
fea4f85da3 perf(transform): optimize dim_locations spatial joins via IEJoin + country filters
All checks were successful
CI / test (push) Successful in 51s
CI / tag (push) Successful in 2s
Replace ABS() bbox predicates with BETWEEN in all three spatial CTEs
(nearest_padel, padel_local, tennis_nearby). BETWEEN enables DuckDB's
IEJoin (interval join) which is O((N+M) log M) vs the previous O(N×M)
nested-loop cross-join.

Add country pre-filters to restrict the left side from ~140K global
locations to ~20K rows for padel/tennis CTEs (~8 countries each).

Expected: ~50-200x speedup on the spatial CTE portion of the model.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 02:57:05 +01:00
25 changed files with 973 additions and 115 deletions

View File

@@ -58,7 +58,7 @@ NTFY_TOKEN=
#ENC[AES256_GCM,data:BCyQYjRnTx8yW9A=,iv:4OPCP+xzRLUJrpoFewVnbZRKnZH4sAbV76SM//2k5wU=,tag:HxwEp7VFVZUN/VjPiL/+Vw==,type:comment]
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:YWM=,iv:iY5+uMazLAFdwyLT7Gr7MaF1QHBIgHuoi6nF2VbSsOA=,tag:dc6AmuJdTQ55gVe16uzs6A==,type:str]
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:lfmlsjXFtL+zo40SNFLiFKaZiYvE7CNH+zRwjMK5pqPfCs0TlMX+Y9e1KmzAS+y/cI69TP5sgMPRBzER0Jn7RvH0KA==,iv:jBN/4/K5L5886G4rSzxt8V8u/57tAuj3R76haltzqeU=,tag:Xe6o9eg2PodfktDqmLgVNA==,type:str]
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:X6xpxz5u8Xh3OXjkIz3UwqH847qLvY9cVWVktW5B+lqhmXAKTzoTzHds8vlRGJf5Up9Yx44XcigbvuK33ZJDSq9ovkAIbY55OK4=,iv:3hHyFD+H9HMzQ/27bPjGr59+7yWmEneUdN9XPQasCig=,tag:oBXsSuV5idB7HqNrNOruwg==,type:str]
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:Eec0X65EMsV2PD3Qvn+JjGqYaHtLupn0k99H918vmuRuAinP3rv/pwEoyKHmygazrUExg7U2PUELycyzq3lU6RIGtO+r0pRAn/n0S8RwdoZS,iv:T+bfbvULwSLRVD/hyW7rDN8tLLBf1FQkwCEbpiuBB+0=,tag:W/YHfl5U2yaA7ZOXgAFw+Q==,type:str]
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:1D9VRZ3MCXPQWfiMH8+CLcrxeYnVVcQgZDvt5kltvbSTuSHQ2hHDmZpBkTOMIBJnw4JLZ2JQKHgG4OaYDtsM2VltFPnfwaRgVI9G5PSenR3o4PeQmYO1AqWOmjn19jPxNXRhEXdupP9UT+xQNXoBJsl6RR20XOpMA5AipUHmSjD0UIKXoZLU,iv:uWUkAydac//qrOTPUThuOLKAKXK4xcZmK9qBVFwpqt4=,tag:1vYhukBW9kEuSXCLAiZZmQ==,type:str]
CIRCUIT_BREAKER_THRESHOLD=
#ENC[AES256_GCM,data:ZcX/OEbrMfKizIQYq3CYGnvzeTEX7KsmQaz2+Jj1rG5tbTy2aljQBIEkjtiwuo8NsNAD+FhIGRGVfBmKe1CAKME1MuiCbgSG,iv:4BSkeD3jZFawP09qECcqyuiWcDnCNSgbIjBATYhazq4=,tag:Ep1d2Uk700MOlWcLWaQ/ig==,type:comment]
@@ -71,7 +71,7 @@ GEONAMES_USERNAME=ENC[AES256_GCM,data:aSkVdLNrhiF6tlg=,iv:eemFGwDIv3EG/P3lVHGZj9
CENSUS_API_KEY=ENC[AES256_GCM,data:qqG971573aGq9MiHI2xLlanKKFwjfcNNoMXtm8LNbyh0rMbQN2XukQ==,iv:az2i0ldH75nHGah4DeOxaXmDbVYqmC1c77ptZqFA9BI=,tag:zoDdKj9bR7fgIDo1/dEU2g==,type:str]
sops_age__list_0__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBxNWNmUzVNUGdWRnE0ZFpF\nM0JQZWZ3UDdEVzlwTmIxakxOZXBkT2x2ZlNrClRtV2M3S2daSGxUZmFDSWQ2Nmh4\neU51QndFcUxlSE00RFovOVJTcDZmUUUKLS0tIDcvL3hRMDRoMWZZSXljNzA3WG5o\nMWFic21MV0krMzlIaldBTVU0ZDdlTE0K7euGQtA+9lHNws+x7TMCArZamm9att96\nL8cXoUDWe5fNI5+M1bXReqVfNwPTwZsV6j/+ZtYKybklIzWz02Ex4A==\n-----END AGE ENCRYPTED FILE-----\n
sops_age__list_0__map_recipient=age1f5002gj4s78jju45jd28kuejtcfhn5cdujz885fl7z2p9ym68pnsgky87a
sops_lastmodified=2026-02-28T15:50:46Z
sops_mac=ENC[AES256_GCM,data:HiLZTLa+p3mqa4hw+tKOK27F/bsJOy4jmDi8MHToi6S7tRfBA/TzcEzXvXUIkkwAixN73NQHvBVeRnbcEsApVpkaxH1OqnjvvyT+B3YFkTEtxczaKGWlCvbqFZNmXYsFvGR9njaWYWsTQPkRIjrroXrSrhr7uxC8F40v7ByxJKo=,iv:qj2IpzWRIh/mM1HtjjkNbyFuhtORKXslVnf/vdEC9Uw=,tag:fr9CZsL74HxRJLXn9eS0xQ==,type:str]
sops_lastmodified=2026-03-01T13:26:08Z
sops_mac=ENC[AES256_GCM,data:WmbT6tCUEoCDyKu673NQoJNzmCiilpG8yDVGl6ObxTOYleWt+1DVdPS+XUV+0Wd4bfkEhGTEfXAyy+wfoCVfYnenMuDGjXUUdsvqrOX6nnNCJ8nIntL46LfbRsbVrU6eeYGu/TaTyfouWjkk6pqlxffNSS6rrEFNZE4Q+v58+EI=,iv:TuCEmK6YJXsYISbN4mbuVbS6OvUNuhPRLstjjNkkrPk=,tag:hWLS036q7H5lMNpR6gZBVA==,type:str]
sops_unencrypted_suffix=_unencrypted
sops_version=3.12.1

View File

@@ -39,8 +39,8 @@ ALERT_WEBHOOK_URL=ENC[AES256_GCM,data:4sXQk8zklruC525J279TUUatdDJQ43qweuoPhtpI82
NTFY_TOKEN=ENC[AES256_GCM,data:YlOxhsRJ8P1y4kk6ugWm41iyRCsM6oAWjvbU9lGcD0A=,iv:JZXOvi3wTOPV9A46c7fMiqbszNCvXkOgh9i/H1hob24=,tag:8xnPimgy7sesOAnxhaXmpg==,type:str]
SUPERVISOR_GIT_PULL=ENC[AES256_GCM,data:mg==,iv:KgqMVYj12FjOzWxtA1T0r0pqCDJ6MtHzMjE+4W/W+s4=,tag:czFaOqhHG8nqrQ8AZ8QiGw==,type:str]
#ENC[AES256_GCM,data:hzAZvCWc4RTk290=,iv:RsSI4OpAOQGcFVpfXDZ6t705yWmlO0JEWwWF5uQu9As=,tag:UPqFtA2tXiSa0vzJAv8qXg==,type:comment]
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:x/F0toXDc8stsUNxaepCmxq1+WuacqqPtdc+R5mxTwcAzsKxCdwt8KpBZWMvz7ku4tHDGsKD949QAX2ANXP9oCMTgW0=,iv:6G9gE9/v7GaYj8aqVTmMrpw6AcQK9yMSCAohNdAD1Ws=,tag:2Jimr1ldVSfkh8LPEwdN3w==,type:str]
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:6BfXBYmyHpgZU/kJWpZLf8eH5VowVK1n0r6GzFTNAx/OmyaaS1RZVPC1JPkPBnTwEmo0WHYRW8uiUdkABmH9F5ZqqlsAesyfW7zvU9r7yD+D7w==,iv:3CBn2qCoTueQy8xVcQqZS4E3F0qoFYnNbzTZTpJ1veo=,tag:wC3Ecl4uNTwPiT23ATvRZg==,type:str]
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:vxRcXQ/8TUTCtr6hKWBD1zVF47GFSfluIHZ8q0tt8SqQOWDdDe2D7Of6boy/kG3lqlpl7TjqMGJ7fLORcr0klKCykQ==,iv:YjegXXtIXm2qr0a3ZHRHxj3L1JoGZ1iQXkVXQupGQ2E=,tag:kahoHRskXbzplZasWOeiig==,type:str]
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:23TgU6oUeO7J+MFkraALQ5/RO38DZ3ib5oYYJr7Lj3KXQSlRsgwA+bJlweI5gcUpFphnPXvmwFGiuL6AeY8LzAQ3bx46dcZa5w9LfKw2PMFt,iv:AGXwYLqWjT5VmU02qqada3PbdjfC0mLK2sPruO0uru8=,tag:Z2IS/JPOqWX+x0LZYwyArA==,type:str]
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:/N77CFf6tJWCk7HrnBOm2Q1ynx7XoblzfbzJySeCjrxqiu4r+CB90aDkaPahlQKI00DUZih3pcy7WhnjdAwI30G5kJZ3P8H8/R0tP7OBK1wPVbsJq8prQJPFOAWewsS4KWNtSURZPYSCxslcBb7DHLX6ZAjv6A5KFOjRK2N8usR9sIabrCWh,iv:G3Ropu/JGytZK/zKsNGFjjSu3Wt6fvHaAqI9RpUHvlI=,tag:fv6xuS94OR+4xfiyKrYELA==,type:str]
PROXY_CONCURRENCY=ENC[AES256_GCM,data:vdEZ,iv:+eTNQO+s/SsVDBLg1/+fneMzEEsFkuEFxo/FcVV+mWc=,tag:i/EPwi/jOoWl3xW8H0XMdw==,type:str]
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:L2s=,iv:fV3mCKmK5fxUmIWRePELBDAPTb8JZqasVIhnAl55kYw=,tag:XL+PO6sblz/7WqHC3dtk1w==,type:str]
@@ -58,7 +58,7 @@ sops_age__list_1__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb2
sops_age__list_1__map_recipient=age1wjepykv3glvsrtegu25tevg7vyn3ngpl607u3yjc9ucay04s045s796msw
sops_age__list_2__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBFeHhaOURNZnRVMEwxNThu\nUjF4Q0kwUXhTUE1QSzZJbmpubnh3RnpQTmdvCjRmWWxpNkxFUmVGb3NRbnlydW5O\nWEg3ZXJQTU4vcndzS2pUQXY3Q0ttYjAKLS0tIE9IRFJ1c2ZxbGVHa2xTL0swbGN1\nTzgwMThPUDRFTWhuZHJjZUYxOTZrU00KY62qrNBCUQYxwcLMXFEnLkwncxq3BPJB\nKm4NzeHBU87XmPWVrgrKuf+PH1mxJlBsl7Hev8xBTy7l6feiZjLIvQ==\n-----END AGE ENCRYPTED FILE-----\n
sops_age__list_2__map_recipient=age1c783ym2q5x9tv7py5d28uc4k44aguudjn03g97l9nzs00dd9tsrqum8h4d
sops_lastmodified=2026-03-01T00:26:54Z
sops_mac=ENC[AES256_GCM,data:DdcABGVm9KbAcFrF0iuZlAaugsouNs7Hon2mZISaHs15/2H/Pd9FniXW3KeQ0+/NdZFQkz/h3i3bVFampcpFS1AxuOE5+1/IgWn8sKtaqPc7E9y8g6lxMnwTkUX2z+n/Q2nR8KAcO9IyE0GNjIluMWkxPWQuLzlRYDOjRN4/1e0=,iv:rm+6lXhYu6VUmrdCIrU0BRN2/ooa21Fw1ESWxr7vATg=,tag:GZmLLZf/LQaNeNNAAEg5bA==,type:str]
sops_lastmodified=2026-03-01T13:25:41Z
sops_mac=ENC[AES256_GCM,data:EL9Bgo0pWWECeHaaM1bHtkvwBgBmS3P2cX+6oahHKmLEJLI7P7fiomP7G8SdrfUyNpZaP9d4LlfwZSuCPqH6rP8jzF67oNkfXfd/xK4OW2U2TqSvouCMzlhqVQgS4HHl5EgvOI488WEIZko7KK2A1rxnpkm8C29WG9d9G64LKvw=,iv:XzsNm3CXnlC6SIef63BdddALjGustp8czHQCWOtjXBQ=,tag:zll0db6K1+M4brOpfVWnhg==,type:str]
sops_unencrypted_suffix=_unencrypted
sops_version=3.12.1

View File

@@ -6,7 +6,24 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Fixed
- **Proxy URL scheme validation in `load_proxy_tiers()`** — URLs in `PROXY_URLS_DATACENTER` / `PROXY_URLS_RESIDENTIAL` that are missing an `http://` or `https://` scheme are now logged as a warning and skipped, rather than being passed through and causing SSL handshake failures or connection errors at request time. Also fixed a missing `http://` prefix in the dev `.env` `PROXY_URLS_DATACENTER` entry.
### Changed
- **Per-proxy dead tracking in tiered cycler** — `make_tiered_cycler` now accepts a `proxy_failure_limit` parameter (default 3). Individual proxies that hit the limit are marked dead and permanently skipped by `next_proxy()`. If all proxies in the active tier are dead, `next_proxy()` auto-escalates to the next tier without needing the tier-level threshold. `record_failure(proxy_url)` and `record_success(proxy_url)` accept an optional `proxy_url` argument for per-proxy tracking; callers without `proxy_url` are fully backward-compatible. New `dead_proxy_count()` callable exposed for monitoring.
- `extract/padelnomics_extract/src/padelnomics_extract/proxy.py`: added per-proxy state (`proxy_failure_counts`, `dead_proxies`), updated `next_proxy`/`record_failure`/`record_success`, added `dead_proxy_count`
- `extract/padelnomics_extract/src/padelnomics_extract/playtomic_tenants.py`: `_fetch_page_via_cycler` passes `proxy_url` to `record_success`/`record_failure`
- `extract/padelnomics_extract/src/padelnomics_extract/playtomic_availability.py`: `_worker` returns `(proxy_url, result)` tuple; serial loops in `extract` and `extract_recheck` capture `proxy_url` before passing to `record_success`/`record_failure`
- `web/tests/test_supervisor.py`: 11 new tests in `TestTieredCyclerDeadProxyTracking` covering dead proxy skipping, auto-escalation, `dead_proxy_count`, backward compat, and thread safety
### Added
- **Pipeline Transform tab + live extraction status** — new "Transform" tab in the pipeline admin with status cards for SQLMesh transform and export-serving tasks, a "Run Full Pipeline" button, and a recent run history table. The Overview tab now auto-polls every 5 s while an extraction task is pending and stops automatically when quiet. Per-extractor "Run" buttons use HTMX in-place updates instead of redirects. The header "Run Pipeline" button now enqueues the full ELT pipeline (extract → transform → export) instead of extraction only. Three new worker task handlers: `run_transform` (sqlmesh plan prod --auto-apply, 2 h timeout), `run_export` (export_serving.py, 10 min timeout), `run_pipeline` (sequential, stops on first failure). Concurrency guard prevents double-enqueuing the same step.
- `web/src/padelnomics/worker.py`: `handle_run_transform`, `handle_run_export`, `handle_run_pipeline`
- `web/src/padelnomics/admin/pipeline_routes.py`: `_render_overview_partial()`, `_fetch_pipeline_tasks()`, `_format_duration()`, `pipeline_transform()`, `pipeline_trigger_transform()`; `pipeline_trigger_extract()` now HTMX-aware
- `web/src/padelnomics/admin/templates/admin/pipeline.html`: pulse animation on `.status-dot.running`, Transform tab button, rewired header button
- `web/src/padelnomics/admin/templates/admin/partials/pipeline_overview.html`: self-polling wrapper, HTMX Run buttons
- `web/src/padelnomics/admin/templates/admin/partials/pipeline_transform.html`: new file
- **Affiliate programs management** — centralised retailer config (`affiliate_programs` table) with URL template + tracking tag + commission %. Products now use a program dropdown + product identifier (e.g. ASIN) instead of manually baking full URLs. URL is assembled at redirect time via `build_affiliate_url()`, so changing a tag propagates instantly to all products. Legacy products (baked `affiliate_url`) continue to work via fallback. Amazon OneLink configured in the Associates dashboard handles geo-redirect to local marketplaces — no per-country programs needed.
- `web/src/padelnomics/migrations/versions/0027_affiliate_programs.py`: `affiliate_programs` table, nullable `program_id` + `product_identifier` columns on `affiliate_products`, seeds "Amazon" program, backfills ASINs from existing URLs
- `web/src/padelnomics/affiliate.py`: `get_all_programs()`, `get_program()`, `get_program_by_slug()`, `build_affiliate_url()`; `get_product()` JOINs program for redirect assembly; `_parse_product()` extracts `_program` sub-dict
@@ -17,6 +34,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- 15 new tests in `web/tests/test_affiliate.py` (41 total)
### Fixed
- **Data Platform admin view showing stale/zero row counts** — Docker web containers were mounting `/opt/padelnomics/data` (stale copy) instead of `/data/padelnomics` (live supervisor output). Fixed volume mount in all 6 containers (blue/green × app/worker/scheduler) and added `LANDING_DIR=/app/data/pipeline/landing` so extraction stats and landing zone file stats are visible to the web app.
- **`workflows.toml` never found in dev** — `_REPO_ROOT` in `pipeline_routes.py` used `parents[5]` (one level too far up) instead of `parents[4]`. Workflow schedules now display correctly on the pipeline overview tab in dev.
- **Article preview frontmatter bug** — `_rebuild_article()` in `admin/routes.py` now strips YAML frontmatter before passing markdown to `mistune.html()`, preventing raw `title:`, `slug:` etc. from appearing as visible text in article previews.
### Added

View File

@@ -60,9 +60,10 @@ services:
environment:
- DATABASE_PATH=/app/data/app.db
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
- LANDING_DIR=/app/data/pipeline/landing
volumes:
- app-data:/app/data
- /opt/padelnomics/data:/app/data/pipeline:ro
- /data/padelnomics:/app/data/pipeline:ro
networks:
- net
healthcheck:
@@ -82,9 +83,10 @@ services:
environment:
- DATABASE_PATH=/app/data/app.db
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
- LANDING_DIR=/app/data/pipeline/landing
volumes:
- app-data:/app/data
- /opt/padelnomics/data:/app/data/pipeline:ro
- /data/padelnomics:/app/data/pipeline:ro
networks:
- net
@@ -98,9 +100,10 @@ services:
environment:
- DATABASE_PATH=/app/data/app.db
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
- LANDING_DIR=/app/data/pipeline/landing
volumes:
- app-data:/app/data
- /opt/padelnomics/data:/app/data/pipeline:ro
- /data/padelnomics:/app/data/pipeline:ro
networks:
- net
@@ -115,9 +118,10 @@ services:
environment:
- DATABASE_PATH=/app/data/app.db
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
- LANDING_DIR=/app/data/pipeline/landing
volumes:
- app-data:/app/data
- /opt/padelnomics/data:/app/data/pipeline:ro
- /data/padelnomics:/app/data/pipeline:ro
networks:
- net
healthcheck:
@@ -137,9 +141,10 @@ services:
environment:
- DATABASE_PATH=/app/data/app.db
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
- LANDING_DIR=/app/data/pipeline/landing
volumes:
- app-data:/app/data
- /opt/padelnomics/data:/app/data/pipeline:ro
- /data/padelnomics:/app/data/pipeline:ro
networks:
- net
@@ -153,9 +158,10 @@ services:
environment:
- DATABASE_PATH=/app/data/app.db
- SERVING_DUCKDB_PATH=/app/data/pipeline/analytics.duckdb
- LANDING_DIR=/app/data/pipeline/landing
volumes:
- app-data:/app/data
- /opt/padelnomics/data:/app/data/pipeline:ro
- /data/padelnomics:/app/data/pipeline:ro
networks:
- net

View File

@@ -213,9 +213,10 @@ def _fetch_venues_parallel(
completed_count = 0
lock = threading.Lock()
def _worker(tenant_id: str) -> dict | None:
def _worker(tenant_id: str) -> tuple[str | None, dict | None]:
proxy_url = cycler["next_proxy"]()
return _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url)
result = _fetch_venue_availability(tenant_id, start_min_str, start_max_str, proxy_url)
return proxy_url, result
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
@@ -231,17 +232,17 @@ def _fetch_venues_parallel(
batch_futures = {pool.submit(_worker, tid): tid for tid in batch}
for future in as_completed(batch_futures):
result = future.result()
proxy_url, result = future.result()
with lock:
completed_count += 1
if result is not None:
venues_data.append(result)
cycler["record_success"]()
cycler["record_success"](proxy_url)
if on_result is not None:
on_result(result)
else:
venues_errored += 1
cycler["record_failure"]()
cycler["record_failure"](proxy_url)
if completed_count % 500 == 0:
logger.info(
@@ -336,16 +337,17 @@ def extract(
else:
logger.info("Serial mode: 1 worker, %d venues", len(venues_to_process))
for i, tenant_id in enumerate(venues_to_process):
proxy_url = cycler["next_proxy"]()
result = _fetch_venue_availability(
tenant_id, start_min_str, start_max_str, cycler["next_proxy"](),
tenant_id, start_min_str, start_max_str, proxy_url,
)
if result is not None:
new_venues_data.append(result)
cycler["record_success"]()
cycler["record_success"](proxy_url)
_on_result(result)
else:
venues_errored += 1
cycler["record_failure"]()
cycler["record_failure"](proxy_url)
if cycler["is_exhausted"]():
logger.error("All proxy tiers exhausted — writing partial results")
break
@@ -500,13 +502,14 @@ def extract_recheck(
venues_data = []
venues_errored = 0
for tid in venues_to_recheck:
result = _fetch_venue_availability(tid, start_min_str, start_max_str, cycler["next_proxy"]())
proxy_url = cycler["next_proxy"]()
result = _fetch_venue_availability(tid, start_min_str, start_max_str, proxy_url)
if result is not None:
venues_data.append(result)
cycler["record_success"]()
cycler["record_success"](proxy_url)
else:
venues_errored += 1
cycler["record_failure"]()
cycler["record_failure"](proxy_url)
if cycler["is_exhausted"]():
logger.error("All proxy tiers exhausted — writing partial recheck results")
break

View File

@@ -10,11 +10,11 @@ API notes (discovered 2026-02):
- `size=100` is the maximum effective page size
- ~14K venues globally as of Feb 2026
Parallel mode: when PROXY_URLS is set, fires batch_size = len(proxy_urls)
pages concurrently. Each page gets its own fresh session + proxy. Pages beyond
the last one return empty lists (safe — just triggers the done condition).
Without proxies, falls back to single-threaded with THROTTLE_SECONDS between
pages.
Parallel mode: when proxy tiers are configured, fires BATCH_SIZE pages
concurrently. Each page gets its own fresh session + proxy from the tiered
cycler. On failure the cycler escalates through free → datacenter →
residential tiers. Without proxies, falls back to single-threaded with
THROTTLE_SECONDS between pages.
Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
@@ -22,6 +22,7 @@ Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
"""
import json
import os
import sqlite3
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -31,7 +32,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_proxy_tiers, make_round_robin_cycler
from .proxy import load_proxy_tiers, make_tiered_cycler
from .utils import compress_jsonl_atomic, landing_path
logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -42,6 +43,9 @@ PLAYTOMIC_TENANTS_URL = "https://api.playtomic.io/v1/tenants"
THROTTLE_SECONDS = 2
PAGE_SIZE = 100
MAX_PAGES = 500 # safety bound — ~50K venues max, well above current ~14K
BATCH_SIZE = 20 # concurrent pages per batch (fixed, independent of proxy count)
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
MAX_PAGE_ATTEMPTS = 5 # max retries per individual page before giving up
def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
@@ -61,22 +65,57 @@ def _fetch_one_page(proxy_url: str | None, page: int) -> tuple[int, list[dict]]:
return (page, tenants)
def _fetch_pages_parallel(pages: list[int], next_proxy) -> list[tuple[int, list[dict]]]:
"""Fetch multiple pages concurrently. Returns [(page_num, tenants_list), ...]."""
def _fetch_page_via_cycler(cycler: dict, page: int) -> tuple[int, list[dict]]:
"""Fetch a single page, retrying across proxy tiers via the circuit breaker.
On each attempt, pulls the next proxy from the active tier. Records
success/failure so the circuit breaker can escalate tiers. Raises
RuntimeError if all tiers are exhausted or MAX_PAGE_ATTEMPTS is exceeded.
"""
last_exc: Exception | None = None
for attempt in range(MAX_PAGE_ATTEMPTS):
proxy_url = cycler["next_proxy"]()
if proxy_url is None: # all tiers exhausted
raise RuntimeError(f"All proxy tiers exhausted fetching page {page}")
try:
result = _fetch_one_page(proxy_url, page)
cycler["record_success"](proxy_url)
return result
except Exception as exc:
last_exc = exc
logger.warning(
"Page %d attempt %d/%d failed (proxy=%s): %s",
page,
attempt + 1,
MAX_PAGE_ATTEMPTS,
proxy_url,
exc,
)
cycler["record_failure"](proxy_url)
if cycler["is_exhausted"]():
raise RuntimeError(f"All proxy tiers exhausted fetching page {page}") from exc
raise RuntimeError(f"Page {page} failed after {MAX_PAGE_ATTEMPTS} attempts") from last_exc
def _fetch_pages_parallel(pages: list[int], cycler: dict) -> list[tuple[int, list[dict]]]:
"""Fetch multiple pages concurrently using the tiered cycler.
Returns [(page_num, tenants_list), ...]. Raises if any page exhausts all tiers.
"""
with ThreadPoolExecutor(max_workers=len(pages)) as pool:
futures = [pool.submit(_fetch_one_page, next_proxy(), p) for p in pages]
futures = [pool.submit(_fetch_page_via_cycler, cycler, p) for p in pages]
return [f.result() for f in as_completed(futures)]
def extract(
landing_dir: Path,
year_month: str, # noqa: ARG001 — unused; tenants uses ISO week partition instead
year_month: str, # noqa: ARG001 — unused; tenants uses daily partition instead
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch all Playtomic venues via global pagination. Returns run metrics.
Partitioned by ISO week (e.g. 2026/W09) so each weekly run produces a
Partitioned by day (e.g. 2026/03/01) so each daily run produces a
fresh file. _load_tenant_ids() in playtomic_availability globs across all
partitions and picks the most recent one.
"""
@@ -89,12 +128,16 @@ def extract(
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
tiers = load_proxy_tiers()
all_proxies = [url for tier in tiers for url in tier]
next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None
batch_size = len(all_proxies) if all_proxies else 1
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD) if tiers else None
batch_size = BATCH_SIZE if cycler else 1
if next_proxy:
logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers))
if cycler:
logger.info(
"Parallel mode: %d pages/batch, %d tier(s), threshold=%d",
batch_size,
cycler["tier_count"](),
CIRCUIT_BREAKER_THRESHOLD,
)
else:
logger.info("Serial mode: 1 page at a time (no proxies)")
@@ -104,15 +147,33 @@ def extract(
done = False
while not done and page < MAX_PAGES:
if cycler and cycler["is_exhausted"]():
logger.error(
"All proxy tiers exhausted — stopping at page %d (%d venues collected)",
page,
len(all_tenants),
)
break
batch_end = min(page + batch_size, MAX_PAGES)
pages_to_fetch = list(range(page, batch_end))
if next_proxy and len(pages_to_fetch) > 1:
if cycler and len(pages_to_fetch) > 1:
logger.info(
"Fetching pages %d-%d in parallel (%d workers, total so far: %d)",
page, batch_end - 1, len(pages_to_fetch), len(all_tenants),
page,
batch_end - 1,
len(pages_to_fetch),
len(all_tenants),
)
results = _fetch_pages_parallel(pages_to_fetch, next_proxy)
try:
results = _fetch_pages_parallel(pages_to_fetch, cycler)
except RuntimeError:
logger.error(
"Proxy tiers exhausted mid-batch — writing partial results (%d venues)",
len(all_tenants),
)
break
else:
# Serial: reuse the shared session, throttle between pages
page_num = pages_to_fetch[0]
@@ -126,7 +187,7 @@ def extract(
)
results = [(page_num, tenants)]
# Process pages in order so the done-detection on < PAGE_SIZE is deterministic
# Process pages in order so done-detection on < PAGE_SIZE is deterministic
for p, tenants in sorted(results):
new_count = 0
for tenant in tenants:
@@ -137,7 +198,11 @@ def extract(
new_count += 1
logger.info(
"page=%d got=%d new=%d total=%d", p, len(tenants), new_count, len(all_tenants),
"page=%d got=%d new=%d total=%d",
p,
len(tenants),
new_count,
len(all_tenants),
)
# Last page — fewer than PAGE_SIZE results means we've exhausted the list
@@ -146,7 +211,7 @@ def extract(
break
page = batch_end
if not next_proxy:
if not cycler:
time.sleep(THROTTLE_SECONDS)
# Write each tenant as a JSONL line, then compress atomically

View File

@@ -88,8 +88,14 @@ def load_proxy_tiers() -> list[list[str]]:
for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
raw = os.environ.get(var, "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
if urls:
tiers.append(urls)
valid = []
for url in urls:
if not url.startswith(("http://", "https://")):
logger.warning("%s contains URL without scheme, skipping: %s", var, url[:60])
continue
valid.append(url)
if valid:
tiers.append(valid)
return tiers
@@ -134,8 +140,8 @@ def make_sticky_selector(proxy_urls: list[str]):
return select_proxy
def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
"""Thread-safe N-tier proxy cycler with circuit breaker.
def make_tiered_cycler(tiers: list[list[str]], threshold: int, proxy_failure_limit: int = 3) -> dict:
"""Thread-safe N-tier proxy cycler with circuit breaker and per-proxy dead tracking.
Uses tiers[0] until consecutive failures >= threshold, then escalates
to tiers[1], then tiers[2], etc. Once all tiers are exhausted,
@@ -144,13 +150,21 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
Failure counter resets on each escalation — the new tier gets a fresh start.
Once exhausted, further record_failure() calls are no-ops.
Per-proxy dead tracking (when proxy_failure_limit > 0):
Individual proxies are marked dead after proxy_failure_limit failures and
skipped by next_proxy(). If all proxies in the active tier are dead,
next_proxy() auto-escalates to the next tier. Both mechanisms coexist:
per-proxy dead tracking removes broken individuals; tier-level threshold
catches systemic failure even before any single proxy hits the limit.
Returns a dict of callables:
next_proxy() -> str | None — URL from the active tier, or None
record_success() -> None — resets consecutive failure counter
record_failure() -> bool — True if just escalated to next tier
next_proxy() -> str | None — URL from active tier (skips dead), or None
record_success(proxy_url=None) -> None — resets consecutive failure counter
record_failure(proxy_url=None) -> bool — True if just escalated to next tier
is_exhausted() -> bool — True if all tiers exhausted
active_tier_index() -> int — 0-based index of current tier
tier_count() -> int — total number of tiers
dead_proxy_count() -> int — number of individual proxies marked dead
Edge cases:
Empty tiers list: next_proxy() always returns None, is_exhausted() True.
@@ -158,28 +172,75 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
"""
assert threshold > 0, f"threshold must be positive, got {threshold}"
assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}"
assert proxy_failure_limit >= 0, f"proxy_failure_limit must be >= 0, got {proxy_failure_limit}"
lock = threading.Lock()
cycles = [itertools.cycle(t) for t in tiers]
state = {
"active_tier": 0,
"consecutive_failures": 0,
"proxy_failure_counts": {}, # proxy_url -> int
"dead_proxies": set(), # proxy URLs marked dead
}
def next_proxy() -> str | None:
with lock:
idx = state["active_tier"]
if idx >= len(cycles):
return None
return next(cycles[idx])
# Try each remaining tier (bounded: at most len(tiers) escalations)
for _ in range(len(tiers) + 1):
idx = state["active_tier"]
if idx >= len(cycles):
return None
def record_success() -> None:
tier_proxies = tiers[idx]
tier_len = len(tier_proxies)
# Find a live proxy in this tier (bounded: try each proxy at most once)
for _ in range(tier_len):
candidate = next(cycles[idx])
if candidate not in state["dead_proxies"]:
return candidate
# All proxies in this tier are dead — auto-escalate
state["consecutive_failures"] = 0
state["active_tier"] += 1
new_idx = state["active_tier"]
if new_idx < len(tiers):
logger.warning(
"All proxies in tier %d are dead — auto-escalating to tier %d/%d",
idx + 1,
new_idx + 1,
len(tiers),
)
else:
logger.error(
"All proxies in all %d tier(s) are dead — no more fallbacks",
len(tiers),
)
return None # safety fallback
def record_success(proxy_url: str | None = None) -> None:
with lock:
state["consecutive_failures"] = 0
if proxy_url is not None:
state["proxy_failure_counts"][proxy_url] = 0
def record_failure() -> bool:
def record_failure(proxy_url: str | None = None) -> bool:
"""Increment failure counter. Returns True if just escalated to next tier."""
with lock:
# Per-proxy dead tracking (additional to tier-level circuit breaker)
if proxy_url is not None and proxy_failure_limit > 0:
count = state["proxy_failure_counts"].get(proxy_url, 0) + 1
state["proxy_failure_counts"][proxy_url] = count
if count >= proxy_failure_limit and proxy_url not in state["dead_proxies"]:
state["dead_proxies"].add(proxy_url)
logger.warning(
"Proxy %s marked dead after %d consecutive failures",
proxy_url,
count,
)
# Tier-level circuit breaker (existing behavior)
idx = state["active_tier"]
if idx >= len(tiers):
# Already exhausted — no-op
@@ -219,6 +280,10 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
def tier_count() -> int:
return len(tiers)
def dead_proxy_count() -> int:
with lock:
return len(state["dead_proxies"])
return {
"next_proxy": next_proxy,
"record_success": record_success,
@@ -226,4 +291,5 @@ def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
"is_exhausted": is_exhausted,
"active_tier_index": active_tier_index,
"tier_count": tier_count,
"dead_proxy_count": dead_proxy_count,
}

View File

@@ -19,8 +19,10 @@
-- 4. Country-level income (global fallback from stg_income / ilc_di03)
--
-- Distance calculations use ST_Distance_Sphere (DuckDB spatial extension).
-- A bounding-box pre-filter (~0.5°, ≈55km) reduces the cross-join before the
-- exact sphere distance is computed.
-- Spatial joins use BETWEEN predicates (not ABS()) to enable DuckDB's IEJoin
-- (interval join) optimization: O((N+M) log M) vs O(N×M) nested-loop.
-- Country pre-filters restrict the left side to ~20K rows for padel/tennis CTEs
-- (~8 countries each), down from ~140K global locations.
MODEL (
name foundation.dim_locations,
@@ -147,6 +149,8 @@ padel_courts AS (
WHERE lat IS NOT NULL AND lon IS NOT NULL
),
-- Nearest padel court distance per location (bbox pre-filter → exact sphere distance)
-- BETWEEN enables DuckDB IEJoin (O((N+M) log M)) vs ABS() nested-loop (O(N×M)).
-- Country pre-filter reduces left side from ~140K to ~20K rows (padel is ~8 countries).
nearest_padel AS (
SELECT
l.geoname_id,
@@ -158,9 +162,12 @@ nearest_padel AS (
) AS nearest_padel_court_km
FROM locations l
JOIN padel_courts p
-- ~55km bounding box pre-filter to limit cross-join before sphere calc
ON ABS(l.lat - p.lat) < 0.5
AND ABS(l.lon - p.lon) < 0.5
-- ~55km bounding box pre-filter; BETWEEN triggers IEJoin optimization
ON l.lat BETWEEN p.lat - 0.5 AND p.lat + 0.5
AND l.lon BETWEEN p.lon - 0.5 AND p.lon + 0.5
WHERE l.country_code IN (
SELECT DISTINCT country_code FROM padel_courts WHERE country_code IS NOT NULL
)
GROUP BY l.geoname_id
),
-- Padel venues within 5km of each location (counts as "local padel supply")
@@ -170,24 +177,35 @@ padel_local AS (
COUNT(*) AS padel_venue_count
FROM locations l
JOIN padel_courts p
ON ABS(l.lat - p.lat) < 0.05 -- ~5km bbox pre-filter
AND ABS(l.lon - p.lon) < 0.05
WHERE ST_Distance_Sphere(
-- ~5km bbox pre-filter; BETWEEN triggers IEJoin optimization
ON l.lat BETWEEN p.lat - 0.05 AND p.lat + 0.05
AND l.lon BETWEEN p.lon - 0.05 AND p.lon + 0.05
WHERE l.country_code IN (
SELECT DISTINCT country_code FROM padel_courts WHERE country_code IS NOT NULL
)
AND ST_Distance_Sphere(
ST_Point(l.lon, l.lat),
ST_Point(p.lon, p.lat)
) / 1000.0 <= 5.0
GROUP BY l.geoname_id
),
-- Tennis courts within 25km of each location (sports culture proxy)
-- Country pre-filter reduces left side from ~140K to ~20K rows (tennis courts are European only).
tennis_nearby AS (
SELECT
l.geoname_id,
COUNT(*) AS tennis_courts_within_25km
FROM locations l
JOIN staging.stg_tennis_courts t
ON ABS(l.lat - t.lat) < 0.23 -- ~25km bbox pre-filter
AND ABS(l.lon - t.lon) < 0.23
WHERE ST_Distance_Sphere(
-- ~25km bbox pre-filter; BETWEEN triggers IEJoin optimization
ON l.lat BETWEEN t.lat - 0.23 AND t.lat + 0.23
AND l.lon BETWEEN t.lon - 0.23 AND t.lon + 0.23
WHERE l.country_code IN (
SELECT DISTINCT country_code
FROM staging.stg_tennis_courts
WHERE country_code IS NOT NULL
)
AND ST_Distance_Sphere(
ST_Point(l.lon, l.lat),
ST_Point(t.lon, t.lat)
) / 1000.0 <= 25.0

View File

@@ -6,7 +6,9 @@ Operational visibility for the data extraction and transformation pipeline:
/admin/pipeline/overview → HTMX tab: extraction status, serving freshness, landing stats
/admin/pipeline/extractions → HTMX tab: filterable extraction run history
/admin/pipeline/extractions/<id>/mark-stale → POST: mark stuck "running" row as failed
/admin/pipeline/extract/trigger → POST: enqueue full extraction run
/admin/pipeline/extract/trigger → POST: enqueue extraction run (HTMX-aware)
/admin/pipeline/transform → HTMX tab: SQLMesh + export status, run history
/admin/pipeline/transform/trigger → POST: enqueue transform/export/pipeline step
/admin/pipeline/catalog → HTMX tab: data catalog (tables, columns, sample data)
/admin/pipeline/catalog/<table> → HTMX partial: table detail (columns + sample)
/admin/pipeline/query → HTMX tab: SQL query editor
@@ -18,6 +20,7 @@ Data sources:
- analytics.duckdb (DuckDB read-only via analytics.execute_user_query)
- LANDING_DIR/ (filesystem scan for file sizes + dates)
- infra/supervisor/workflows.toml (schedule definitions — tomllib, stdlib)
- app.db tasks table (run_transform, run_export, run_pipeline task rows)
"""
import asyncio
import json
@@ -49,7 +52,7 @@ _LANDING_DIR = os.environ.get("LANDING_DIR", "data/landing")
_SERVING_DUCKDB_PATH = os.environ.get("SERVING_DUCKDB_PATH", "data/analytics.duckdb")
# Repo root: web/src/padelnomics/admin/ → up 4 levels
_REPO_ROOT = Path(__file__).resolve().parents[5]
_REPO_ROOT = Path(__file__).resolve().parents[4]
_WORKFLOWS_TOML = _REPO_ROOT / "infra" / "supervisor" / "workflows.toml"
# A "running" row older than this is considered stale/crashed.
@@ -626,10 +629,8 @@ async def pipeline_dashboard():
# ── Overview tab ─────────────────────────────────────────────────────────────
@bp.route("/overview")
@role_required("admin")
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
async def _render_overview_partial():
"""Build and render the pipeline overview partial (shared by GET and POST triggers)."""
latest_runs, landing_stats, workflows, serving_meta = await asyncio.gather(
asyncio.to_thread(_fetch_latest_per_extractor_sync),
asyncio.to_thread(_get_landing_zone_stats_sync),
@@ -650,6 +651,13 @@ async def pipeline_overview():
"stale": _is_stale(run) if run else False,
})
# Treat pending extraction tasks as "running" (queued or active).
from ..core import fetch_all as _fetch_all # noqa: PLC0415
pending_extraction = await _fetch_all(
"SELECT id FROM tasks WHERE task_name = 'run_extraction' AND status = 'pending' LIMIT 1"
)
any_running = bool(pending_extraction)
# Compute landing zone totals
total_landing_bytes = sum(s["total_bytes"] for s in landing_stats)
@@ -677,10 +685,18 @@ async def pipeline_overview():
total_landing_bytes=total_landing_bytes,
serving_tables=serving_tables,
last_export=last_export,
any_running=any_running,
format_bytes=_format_bytes,
)
@bp.route("/overview")
@role_required("admin")
async def pipeline_overview():
"""HTMX tab: extraction status per source, serving freshness, landing zone."""
return await _render_overview_partial()
# ── Extractions tab ────────────────────────────────────────────────────────────
@@ -745,7 +761,11 @@ async def pipeline_mark_stale(run_id: int):
@role_required("admin")
@csrf_protect
async def pipeline_trigger_extract():
"""Enqueue an extraction run — all extractors, or a single named one."""
"""Enqueue an extraction run — all extractors, or a single named one.
HTMX-aware: if the HX-Request header is present, returns the overview partial
directly so the UI can update in-place without a redirect.
"""
from ..worker import enqueue
form = await request.form
@@ -757,11 +777,15 @@ async def pipeline_trigger_extract():
await flash(f"Unknown extractor '{extractor}'.", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
await enqueue("run_extraction", {"extractor": extractor})
await flash(f"Extractor '{extractor}' queued. Check the task queue for progress.", "success")
else:
await enqueue("run_extraction")
await flash("Extraction run queued. Check the task queue for progress.", "success")
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
return await _render_overview_partial()
msg = f"Extractor '{extractor}' queued." if extractor else "Extraction run queued."
await flash(f"{msg} Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))
@@ -847,6 +871,156 @@ async def pipeline_lineage_schema(model: str):
)
# ── Transform tab ─────────────────────────────────────────────────────────────
_TRANSFORM_TASK_NAMES = ("run_transform", "run_export", "run_pipeline")
async def _fetch_pipeline_tasks() -> dict:
"""Fetch the latest task row for each transform task type, plus recent run history.
Returns:
{
"latest": {"run_transform": row|None, "run_export": row|None, "run_pipeline": row|None},
"history": [row, ...], # last 20 rows across all three task types, newest first
}
"""
from ..core import fetch_all as _fetch_all # noqa: PLC0415
# Latest row per task type (may be pending, complete, or failed)
latest_rows = await _fetch_all(
"""
SELECT t.*
FROM tasks t
INNER JOIN (
SELECT task_name, MAX(id) AS max_id
FROM tasks
WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline')
GROUP BY task_name
) latest ON t.id = latest.max_id
"""
)
latest: dict = {"run_transform": None, "run_export": None, "run_pipeline": None}
for row in latest_rows:
latest[row["task_name"]] = dict(row)
history = await _fetch_all(
"""
SELECT id, task_name, status, created_at, completed_at, error
FROM tasks
WHERE task_name IN ('run_transform', 'run_export', 'run_pipeline')
ORDER BY id DESC
LIMIT 20
"""
)
return {"latest": latest, "history": [dict(r) for r in history]}
def _format_duration(created_at: str | None, completed_at: str | None) -> str:
"""Human-readable duration between created_at and completed_at, or '' if unavailable."""
if not created_at or not completed_at:
return ""
try:
fmt = "%Y-%m-%d %H:%M:%S"
start = datetime.strptime(created_at, fmt)
end = datetime.strptime(completed_at, fmt)
delta = int((end - start).total_seconds())
if delta < 0:
return ""
if delta < 60:
return f"{delta}s"
return f"{delta // 60}m {delta % 60}s"
except ValueError:
return ""
async def _render_transform_partial():
"""Build and render the transform tab partial."""
task_data = await _fetch_pipeline_tasks()
latest = task_data["latest"]
history = task_data["history"]
# Enrich history rows with duration
for row in history:
row["duration"] = _format_duration(row.get("created_at"), row.get("completed_at"))
# Truncate error for display
if row.get("error"):
row["error_short"] = row["error"][:120]
else:
row["error_short"] = None
any_running = any(
t is not None and t["status"] == "pending" for t in latest.values()
)
serving_meta = await asyncio.to_thread(_load_serving_meta)
return await render_template(
"admin/partials/pipeline_transform.html",
latest=latest,
history=history,
any_running=any_running,
serving_meta=serving_meta,
format_duration=_format_duration,
)
@bp.route("/transform")
@role_required("admin")
async def pipeline_transform():
"""HTMX tab: SQLMesh transform + export status, run history."""
return await _render_transform_partial()
@bp.route("/transform/trigger", methods=["POST"])
@role_required("admin")
@csrf_protect
async def pipeline_trigger_transform():
"""Enqueue a transform, export, or full pipeline task.
form field `step`: 'transform' | 'export' | 'pipeline'
Concurrency guard: rejects if the same task type is already pending.
HTMX-aware: returns the transform partial for HTMX requests.
"""
from ..core import fetch_one as _fetch_one # noqa: PLC0415
from ..worker import enqueue
form = await request.form
step = (form.get("step") or "").strip()
step_to_task = {
"transform": "run_transform",
"export": "run_export",
"pipeline": "run_pipeline",
}
if step not in step_to_task:
await flash(f"Unknown step '{step}'.", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
task_name = step_to_task[step]
# Concurrency guard: reject if same task type is already pending
existing = await _fetch_one(
"SELECT id FROM tasks WHERE task_name = ? AND status = 'pending' LIMIT 1",
(task_name,),
)
if existing:
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
return await _render_transform_partial()
await flash(f"A '{step}' task is already queued (task #{existing['id']}).", "warning")
return redirect(url_for("pipeline.pipeline_dashboard"))
await enqueue(task_name)
is_htmx = request.headers.get("HX-Request") == "true"
if is_htmx:
return await _render_transform_partial()
await flash(f"'{step}' task queued. Check the task queue for progress.", "success")
return redirect(url_for("pipeline.pipeline_dashboard"))
# ── Catalog tab ───────────────────────────────────────────────────────────────

View File

@@ -169,7 +169,6 @@ async def pseo_generate_gaps(slug: str):
"template_slug": slug,
"start_date": date.today().isoformat(),
"articles_per_day": 500,
"limit": 500,
})
await flash(
f"Queued generation for {len(gaps)} missing articles in '{config['name']}'.",

View File

@@ -1865,7 +1865,7 @@ async def template_preview(slug: str, row_key: str):
@csrf_protect
async def template_generate(slug: str):
"""Generate articles from template + DuckDB data."""
from ..content import fetch_template_data, load_template
from ..content import count_template_data, load_template
try:
config = load_template(slug)
@@ -1873,8 +1873,7 @@ async def template_generate(slug: str):
await flash("Template not found.", "error")
return redirect(url_for("admin.templates"))
data_rows = await fetch_template_data(config["data_table"], limit=501)
row_count = len(data_rows)
row_count = await count_template_data(config["data_table"])
if request.method == "POST":
form = await request.form
@@ -1888,7 +1887,6 @@ async def template_generate(slug: str):
"template_slug": slug,
"start_date": start_date.isoformat(),
"articles_per_day": articles_per_day,
"limit": 500,
})
await flash(
f"Article generation queued for '{config['name']}'. "
@@ -1923,7 +1921,6 @@ async def template_regenerate(slug: str):
"template_slug": slug,
"start_date": date.today().isoformat(),
"articles_per_day": 500,
"limit": 500,
})
await flash("Regeneration queued. The worker will process it in the background.", "success")
return redirect(url_for("admin.template_detail", slug=slug))
@@ -2729,7 +2726,6 @@ async def rebuild_all():
"template_slug": t["slug"],
"start_date": date.today().isoformat(),
"articles_per_day": 500,
"limit": 500,
})
# Manual articles still need inline rebuild
@@ -3037,6 +3033,7 @@ async def outreach():
current_search=search,
current_follow_up=follow_up,
page=page,
outreach_email=EMAIL_ADDRESSES["outreach"],
)

View File

@@ -40,8 +40,10 @@
.admin-subnav {
display: flex; align-items: stretch; padding: 0 2rem;
background: #fff; border-bottom: 1px solid #E2E8F0;
flex-shrink: 0; overflow-x: auto; gap: 0;
flex-shrink: 0; overflow-x: auto; overflow-y: hidden; gap: 0;
scrollbar-width: none;
}
.admin-subnav::-webkit-scrollbar { display: none; }
.admin-subnav a {
display: flex; align-items: center; gap: 5px;
padding: 0 1px; margin: 0 13px 0 0; height: 42px;

View File

@@ -3,6 +3,19 @@
{% block title %}Admin Dashboard - {{ config.APP_NAME }}{% endblock %}
{% block admin_head %}
<style>
.funnel-grid {
display: grid;
grid-template-columns: repeat(2, 1fr);
gap: 0.75rem;
}
@media (min-width: 768px) {
.funnel-grid { grid-template-columns: repeat(5, 1fr); }
}
</style>
{% endblock %}
{% block admin_content %}
<header class="flex justify-between items-center mb-8">
<div>
@@ -47,7 +60,7 @@
<!-- Lead Funnel -->
<p class="text-xs font-semibold text-slate uppercase tracking-wider mb-2">Lead Funnel</p>
<div style="display:grid;grid-template-columns:repeat(5,1fr);gap:0.75rem" class="mb-8">
<div class="funnel-grid mb-8">
<div class="card text-center border-l-4 border-l-electric" style="padding:0.75rem">
<p class="text-xs text-slate">Planner Users</p>
<p class="text-xl font-bold text-navy">{{ stats.planner_users }}</p>
@@ -72,7 +85,7 @@
<!-- Supplier Stats -->
<p class="text-xs font-semibold text-slate uppercase tracking-wider mb-2">Supplier Funnel</p>
<div style="display:grid;grid-template-columns:repeat(5,1fr);gap:0.75rem" class="mb-8">
<div class="funnel-grid mb-8">
<div class="card text-center border-l-4 border-l-accent" style="padding:0.75rem">
<p class="text-xs text-slate">Claimed Suppliers</p>
<p class="text-xl font-bold text-navy">{{ stats.suppliers_claimed }}</p>

View File

@@ -2,13 +2,30 @@
{% set admin_page = "outreach" %}
{% block title %}Outreach Pipeline - Admin - {{ config.APP_NAME }}{% endblock %}
{% block admin_head %}
<style>
.pipeline-status-grid {
display: grid;
grid-template-columns: repeat(2, 1fr);
gap: 0.75rem;
margin-bottom: 1.5rem;
}
@media (min-width: 640px) {
.pipeline-status-grid { grid-template-columns: repeat(3, 1fr); }
}
@media (min-width: 1024px) {
.pipeline-status-grid { grid-template-columns: repeat(6, 1fr); }
}
</style>
{% endblock %}
{% block admin_content %}
<header class="flex justify-between items-center mb-6">
<div>
<h1 class="text-2xl">Outreach</h1>
<p class="text-sm text-slate mt-1">
{{ pipeline.total }} supplier{{ 's' if pipeline.total != 1 else '' }} in pipeline
&middot; Sending domain: <span class="mono text-xs">hello.padelnomics.io</span>
&middot; Sending from: <span class="mono text-xs">{{ outreach_email }}</span>
</p>
</div>
<div class="flex gap-2">
@@ -18,7 +35,7 @@
</header>
<!-- Pipeline cards -->
<div style="display:grid;grid-template-columns:repeat(6,1fr);gap:0.75rem;margin-bottom:1.5rem">
<div class="pipeline-status-grid">
{% set status_colors = {
'prospect': '#E2E8F0',
'contacted': '#DBEAFE',

View File

@@ -1,5 +1,6 @@
{% if emails %}
<div class="card">
<div style="overflow-x:auto">
<table class="table">
<thead>
<tr>
@@ -38,6 +39,7 @@
{% endfor %}
</tbody>
</table>
</div>
</div>
{% else %}
<div class="card text-center" style="padding:2rem">

View File

@@ -25,6 +25,7 @@
{% if leads %}
<div class="card">
<div style="overflow-x:auto">
<table class="table">
<thead>
<tr>
@@ -58,6 +59,7 @@
{% endfor %}
</tbody>
</table>
</div>
</div>
<!-- Pagination -->

View File

@@ -1,5 +1,6 @@
{% if suppliers %}
<div class="card">
<div style="overflow-x:auto">
<table class="table">
<thead>
<tr>
@@ -19,6 +20,7 @@
{% endfor %}
</tbody>
</table>
</div>
</div>
{% else %}
<div class="card text-center" style="padding:2rem">

View File

@@ -1,4 +1,11 @@
<!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone -->
<!-- Pipeline Overview Tab: extraction status, serving freshness, landing zone
Self-polls every 5s while any extraction task is pending, stops when quiet. -->
<div id="pipeline-overview-content"
hx-get="{{ url_for('pipeline.pipeline_overview') }}"
hx-target="this"
hx-swap="outerHTML"
{% if any_running %}hx-trigger="every 5s"{% endif %}>
<!-- Extraction Status Grid -->
<div class="card mb-4">
@@ -26,12 +33,14 @@
{% if stale %}
<span class="badge-warning" style="font-size:10px;padding:1px 6px;margin-left:auto">stale</span>
{% endif %}
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0 ml-auto">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="extractor" value="{{ wf.name }}">
<button type="button" class="btn btn-sm" style="padding:2px 8px;font-size:11px"
onclick="confirmAction('Run {{ wf.name }} extractor?', this.closest('form'))">Run</button>
</form>
<button type="button"
class="btn btn-sm ml-auto"
style="padding:2px 8px;font-size:11px"
hx-post="{{ url_for('pipeline.pipeline_trigger_extract') }}"
hx-target="#pipeline-overview-content"
hx-swap="outerHTML"
hx-vals='{"extractor": "{{ wf.name }}", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Run {{ wf.name }} extractor?')) return false;">Run</button>
</div>
<p class="text-xs text-slate">{{ wf.schedule_label }}</p>
{% if run %}
@@ -57,7 +66,7 @@
</div>
<!-- Two-column row: Serving Freshness + Landing Zone -->
<div style="display:grid;grid-template-columns:1fr 1fr;gap:1rem">
<div class="pipeline-two-col">
<!-- Serving Freshness -->
<div class="card">
@@ -68,6 +77,7 @@
</p>
{% endif %}
{% if serving_tables %}
<div style="overflow-x:auto">
<table class="table" style="font-size:0.8125rem">
<thead>
<tr>
@@ -86,6 +96,7 @@
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-sm text-slate">No serving tables found — run the pipeline first.</p>
{% endif %}
@@ -99,6 +110,7 @@
</span>
</p>
{% if landing_stats %}
<div style="overflow-x:auto">
<table class="table" style="font-size:0.8125rem">
<thead>
<tr>
@@ -119,6 +131,7 @@
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-sm text-slate">
Landing zone empty or not found at <code>data/landing</code>.
@@ -127,3 +140,5 @@
</div>
</div>
</div>{# end #pipeline-overview-content #}

View File

@@ -0,0 +1,197 @@
<!-- Pipeline Transform Tab: SQLMesh + export status, run history
Self-polls every 5s while any transform/export task is pending. -->
<div id="pipeline-transform-content"
hx-get="{{ url_for('pipeline.pipeline_transform') }}"
hx-target="this"
hx-swap="outerHTML"
{% if any_running %}hx-trigger="every 5s"{% endif %}>
<!-- Status Cards: Transform + Export -->
<div class="pipeline-two-col mb-4">
<!-- SQLMesh Transform -->
{% set tx = latest['run_transform'] %}
<div class="card">
<p class="card-header">SQLMesh Transform</p>
<div class="flex items-center gap-2 mb-3">
{% if tx is none %}
<span class="status-dot pending"></span>
<span class="text-sm text-slate">Never run</span>
{% elif tx.status == 'pending' %}
<span class="status-dot running"></span>
<span class="text-sm text-slate">Running…</span>
{% elif tx.status == 'complete' %}
<span class="status-dot ok"></span>
<span class="text-sm text-slate">Complete</span>
{% else %}
<span class="status-dot failed"></span>
<span class="text-sm text-danger">Failed</span>
{% endif %}
</div>
{% if tx %}
<p class="text-xs text-slate mono">
Started: {{ (tx.created_at or '')[:19] or '—' }}
</p>
{% if tx.completed_at %}
<p class="text-xs text-slate mono">
Finished: {{ tx.completed_at[:19] }}
</p>
{% endif %}
{% if tx.status == 'failed' and tx.error %}
<details class="mt-2">
<summary class="text-xs text-danger cursor-pointer">Error</summary>
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-height:8rem;white-space:pre-wrap">{{ tx.error[:400] }}</pre>
</details>
{% endif %}
{% endif %}
<div class="mt-3">
<button type="button"
class="btn btn-sm"
{% if any_running %}disabled{% endif %}
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
hx-target="#pipeline-transform-content"
hx-swap="outerHTML"
hx-vals='{"step": "transform", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Run SQLMesh transform (prod --auto-apply)?')) return false;">
Run Transform
</button>
</div>
</div>
<!-- Export Serving -->
{% set ex = latest['run_export'] %}
<div class="card">
<p class="card-header">Export Serving</p>
<div class="flex items-center gap-2 mb-3">
{% if ex is none %}
<span class="status-dot pending"></span>
<span class="text-sm text-slate">Never run</span>
{% elif ex.status == 'pending' %}
<span class="status-dot running"></span>
<span class="text-sm text-slate">Running…</span>
{% elif ex.status == 'complete' %}
<span class="status-dot ok"></span>
<span class="text-sm text-slate">Complete</span>
{% else %}
<span class="status-dot failed"></span>
<span class="text-sm text-danger">Failed</span>
{% endif %}
</div>
{% if ex %}
<p class="text-xs text-slate mono">
Started: {{ (ex.created_at or '')[:19] or '—' }}
</p>
{% if ex.completed_at %}
<p class="text-xs text-slate mono">
Finished: {{ ex.completed_at[:19] }}
</p>
{% endif %}
{% if serving_meta %}
<p class="text-xs text-slate mt-1">
Last export: <span class="font-semibold mono">{{ (serving_meta.exported_at_utc or '')[:19].replace('T', ' ') or '—' }}</span>
</p>
{% endif %}
{% if ex.status == 'failed' and ex.error %}
<details class="mt-2">
<summary class="text-xs text-danger cursor-pointer">Error</summary>
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-height:8rem;white-space:pre-wrap">{{ ex.error[:400] }}</pre>
</details>
{% endif %}
{% endif %}
<div class="mt-3">
<button type="button"
class="btn btn-sm"
{% if any_running %}disabled{% endif %}
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
hx-target="#pipeline-transform-content"
hx-swap="outerHTML"
hx-vals='{"step": "export", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Export serving tables (lakehouse → analytics.duckdb)?')) return false;">
Run Export
</button>
</div>
</div>
</div>
<!-- Run Full Pipeline -->
{% set pl = latest['run_pipeline'] %}
<div class="card mb-4">
<div class="flex items-center justify-between flex-wrap gap-3">
<div>
<p class="font-semibold text-navy text-sm">Full Pipeline</p>
<p class="text-xs text-slate mt-1">Runs extract → transform → export sequentially</p>
{% if pl %}
<p class="text-xs text-slate mono mt-1">
Last: {{ (pl.created_at or '')[:19] or '—' }}
{% if pl.status == 'complete' %}<span class="badge-success ml-2">Complete</span>{% endif %}
{% if pl.status == 'pending' %}<span class="badge-warning ml-2">Running…</span>{% endif %}
{% if pl.status == 'failed' %}<span class="badge-danger ml-2">Failed</span>{% endif %}
</p>
{% endif %}
</div>
<button type="button"
class="btn btn-sm"
{% if any_running %}disabled{% endif %}
hx-post="{{ url_for('pipeline.pipeline_trigger_transform') }}"
hx-target="#pipeline-transform-content"
hx-swap="outerHTML"
hx-vals='{"step": "pipeline", "csrf_token": "{{ csrf_token() }}"}'
onclick="if (!confirm('Run full ELT pipeline (extract → transform → export)?')) return false;">
Run Full Pipeline
</button>
</div>
</div>
<!-- Recent Runs -->
<div class="card">
<p class="card-header">Recent Runs</p>
{% if history %}
<div style="overflow-x:auto">
<table class="table" style="font-size:0.8125rem">
<thead>
<tr>
<th>#</th>
<th>Step</th>
<th>Started</th>
<th>Duration</th>
<th>Status</th>
<th>Error</th>
</tr>
</thead>
<tbody>
{% for row in history %}
<tr>
<td class="text-xs text-slate">{{ row.id }}</td>
<td class="mono text-xs">{{ row.task_name | replace('run_', '') }}</td>
<td class="mono text-xs text-slate">{{ (row.created_at or '')[:19] or '—' }}</td>
<td class="mono text-xs text-slate">{{ row.duration or '—' }}</td>
<td>
{% if row.status == 'complete' %}
<span class="badge-success">Complete</span>
{% elif row.status == 'failed' %}
<span class="badge-danger">Failed</span>
{% else %}
<span class="badge-warning">Running…</span>
{% endif %}
</td>
<td>
{% if row.error_short %}
<details>
<summary class="text-xs text-danger cursor-pointer">Error</summary>
<pre class="text-xs mt-1 p-2 bg-gray-50 rounded overflow-auto" style="max-width:24rem;white-space:pre-wrap">{{ row.error_short }}</pre>
</details>
{% else %}—{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<p class="text-sm text-slate">No transform runs yet.</p>
{% endif %}
</div>
</div>{# end #pipeline-transform-content #}

View File

@@ -1,5 +1,6 @@
{% if suppliers %}
<div class="card">
<div style="overflow-x:auto">
<table class="table">
<thead>
<tr>
@@ -47,6 +48,7 @@
{% endfor %}
</tbody>
</table>
</div>
</div>
{% else %}
<div class="card text-center" style="padding:2rem">

View File

@@ -4,8 +4,18 @@
{% block admin_head %}
<style>
.pipeline-stat-grid {
display: grid;
grid-template-columns: repeat(2, 1fr);
gap: 0.75rem;
}
@media (min-width: 768px) {
.pipeline-stat-grid { grid-template-columns: repeat(4, 1fr); }
}
.pipeline-tabs {
display: flex; gap: 0; border-bottom: 2px solid #E2E8F0; margin-bottom: 1.5rem;
overflow-x: auto; -webkit-overflow-scrolling: touch;
}
.pipeline-tabs button {
padding: 0.625rem 1.25rem; font-size: 0.8125rem; font-weight: 600;
@@ -23,7 +33,19 @@
.status-dot.failed { background: #EF4444; }
.status-dot.stale { background: #D97706; }
.status-dot.running { background: #3B82F6; }
@keyframes pulse-dot { 0%,100%{opacity:1} 50%{opacity:0.4} }
.status-dot.running { animation: pulse-dot 1.5s ease-in-out infinite; }
.status-dot.pending { background: #CBD5E1; }
.pipeline-two-col {
display: grid;
grid-template-columns: 1fr;
gap: 1rem;
}
@media (min-width: 640px) {
.pipeline-two-col { grid-template-columns: 1fr 1fr; }
}
</style>
{% endblock %}
@@ -34,10 +56,11 @@
<p class="text-sm text-slate mt-1">Extraction status, data catalog, and ad-hoc query editor</p>
</div>
<div class="flex gap-2">
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_extract') }}" class="m-0">
<form method="post" action="{{ url_for('pipeline.pipeline_trigger_transform') }}" class="m-0">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="step" value="pipeline">
<button type="button" class="btn btn-sm"
onclick="confirmAction('Enqueue a full extraction run? This will run all extractors in the background.', this.closest('form'))">
onclick="confirmAction('Run full ELT pipeline (extract → transform → export)? This runs in the background.', this.closest('form'))">
Run Pipeline
</button>
</form>
@@ -46,7 +69,7 @@
</header>
<!-- Health stat cards -->
<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:0.75rem" class="mb-6">
<div class="pipeline-stat-grid mb-6">
<div class="card text-center" style="padding:0.875rem">
<p class="text-xs text-slate">Total Runs</p>
<p class="text-2xl font-bold text-navy metric">{{ summary.total | default(0) }}</p>
@@ -97,6 +120,10 @@
hx-get="{{ url_for('pipeline.pipeline_lineage') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Lineage</button>
<button data-tab="transform"
hx-get="{{ url_for('pipeline.pipeline_transform') }}"
hx-target="#pipeline-tab-content" hx-swap="innerHTML"
hx-trigger="click">Transform</button>
</div>
<!-- Tab content (Overview loads on page load) -->

View File

@@ -123,17 +123,19 @@ async def get_table_columns(data_table: str) -> list[dict]:
async def fetch_template_data(
data_table: str,
order_by: str | None = None,
limit: int = 500,
limit: int = 0,
) -> list[dict]:
"""Fetch all rows from a DuckDB serving table."""
"""Fetch rows from a DuckDB serving table. limit=0 means all rows."""
assert "." in data_table, "data_table must be schema-qualified"
_validate_table_name(data_table)
order_clause = f"ORDER BY {order_by} DESC" if order_by else ""
return await fetch_analytics(
f"SELECT * FROM {data_table} {order_clause} LIMIT ?",
[limit],
)
if limit:
return await fetch_analytics(
f"SELECT * FROM {data_table} {order_clause} LIMIT ?",
[limit],
)
return await fetch_analytics(f"SELECT * FROM {data_table} {order_clause}")
async def count_template_data(data_table: str) -> int:
@@ -290,7 +292,7 @@ async def generate_articles(
start_date: date,
articles_per_day: int,
*,
limit: int = 500,
limit: int = 0,
base_url: str = "https://padelnomics.io",
task_id: int | None = None,
) -> int:

View File

@@ -218,9 +218,7 @@
.nav-bar[data-navopen="true"] .nav-mobile {
display: flex;
}
.nav-mobile a,
.nav-mobile button.nav-auth-btn,
.nav-mobile a.nav-auth-btn {
.nav-mobile a:not(.nav-auth-btn) {
display: block;
padding: 0.625rem 0;
border-bottom: 1px solid #F1F5F9;
@@ -230,15 +228,18 @@
text-decoration: none;
transition: color 0.15s;
}
.nav-mobile a:last-child { border-bottom: none; }
.nav-mobile a:hover { color: #1D4ED8; }
.nav-mobile a:not(.nav-auth-btn):last-child { border-bottom: none; }
.nav-mobile a:not(.nav-auth-btn):hover { color: #1D4ED8; }
/* nav-auth-btn in mobile menu: override block style, keep button colours */
.nav-mobile a.nav-auth-btn,
.nav-mobile button.nav-auth-btn {
display: inline-flex;
margin-top: 0.5rem;
padding: 6px 16px;
border-bottom: none;
width: auto;
align-self: flex-start;
color: #fff;
}
.nav-mobile .nav-mobile-section {
font-size: 0.6875rem;

View File

@@ -735,6 +735,107 @@ async def handle_run_extraction(payload: dict) -> None:
logger.info("Extraction completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_transform")
async def handle_run_transform(payload: dict) -> None:
"""Run SQLMesh transform (prod plan --auto-apply) in the background.
Shells out to `uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply`.
2-hour absolute timeout — same as extraction.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"],
capture_output=True,
text=True,
timeout=7200,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"SQLMesh transform failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("SQLMesh transform completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_export")
async def handle_run_export(payload: dict) -> None:
"""Export serving tables from lakehouse.duckdb → analytics.duckdb.
Shells out to `uv run python src/padelnomics/export_serving.py`.
10-minute absolute timeout.
"""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
result = await asyncio.to_thread(
subprocess.run,
["uv", "run", "python", "src/padelnomics/export_serving.py"],
capture_output=True,
text=True,
timeout=600,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Export failed (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info("Export completed: %s", result.stdout[-300:] if result.stdout else "(no output)")
@task("run_pipeline")
async def handle_run_pipeline(payload: dict) -> None:
"""Run full ELT pipeline: extract → transform → export, stopping on first failure."""
import subprocess
from pathlib import Path
repo_root = Path(__file__).resolve().parents[4]
steps = [
(
"extraction",
["uv", "run", "--package", "padelnomics_extract", "extract"],
7200,
),
(
"transform",
["uv", "run", "sqlmesh", "-p", "transform/sqlmesh_padelnomics", "plan", "prod", "--auto-apply"],
7200,
),
(
"export",
["uv", "run", "python", "src/padelnomics/export_serving.py"],
600,
),
]
for step_name, cmd, timeout_seconds in steps:
logger.info("Pipeline step starting: %s", step_name)
result = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
timeout=timeout_seconds,
cwd=str(repo_root),
)
if result.returncode != 0:
raise RuntimeError(
f"Pipeline failed at {step_name} (exit {result.returncode}): {result.stderr[:500]}"
)
logger.info(
"Pipeline step complete: %s%s",
step_name,
result.stdout[-200:] if result.stdout else "(no output)",
)
logger.info("Full pipeline complete (extract → transform → export)")
@task("generate_articles")
async def handle_generate_articles(payload: dict) -> None:
"""Generate articles from a template in the background."""
@@ -745,7 +846,7 @@ async def handle_generate_articles(payload: dict) -> None:
slug = payload["template_slug"]
start_date = date_cls.fromisoformat(payload["start_date"])
articles_per_day = payload.get("articles_per_day", 3)
limit = payload.get("limit", 500)
limit = payload.get("limit", 0)
task_id = payload.get("_task_id")
count = await generate_articles(

View File

@@ -500,3 +500,131 @@ class TestTieredCyclerNTier:
t.join()
assert errors == [], f"Thread safety errors: {errors}"
class TestTieredCyclerDeadProxyTracking:
"""Per-proxy dead tracking: individual proxies marked dead are skipped."""
def test_dead_proxy_skipped_in_next_proxy(self):
"""After a proxy hits the failure limit it is never returned again."""
tiers = [["http://dead", "http://live"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1)
# Mark http://dead as dead
cycler["record_failure"]("http://dead")
# next_proxy must always return the live one
for _ in range(6):
assert cycler["next_proxy"]() == "http://live"
def test_dead_proxy_count_increments(self):
tiers = [["http://a", "http://b", "http://c"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=2)
assert cycler["dead_proxy_count"]() == 0
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 0 # only 1 failure, limit is 2
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 1
cycler["record_failure"]("http://b")
cycler["record_failure"]("http://b")
assert cycler["dead_proxy_count"]() == 2
def test_auto_escalates_when_all_proxies_in_tier_dead(self):
"""If all proxies in the active tier are dead, next_proxy auto-escalates."""
tiers = [["http://t0a", "http://t0b"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1)
# Kill all proxies in tier 0
cycler["record_failure"]("http://t0a")
cycler["record_failure"]("http://t0b")
# next_proxy should transparently escalate and return tier 1 proxy
assert cycler["next_proxy"]() == "http://t1"
def test_auto_escalates_updates_active_tier_index(self):
"""Auto-escalation via dead proxies bumps active_tier_index."""
tiers = [["http://t0a", "http://t0b"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1)
cycler["record_failure"]("http://t0a")
cycler["record_failure"]("http://t0b")
cycler["next_proxy"]() # triggers auto-escalation
assert cycler["active_tier_index"]() == 1
def test_returns_none_when_all_tiers_exhausted_by_dead_proxies(self):
tiers = [["http://t0"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1)
cycler["record_failure"]("http://t0")
cycler["record_failure"]("http://t1")
assert cycler["next_proxy"]() is None
def test_record_success_resets_per_proxy_counter(self):
"""Success resets the failure count so proxy is not marked dead."""
tiers = [["http://a", "http://b"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=3)
# Two failures — not dead yet
cycler["record_failure"]("http://a")
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 0
# Success resets the counter
cycler["record_success"]("http://a")
# Two more failures — still not dead (counter was reset)
cycler["record_failure"]("http://a")
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 0
# Third failure after reset — now dead
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 1
def test_dead_proxy_stays_dead_after_success(self):
"""Once marked dead, a proxy is not revived by record_success."""
tiers = [["http://a", "http://b"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=1)
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 1
cycler["record_success"]("http://a")
assert cycler["dead_proxy_count"]() == 1
# http://a is still skipped
for _ in range(6):
assert cycler["next_proxy"]() == "http://b"
def test_backward_compat_no_proxy_url(self):
"""Calling record_failure/record_success without proxy_url still works."""
tiers = [["http://t0"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=2)
cycler["record_failure"]()
cycler["record_failure"]() # escalates
assert cycler["active_tier_index"]() == 1
cycler["record_success"]()
assert cycler["dead_proxy_count"]() == 0 # no per-proxy tracking happened
def test_proxy_failure_limit_zero_disables_per_proxy_tracking(self):
"""proxy_failure_limit=0 disables per-proxy dead tracking entirely."""
tiers = [["http://a", "http://b"]]
cycler = make_tiered_cycler(tiers, threshold=10, proxy_failure_limit=0)
for _ in range(100):
cycler["record_failure"]("http://a")
assert cycler["dead_proxy_count"]() == 0
def test_thread_safety_with_per_proxy_tracking(self):
"""Concurrent record_failure(proxy_url) calls don't corrupt state."""
import threading as _threading
tiers = [["http://t0a", "http://t0b", "http://t0c"], ["http://t1a"]]
cycler = make_tiered_cycler(tiers, threshold=50, proxy_failure_limit=5)
errors = []
lock = _threading.Lock()
def worker():
try:
for _ in range(30):
p = cycler["next_proxy"]()
if p is not None:
cycler["record_failure"](p)
cycler["record_success"](p)
except Exception as e:
with lock:
errors.append(e)
threads = [_threading.Thread(target=worker) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Thread safety errors: {errors}"