23 Commits

Author SHA1 Message Date
Deeman
51d9aab4a0 fix(supervisor): use version-sorted tag list for current_deployed_tag
All checks were successful
CI / test (push) Successful in 48s
CI / tag (push) Successful in 2s
git describe --exact-match returns the first tag alphabetically when multiple
tags point to the same commit. This caused an infinite redeploy loop when
Gitea CI created a sequential tag (v11) on the same commit as our date-based
tag (v202602281745) — v11 < v202602281745 alphabetically but the deploy check
uses version sort where v202602281745 > v11.

Fix: use git tag --points-at HEAD --sort=-version:refname to pick the
highest-version tag at HEAD, matching the sort order of latest_remote_tag().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 20:55:44 +01:00
Deeman
85b6aa0d0a fix(seeds): update init_landing_seeds.py to write JSONL format
All checks were successful
CI / test (push) Successful in 48s
CI / tag (push) Successful in 2s
Old script wrote blob json.gz seeds; staging models now only read jsonl.gz.
Seeds are empty JSONL gzip files — zero rows, satisfies DuckDB file-not-found check.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 18:50:51 +01:00
Deeman
e62aad148b fix(transform): remove blob CTE from stg_population_geonames
All checks were successful
CI / test (push) Successful in 49s
CI / tag (push) Successful in 2s
Server has cities_global.jsonl.gz (JSONL), not cities_global.json.gz (blob).
TigerStyle clean break — removed blob_rows CTE and UNION ALL.
Simplified to a single SELECT directly from read_json.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 18:40:15 +01:00
Deeman
6fb1e990e3 merge: three-tier proxy + daily tenants + staging model cleanup
All checks were successful
CI / test (push) Successful in 48s
CI / tag (push) Successful in 3s
2026-02-28 18:26:50 +01:00
Deeman
6edf8ba65e fix(transform): remove blob fallback CTEs, update tenants glob to daily partition depth
TigerStyle clean break — no backwards-compat shims for old file formats:

- stg_playtomic_{venues,opening_hours,resources}: glob updated from
  */*/tenants.jsonl.gz (2-level, old weekly) to */*/*/tenants.jsonl.gz
  (3-level, new daily YYYY/MM/DD partition); blob tenants.json.gz CTE removed
- stg_playtomic_availability: morning_blob and recheck_blob CTEs removed;
  only JSONL format (availability_*.jsonl.gz) is read going forward

Verified locally: stg_playtomic_venues evaluates to 14231 venues from
2026/02/28/tenants.jsonl.gz with 0 errors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 18:26:44 +01:00
Deeman
ed0a578050 add concurrency var
All checks were successful
CI / test (push) Successful in 49s
CI / tag (push) Successful in 3s
2026-02-28 18:20:52 +01:00
Deeman
c1cdeec6be fix(extract): default worker count to 200 when proxies configured
All checks were successful
CI / test (push) Successful in 49s
CI / tag (push) Successful in 3s
Previously fell back to len(tiers[0]) (e.g. 10 for Webshare) when
PROXY_CONCURRENCY was unset. Default is now MAX_PROXY_CONCURRENCY=200
so single-URL rotating proxies (DC/residential) run at full concurrency
without needing an explicit env var.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 18:06:55 +01:00
Deeman
710624f417 fix(supervisor): re-decrypt .env.prod.sops on tag deploy
All checks were successful
CI / test (push) Successful in 49s
CI / tag (push) Successful in 3s
git_pull_and_sync() was missing the sops decrypt step, so .env on the
server was never updated when secrets changed. Now decrypts after
checkout, before uv sync.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:57:32 +01:00
Deeman
6cf98f44d4 fix(transform): remove blob compat CTE from stg_tennis_courts
All checks were successful
CI / test (push) Successful in 49s
CI / tag (push) Successful in 3s
The overpass_tennis extractor has written JSONL-only since it was added.
The dual-format UNION ALL was backwards-compat debt that broke the
transform once no courts.json.gz files exist on the server:

  IO Error: No files found that match the pattern
  "data/landing/overpass_tennis/*/*/courts.json.gz"

Remove blob_elements CTE and the UNION ALL. Only read JSONL.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:39:11 +01:00
Deeman
60659a5ec5 merge: daily tenant snapshots with date-based partition 2026-02-28 17:30:33 +01:00
Deeman
beb4195f16 feat(extract): daily tenant snapshots with date-based partition
- playtomic_tenants: partition by YYYY/MM/DD instead of ISO week;
  schedule changed from weekly to daily in workflows.toml
- playtomic_availability: _load_tenant_ids now tries 3-level glob
  (*/*/*/tenants.jsonl.gz) first for daily files, falls back to
  2-level for old monthly/weekly data

Alphabetical sort would rank old monthly files above daily ones
('t' > '2' in ASCII), so the explicit fallback chain is required.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:27:16 +01:00
Deeman
88cc857f3a merge: weekly tenant snapshots via ISO week partition 2026-02-28 17:19:25 +01:00
Deeman
9116625884 feat(extract): weekly tenant snapshots via ISO week partition
Tenants extractor now partitions by ISO week (e.g. 2026/W09) instead of
month (2026/02), so each weekly run writes a fresh file rather than
skipping for the rest of the month.

_load_tenant_ids() in playtomic_availability already globs */*/tenants.jsonl.gz
and sorts reverse — 'W09' > '02' alphabetically so weekly files take priority
over old monthly ones automatically.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:19:19 +01:00
Deeman
1af65bb46f feat(extract): add PROXY_CONCURRENCY override for rotating single-URL proxies
When DC/residential tiers have a single rotating endpoint, worker_count
defaulted to 1 (one URL = one worker). PROXY_CONCURRENCY lets you set
an explicit thread count (e.g. 100) for providers that handle concurrent
connections on a single URL.

Capped at MAX_PROXY_CONCURRENCY=200 to avoid overloading the endpoint.
Falls back to len(tiers[0]) when unset (existing behaviour).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 17:06:53 +01:00
Deeman
9b0bfc478d merge: three-tier proxy system with Webshare auto-fetch 2026-02-28 17:00:10 +01:00
Deeman
adf22924f6 feat(extract): three-tier proxy system with Webshare auto-fetch
Replace two-tier proxy setup (PROXY_URLS / PROXY_URLS_FALLBACK) with
N-tier escalation: free → datacenter → residential.

- proxy.py: fetch_webshare_proxies() auto-fetches the Webshare download
  API on each run (no more stale manually-copied lists). load_proxy_tiers()
  assembles tiers from WEBSHARE_DOWNLOAD_URL, PROXY_URLS_DATACENTER,
  PROXY_URLS_RESIDENTIAL. make_tiered_cycler() generalised to list[list[str]]
  with N-level escalation; is_fallback_active() replaced by is_exhausted().
  Old load_proxy_urls() / load_fallback_proxy_urls() deleted.

- playtomic_availability.py: both extract() and extract_recheck() use
  load_proxy_tiers() + generalised cycler. _fetch_venues_parallel fallback_urls
  param removed. All is_fallback_active() checks → is_exhausted().

- playtomic_tenants.py: flattens tiers for simple round-robin.

- test_supervisor.py: TestLoadProxyUrls removed (function deleted).
  Added TestFetchWebshareProxies, TestLoadProxyTiers, TestTieredCyclerNTier
  (11 tests covering parse format, error handling, escalation, thread safety).

47 tests pass, ruff clean.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 16:57:07 +01:00
Deeman
09665b7786 update proxies 2026-02-28 16:51:40 +01:00
Deeman
93349923bd merge(better-alerts): improve supervisor alert messages 2026-02-28 12:27:14 +01:00
Deeman
642041b32b fix(supervisor): improve alert messages with category prefix and error snippet
Each alert now includes a neutral category tag ([extract], [transform],
[export], [deploy], [supervisor]) and the first line of the error, so
notifications are actionable without revealing tech stack details on the
public free ntfy tier.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 12:27:11 +01:00
Deeman
bb70a5372b docs: replace GitLab CI/CD section with Gitea pull-based deployment
All checks were successful
CI / test (push) Successful in 48s
CI / tag (push) Successful in 3s
Remove outdated SSH-push model referencing GitLab variables. Document
the actual pull-based flow: Gitea Actions → tag → supervisor polls.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 01:58:11 +01:00
Deeman
bc28d93662 fix: remove duplicate age key in .sops.yaml
All checks were successful
CI / test (push) Successful in 47s
CI / tag (push) Successful in 3s
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 18:30:31 +01:00
Deeman
81ce1d277a update key
Some checks failed
CI / test (push) Has been cancelled
CI / tag (push) Has been cancelled
2026-02-27 18:26:14 +01:00
Deeman
2012894eeb chore: migrate from GitLab to self-hosted Gitea
Some checks failed
CI / test (push) Has been cancelled
CI / tag (push) Has been cancelled
Update bootstrap_supervisor.sh and setup_server.sh to use
git.padelnomics.io:2222 instead of gitlab.com.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-27 18:23:20 +01:00
19 changed files with 541 additions and 477 deletions

View File

@@ -56,9 +56,10 @@ WORKFLOWS_PATH=ENC[AES256_GCM,data:PehxEUMb1K3F1557BY3IqKD7sbJcoaIjnQvboBRJ1g==,
ALERT_WEBHOOK_URL=
NTFY_TOKEN=
#ENC[AES256_GCM,data:BCyQYjRnTx8yW9A=,iv:4OPCP+xzRLUJrpoFewVnbZRKnZH4sAbV76SM//2k5wU=,tag:HxwEp7VFVZUN/VjPiL/+Vw==,type:comment]
PROXY_URLS=ENC[AES256_GCM,data:CzRaK0piUQfvuYYsdz0i2MEQIphKi0BhNvHw9alo46aTH+kqEKvoS7dKEKzyU9VJ4TyNweInlVMxB962DsvRoBtnHwo/pUmYtVeEr2881clNgEiZVYRDFRdEbpULcLPDJa3ey1leqAAHlmiL0RQ6Qa57gPCOVBzVG6npGLKO+K8XVIb+BZMs9kEUOlw7iuqTJW5xPN/t4X/jHidEqfTSAl9b4vU4bsYVuY3yQrL+/V5QpTbyXlf+cMq3flpA3zE2Fxhalzg+c/wHMTrCksFwrCkrInW0kY9yPkA7usUWr1xwwaV3wIDoNQsLXpMd/3RztipNvKtOMRhRJOmjzP7BKhCJvvvKTV5p+mBCulFijbMQgArg3BqcFanfw3YZ4wPd4hp8q/vOhE/U9Wu0yrMmyWYFHYGQnFVARlBH7pwn/ez8W4KqRFveEAuev9CE7K7s5RqzPLelSkoa9UuiiULJ+t0LFgKlgxuLtQ8GdFdgsmBCxY/4U/xzvNdC82hD549z5nMWWlaUJm4onPWirT/RYm7j3v6z4mmNImI2W6rCNbvEvsXwWsciquVaBIgReA47p6/GTzZ9VZMyGr4PdzB87BJGAgX1W57WNdPAsRIF49XP2BU72RtRFxsUG8Ha2dc=,iv:a10Vpk7Zv8QqORuEcMlpcvtHO/zjBLaFphWPYBXwysc=,tag:8N66/R+CLqEZ45wj+tCt6w==,type:str]
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:YWM=,iv:iY5+uMazLAFdwyLT7Gr7MaF1QHBIgHuoi6nF2VbSsOA=,tag:dc6AmuJdTQ55gVe16uzs6A==,type:str]
PROXY_URLS_FALLBACK=ENC[AES256_GCM,data:95rwI7kKUj1YxLpjChtrM4f2EFUDzQdAg1e1MOHnLwQ9ZY54UNH7v4JcqTsvDk9D+0N/BIdwFSDi7pnCSd6BWFV+cQ==,iv:rm9HdBsibSne7JR6vWl+ao/GHb1rbuVdZZDUWhVbTnE=,tag:NJ2STxmFZPvFayfTrEEYbg==,type:str]
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:lfmlsjXFtL+zo40SNFLiFKaZiYvE7CNH+zRwjMK5pqPfCs0TlMX+Y9e1KmzAS+y/cI69TP5sgMPRBzER0Jn7RvH0KA==,iv:jBN/4/K5L5886G4rSzxt8V8u/57tAuj3R76haltzqeU=,tag:Xe6o9eg2PodfktDqmLgVNA==,type:str]
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:X6xpxz5u8Xh3OXjkIz3UwqH847qLvY9cVWVktW5B+lqhmXAKTzoTzHds8vlRGJf5Up9Yx44XcigbvuK33ZJDSq9ovkAIbY55OK4=,iv:3hHyFD+H9HMzQ/27bPjGr59+7yWmEneUdN9XPQasCig=,tag:oBXsSuV5idB7HqNrNOruwg==,type:str]
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:1D9VRZ3MCXPQWfiMH8+CLcrxeYnVVcQgZDvt5kltvbSTuSHQ2hHDmZpBkTOMIBJnw4JLZ2JQKHgG4OaYDtsM2VltFPnfwaRgVI9G5PSenR3o4PeQmYO1AqWOmjn19jPxNXRhEXdupP9UT+xQNXoBJsl6RR20XOpMA5AipUHmSjD0UIKXoZLU,iv:uWUkAydac//qrOTPUThuOLKAKXK4xcZmK9qBVFwpqt4=,tag:1vYhukBW9kEuSXCLAiZZmQ==,type:str]
CIRCUIT_BREAKER_THRESHOLD=
#ENC[AES256_GCM,data:ZcX/OEbrMfKizIQYq3CYGnvzeTEX7KsmQaz2+Jj1rG5tbTy2aljQBIEkjtiwuo8NsNAD+FhIGRGVfBmKe1CAKME1MuiCbgSG,iv:4BSkeD3jZFawP09qECcqyuiWcDnCNSgbIjBATYhazq4=,tag:Ep1d2Uk700MOlWcLWaQ/ig==,type:comment]
GSC_SERVICE_ACCOUNT_PATH=
@@ -70,7 +71,7 @@ GEONAMES_USERNAME=ENC[AES256_GCM,data:aSkVdLNrhiF6tlg=,iv:eemFGwDIv3EG/P3lVHGZj9
CENSUS_API_KEY=ENC[AES256_GCM,data:qqG971573aGq9MiHI2xLlanKKFwjfcNNoMXtm8LNbyh0rMbQN2XukQ==,iv:az2i0ldH75nHGah4DeOxaXmDbVYqmC1c77ptZqFA9BI=,tag:zoDdKj9bR7fgIDo1/dEU2g==,type:str]
sops_age__list_0__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBxNWNmUzVNUGdWRnE0ZFpF\nM0JQZWZ3UDdEVzlwTmIxakxOZXBkT2x2ZlNrClRtV2M3S2daSGxUZmFDSWQ2Nmh4\neU51QndFcUxlSE00RFovOVJTcDZmUUUKLS0tIDcvL3hRMDRoMWZZSXljNzA3WG5o\nMWFic21MV0krMzlIaldBTVU0ZDdlTE0K7euGQtA+9lHNws+x7TMCArZamm9att96\nL8cXoUDWe5fNI5+M1bXReqVfNwPTwZsV6j/+ZtYKybklIzWz02Ex4A==\n-----END AGE ENCRYPTED FILE-----\n
sops_age__list_0__map_recipient=age1f5002gj4s78jju45jd28kuejtcfhn5cdujz885fl7z2p9ym68pnsgky87a
sops_lastmodified=2026-02-26T14:31:14Z
sops_mac=ENC[AES256_GCM,data:iqFuTexTS9U/Nv8xoTpHljTNQTGX9ITcJ3AjhDEtxrh0Z9/lngfBvGtjiKmpwFGlobQw/x+/YLM+u3MhciwXF7qNwFfJ/StN2Y66uF71SxWotbL70Dxl4oWSVL3sU+2NYbw5yP0p+xCbE+rEd5SqAe6K5yyq5X25hz8fIapxlYA=,iv:foqoWQVMipuOAQ0Kp799PaIhCIrxV8T5cC811wIzxR8=,tag:yNfxSV3R21XEXksjmdsKBw==,type:str]
sops_lastmodified=2026-02-28T15:50:46Z
sops_mac=ENC[AES256_GCM,data:HiLZTLa+p3mqa4hw+tKOK27F/bsJOy4jmDi8MHToi6S7tRfBA/TzcEzXvXUIkkwAixN73NQHvBVeRnbcEsApVpkaxH1OqnjvvyT+B3YFkTEtxczaKGWlCvbqFZNmXYsFvGR9njaWYWsTQPkRIjrroXrSrhr7uxC8F40v7ByxJKo=,iv:qj2IpzWRIh/mM1HtjjkNbyFuhtORKXslVnf/vdEC9Uw=,tag:fr9CZsL74HxRJLXn9eS0xQ==,type:str]
sops_unencrypted_suffix=_unencrypted
sops_version=3.12.1

View File

@@ -43,7 +43,10 @@ ALERT_WEBHOOK_URL=ENC[AES256_GCM,data:4sXQk8zklruC525J279TUUatdDJQ43qweuoPhtpI82
NTFY_TOKEN=ENC[AES256_GCM,data:YlOxhsRJ8P1y4kk6ugWm41iyRCsM6oAWjvbU9lGcD0A=,iv:JZXOvi3wTOPV9A46c7fMiqbszNCvXkOgh9i/H1hob24=,tag:8xnPimgy7sesOAnxhaXmpg==,type:str]
SUPERVISOR_GIT_PULL=ENC[AES256_GCM,data:mg==,iv:KgqMVYj12FjOzWxtA1T0r0pqCDJ6MtHzMjE+4W/W+s4=,tag:czFaOqhHG8nqrQ8AZ8QiGw==,type:str]
#ENC[AES256_GCM,data:hzAZvCWc4RTk290=,iv:RsSI4OpAOQGcFVpfXDZ6t705yWmlO0JEWwWF5uQu9As=,tag:UPqFtA2tXiSa0vzJAv8qXg==,type:comment]
PROXY_URLS=ENC[AES256_GCM,data:nm4B++SkZZgN3p2xru3WrpVA0X6O8yvb45tH/ovF4006zBy28xqVxbsd44Mz6b5FMinjOXRmGwoI/GDWmdJLzBYdpryQ/FhpbzSUpr1ZOjOz+7P0vn2jfBGAB8ksU3i5kuYglud3EyQGFL+v+uooxwrIUCjfzmmB4vCmf7phssKDsK1CqzmdZ1c54ehSu4bRRdmGp9d0+r+j1SpXb/JbZ8LTqUIhLlZXrHFqkCfN1czhFK9IwMVgR00Q4v2YkjaRBME4lVqwk1NwwatbS9Fq8LlzwuT1uKk+T6ZDkFKC8ZoPW1YRqF13X7hFGFXCNRqABRDZ45lqxYQbBoRrWmH2tfMiAmTrIuRsdPM8bZ/Ol5mXSDhs0HyWX2urX+LD65rIOO0zN/lwjXSwh5mwwBdB61akdzsWRyLZsdafuQUmgGul8y0eGMEbFWaty3bdrtAmqtsvHwxD/Dp/gQWScESXvPd1arn55zaXmefOy+ZLwcmx+FAJPpTMXRaq6Y/Z+D1PZZ+Uhu2D6tsAR4VvqqwlUgpsrAFXk6chJzOry8rmmxoMuIj9mXfjG+BqPFhV2oQsKSuIqFQqd/ZidJLO8ZSxA7L+h1eH4cQjcUd2nfzroG8nnKZ+cA8hQMfLuFiMY1I=,iv:nTaNQlC3px/lnodLphnILWbPVnelaUKKOZAFAaHi8MU=,tag:TYkIX1nrc+PKbvvnWYcvbg==,type:str]
PROXY_URLS_RESIDENTIAL=ENC[AES256_GCM,data:x/F0toXDc8stsUNxaepCmxq1+WuacqqPtdc+R5mxTwcAzsKxCdwt8KpBZWMvz7ku4tHDGsKD949QAX2ANXP9oCMTgW0=,iv:6G9gE9/v7GaYj8aqVTmMrpw6AcQK9yMSCAohNdAD1Ws=,tag:2Jimr1ldVSfkh8LPEwdN3w==,type:str]
PROXY_URLS_DATACENTER=ENC[AES256_GCM,data:6BfXBYmyHpgZU/kJWpZLf8eH5VowVK1n0r6GzFTNAx/OmyaaS1RZVPC1JPkPBnTwEmo0WHYRW8uiUdkABmH9F5ZqqlsAesyfW7zvU9r7yD+D7w==,iv:3CBn2qCoTueQy8xVcQqZS4E3F0qoFYnNbzTZTpJ1veo=,tag:wC3Ecl4uNTwPiT23ATvRZg==,type:str]
WEBSHARE_DOWNLOAD_URL=ENC[AES256_GCM,data:/N77CFf6tJWCk7HrnBOm2Q1ynx7XoblzfbzJySeCjrxqiu4r+CB90aDkaPahlQKI00DUZih3pcy7WhnjdAwI30G5kJZ3P8H8/R0tP7OBK1wPVbsJq8prQJPFOAWewsS4KWNtSURZPYSCxslcBb7DHLX6ZAjv6A5KFOjRK2N8usR9sIabrCWh,iv:G3Ropu/JGytZK/zKsNGFjjSu3Wt6fvHaAqI9RpUHvlI=,tag:fv6xuS94OR+4xfiyKrYELA==,type:str]
PROXY_CONCURRENCY=ENC[AES256_GCM,data:vdEZ,iv:+eTNQO+s/SsVDBLg1/+fneMzEEsFkuEFxo/FcVV+mWc=,tag:i/EPwi/jOoWl3xW8H0XMdw==,type:str]
RECHECK_WINDOW_MINUTES=ENC[AES256_GCM,data:L2s=,iv:fV3mCKmK5fxUmIWRePELBDAPTb8JZqasVIhnAl55kYw=,tag:XL+PO6sblz/7WqHC3dtk1w==,type:str]
#ENC[AES256_GCM,data:RC+t2vqLwLjapdAUql8rQls=,iv:Kkiz3ND0g0MRAgcPJysIYMzSQS96Rq+3YP5yO7yWfIY=,tag:Y6TbZd81ihIwn+U515qd1g==,type:comment]
GSC_SERVICE_ACCOUNT_PATH=ENC[AES256_GCM,data:Vki6yHk+gd4n,iv:rxzKvwrGnAkLcpS41EZ097E87NrIpNZGFfl4iXFvr40=,tag:EZkBJpCq5rSpKYVC4H3JHQ==,type:str]
@@ -59,7 +62,7 @@ sops_age__list_1__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb2
sops_age__list_1__map_recipient=age1wjepykv3glvsrtegu25tevg7vyn3ngpl607u3yjc9ucay04s045s796msw
sops_age__list_2__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBFeHhaOURNZnRVMEwxNThu\nUjF4Q0kwUXhTUE1QSzZJbmpubnh3RnpQTmdvCjRmWWxpNkxFUmVGb3NRbnlydW5O\nWEg3ZXJQTU4vcndzS2pUQXY3Q0ttYjAKLS0tIE9IRFJ1c2ZxbGVHa2xTL0swbGN1\nTzgwMThPUDRFTWhuZHJjZUYxOTZrU00KY62qrNBCUQYxwcLMXFEnLkwncxq3BPJB\nKm4NzeHBU87XmPWVrgrKuf+PH1mxJlBsl7Hev8xBTy7l6feiZjLIvQ==\n-----END AGE ENCRYPTED FILE-----\n
sops_age__list_2__map_recipient=age1c783ym2q5x9tv7py5d28uc4k44aguudjn03g97l9nzs00dd9tsrqum8h4d
sops_lastmodified=2026-02-26T14:32:28Z
sops_mac=ENC[AES256_GCM,data:pyHQHwTtjh7OLiMqbqhUjfrmetEtYS7yB342C/TWfDCwEotWLVwnGWlC4+HIl53pw9+3AgoBVRnW0t86e4kG9O8KyHnk68S9qBcpUsybW3lyGPNXmBydv1W9gQHuK8f/4WGIbkhNxyIToKg9ZAmYWFxNhRKSoYKm5P9Uh7B7CF4=,iv:syrX8VdL3JsDsawvFWbX04Ygcr18hjSSHfEwHkyKETk=,tag:qrhWkh/e+21OKGU2+rCeyg==,type:str]
sops_lastmodified=2026-02-28T17:03:44Z
sops_mac=ENC[AES256_GCM,data:IQ9jpRxVUssaMK+qFcM3nPdzXHkiqp6E+DhEey1TfqUu5GCBNsWeVy9m9A6p9RWhu2NtJV7aKdUeqneuMtD1q5Tnm6L96zuyot2ESnx2N2ssD9ilrDauQxoBJcrJVnGV61CgaCz9458w8BuVUZydn3MoHeRaU7bOBBzQlTI6vZk=,iv:qHqdt3av/KZRQHr/OS/9KdAJUgKlKEDgan7qI3Zzkck=,tag:fOvdO9iRTTF1Siobu2mLqg==,type:str]
sops_unencrypted_suffix=_unencrypted
sops_version=3.12.1

View File

@@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased]
### Added
- **Three-tier proxy system** for extraction pipeline: free (Webshare auto-fetched) → datacenter (`PROXY_URLS_DATACENTER`) → residential (`PROXY_URLS_RESIDENTIAL`). Webshare free proxies are now auto-fetched from their download API on each run — no more manually copying stale proxy lists.
- `proxy.py`: added `fetch_webshare_proxies()` (stdlib urllib, bounded read + timeout), `load_proxy_tiers()` (assembles N tiers from env), generalised `make_tiered_cycler()` to accept `list[list[str]]` with N-level escalation. Exposes `is_exhausted()`, `active_tier_index()`, `tier_count()`.
- `playtomic_availability.py`: both `extract()` and `extract_recheck()` now use `load_proxy_tiers()` + N-tier cycler. `_fetch_venues_parallel` `fallback_urls` param removed. `is_fallback_active()` replaced by `is_exhausted()`.
- `playtomic_tenants.py`: uses `load_proxy_tiers()` flattened for simple round-robin.
### Changed
- **Env vars renamed** (breaking): `PROXY_URLS` → removed, `PROXY_URLS_FALLBACK` → removed. New vars: `WEBSHARE_DOWNLOAD_URL`, `PROXY_URLS_DATACENTER`, `PROXY_URLS_RESIDENTIAL`.
### Added
- **Phase 2a — NUTS-1 regional income differentiation** (`opportunity_score`): Munich and Berlin no longer share the same income figure as Chemnitz.
- `eurostat.py`: added `nama_10r_2hhinc` dataset config (NUTS-2 cube with NUTS-1 entries); filter params now appended to API URL so the server pre-filters the large cube before download (also makes `ilc_di03` requests smaller).

View File

@@ -396,18 +396,19 @@ docker compose logs -f app # tail logs
## CI/CD
Go to GitLab → padelnomics → Settings → CI/CD → Variables and add:
Pull-based deployment via Gitea Actions — no SSH keys or deploy credentials in CI.
| Variable | Value | Notes |
|----------|-------|-------|
| SSH_PRIVATE_KEY | Your ed25519 private key | Mask it, type "Variable" |
| DEPLOY_HOST | Your Hetzner server IP | e.g. 1.2.3.4 |
| DEPLOY_USER | SSH username on the server | e.g. deploy or root |
| SSH_KNOWN_HOSTS | Server host key | Run `ssh-keyscan $YOUR_SERVER_IP` |
1. Push to master → Gitea Actions runs tests (`.gitea/workflows/ci.yaml`)
2. On success, CI creates tag `v<run_number>` using the built-in `github.token`
3. On-server supervisor polls for new tags every 60s and deploys automatically
Server-side one-time setup:
1. Add the matching public key to `~/.ssh/authorized_keys` for the deploy user
2. Clone the repo to `/opt/padelnomics`
3. Create `.env` from `padelnomics/.env.example` with production values
4. `chmod +x deploy.sh && ./deploy.sh` for the first deploy
5. Point reverse proxy to port 5000
**Server-side one-time setup:**
```bash
bash infra/setup_server.sh # creates padelnomics_service user, keys, dirs
ssh root@<server> 'bash -s' < infra/bootstrap_supervisor.sh
```
1. `setup_server.sh` generates an ed25519 SSH deploy key — add the printed public key to Gitea:
`git.padelnomics.io → padelnomics → Settings → Deploy Keys → Add key (read-only)`
2. Add the printed age public key to `.sops.yaml`, re-encrypt, commit + push
3. Run `bootstrap_supervisor.sh` — clones from `git.padelnomics.io:2222`, decrypts secrets, starts systemd supervisor

View File

@@ -33,7 +33,7 @@ from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_fallback_proxy_urls, load_proxy_urls, make_tiered_cycler
from .proxy import load_proxy_tiers, make_tiered_cycler
from .utils import (
compress_jsonl_atomic,
flush_partial_batch,
@@ -52,6 +52,9 @@ MAX_VENUES_PER_RUN = 20_000
MAX_RETRIES_PER_VENUE = 2
RECHECK_WINDOW_MINUTES = int(os.environ.get("RECHECK_WINDOW_MINUTES", "30"))
CIRCUIT_BREAKER_THRESHOLD = int(os.environ.get("CIRCUIT_BREAKER_THRESHOLD") or "10")
# Worker count: defaults to MAX_PROXY_CONCURRENCY (200). Override via PROXY_CONCURRENCY env var.
_PROXY_CONCURRENCY = os.environ.get("PROXY_CONCURRENCY", "").strip()
MAX_PROXY_CONCURRENCY = 200
# Parallel mode submits futures in batches so the circuit breaker can stop
# new submissions after it opens. Already-inflight futures in the current
@@ -76,8 +79,10 @@ def _load_tenant_ids(landing_dir: Path) -> list[str]:
if not playtomic_dir.exists():
return []
# Prefer JSONL (new format), fall back to blob (old format)
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
# Prefer daily partition (YYYY/MM/DD), fall back to older monthly/weekly partitions
tenant_files = sorted(playtomic_dir.glob("*/*/*/tenants.jsonl.gz"), reverse=True)
if not tenant_files:
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.jsonl.gz"), reverse=True)
if not tenant_files:
tenant_files = sorted(playtomic_dir.glob("*/*/tenants.json.gz"), reverse=True)
if not tenant_files:
@@ -190,14 +195,13 @@ def _fetch_venues_parallel(
start_max_str: str,
worker_count: int,
cycler: dict,
fallback_urls: list[str],
on_result=None,
) -> tuple[list[dict], int]:
"""Fetch availability for multiple venues in parallel.
Submits futures in batches of PARALLEL_BATCH_SIZE. After each batch
completes, checks the circuit breaker: if it opened and there is no
fallback configured, stops submitting further batches.
completes, checks the circuit breaker: if all proxy tiers are exhausted,
stops submitting further batches.
on_result: optional callable(result: dict) invoked inside the lock for
each successful result — used for incremental partial-file flushing.
@@ -215,10 +219,10 @@ def _fetch_venues_parallel(
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for batch_start in range(0, len(tenant_ids), PARALLEL_BATCH_SIZE):
# Stop submitting new work if circuit is open with no fallback
if cycler["is_fallback_active"]() and not fallback_urls:
# Stop submitting new work if all proxy tiers are exhausted
if cycler["is_exhausted"]():
logger.error(
"Circuit open with no fallback — stopping after %d/%d venues",
"All proxy tiers exhausted — stopping after %d/%d venues",
completed_count, len(tenant_ids),
)
break
@@ -294,10 +298,9 @@ def extract(
venues_to_process = [tid for tid in all_venues_to_process if tid not in already_done]
# Set up tiered proxy cycler with circuit breaker
proxy_urls = load_proxy_urls()
fallback_urls = load_fallback_proxy_urls()
worker_count = len(proxy_urls) if proxy_urls else 1
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
tiers = load_proxy_tiers()
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else (MAX_PROXY_CONCURRENCY if tiers else 1)
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
start_min_str = start_min.strftime("%Y-%m-%dT%H:%M:%S")
start_max_str = start_max.strftime("%Y-%m-%dT%H:%M:%S")
@@ -325,9 +328,9 @@ def extract(
venues_errored = 0
if worker_count > 1:
logger.info("Parallel mode: %d workers, %d proxies", worker_count, len(proxy_urls))
logger.info("Parallel mode: %d workers, %d tier(s)", worker_count, len(tiers))
new_venues_data, venues_errored = _fetch_venues_parallel(
venues_to_process, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
venues_to_process, start_min_str, start_max_str, worker_count, cycler,
on_result=_on_result,
)
else:
@@ -342,9 +345,9 @@ def extract(
_on_result(result)
else:
venues_errored += 1
circuit_opened = cycler["record_failure"]()
if circuit_opened and not fallback_urls:
logger.error("Circuit open with no fallback — writing partial results")
cycler["record_failure"]()
if cycler["is_exhausted"]():
logger.error("All proxy tiers exhausted — writing partial results")
break
if (i + 1) % 100 == 0:
@@ -485,14 +488,13 @@ def extract_recheck(
start_max_str = window_end.strftime("%Y-%m-%dT%H:%M:%S")
# Set up tiered proxy cycler with circuit breaker
proxy_urls = load_proxy_urls()
fallback_urls = load_fallback_proxy_urls()
worker_count = len(proxy_urls) if proxy_urls else 1
cycler = make_tiered_cycler(proxy_urls, fallback_urls, CIRCUIT_BREAKER_THRESHOLD)
tiers = load_proxy_tiers()
worker_count = min(int(_PROXY_CONCURRENCY), MAX_PROXY_CONCURRENCY) if _PROXY_CONCURRENCY else (MAX_PROXY_CONCURRENCY if tiers else 1)
cycler = make_tiered_cycler(tiers, CIRCUIT_BREAKER_THRESHOLD)
if worker_count > 1 and len(venues_to_recheck) > 10:
venues_data, venues_errored = _fetch_venues_parallel(
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler, fallback_urls,
venues_to_recheck, start_min_str, start_max_str, worker_count, cycler,
)
else:
venues_data = []
@@ -504,9 +506,9 @@ def extract_recheck(
cycler["record_success"]()
else:
venues_errored += 1
circuit_opened = cycler["record_failure"]()
if circuit_opened and not fallback_urls:
logger.error("Circuit open with no fallback — writing partial recheck results")
cycler["record_failure"]()
if cycler["is_exhausted"]():
logger.error("All proxy tiers exhausted — writing partial recheck results")
break
# Write recheck file as JSONL — one venue per line with metadata injected

View File

@@ -25,12 +25,13 @@ import json
import sqlite3
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import UTC, datetime
from pathlib import Path
import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_proxy_urls, make_round_robin_cycler
from .proxy import load_proxy_tiers, make_round_robin_cycler
from .utils import compress_jsonl_atomic, landing_path
logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -69,25 +70,31 @@ def _fetch_pages_parallel(pages: list[int], next_proxy) -> list[tuple[int, list[
def extract(
landing_dir: Path,
year_month: str,
year_month: str, # noqa: ARG001 — unused; tenants uses ISO week partition instead
conn: sqlite3.Connection,
session: niquests.Session,
) -> dict:
"""Fetch all Playtomic venues via global pagination. Returns run metrics."""
year, month = year_month.split("/")
dest_dir = landing_path(landing_dir, "playtomic", year, month)
"""Fetch all Playtomic venues via global pagination. Returns run metrics.
Partitioned by ISO week (e.g. 2026/W09) so each weekly run produces a
fresh file. _load_tenant_ids() in playtomic_availability globs across all
partitions and picks the most recent one.
"""
today = datetime.now(UTC)
year, month, day = today.strftime("%Y"), today.strftime("%m"), today.strftime("%d")
dest_dir = landing_path(landing_dir, "playtomic", year, month, day)
dest = dest_dir / "tenants.jsonl.gz"
old_blob = dest_dir / "tenants.json.gz"
if dest.exists() or old_blob.exists():
logger.info("Already have tenants for %s/%s — skipping", year, month)
if dest.exists():
logger.info("Already have tenants for %s/%s/%s — skipping", year, month, day)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
proxy_urls = load_proxy_urls()
next_proxy = make_round_robin_cycler(proxy_urls) if proxy_urls else None
batch_size = len(proxy_urls) if proxy_urls else 1
tiers = load_proxy_tiers()
all_proxies = [url for tier in tiers for url in tier]
next_proxy = make_round_robin_cycler(all_proxies) if all_proxies else None
batch_size = len(all_proxies) if all_proxies else 1
if next_proxy:
logger.info("Parallel mode: %d pages per batch (%d proxies)", batch_size, len(proxy_urls))
logger.info("Parallel mode: %d pages per batch (%d proxies across %d tier(s))", batch_size, len(all_proxies), len(tiers))
else:
logger.info("Serial mode: 1 page at a time (no proxies)")
@@ -154,7 +161,7 @@ def extract(
"files_written": 1,
"files_skipped": 0,
"bytes_written": bytes_written,
"cursor_value": year_month,
"cursor_value": f"{year}/{month}/{day}",
}

View File

@@ -1,41 +1,97 @@
"""Optional proxy rotation for parallel HTTP fetching.
Proxies are configured via the PROXY_URLS environment variable (comma-separated).
When unset, all functions return None/no-op — extractors fall back to direct requests.
Proxies are configured via environment variables. When unset, all functions
return None/no-op — extractors fall back to direct requests.
Tiered proxy with circuit breaker:
Primary tier (PROXY_URLS) is used by default — typically cheap datacenter proxies.
Fallback tier (PROXY_URLS_FALLBACK) activates once consecutive failures >= threshold.
Once the circuit opens it stays open for the duration of the run (no auto-recovery).
Three-tier escalation: free → datacenter → residential.
Tier 1 (free): WEBSHARE_DOWNLOAD_URL — auto-fetched from Webshare API
Tier 2 (datacenter): PROXY_URLS_DATACENTER — comma-separated paid DC proxies
Tier 3 (residential): PROXY_URLS_RESIDENTIAL — comma-separated paid residential proxies
Tiered circuit breaker:
Active tier is used until consecutive failures >= threshold, then escalates
to the next tier. Once all tiers are exhausted, is_exhausted() returns True.
Escalation is permanent for the duration of the run — no auto-recovery.
"""
import itertools
import logging
import os
import threading
import urllib.error
import urllib.request
logger = logging.getLogger(__name__)
MAX_WEBSHARE_PROXIES = 20
WEBSHARE_FETCH_TIMEOUT_SECONDS = 10
WEBSHARE_MAX_RESPONSE_BYTES = 1024 * 1024 # 1MB
def load_proxy_urls() -> list[str]:
"""Read PROXY_URLS env var (comma-separated). Returns [] if unset.
Format: http://user:pass@host:port or socks5://host:port
def fetch_webshare_proxies(download_url: str, max_proxies: int = MAX_WEBSHARE_PROXIES) -> list[str]:
"""Fetch proxy list from the Webshare download API. Returns [] on any error.
Expected line format: ip:port:username:password
Converts to: http://username:password@ip:port
Bounded: reads at most WEBSHARE_MAX_RESPONSE_BYTES, returns at most max_proxies.
"""
raw = os.environ.get("PROXY_URLS", "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
assert max_proxies > 0, f"max_proxies must be positive, got {max_proxies}"
assert download_url, "download_url must not be empty"
try:
req = urllib.request.Request(
download_url,
headers={"User-Agent": "padelnomics-extract/1.0"},
)
with urllib.request.urlopen(req, timeout=WEBSHARE_FETCH_TIMEOUT_SECONDS) as resp:
raw = resp.read(WEBSHARE_MAX_RESPONSE_BYTES).decode("utf-8")
except Exception as e:
logger.warning("Failed to fetch Webshare proxies: %s", e)
return []
urls = []
for line in raw.splitlines():
line = line.strip()
if not line:
continue
parts = line.split(":")
if len(parts) != 4:
logger.debug("Skipping malformed proxy line: %r", line)
continue
ip, port, username, password = parts
urls.append(f"http://{username}:{password}@{ip}:{port}")
if len(urls) >= max_proxies:
break
logger.info("Fetched %d proxies from Webshare", len(urls))
return urls
def load_fallback_proxy_urls() -> list[str]:
"""Read PROXY_URLS_FALLBACK env var (comma-separated). Returns [] if unset.
def load_proxy_tiers() -> list[list[str]]:
"""Assemble proxy tiers in escalation order: free → datacenter → residential.
Used as the residential/reliable fallback tier when the primary tier fails.
Format: http://user:pass@host:port or socks5://host:port
Tier 1 (free): fetched from WEBSHARE_DOWNLOAD_URL if set.
Tier 2 (datacenter): PROXY_URLS_DATACENTER (comma-separated).
Tier 3 (residential): PROXY_URLS_RESIDENTIAL (comma-separated).
Empty tiers are omitted. Returns [] if no proxies configured anywhere.
"""
raw = os.environ.get("PROXY_URLS_FALLBACK", "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
return urls
tiers: list[list[str]] = []
webshare_url = os.environ.get("WEBSHARE_DOWNLOAD_URL", "").strip()
if webshare_url:
free_proxies = fetch_webshare_proxies(webshare_url)
if free_proxies:
tiers.append(free_proxies)
for var in ("PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
raw = os.environ.get(var, "")
urls = [u.strip() for u in raw.split(",") if u.strip()]
if urls:
tiers.append(urls)
return tiers
def make_round_robin_cycler(proxy_urls: list[str]):
@@ -78,83 +134,96 @@ def make_sticky_selector(proxy_urls: list[str]):
return select_proxy
def make_tiered_cycler(
primary_urls: list[str],
fallback_urls: list[str],
threshold: int,
) -> dict:
"""Thread-safe tiered proxy cycler with circuit breaker.
def make_tiered_cycler(tiers: list[list[str]], threshold: int) -> dict:
"""Thread-safe N-tier proxy cycler with circuit breaker.
Uses primary_urls until consecutive failures >= threshold, then switches
permanently to fallback_urls for the rest of the run. No auto-recovery —
once the circuit opens it stays open to avoid flapping.
Uses tiers[0] until consecutive failures >= threshold, then escalates
to tiers[1], then tiers[2], etc. Once all tiers are exhausted,
is_exhausted() returns True and next_proxy() returns None.
Failure counter resets on each escalation — the new tier gets a fresh start.
Once exhausted, further record_failure() calls are no-ops.
Returns a dict of callables:
next_proxy() -> str | None — returns URL from the active tier
record_success() — resets consecutive failure counter
record_failure() -> bool — increments counter; True if circuit just opened
is_fallback_active() -> bool — whether fallback tier is currently active
next_proxy() -> str | None — URL from the active tier, or None
record_success() -> None — resets consecutive failure counter
record_failure() -> bool — True if just escalated to next tier
is_exhausted() -> bool — True if all tiers exhausted
active_tier_index() -> int — 0-based index of current tier
tier_count() -> int — total number of tiers
If primary_urls is empty: always returns from fallback_urls (no circuit breaker needed).
If both are empty: next_proxy() always returns None.
Edge cases:
Empty tiers list: next_proxy() always returns None, is_exhausted() True.
Single tier: behaves like the primary-only case, is_exhausted() after threshold.
"""
assert threshold > 0, f"threshold must be positive, got {threshold}"
assert isinstance(tiers, list), f"tiers must be a list, got {type(tiers)}"
lock = threading.Lock()
cycles = [itertools.cycle(t) for t in tiers]
state = {
"active_tier": 0,
"consecutive_failures": 0,
"fallback_active": False,
}
primary_cycle = itertools.cycle(primary_urls) if primary_urls else None
fallback_cycle = itertools.cycle(fallback_urls) if fallback_urls else None
# No primary proxies — skip circuit breaker, use fallback directly
if not primary_urls:
state["fallback_active"] = True
def next_proxy() -> str | None:
with lock:
if state["fallback_active"]:
return next(fallback_cycle) if fallback_cycle else None
return next(primary_cycle) if primary_cycle else None
idx = state["active_tier"]
if idx >= len(cycles):
return None
return next(cycles[idx])
def record_success() -> None:
with lock:
state["consecutive_failures"] = 0
def record_failure() -> bool:
"""Increment failure counter. Returns True if circuit just opened."""
"""Increment failure counter. Returns True if just escalated to next tier."""
with lock:
if state["fallback_active"]:
# Already on fallback — don't trip the circuit again
idx = state["active_tier"]
if idx >= len(tiers):
# Already exhausted — no-op
return False
state["consecutive_failures"] += 1
if state["consecutive_failures"] >= threshold:
state["fallback_active"] = True
if fallback_urls:
logger.warning(
"Circuit open after %d consecutive failures — "
"switching to fallback residential proxies",
state["consecutive_failures"],
)
else:
logger.error(
"Circuit open after %d consecutive failures — "
"no fallback configured, aborting run",
state["consecutive_failures"],
)
return True
return False
if state["consecutive_failures"] < threshold:
return False
# Threshold reached — escalate
state["consecutive_failures"] = 0
state["active_tier"] += 1
new_idx = state["active_tier"]
if new_idx < len(tiers):
logger.warning(
"Circuit open after %d consecutive failures — "
"escalating to proxy tier %d/%d",
threshold,
new_idx + 1,
len(tiers),
)
else:
logger.error(
"All %d proxy tier(s) exhausted after %d consecutive failures — "
"no more fallbacks",
len(tiers),
threshold,
)
return True
def is_fallback_active() -> bool:
def is_exhausted() -> bool:
with lock:
return state["fallback_active"]
return state["active_tier"] >= len(tiers)
def active_tier_index() -> int:
with lock:
return state["active_tier"]
def tier_count() -> int:
return len(tiers)
return {
"next_proxy": next_proxy,
"record_success": record_success,
"record_failure": record_failure,
"is_fallback_active": is_fallback_active,
"is_exhausted": is_exhausted,
"active_tier_index": active_tier_index,
"tier_count": tier_count,
}

View File

@@ -15,7 +15,7 @@ set -euo pipefail
SERVICE_USER="padelnomics_service"
REPO_DIR="/opt/padelnomics"
GITLAB_PROJECT="deemanone/padelnomics"
GITEA_REPO="ssh://git@git.padelnomics.io:2222/deemanone/padelnomics.git"
UV="/home/${SERVICE_USER}/.local/bin/uv"
[ "$(id -u)" = "0" ] || { echo "ERROR: Run as root"; exit 1; }
@@ -35,7 +35,7 @@ if [ -d "${REPO_DIR}/.git" ]; then
sudo -u "${SERVICE_USER}" git -C "${REPO_DIR}" fetch --tags --prune-tags origin
else
sudo -u "${SERVICE_USER}" git clone \
"git@gitlab.com:${GITLAB_PROJECT}.git" "${REPO_DIR}"
"${GITEA_REPO}" "${REPO_DIR}"
fi
LATEST_TAG=$(sudo -u "${SERVICE_USER}" \

View File

@@ -75,7 +75,8 @@ fi
if [ ! -f "${SSH_DIR}/config" ]; then
cat > "${SSH_DIR}/config" <<EOF
Host gitlab.com
Host git.padelnomics.io
Port 2222
IdentityFile ${DEPLOY_KEY}
IdentitiesOnly yes
EOF
@@ -83,7 +84,7 @@ EOF
chmod 600 "${SSH_DIR}/config"
fi
ssh-keyscan -H gitlab.com >> "${SSH_DIR}/known_hosts" 2>/dev/null
ssh-keyscan -H -p 2222 git.padelnomics.io >> "${SSH_DIR}/known_hosts" 2>/dev/null
sort -u "${SSH_DIR}/known_hosts" -o "${SSH_DIR}/known_hosts"
chown "${SERVICE_USER}:${SERVICE_USER}" "${SSH_DIR}/known_hosts"
chmod 644 "${SSH_DIR}/known_hosts"

View File

@@ -23,7 +23,7 @@ schedule = "monthly"
[playtomic_tenants]
module = "padelnomics_extract.playtomic_tenants"
schedule = "weekly"
schedule = "daily"
[playtomic_availability]
module = "padelnomics_extract.playtomic_availability"

View File

@@ -192,9 +192,9 @@ def run_workflow(conn, workflow: dict) -> None:
entry_fn = getattr(module, entry_name)
entry_fn()
logger.info("Workflow %s completed successfully", workflow["name"])
except Exception:
except Exception as exc:
logger.exception("Workflow %s failed", workflow["name"])
send_alert(f"Workflow '{workflow['name']}' failed")
send_alert(f"[extract] {type(exc).__name__}: {str(exc)[:100]}")
raise
@@ -233,8 +233,8 @@ def run_due_workflows(conn, workflows: list[dict]) -> bool:
# Transform + Export + Deploy
# ---------------------------------------------------------------------------
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool:
"""Run a shell command. Returns True on success."""
def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> tuple[bool, str]:
"""Run a shell command. Returns (success, error_snippet)."""
logger.info("Shell: %s", cmd)
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds
@@ -242,29 +242,31 @@ def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bo
if result.returncode != 0:
logger.error("Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s",
result.returncode, cmd, result.stdout[-500:], result.stderr[-500:])
return False
return True
raw = (result.stderr or result.stdout).strip()
snippet = next((ln.strip() for ln in raw.splitlines() if ln.strip()), raw)[:120]
return False, snippet
return True, ""
def run_transform() -> None:
"""Run SQLMesh — it evaluates model staleness internally."""
logger.info("Running SQLMesh transform")
ok = run_shell(
ok, err = run_shell(
"uv run sqlmesh -p transform/sqlmesh_padelnomics plan prod --auto-apply",
)
if not ok:
send_alert("SQLMesh transform failed")
send_alert(f"[transform] {err}")
def run_export() -> None:
"""Export serving tables to analytics.duckdb."""
logger.info("Exporting serving tables")
ok = run_shell(
ok, err = run_shell(
f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} "
f"uv run python src/padelnomics/export_serving.py"
)
if not ok:
send_alert("Serving export failed")
send_alert(f"[export] {err}")
def web_code_changed() -> bool:
@@ -277,12 +279,18 @@ def web_code_changed() -> bool:
def current_deployed_tag() -> str | None:
"""Return the tag currently checked out, or None if not on a tag."""
"""Return the highest-version tag pointing at HEAD, or None.
Uses the same sort order as latest_remote_tag() so that when multiple
tags point to the same commit (e.g. a date-based tag and a CI integer
tag), we always compare apples-to-apples.
"""
result = subprocess.run(
["git", "describe", "--tags", "--exact-match", "HEAD"],
["git", "tag", "--list", "--sort=-version:refname", "--points-at", "HEAD", "v*"],
capture_output=True, text=True, timeout=10,
)
return result.stdout.strip() or None
tags = result.stdout.strip().splitlines()
return tags[0] if tags else None
def latest_remote_tag() -> str | None:
@@ -317,6 +325,7 @@ def git_pull_and_sync() -> None:
logger.info("New tag %s available (current: %s) — deploying", latest, current)
run_shell(f"git checkout --detach {latest}")
run_shell("sops --input-type dotenv --output-type dotenv -d .env.prod.sops > .env")
run_shell("uv sync --all-packages")
@@ -365,11 +374,11 @@ def tick() -> None:
# Deploy web app if code changed
if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed():
logger.info("Web code changed — deploying")
ok = run_shell("./deploy.sh")
ok, err = run_shell("./deploy.sh")
if ok:
send_alert("Deploy succeeded")
send_alert("[deploy] ok")
else:
send_alert("Deploy FAILED — check journalctl -u padelnomics-supervisor")
send_alert(f"[deploy] failed: {err}")
finally:
conn.close()
@@ -386,9 +395,9 @@ def supervisor_loop() -> None:
except KeyboardInterrupt:
logger.info("Supervisor stopped (KeyboardInterrupt)")
break
except Exception:
except Exception as exc:
logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS)
send_alert("Supervisor tick failed")
send_alert(f"[supervisor] {type(exc).__name__}: {str(exc)[:100]}")
time.sleep(BACKOFF_SECONDS)
else:
time.sleep(TICK_INTERVAL_SECONDS)

View File

@@ -2,22 +2,14 @@
-- One row per available 60-minute booking slot per court per venue per day.
-- "Available" = the slot was NOT booked at capture time. Missing slots = booked.
--
-- Reads BOTH morning snapshots and recheck files:
-- Morning (new): availability_{date}.jsonl.gz → snapshot_type = 'morning'
-- Morning (old): availability_{date}.json.gz → snapshot_type = 'morning'
-- Recheck (new): availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
-- Recheck (old): availability_{date}_recheck_{HH}.json.gz → snapshot_type = 'recheck'
-- Reads morning snapshots and recheck files (JSONL format):
-- Morning: availability_{date}.jsonl.gz → snapshot_type = 'morning'
-- Recheck: availability_{date}_recheck_{HH}.jsonl.gz → snapshot_type = 'recheck'
--
-- Only 60-min duration slots are kept (canonical hourly rate + occupancy unit).
-- Price parsed from strings like "14.56 EUR" or "48 GBP".
--
-- Supports two morning landing formats (UNION ALL during migration):
-- New: availability_{date}.jsonl.gz — one venue per line, columns: tenant_id, slots, date, captured_at_utc
-- Old: availability_{date}.json.gz — {"date":..., "venues": [...]} blob (UNNEST required)
--
-- Requires: at least one availability file in the landing zone.
-- A seed file (data/landing/playtomic/1970/01/availability_1970-01-01.json.gz)
-- with empty venues[] ensures this model runs before real data arrives.
-- Source: data/landing/playtomic/{year}/{month}/availability_*.jsonl.gz
MODEL (
name staging.stg_playtomic_availability,
@@ -27,7 +19,6 @@ MODEL (
);
WITH
-- New format: one venue per JSONL line — no outer UNNEST needed
morning_jsonl AS (
SELECT
date AS snapshot_date,
@@ -50,35 +41,6 @@ morning_jsonl AS (
WHERE filename NOT LIKE '%_recheck_%'
AND tenant_id IS NOT NULL
),
-- Old format: {"date":..., "venues": [...]} blob — kept for transition
morning_blob AS (
SELECT
af.date AS snapshot_date,
af.captured_at_utc,
'morning' AS snapshot_type,
NULL::INTEGER AS recheck_hour,
venue_json ->> 'tenant_id' AS tenant_id,
venue_json -> 'slots' AS slots_json
FROM (
SELECT date, captured_at_utc, venues
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*.json.gz',
format = 'auto',
columns = {
date: 'VARCHAR',
captured_at_utc: 'VARCHAR',
venues: 'JSON[]'
},
filename = true,
maximum_object_size = 134217728 -- 128 MB; daily files grow with venue count
)
WHERE filename NOT LIKE '%_recheck_%'
AND venues IS NOT NULL
AND json_array_length(venues) > 0
) af,
LATERAL UNNEST(af.venues) AS t(venue_json)
),
-- Recheck snapshots (new JSONL format — one venue per line)
recheck_jsonl AS (
SELECT
date AS snapshot_date,
@@ -101,43 +63,10 @@ recheck_jsonl AS (
)
WHERE tenant_id IS NOT NULL
),
-- Recheck snapshots (old blob format, kept for transition)
recheck_blob AS (
SELECT
rf.date AS snapshot_date,
rf.captured_at_utc,
'recheck' AS snapshot_type,
TRY_CAST(
regexp_extract(rf.filename, '_recheck_(\d+)', 1) AS INTEGER
) AS recheck_hour,
venue_json ->> 'tenant_id' AS tenant_id,
venue_json -> 'slots' AS slots_json
FROM (
SELECT date, captured_at_utc, venues, filename
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/availability_*_recheck_*.json.gz',
format = 'auto',
columns = {
date: 'VARCHAR',
captured_at_utc: 'VARCHAR',
venues: 'JSON[]'
},
filename = true,
maximum_object_size = 134217728 -- 128 MB; matches morning snapshot limit
)
WHERE venues IS NOT NULL
AND json_array_length(venues) > 0
) rf,
LATERAL UNNEST(rf.venues) AS t(venue_json)
),
all_venues AS (
SELECT * FROM morning_jsonl
UNION ALL
SELECT * FROM morning_blob
UNION ALL
SELECT * FROM recheck_jsonl
UNION ALL
SELECT * FROM recheck_blob
),
raw_resources AS (
SELECT

View File

@@ -5,11 +5,7 @@
-- DuckDB auto-infers opening_hours as STRUCT, so we access each day by literal
-- key (no dynamic access) and UNION ALL to unpivot.
--
-- Supports two landing formats (UNION ALL during migration):
-- New: tenants.jsonl.gz — one tenant per line, opening_hours is a top-level JSON column
-- Old: tenants.json.gz — {"tenants": [...]} blob (UNNEST required)
--
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
MODEL (
name staging.stg_playtomic_opening_hours,
@@ -19,40 +15,18 @@ MODEL (
);
WITH
-- New format: one tenant per JSONL line
jsonl_venues AS (
venues AS (
SELECT
tenant_id,
opening_hours AS oh
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
format = 'newline_delimited',
columns = {tenant_id: 'VARCHAR', opening_hours: 'JSON'}
)
WHERE tenant_id IS NOT NULL
AND opening_hours IS NOT NULL
),
-- Old format: blob
blob_venues AS (
SELECT
tenant ->> 'tenant_id' AS tenant_id,
tenant -> 'opening_hours' AS oh
FROM (
SELECT UNNEST(tenants) AS tenant
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
format = 'auto',
maximum_object_size = 134217728
)
)
WHERE (tenant ->> 'tenant_id') IS NOT NULL
AND (tenant -> 'opening_hours') IS NOT NULL
),
venues AS (
SELECT * FROM jsonl_venues
UNION ALL
SELECT * FROM blob_venues
),
-- Unpivot by UNION ALL — 7 literal key accesses
unpivoted AS (
SELECT tenant_id, 'MONDAY' AS day_of_week, 1 AS day_number,
@@ -104,6 +78,4 @@ SELECT
FROM unpivoted
WHERE opening_time IS NOT NULL
AND closing_time IS NOT NULL
-- Enforce grain: if both old blob and new JSONL exist for the same month,
-- the UNION ALL produces duplicate (tenant_id, day_of_week) pairs — deduplicate.
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, day_of_week ORDER BY tenant_id) = 1

View File

@@ -2,11 +2,7 @@
-- Reads resources array from the landing zone to extract court type, size,
-- surface, and booking config.
--
-- Supports two landing formats (UNION ALL during migration):
-- New: tenants.jsonl.gz — one tenant per line, resources is a top-level JSON column
-- Old: tenants.json.gz — {"tenants": [...]} blob (double UNNEST: tenants → resources)
--
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
MODEL (
name staging.stg_playtomic_resources,
@@ -16,41 +12,18 @@ MODEL (
);
WITH
-- New format: one tenant per JSONL line — single UNNEST for resources
jsonl_unnested AS (
unnested AS (
SELECT
tenant_id,
UPPER(address ->> 'country_code') AS country_code,
UNNEST(from_json(resources, '["JSON"]')) AS resource_json
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
format = 'newline_delimited',
columns = {tenant_id: 'VARCHAR', address: 'JSON', resources: 'JSON'}
)
WHERE tenant_id IS NOT NULL
AND resources IS NOT NULL
),
-- Old format: blob — double UNNEST (tenants → resources)
blob_unnested AS (
SELECT
tenant ->> 'tenant_id' AS tenant_id,
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
UNNEST(from_json(tenant -> 'resources', '["JSON"]')) AS resource_json
FROM (
SELECT UNNEST(tenants) AS tenant
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
format = 'auto',
maximum_object_size = 134217728
)
)
WHERE (tenant ->> 'tenant_id') IS NOT NULL
AND (tenant -> 'resources') IS NOT NULL
),
unnested AS (
SELECT * FROM jsonl_unnested
UNION ALL
SELECT * FROM blob_unnested
)
SELECT
tenant_id,
@@ -68,6 +41,4 @@ SELECT
FROM unnested
WHERE (resource_json ->> 'resource_id') IS NOT NULL
AND (resource_json ->> 'sport_id') = 'PADEL'
-- Enforce grain: if both old blob and new JSONL exist for the same month,
-- the UNION ALL produces duplicate (tenant_id, resource_id) pairs — deduplicate.
QUALIFY ROW_NUMBER() OVER (PARTITION BY tenant_id, resource_json ->> 'resource_id' ORDER BY tenant_id) = 1

View File

@@ -3,11 +3,7 @@
-- including address, opening hours, court resources, VAT rate, and facilities.
-- Deduplicates on tenant_id (keeps most recent extraction).
--
-- Supports two landing formats (UNION ALL during migration):
-- New: tenants.jsonl.gz — one tenant JSON object per line (no UNNEST needed)
-- Old: tenants.json.gz — {"tenants": [{...}]} blob (UNNEST required)
--
-- Source: data/landing/playtomic/{year}/{month}/tenants.{jsonl,json}.gz
-- Source: data/landing/playtomic/{year}/{month}/{day}/tenants.jsonl.gz
MODEL (
name staging.stg_playtomic_venues,
@@ -17,8 +13,7 @@ MODEL (
);
WITH
-- New format: one tenant per JSONL line — no UNNEST, access columns directly
jsonl_parsed AS (
parsed AS (
SELECT
tenant_id,
tenant_name,
@@ -45,7 +40,7 @@ jsonl_parsed AS (
filename AS source_file,
CURRENT_DATE AS extracted_date
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.jsonl.gz',
@LANDING_DIR || '/playtomic/*/*/*/tenants.jsonl.gz',
format = 'newline_delimited',
filename = true,
columns = {
@@ -59,49 +54,6 @@ jsonl_parsed AS (
)
WHERE tenant_id IS NOT NULL
),
-- Old format: {"tenants": [...]} blob — keep for transition until old files rotate out
blob_parsed AS (
SELECT
tenant ->> 'tenant_id' AS tenant_id,
tenant ->> 'tenant_name' AS tenant_name,
tenant ->> 'slug' AS slug,
tenant ->> 'tenant_type' AS tenant_type,
tenant ->> 'tenant_status' AS tenant_status,
tenant ->> 'playtomic_status' AS playtomic_status,
tenant ->> 'booking_type' AS booking_type,
tenant -> 'address' ->> 'street' AS street,
tenant -> 'address' ->> 'city' AS city,
tenant -> 'address' ->> 'postal_code' AS postal_code,
UPPER(tenant -> 'address' ->> 'country_code') AS country_code,
tenant -> 'address' ->> 'timezone' AS timezone,
tenant -> 'address' ->> 'administrative_area' AS administrative_area,
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lat' AS DOUBLE) AS lat,
TRY_CAST(tenant -> 'address' -> 'coordinate' ->> 'lon' AS DOUBLE) AS lon,
TRY_CAST(tenant ->> 'vat_rate' AS DOUBLE) AS vat_rate,
tenant ->> 'default_currency' AS default_currency,
TRY_CAST(tenant -> 'booking_settings' ->> 'booking_ahead_limit' AS INTEGER) AS booking_ahead_limit_minutes,
tenant -> 'opening_hours' AS opening_hours_json,
tenant -> 'resources' AS resources_json,
tenant ->> 'created_at' AS created_at,
tenant ->> 'is_playtomic_partner' AS is_playtomic_partner_raw,
filename AS source_file,
CURRENT_DATE AS extracted_date
FROM (
SELECT UNNEST(tenants) AS tenant, filename
FROM read_json(
@LANDING_DIR || '/playtomic/*/*/tenants.json.gz',
format = 'auto',
filename = true,
maximum_object_size = 134217728
)
)
WHERE (tenant ->> 'tenant_id') IS NOT NULL
),
parsed AS (
SELECT * FROM jsonl_parsed
UNION ALL
SELECT * FROM blob_parsed
),
deduped AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY tenant_id ORDER BY source_file DESC) AS rn

View File

@@ -3,11 +3,7 @@
-- Broad coverage (140K+ locations) enables Gemeinde-level market intelligence.
-- One row per geoname_id (GeoNames stable numeric identifier).
--
-- Supports two landing formats (UNION ALL during migration):
-- New: cities_global.jsonl.gz — one city per line, columns directly accessible
-- Old: cities_global.json.gz — {"rows": [...]} blob (UNNEST required)
--
-- Source: data/landing/geonames/{year}/{month}/cities_global.{jsonl,json}.gz
-- Source: data/landing/geonames/{year}/{month}/cities_global.jsonl.gz
MODEL (
name staging.stg_population_geonames,
@@ -16,74 +12,29 @@ MODEL (
grain geoname_id
);
WITH
-- New format: one city per JSONL line
jsonl_rows AS (
SELECT
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
city_name,
country_code,
TRY_CAST(lat AS DOUBLE) AS lat,
TRY_CAST(lon AS DOUBLE) AS lon,
admin1_code,
admin2_code,
TRY_CAST(population AS BIGINT) AS population,
TRY_CAST(ref_year AS INTEGER) AS ref_year,
CURRENT_DATE AS extracted_date
FROM read_json(
@LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz',
format = 'newline_delimited',
columns = {
geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR',
lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR',
population: 'BIGINT', ref_year: 'INTEGER'
}
)
WHERE geoname_id IS NOT NULL
),
-- Old format: {"rows": [...]} blob — kept for transition
blob_rows AS (
SELECT
TRY_CAST(row ->> 'geoname_id' AS INTEGER) AS geoname_id,
row ->> 'city_name' AS city_name,
row ->> 'country_code' AS country_code,
TRY_CAST(row ->> 'lat' AS DOUBLE) AS lat,
TRY_CAST(row ->> 'lon' AS DOUBLE) AS lon,
row ->> 'admin1_code' AS admin1_code,
row ->> 'admin2_code' AS admin2_code,
TRY_CAST(row ->> 'population' AS BIGINT) AS population,
TRY_CAST(row ->> 'ref_year' AS INTEGER) AS ref_year,
CURRENT_DATE AS extracted_date
FROM (
SELECT UNNEST(rows) AS row
FROM read_json(
@LANDING_DIR || '/geonames/*/*/cities_global.json.gz',
auto_detect = true,
maximum_object_size = 40000000
)
)
WHERE (row ->> 'geoname_id') IS NOT NULL
),
all_rows AS (
SELECT * FROM jsonl_rows
UNION ALL
SELECT * FROM blob_rows
)
SELECT
geoname_id,
TRIM(city_name) AS city_name,
UPPER(country_code) AS country_code,
lat,
lon,
NULLIF(TRIM(admin1_code), '') AS admin1_code,
NULLIF(TRIM(admin2_code), '') AS admin2_code,
population,
ref_year,
extracted_date
FROM all_rows
WHERE population IS NOT NULL
TRY_CAST(geoname_id AS INTEGER) AS geoname_id,
TRIM(city_name) AS city_name,
UPPER(country_code) AS country_code,
TRY_CAST(lat AS DOUBLE) AS lat,
TRY_CAST(lon AS DOUBLE) AS lon,
NULLIF(TRIM(admin1_code), '') AS admin1_code,
NULLIF(TRIM(admin2_code), '') AS admin2_code,
TRY_CAST(population AS BIGINT) AS population,
TRY_CAST(ref_year AS INTEGER) AS ref_year,
CURRENT_DATE AS extracted_date
FROM read_json(
@LANDING_DIR || '/geonames/*/*/cities_global.jsonl.gz',
format = 'newline_delimited',
columns = {
geoname_id: 'INTEGER', city_name: 'VARCHAR', country_code: 'VARCHAR',
lat: 'DOUBLE', lon: 'DOUBLE', admin1_code: 'VARCHAR', admin2_code: 'VARCHAR',
population: 'BIGINT', ref_year: 'INTEGER'
}
)
WHERE geoname_id IS NOT NULL
AND population IS NOT NULL
AND population > 0
AND geoname_id IS NOT NULL
AND city_name IS NOT NULL
AND lat IS NOT NULL
AND lon IS NOT NULL

View File

@@ -2,12 +2,9 @@
-- Used as a "racket sport culture" signal in the opportunity score:
-- areas with high tennis court density are prime padel adoption markets.
--
-- Supports two landing formats (UNION ALL during migration):
-- New: courts.jsonl.gz — one OSM element per line; nodes have lat/lon directly,
-- ways/relations have center.lat/center.lon (Overpass out center)
-- Old: courts.json.gz — {"elements": [...]} blob (UNNEST required)
--
-- Source: data/landing/overpass_tennis/{year}/{month}/courts.{jsonl,json}.gz
-- Source: data/landing/overpass_tennis/{year}/{month}/courts.jsonl.gz
-- Format: one OSM element per line; nodes have lat/lon directly,
-- ways/relations have center.lat/center.lon (Overpass out center)
MODEL (
name staging.stg_tennis_courts,
@@ -17,8 +14,7 @@ MODEL (
);
WITH
-- New format: one OSM element per JSONL line
jsonl_elements AS (
parsed AS (
SELECT
type AS osm_type,
TRY_CAST(id AS BIGINT) AS osm_id,
@@ -47,33 +43,6 @@ jsonl_elements AS (
)
WHERE type IS NOT NULL
),
-- Old format: {"elements": [...]} blob — kept for transition
blob_elements AS (
SELECT
elem ->> 'type' AS osm_type,
(elem ->> 'id')::BIGINT AS osm_id,
TRY_CAST(elem ->> 'lat' AS DOUBLE) AS lat,
TRY_CAST(elem ->> 'lon' AS DOUBLE) AS lon,
elem -> 'tags' ->> 'name' AS name,
elem -> 'tags' ->> 'addr:country' AS country_code,
elem -> 'tags' ->> 'addr:city' AS city_tag,
filename AS source_file,
CURRENT_DATE AS extracted_date
FROM (
SELECT UNNEST(elements) AS elem, filename
FROM read_json(
@LANDING_DIR || '/overpass_tennis/*/*/courts.json.gz',
format = 'auto',
filename = true
)
)
WHERE (elem ->> 'type') IS NOT NULL
),
parsed AS (
SELECT * FROM jsonl_elements
UNION ALL
SELECT * FROM blob_elements
),
deduped AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY osm_id ORDER BY extracted_date DESC) AS rn

View File

@@ -1,22 +1,19 @@
"""Create minimal seed files for SQLMesh staging models that require landing data."""
"""Create minimal seed files for SQLMesh staging models that require landing data.
Seeds are empty JSONL gzip files — they satisfy DuckDB's file-not-found check
while contributing zero rows to the staging models.
"""
import gzip
import json
from pathlib import Path
seed = {
"date": "1970-01-01",
"captured_at_utc": "1970-01-01T00:00:00Z",
"venue_count": 0,
"venues_errored": 0,
"venues": [],
}
morning = Path("data/landing/playtomic/1970/01/availability_1970-01-01.json.gz")
recheck = Path("data/landing/playtomic/1970/01/availability_1970-01-01_recheck_00.json.gz")
# stg_playtomic_availability requires at least one morning and one recheck file
morning = Path("data/landing/playtomic/1970/01/availability_1970-01-01.jsonl.gz")
recheck = Path("data/landing/playtomic/1970/01/availability_1970-01-01_recheck_00.jsonl.gz")
morning.parent.mkdir(parents=True, exist_ok=True)
for p in [morning, recheck]:
if not p.exists():
with gzip.open(p, "wt") as f:
json.dump(seed, f)
with gzip.open(p, "wb") as f:
pass # empty JSONL — 0 rows, no error
print("created", p)
else:
print("exists ", p)

View File

@@ -24,9 +24,11 @@ sup = _ilu.module_from_spec(_spec)
_spec.loader.exec_module(sup)
from padelnomics_extract.proxy import ( # noqa: E402
load_proxy_urls,
fetch_webshare_proxies,
load_proxy_tiers,
make_round_robin_cycler,
make_sticky_selector,
make_tiered_cycler,
)
# ── load_workflows ────────────────────────────────────────────────
@@ -198,28 +200,112 @@ class TestTopologicalWaves:
# ── proxy.py ─────────────────────────────────────────────────────
class TestLoadProxyUrls:
def test_returns_empty_when_unset(self, monkeypatch):
monkeypatch.delenv("PROXY_URLS", raising=False)
assert load_proxy_urls() == []
class TestFetchWebshareProxies:
def test_parses_ip_port_user_pass_format(self):
raw = "1.2.3.4:1080:user1:pass1\n5.6.7.8:1080:user2:pass2\n"
with patch("urllib.request.urlopen") as mock_open:
mock_resp = MagicMock()
mock_resp.read.return_value = raw.encode("utf-8")
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_open.return_value = mock_resp
urls = fetch_webshare_proxies("http://example.com/proxy-list")
assert urls == [
"http://user1:pass1@1.2.3.4:1080",
"http://user2:pass2@5.6.7.8:1080",
]
def test_parses_comma_separated_urls(self, monkeypatch):
monkeypatch.setenv(
"PROXY_URLS",
"http://p1:8080,http://p2:8080,http://p3:8080",
)
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080", "http://p3:8080"]
def test_network_error_returns_empty(self):
import urllib.error
with patch("urllib.request.urlopen", side_effect=urllib.error.URLError("timeout")):
result = fetch_webshare_proxies("http://example.com/proxy-list")
assert result == []
def test_strips_whitespace(self, monkeypatch):
monkeypatch.setenv("PROXY_URLS", " http://p1:8080 , http://p2:8080 ")
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080"]
def test_malformed_lines_are_skipped(self):
raw = "bad_line\n1.2.3.4:1080:user:pass\nonly:three:parts\n"
with patch("urllib.request.urlopen") as mock_open:
mock_resp = MagicMock()
mock_resp.read.return_value = raw.encode("utf-8")
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_open.return_value = mock_resp
urls = fetch_webshare_proxies("http://example.com/proxy-list")
assert urls == ["http://user:pass@1.2.3.4:1080"]
def test_ignores_empty_segments(self, monkeypatch):
monkeypatch.setenv("PROXY_URLS", "http://p1:8080,,http://p2:8080,")
urls = load_proxy_urls()
assert urls == ["http://p1:8080", "http://p2:8080"]
def test_max_proxies_respected(self):
lines = "\n".join(f"10.0.0.{i}:1080:u{i}:p{i}" for i in range(10))
with patch("urllib.request.urlopen") as mock_open:
mock_resp = MagicMock()
mock_resp.read.return_value = lines.encode("utf-8")
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_open.return_value = mock_resp
urls = fetch_webshare_proxies("http://example.com/proxy-list", max_proxies=3)
assert len(urls) == 3
def test_empty_lines_skipped(self):
raw = "\n\n1.2.3.4:1080:user:pass\n\n"
with patch("urllib.request.urlopen") as mock_open:
mock_resp = MagicMock()
mock_resp.read.return_value = raw.encode("utf-8")
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_open.return_value = mock_resp
urls = fetch_webshare_proxies("http://example.com/proxy-list")
assert urls == ["http://user:pass@1.2.3.4:1080"]
class TestLoadProxyTiers:
def _clear_proxy_env(self, monkeypatch):
for var in ("WEBSHARE_DOWNLOAD_URL", "PROXY_URLS_DATACENTER", "PROXY_URLS_RESIDENTIAL"):
monkeypatch.delenv(var, raising=False)
def test_returns_empty_when_all_unset(self, monkeypatch):
self._clear_proxy_env(monkeypatch)
assert load_proxy_tiers() == []
def test_single_datacenter_tier(self, monkeypatch):
self._clear_proxy_env(monkeypatch)
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080,http://dc2:8080")
tiers = load_proxy_tiers()
assert len(tiers) == 1
assert tiers[0] == ["http://dc1:8080", "http://dc2:8080"]
def test_residential_only(self, monkeypatch):
self._clear_proxy_env(monkeypatch)
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
tiers = load_proxy_tiers()
assert len(tiers) == 1
assert tiers[0] == ["http://res1:8080"]
def test_empty_tiers_skipped(self, monkeypatch):
self._clear_proxy_env(monkeypatch)
monkeypatch.setenv("PROXY_URLS_DATACENTER", "")
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
tiers = load_proxy_tiers()
assert len(tiers) == 1
assert tiers[0] == ["http://res1:8080"]
def test_three_tiers_correct_order(self, monkeypatch):
self._clear_proxy_env(monkeypatch)
with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=["http://user:pass@1.2.3.4:1080"]):
monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list")
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080")
monkeypatch.setenv("PROXY_URLS_RESIDENTIAL", "http://res1:8080")
tiers = load_proxy_tiers()
assert len(tiers) == 3
assert tiers[0] == ["http://user:pass@1.2.3.4:1080"] # free
assert tiers[1] == ["http://dc1:8080"] # datacenter
assert tiers[2] == ["http://res1:8080"] # residential
def test_webshare_fetch_failure_skips_tier(self, monkeypatch):
self._clear_proxy_env(monkeypatch)
with patch("padelnomics_extract.proxy.fetch_webshare_proxies", return_value=[]):
monkeypatch.setenv("WEBSHARE_DOWNLOAD_URL", "http://example.com/list")
monkeypatch.setenv("PROXY_URLS_DATACENTER", "http://dc1:8080")
tiers = load_proxy_tiers()
assert len(tiers) == 1
assert tiers[0] == ["http://dc1:8080"]
class TestRoundRobinCycler:
@@ -279,3 +365,138 @@ class TestStickySelectorProxy:
fn = make_sticky_selector(urls)
for i in range(20):
assert fn(f"key_{i}") in urls
class TestTieredCyclerNTier:
def test_starts_on_first_tier(self):
tiers = [["http://t0a", "http://t0b"], ["http://t1a"]]
cycler = make_tiered_cycler(tiers, threshold=3)
assert cycler["active_tier_index"]() == 0
assert not cycler["is_exhausted"]()
assert cycler["next_proxy"]() in tiers[0]
def test_escalates_after_threshold(self):
tiers = [["http://t0"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=3)
# Two failures — stays on tier 0
cycler["record_failure"]()
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 0
# Third failure — escalates
escalated = cycler["record_failure"]()
assert escalated is True
assert cycler["active_tier_index"]() == 1
assert cycler["next_proxy"]() == "http://t1"
def test_escalates_through_all_tiers(self):
tiers = [["http://t0"], ["http://t1"], ["http://t2"]]
cycler = make_tiered_cycler(tiers, threshold=2)
# Exhaust tier 0
cycler["record_failure"]()
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 1
# Exhaust tier 1
cycler["record_failure"]()
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 2
# Exhaust tier 2
cycler["record_failure"]()
cycler["record_failure"]()
assert cycler["is_exhausted"]()
assert cycler["next_proxy"]() is None
def test_success_resets_counter(self):
tiers = [["http://t0"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=3)
cycler["record_failure"]()
cycler["record_failure"]()
cycler["record_success"]()
# Counter reset — need threshold more failures to escalate
cycler["record_failure"]()
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 0 # still on tier 0
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 1 # now escalated
def test_counter_resets_on_escalation(self):
"""After escalating, failure counter resets so new tier gets a fresh start."""
tiers = [["http://t0"], ["http://t1"], ["http://t2"]]
cycler = make_tiered_cycler(tiers, threshold=2)
# Exhaust tier 0
cycler["record_failure"]()
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 1
# One failure on tier 1 — should NOT escalate yet (counter reset)
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 1
# Second failure on tier 1 — escalates to tier 2
cycler["record_failure"]()
assert cycler["active_tier_index"]() == 2
def test_is_exhausted_false_when_tiers_remain(self):
tiers = [["http://t0"], ["http://t1"]]
cycler = make_tiered_cycler(tiers, threshold=1)
assert not cycler["is_exhausted"]()
cycler["record_failure"]() # escalates to tier 1
assert not cycler["is_exhausted"]()
def test_is_exhausted_true_after_all_tiers_fail(self):
tiers = [["http://t0"]]
cycler = make_tiered_cycler(tiers, threshold=1)
assert not cycler["is_exhausted"]()
cycler["record_failure"]()
assert cycler["is_exhausted"]()
assert cycler["next_proxy"]() is None
def test_empty_tiers_immediately_exhausted(self):
cycler = make_tiered_cycler([], threshold=3)
assert cycler["is_exhausted"]()
assert cycler["next_proxy"]() is None
assert cycler["tier_count"]() == 0
def test_single_tier_cycles_within_tier(self):
tiers = [["http://p1", "http://p2", "http://p3"]]
cycler = make_tiered_cycler(tiers, threshold=10)
results = [cycler["next_proxy"]() for _ in range(6)]
assert results == ["http://p1", "http://p2", "http://p3"] * 2
def test_tier_count_reflects_input(self):
assert make_tiered_cycler([], threshold=1)["tier_count"]() == 0
assert make_tiered_cycler([["a"]], threshold=1)["tier_count"]() == 1
assert make_tiered_cycler([["a"], ["b"], ["c"]], threshold=1)["tier_count"]() == 3
def test_record_failure_noop_when_exhausted(self):
tiers = [["http://t0"]]
cycler = make_tiered_cycler(tiers, threshold=1)
cycler["record_failure"]() # exhausts
assert cycler["is_exhausted"]()
# Further failures are no-ops, not exceptions
result = cycler["record_failure"]()
assert result is False
assert cycler["is_exhausted"]()
def test_thread_safety(self):
"""Concurrent next_proxy and record calls do not raise or corrupt state."""
import threading
tiers = [["http://t0a", "http://t0b"], ["http://t1a", "http://t1b"]]
cycler = make_tiered_cycler(tiers, threshold=5)
errors = []
lock = threading.Lock()
def worker():
try:
for _ in range(20):
cycler["next_proxy"]()
cycler["record_failure"]()
cycler["record_success"]()
except Exception as e:
with lock:
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
assert errors == [], f"Thread safety errors: {errors}"