merge: semantic-compression — add compression helpers, macros, and coding philosophy
All checks were successful
CI / test (push) Successful in 50s
CI / tag (push) Successful in 3s

Applies Casey Muratori's semantic compression across all three packages:
- count_where() helper: 30+ COUNT(*) call sites compressed
- _forward_lead(): deduplicates lead forward routes
- 5 SQLMesh macros for country code patterns (7 models)
- skip_if_current() + write_jsonl_atomic() extract helpers
Net: -118 lines (272 added, 390 removed)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-03-02 08:00:15 +01:00
23 changed files with 269 additions and 378 deletions

View File

@@ -6,6 +6,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
## [Unreleased] ## [Unreleased]
### Changed
- **Semantic compression pass** — applied Casey Muratori's compression workflow (write concrete → observe patterns → compress genuine repetitions) across all three packages. Net result: ~200 lines removed, codebase simpler.
- **`count_where()` helper** (`web/core.py`): compresses the `fetch_one("SELECT COUNT(*) ...") + null-check` pattern. Applied across 30+ call sites in admin, suppliers, directory, dashboard, public, and planner routes. Dashboard stats function shrinks from 75 to 25 lines.
- **`_forward_lead()` helper** (`web/admin/routes.py`): extracts shared DB logic from `lead_forward` and `lead_forward_htmx` — both routes now call the helper and differ only in response format.
- **SQLMesh macros** (`transform/macros/__init__.py`): 5 new macros compress repeated country code patterns across 7 SQL models: `@country_name`, `@country_slug`, `@normalize_eurostat_country`, `@normalize_eurostat_nuts`, `@infer_country_from_coords`.
- **Extract helpers** (`extract/utils.py`): `skip_if_current()` compresses cursor-check + early-return pattern (3 extractors); `write_jsonl_atomic()` compresses working-file → JSONL → compress pattern (2 extractors).
- **Coding philosophy updated** (`~/.claude/coding_philosophy.md`): added `<compression>` section documenting the workflow, the test ("Did this abstraction make the total codebase smaller?"), and distinction from premature DRY.
### Fixed ### Fixed
- **Admin: empty confirm dialog on auto-poll** — `htmx:confirm` handler now guards with `if (!evt.detail.question) return` so auto-poll requests (`hx-trigger="every 5s"`, no `hx-confirm` attribute) no longer trigger an empty dialog every 5 seconds. - **Admin: empty confirm dialog on auto-poll** — `htmx:confirm` handler now guards with `if (!evt.detail.question) return` so auto-poll requests (`hx-trigger="every 5s"`, no `hx-confirm` attribute) no longer trigger an empty dialog every 5 seconds.

View File

@@ -19,7 +19,7 @@ from pathlib import Path
import niquests import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor, landing_path, write_gzip_atomic from .utils import landing_path, skip_if_current, write_gzip_atomic
logger = setup_logging("padelnomics.extract.census_usa") logger = setup_logging("padelnomics.extract.census_usa")
@@ -73,10 +73,10 @@ def extract(
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
# Skip if we already have data for this month (annual data, monthly cursor) # Skip if we already have data for this month (annual data, monthly cursor)
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if last_cursor == year_month: if skip:
logger.info("already have data for %s — skipping", year_month) logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return skip
year, month = year_month.split("/") year, month = year_month.split("/")
url = f"{ACS_URL}&key={api_key}" url = f"{ACS_URL}&key={api_key}"

View File

@@ -19,7 +19,6 @@ Output: one JSON object per line, e.g.:
import gzip import gzip
import io import io
import json
import os import os
import sqlite3 import sqlite3
import zipfile import zipfile
@@ -28,7 +27,7 @@ from pathlib import Path
import niquests import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import compress_jsonl_atomic, get_last_cursor, landing_path from .utils import landing_path, skip_if_current, write_jsonl_atomic
logger = setup_logging("padelnomics.extract.geonames") logger = setup_logging("padelnomics.extract.geonames")
@@ -139,10 +138,10 @@ def extract(
tmp.rename(dest) tmp.rename(dest)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if last_cursor == year_month: if skip:
logger.info("already have data for %s — skipping", year_month) logger.info("already have data for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return skip
year, month = year_month.split("/") year, month = year_month.split("/")
@@ -168,11 +167,7 @@ def extract(
dest_dir = landing_path(landing_dir, "geonames", year, month) dest_dir = landing_path(landing_dir, "geonames", year, month)
dest = dest_dir / "cities_global.jsonl.gz" dest = dest_dir / "cities_global.jsonl.gz"
working_path = dest.with_suffix(".working.jsonl") bytes_written = write_jsonl_atomic(dest, rows)
with open(working_path, "w") as f:
for row in rows:
f.write(json.dumps(row, separators=(",", ":")) + "\n")
bytes_written = compress_jsonl_atomic(working_path, dest)
logger.info("written %s bytes compressed", f"{bytes_written:,}") logger.info("written %s bytes compressed", f"{bytes_written:,}")
return { return {

View File

@@ -17,7 +17,7 @@ from pathlib import Path
import niquests import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging
from .utils import get_last_cursor from .utils import skip_if_current
logger = setup_logging("padelnomics.extract.gisco") logger = setup_logging("padelnomics.extract.gisco")
@@ -45,10 +45,10 @@ def extract(
session: niquests.Session, session: niquests.Session,
) -> dict: ) -> dict:
"""Download NUTS-2 GeoJSON. Skips if already run this month or file exists.""" """Download NUTS-2 GeoJSON. Skips if already run this month or file exists."""
last_cursor = get_last_cursor(conn, EXTRACTOR_NAME) skip = skip_if_current(conn, EXTRACTOR_NAME, year_month)
if last_cursor == year_month: if skip:
logger.info("already ran for %s — skipping", year_month) logger.info("already ran for %s — skipping", year_month)
return {"files_written": 0, "files_skipped": 1, "bytes_written": 0} return skip
dest = landing_dir / DEST_REL dest = landing_dir / DEST_REL
if dest.exists(): if dest.exists():

View File

@@ -21,7 +21,6 @@ Rate: 1 req / 2 s per IP (see docs/data-sources-inventory.md §1.2).
Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz Landing: {LANDING_DIR}/playtomic/{year}/{month}/tenants.jsonl.gz
""" """
import json
import os import os
import sqlite3 import sqlite3
import time import time
@@ -33,7 +32,7 @@ import niquests
from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy from ._shared import HTTP_TIMEOUT_SECONDS, run_extractor, setup_logging, ua_for_proxy
from .proxy import load_proxy_tiers, make_tiered_cycler from .proxy import load_proxy_tiers, make_tiered_cycler
from .utils import compress_jsonl_atomic, landing_path from .utils import landing_path, write_jsonl_atomic
logger = setup_logging("padelnomics.extract.playtomic_tenants") logger = setup_logging("padelnomics.extract.playtomic_tenants")
@@ -215,11 +214,7 @@ def extract(
time.sleep(THROTTLE_SECONDS) time.sleep(THROTTLE_SECONDS)
# Write each tenant as a JSONL line, then compress atomically # Write each tenant as a JSONL line, then compress atomically
working_path = dest.with_suffix(".working.jsonl") bytes_written = write_jsonl_atomic(dest, all_tenants)
with open(working_path, "w") as f:
for tenant in all_tenants:
f.write(json.dumps(tenant, separators=(",", ":")) + "\n")
bytes_written = compress_jsonl_atomic(working_path, dest)
logger.info("%d unique venues -> %s", len(all_tenants), dest) logger.info("%d unique venues -> %s", len(all_tenants), dest)
return { return {

View File

@@ -101,6 +101,19 @@ def get_last_cursor(conn: sqlite3.Connection, extractor: str) -> str | None:
return row["cursor_value"] if row else None return row["cursor_value"] if row else None
_SKIP_RESULT = {"files_written": 0, "files_skipped": 1, "bytes_written": 0}
def skip_if_current(conn: sqlite3.Connection, extractor: str, year_month: str) -> dict | None:
"""Return an early-exit result dict if this extractor already ran for year_month.
Returns None when the extractor should proceed with extraction.
"""
if get_last_cursor(conn, extractor) == year_month:
return _SKIP_RESULT
return None
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# File I/O helpers # File I/O helpers
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -176,6 +189,20 @@ def write_gzip_atomic(path: Path, data: bytes) -> int:
return len(compressed) return len(compressed)
def write_jsonl_atomic(dest: Path, items: list[dict]) -> int:
"""Write items as JSONL, then compress atomically to dest (.jsonl.gz).
Compresses the working-file → JSONL → gzip pattern into one call.
Returns compressed bytes written.
"""
assert items, "items must not be empty"
working_path = dest.with_suffix(".working.jsonl")
with open(working_path, "w") as f:
for item in items:
f.write(json.dumps(item, separators=(",", ":")) + "\n")
return compress_jsonl_atomic(working_path, dest)
def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int: def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int:
"""Compress a JSONL working file to .jsonl.gz atomically, then delete the source. """Compress a JSONL working file to .jsonl.gz atomically, then delete the source.

View File

@@ -16,5 +16,92 @@ def padelnomics_glob(evaluator) -> str:
return f"'{landing_dir}/padelnomics/**/*.csv.gz'" return f"'{landing_dir}/padelnomics/**/*.csv.gz'"
# Add one macro per landing zone subdirectory you create. # ── Country code helpers ─────────────────────────────────────────────────────
# Pattern: def {source}_glob(evaluator) → f"'{landing_dir}/{source}/**/*.csv.gz'" # Shared lookup used by dim_cities and dim_locations.
_COUNTRY_NAMES = {
"DE": "Germany", "ES": "Spain", "GB": "United Kingdom",
"FR": "France", "IT": "Italy", "PT": "Portugal",
"AT": "Austria", "CH": "Switzerland", "NL": "Netherlands",
"BE": "Belgium", "SE": "Sweden", "NO": "Norway",
"DK": "Denmark", "FI": "Finland", "US": "United States",
"AR": "Argentina", "MX": "Mexico", "AE": "UAE",
"AU": "Australia", "IE": "Ireland",
}
def _country_case(col: str) -> str:
"""Build a CASE expression mapping ISO 3166-1 alpha-2 → English name."""
whens = "\n ".join(
f"WHEN '{code}' THEN '{name}'" for code, name in _COUNTRY_NAMES.items()
)
return f"CASE {col}\n {whens}\n ELSE {col}\n END"
@macro()
def country_name(evaluator, code_col) -> str:
"""CASE expression: country code → English name.
Usage in SQL: @country_name(vc.country_code) AS country_name_en
"""
return _country_case(str(code_col))
@macro()
def country_slug(evaluator, code_col) -> str:
"""CASE expression: country code → URL-safe slug (lowercased, spaces → dashes).
Usage in SQL: @country_slug(vc.country_code) AS country_slug
"""
return f"LOWER(REGEXP_REPLACE({_country_case(str(code_col))}, '[^a-zA-Z0-9]+', '-'))"
@macro()
def normalize_eurostat_country(evaluator, code_col) -> str:
"""Normalize Eurostat country codes to ISO 3166-1 alpha-2: EL→GR, UK→GB.
Usage in SQL: @normalize_eurostat_country(geo_code) AS country_code
"""
col = str(code_col)
return f"CASE {col} WHEN 'EL' THEN 'GR' WHEN 'UK' THEN 'GB' ELSE {col} END"
@macro()
def normalize_eurostat_nuts(evaluator, code_col) -> str:
"""Normalize NUTS code prefix: EL→GR, UK→GB, preserving the suffix.
Usage in SQL: @normalize_eurostat_nuts(geo_code) AS nuts_code
"""
col = str(code_col)
return (
f"CASE"
f" WHEN {col} LIKE 'EL%' THEN 'GR' || SUBSTR({col}, 3)"
f" WHEN {col} LIKE 'UK%' THEN 'GB' || SUBSTR({col}, 3)"
f" ELSE {col}"
f" END"
)
@macro()
def infer_country_from_coords(evaluator, lat_col, lon_col) -> str:
"""Infer ISO country code from lat/lon using bounding boxes for 8 European markets.
Usage in SQL:
COALESCE(NULLIF(TRIM(UPPER(country_code)), ''),
@infer_country_from_coords(lat, lon)) AS country_code
"""
lat = str(lat_col)
lon = str(lon_col)
return (
f"CASE"
f" WHEN {lat} BETWEEN 47.27 AND 55.06 AND {lon} BETWEEN 5.87 AND 15.04 THEN 'DE'"
f" WHEN {lat} BETWEEN 35.95 AND 43.79 AND {lon} BETWEEN -9.39 AND 4.33 THEN 'ES'"
f" WHEN {lat} BETWEEN 49.90 AND 60.85 AND {lon} BETWEEN -8.62 AND 1.77 THEN 'GB'"
f" WHEN {lat} BETWEEN 41.36 AND 51.09 AND {lon} BETWEEN -5.14 AND 9.56 THEN 'FR'"
f" WHEN {lat} BETWEEN 45.46 AND 47.80 AND {lon} BETWEEN 5.96 AND 10.49 THEN 'CH'"
f" WHEN {lat} BETWEEN 46.37 AND 49.02 AND {lon} BETWEEN 9.53 AND 17.16 THEN 'AT'"
f" WHEN {lat} BETWEEN 36.35 AND 47.09 AND {lon} BETWEEN 6.62 AND 18.51 THEN 'IT'"
f" WHEN {lat} BETWEEN 37.00 AND 42.15 AND {lon} BETWEEN -9.50 AND -6.19 THEN 'PT'"
f" ELSE NULL"
f" END"
)

View File

@@ -110,55 +110,9 @@ SELECT
vc.city_slug, vc.city_slug,
vc.city_name, vc.city_name,
-- Human-readable country name for pSEO templates and internal linking -- Human-readable country name for pSEO templates and internal linking
CASE vc.country_code @country_name(vc.country_code) AS country_name_en,
WHEN 'DE' THEN 'Germany'
WHEN 'ES' THEN 'Spain'
WHEN 'GB' THEN 'United Kingdom'
WHEN 'FR' THEN 'France'
WHEN 'IT' THEN 'Italy'
WHEN 'PT' THEN 'Portugal'
WHEN 'AT' THEN 'Austria'
WHEN 'CH' THEN 'Switzerland'
WHEN 'NL' THEN 'Netherlands'
WHEN 'BE' THEN 'Belgium'
WHEN 'SE' THEN 'Sweden'
WHEN 'NO' THEN 'Norway'
WHEN 'DK' THEN 'Denmark'
WHEN 'FI' THEN 'Finland'
WHEN 'US' THEN 'United States'
WHEN 'AR' THEN 'Argentina'
WHEN 'MX' THEN 'Mexico'
WHEN 'AE' THEN 'UAE'
WHEN 'AU' THEN 'Australia'
WHEN 'IE' THEN 'Ireland'
ELSE vc.country_code
END AS country_name_en,
-- URL-safe country slug -- URL-safe country slug
LOWER(REGEXP_REPLACE( @country_slug(vc.country_code) AS country_slug,
CASE vc.country_code
WHEN 'DE' THEN 'Germany'
WHEN 'ES' THEN 'Spain'
WHEN 'GB' THEN 'United Kingdom'
WHEN 'FR' THEN 'France'
WHEN 'IT' THEN 'Italy'
WHEN 'PT' THEN 'Portugal'
WHEN 'AT' THEN 'Austria'
WHEN 'CH' THEN 'Switzerland'
WHEN 'NL' THEN 'Netherlands'
WHEN 'BE' THEN 'Belgium'
WHEN 'SE' THEN 'Sweden'
WHEN 'NO' THEN 'Norway'
WHEN 'DK' THEN 'Denmark'
WHEN 'FI' THEN 'Finland'
WHEN 'US' THEN 'United States'
WHEN 'AR' THEN 'Argentina'
WHEN 'MX' THEN 'Mexico'
WHEN 'AE' THEN 'UAE'
WHEN 'AU' THEN 'Australia'
WHEN 'IE' THEN 'Ireland'
ELSE vc.country_code
END, '[^a-zA-Z0-9]+', '-'
)) AS country_slug,
vc.centroid_lat AS lat, vc.centroid_lat AS lat,
vc.centroid_lon AS lon, vc.centroid_lon AS lon,
-- Population cascade: Eurostat EU > US Census > ONS UK > GeoNames string > GeoNames spatial > 0. -- Population cascade: Eurostat EU > US Census > ONS UK > GeoNames string > GeoNames spatial > 0.

View File

@@ -215,55 +215,9 @@ SELECT
l.geoname_id, l.geoname_id,
l.country_code, l.country_code,
-- Human-readable country name (consistent with dim_cities) -- Human-readable country name (consistent with dim_cities)
CASE l.country_code @country_name(l.country_code) AS country_name_en,
WHEN 'DE' THEN 'Germany'
WHEN 'ES' THEN 'Spain'
WHEN 'GB' THEN 'United Kingdom'
WHEN 'FR' THEN 'France'
WHEN 'IT' THEN 'Italy'
WHEN 'PT' THEN 'Portugal'
WHEN 'AT' THEN 'Austria'
WHEN 'CH' THEN 'Switzerland'
WHEN 'NL' THEN 'Netherlands'
WHEN 'BE' THEN 'Belgium'
WHEN 'SE' THEN 'Sweden'
WHEN 'NO' THEN 'Norway'
WHEN 'DK' THEN 'Denmark'
WHEN 'FI' THEN 'Finland'
WHEN 'US' THEN 'United States'
WHEN 'AR' THEN 'Argentina'
WHEN 'MX' THEN 'Mexico'
WHEN 'AE' THEN 'UAE'
WHEN 'AU' THEN 'Australia'
WHEN 'IE' THEN 'Ireland'
ELSE l.country_code
END AS country_name_en,
-- URL-safe country slug -- URL-safe country slug
LOWER(REGEXP_REPLACE( @country_slug(l.country_code) AS country_slug,
CASE l.country_code
WHEN 'DE' THEN 'Germany'
WHEN 'ES' THEN 'Spain'
WHEN 'GB' THEN 'United Kingdom'
WHEN 'FR' THEN 'France'
WHEN 'IT' THEN 'Italy'
WHEN 'PT' THEN 'Portugal'
WHEN 'AT' THEN 'Austria'
WHEN 'CH' THEN 'Switzerland'
WHEN 'NL' THEN 'Netherlands'
WHEN 'BE' THEN 'Belgium'
WHEN 'SE' THEN 'Sweden'
WHEN 'NO' THEN 'Norway'
WHEN 'DK' THEN 'Denmark'
WHEN 'FI' THEN 'Finland'
WHEN 'US' THEN 'United States'
WHEN 'AR' THEN 'Argentina'
WHEN 'MX' THEN 'Mexico'
WHEN 'AE' THEN 'UAE'
WHEN 'AU' THEN 'Australia'
WHEN 'IE' THEN 'Ireland'
ELSE l.country_code
END, '[^a-zA-Z0-9]+', '-'
)) AS country_slug,
l.location_name, l.location_name,
l.location_slug, l.location_slug,
l.lat, l.lat,

View File

@@ -30,11 +30,7 @@ parsed AS (
) )
SELECT SELECT
-- Normalise to ISO 3166-1 alpha-2: EL→GR, UK→GB -- Normalise to ISO 3166-1 alpha-2: EL→GR, UK→GB
CASE geo_code @normalize_eurostat_country(geo_code) AS country_code,
WHEN 'EL' THEN 'GR'
WHEN 'UK' THEN 'GB'
ELSE geo_code
END AS country_code,
ref_year, ref_year,
median_income_pps, median_income_pps,
extracted_date extracted_date

View File

@@ -28,11 +28,7 @@ WITH raw AS (
SELECT SELECT
NUTS_ID AS nuts2_code, NUTS_ID AS nuts2_code,
-- Normalise country prefix to ISO 3166-1 alpha-2: EL→GR, UK→GB -- Normalise country prefix to ISO 3166-1 alpha-2: EL→GR, UK→GB
CASE CNTR_CODE @normalize_eurostat_country(CNTR_CODE) AS country_code,
WHEN 'EL' THEN 'GR'
WHEN 'UK' THEN 'GB'
ELSE CNTR_CODE
END AS country_code,
NAME_LATN AS region_name, NAME_LATN AS region_name,
geom AS geometry, geom AS geometry,
-- Pre-compute bounding box for efficient spatial pre-filter in dim_locations. -- Pre-compute bounding box for efficient spatial pre-filter in dim_locations.

View File

@@ -48,17 +48,8 @@ deduped AS (
with_country AS ( with_country AS (
SELECT SELECT
osm_id, lat, lon, osm_id, lat, lon,
COALESCE(NULLIF(TRIM(UPPER(country_code)), ''), CASE COALESCE(NULLIF(TRIM(UPPER(country_code)), ''),
WHEN lat BETWEEN 47.27 AND 55.06 AND lon BETWEEN 5.87 AND 15.04 THEN 'DE' @infer_country_from_coords(lat, lon)) AS country_code,
WHEN lat BETWEEN 35.95 AND 43.79 AND lon BETWEEN -9.39 AND 4.33 THEN 'ES'
WHEN lat BETWEEN 49.90 AND 60.85 AND lon BETWEEN -8.62 AND 1.77 THEN 'GB'
WHEN lat BETWEEN 41.36 AND 51.09 AND lon BETWEEN -5.14 AND 9.56 THEN 'FR'
WHEN lat BETWEEN 45.46 AND 47.80 AND lon BETWEEN 5.96 AND 10.49 THEN 'CH'
WHEN lat BETWEEN 46.37 AND 49.02 AND lon BETWEEN 9.53 AND 17.16 THEN 'AT'
WHEN lat BETWEEN 36.35 AND 47.09 AND lon BETWEEN 6.62 AND 18.51 THEN 'IT'
WHEN lat BETWEEN 37.00 AND 42.15 AND lon BETWEEN -9.50 AND -6.19 THEN 'PT'
ELSE NULL
END) AS country_code,
NULLIF(TRIM(name), '') AS name, NULLIF(TRIM(name), '') AS name,
NULLIF(TRIM(city_tag), '') AS city, NULLIF(TRIM(city_tag), '') AS city,
postcode, operator_name, opening_hours, fee, extracted_date postcode, operator_name, opening_hours, fee, extracted_date

View File

@@ -30,11 +30,7 @@ parsed AS (
) )
SELECT SELECT
-- Normalise to ISO 3166-1 alpha-2 prefix: EL→GR, UK→GB -- Normalise to ISO 3166-1 alpha-2 prefix: EL→GR, UK→GB
CASE @normalize_eurostat_nuts(geo_code) AS nuts_code,
WHEN geo_code LIKE 'EL%' THEN 'GR' || SUBSTR(geo_code, 3)
WHEN geo_code LIKE 'UK%' THEN 'GB' || SUBSTR(geo_code, 3)
ELSE geo_code
END AS nuts_code,
-- NUTS level: 3-char = NUTS-1, 4-char = NUTS-2 -- NUTS level: 3-char = NUTS-1, 4-char = NUTS-2
LENGTH(geo_code) - 2 AS nuts_level, LENGTH(geo_code) - 2 AS nuts_level,
ref_year, ref_year,

View File

@@ -54,17 +54,8 @@ deduped AS (
with_country AS ( with_country AS (
SELECT SELECT
osm_id, lat, lon, osm_id, lat, lon,
COALESCE(NULLIF(TRIM(UPPER(country_code)), ''), CASE COALESCE(NULLIF(TRIM(UPPER(country_code)), ''),
WHEN lat BETWEEN 47.27 AND 55.06 AND lon BETWEEN 5.87 AND 15.04 THEN 'DE' @infer_country_from_coords(lat, lon)) AS country_code,
WHEN lat BETWEEN 35.95 AND 43.79 AND lon BETWEEN -9.39 AND 4.33 THEN 'ES'
WHEN lat BETWEEN 49.90 AND 60.85 AND lon BETWEEN -8.62 AND 1.77 THEN 'GB'
WHEN lat BETWEEN 41.36 AND 51.09 AND lon BETWEEN -5.14 AND 9.56 THEN 'FR'
WHEN lat BETWEEN 45.46 AND 47.80 AND lon BETWEEN 5.96 AND 10.49 THEN 'CH'
WHEN lat BETWEEN 46.37 AND 49.02 AND lon BETWEEN 9.53 AND 17.16 THEN 'AT'
WHEN lat BETWEEN 36.35 AND 47.09 AND lon BETWEEN 6.62 AND 18.51 THEN 'IT'
WHEN lat BETWEEN 37.00 AND 42.15 AND lon BETWEEN -9.50 AND -6.19 THEN 'PT'
ELSE NULL
END) AS country_code,
NULLIF(TRIM(name), '') AS name, NULLIF(TRIM(name), '') AS name,
NULLIF(TRIM(city_tag), '') AS city, NULLIF(TRIM(city_tag), '') AS city,
extracted_date extracted_date

View File

@@ -35,7 +35,7 @@ from pathlib import Path
from quart import Blueprint, flash, redirect, render_template, request, url_for from quart import Blueprint, flash, redirect, render_template, request, url_for
from ..auth.routes import role_required from ..auth.routes import role_required
from ..core import csrf_protect from ..core import count_where, csrf_protect
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -298,11 +298,8 @@ async def _inject_sidebar_data():
"""Load unread inbox count for the admin sidebar badge.""" """Load unread inbox count for the admin sidebar badge."""
from quart import g from quart import g
from ..core import fetch_one
try: try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0") g.admin_unread_count = await count_where("inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception: except Exception:
g.admin_unread_count = 0 g.admin_unread_count = 0

View File

@@ -25,7 +25,7 @@ from ..content.health import (
get_template_freshness, get_template_freshness,
get_template_stats, get_template_stats,
) )
from ..core import csrf_protect, fetch_all, fetch_one from ..core import count_where, csrf_protect, fetch_all, fetch_one
bp = Blueprint( bp = Blueprint(
"pseo", "pseo",
@@ -41,8 +41,7 @@ async def _inject_sidebar_data():
from quart import g from quart import g
try: try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0") g.admin_unread_count = await count_where("inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception: except Exception:
g.admin_unread_count = 0 g.admin_unread_count = 0
@@ -80,8 +79,7 @@ async def pseo_dashboard():
total_published = sum(r["stats"]["published"] for r in template_rows) total_published = sum(r["stats"]["published"] for r in template_rows)
stale_count = sum(1 for f in freshness if f["status"] == "stale") stale_count = sum(1 for f in freshness if f["status"] == "stale")
noindex_row = await fetch_one("SELECT COUNT(*) as cnt FROM articles WHERE noindex = 1") noindex_count = await count_where("articles WHERE noindex = 1")
noindex_count = noindex_row["cnt"] if noindex_row else 0
# Recent generation jobs — enough for the dashboard summary. # Recent generation jobs — enough for the dashboard summary.
jobs = await fetch_all( jobs = await fetch_all(

View File

@@ -28,6 +28,7 @@ from ..auth.routes import role_required
from ..core import ( from ..core import (
EMAIL_ADDRESSES, EMAIL_ADDRESSES,
config, config,
count_where,
csrf_protect, csrf_protect,
execute, execute,
fetch_all, fetch_all,
@@ -91,8 +92,7 @@ async def _inject_admin_sidebar_data():
"""Load unread inbox count for sidebar badge on every admin page.""" """Load unread inbox count for sidebar badge on every admin page."""
from quart import g from quart import g
try: try:
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0") g.admin_unread_count = await count_where("inbound_emails WHERE is_read = 0")
g.admin_unread_count = row["cnt"] if row else 0
except Exception: except Exception:
logger.exception("Failed to load admin sidebar unread count") logger.exception("Failed to load admin sidebar unread count")
g.admin_unread_count = 0 g.admin_unread_count = 0
@@ -114,76 +114,32 @@ async def get_dashboard_stats() -> dict:
now = utcnow() now = utcnow()
today = now.date().isoformat() today = now.date().isoformat()
week_ago = (now - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S") week_ago = (now - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
users_total = await fetch_one("SELECT COUNT(*) as count FROM users WHERE deleted_at IS NULL")
users_today = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
(today,)
)
users_week = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
(week_ago,)
)
subs = await fetch_one( # Two queries that aren't simple COUNT(*) — keep as fetch_one
"SELECT COUNT(*) as count FROM subscriptions WHERE status = 'active'" planner_row = await fetch_one(
"SELECT COUNT(DISTINCT user_id) AS n FROM scenarios WHERE deleted_at IS NULL"
) )
credits_row = await fetch_one(
tasks_pending = await fetch_one("SELECT COUNT(*) as count FROM tasks WHERE status = 'pending'") "SELECT COALESCE(SUM(ABS(delta)), 0) AS n FROM credit_ledger WHERE delta < 0"
tasks_failed = await fetch_one("SELECT COUNT(*) as count FROM tasks WHERE status = 'failed'")
# Lead funnel stats
leads_total = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE lead_type = 'quote'"
)
leads_new = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE status = 'new' AND lead_type = 'quote'"
)
leads_verified = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE verified_at IS NOT NULL AND lead_type = 'quote'"
)
leads_unlocked = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE unlock_count > 0 AND lead_type = 'quote'"
)
# Planner users
planner_users = await fetch_one(
"SELECT COUNT(DISTINCT user_id) as count FROM scenarios WHERE deleted_at IS NULL"
)
# Supplier stats
suppliers_claimed = await fetch_one(
"SELECT COUNT(*) as count FROM suppliers WHERE claimed_by IS NOT NULL"
)
suppliers_growth = await fetch_one(
"SELECT COUNT(*) as count FROM suppliers WHERE tier = 'growth'"
)
suppliers_pro = await fetch_one(
"SELECT COUNT(*) as count FROM suppliers WHERE tier = 'pro'"
)
total_credits_spent = await fetch_one(
"SELECT COALESCE(SUM(ABS(delta)), 0) as total FROM credit_ledger WHERE delta < 0"
)
leads_unlocked_by_suppliers = await fetch_one(
"SELECT COUNT(*) as count FROM lead_forwards"
) )
return { return {
"users_total": users_total["count"] if users_total else 0, "users_total": await count_where("users WHERE deleted_at IS NULL"),
"users_today": users_today["count"] if users_today else 0, "users_today": await count_where("users WHERE created_at >= ? AND deleted_at IS NULL", (today,)),
"users_week": users_week["count"] if users_week else 0, "users_week": await count_where("users WHERE created_at >= ? AND deleted_at IS NULL", (week_ago,)),
"active_subscriptions": subs["count"] if subs else 0, "active_subscriptions": await count_where("subscriptions WHERE status = 'active'"),
"tasks_pending": tasks_pending["count"] if tasks_pending else 0, "tasks_pending": await count_where("tasks WHERE status = 'pending'"),
"tasks_failed": tasks_failed["count"] if tasks_failed else 0, "tasks_failed": await count_where("tasks WHERE status = 'failed'"),
"leads_total": leads_total["count"] if leads_total else 0, "leads_total": await count_where("lead_requests WHERE lead_type = 'quote'"),
"leads_new": leads_new["count"] if leads_new else 0, "leads_new": await count_where("lead_requests WHERE status = 'new' AND lead_type = 'quote'"),
"leads_verified": leads_verified["count"] if leads_verified else 0, "leads_verified": await count_where("lead_requests WHERE verified_at IS NOT NULL AND lead_type = 'quote'"),
"leads_unlocked": leads_unlocked["count"] if leads_unlocked else 0, "leads_unlocked": await count_where("lead_requests WHERE unlock_count > 0 AND lead_type = 'quote'"),
"planner_users": planner_users["count"] if planner_users else 0, "planner_users": planner_row["n"] if planner_row else 0,
"suppliers_claimed": suppliers_claimed["count"] if suppliers_claimed else 0, "suppliers_claimed": await count_where("suppliers WHERE claimed_by IS NOT NULL"),
"suppliers_growth": suppliers_growth["count"] if suppliers_growth else 0, "suppliers_growth": await count_where("suppliers WHERE tier = 'growth'"),
"suppliers_pro": suppliers_pro["count"] if suppliers_pro else 0, "suppliers_pro": await count_where("suppliers WHERE tier = 'pro'"),
"total_credits_spent": total_credits_spent["total"] if total_credits_spent else 0, "total_credits_spent": credits_row["n"] if credits_row else 0,
"leads_unlocked_by_suppliers": leads_unlocked_by_suppliers["count"] if leads_unlocked_by_suppliers else 0, "leads_unlocked_by_suppliers": await count_where("lead_forwards WHERE 1=1"),
} }
@@ -446,10 +402,7 @@ async def get_leads(
params.append(f"-{days} days") params.append(f"-{days} days")
where = " AND ".join(wheres) where = " AND ".join(wheres)
count_row = await fetch_one( total = await count_where(f"lead_requests WHERE {where}", tuple(params))
f"SELECT COUNT(*) as cnt FROM lead_requests WHERE {where}", tuple(params)
)
total = count_row["cnt"] if count_row else 0
offset = (page - 1) * per_page offset = (page - 1) * per_page
rows = await fetch_all( rows = await fetch_all(
@@ -679,26 +632,14 @@ async def lead_new():
return await render_template("admin/lead_form.html", data={}, statuses=LEAD_STATUSES) return await render_template("admin/lead_form.html", data={}, statuses=LEAD_STATUSES)
@bp.route("/leads/<int:lead_id>/forward", methods=["POST"]) async def _forward_lead(lead_id: int, supplier_id: int) -> str | None:
@role_required("admin") """Forward a lead to a supplier. Returns error message or None on success."""
@csrf_protect
async def lead_forward(lead_id: int):
"""Manually forward a lead to a supplier (no credit cost)."""
form = await request.form
supplier_id = int(form.get("supplier_id", 0))
if not supplier_id:
await flash("Select a supplier.", "error")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
# Check if already forwarded
existing = await fetch_one( existing = await fetch_one(
"SELECT 1 FROM lead_forwards WHERE lead_id = ? AND supplier_id = ?", "SELECT 1 FROM lead_forwards WHERE lead_id = ? AND supplier_id = ?",
(lead_id, supplier_id), (lead_id, supplier_id),
) )
if existing: if existing:
await flash("Already forwarded to this supplier.", "warning") return "Already forwarded to this supplier."
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
now = utcnow_iso() now = utcnow_iso()
await execute( await execute(
@@ -710,15 +651,27 @@ async def lead_forward(lead_id: int):
"UPDATE lead_requests SET unlock_count = unlock_count + 1, status = 'forwarded' WHERE id = ?", "UPDATE lead_requests SET unlock_count = unlock_count + 1, status = 'forwarded' WHERE id = ?",
(lead_id,), (lead_id,),
) )
# Enqueue forward email
from ..worker import enqueue from ..worker import enqueue
await enqueue("send_lead_forward_email", { await enqueue("send_lead_forward_email", {"lead_id": lead_id, "supplier_id": supplier_id})
"lead_id": lead_id, return None
"supplier_id": supplier_id,
})
await flash("Lead forwarded to supplier.", "success")
@bp.route("/leads/<int:lead_id>/forward", methods=["POST"])
@role_required("admin")
@csrf_protect
async def lead_forward(lead_id: int):
"""Manually forward a lead to a supplier (no credit cost)."""
form = await request.form
supplier_id = int(form.get("supplier_id", 0))
if not supplier_id:
await flash("Select a supplier.", "error")
return redirect(url_for("admin.lead_detail", lead_id=lead_id))
error = await _forward_lead(lead_id, supplier_id)
if error:
await flash(error, "warning")
else:
await flash("Lead forwarded to supplier.", "success")
return redirect(url_for("admin.lead_detail", lead_id=lead_id)) return redirect(url_for("admin.lead_detail", lead_id=lead_id))
@@ -751,25 +704,9 @@ async def lead_forward_htmx(lead_id: int):
return Response("Select a supplier.", status=422) return Response("Select a supplier.", status=422)
supplier_id = int(supplier_id_str) supplier_id = int(supplier_id_str)
existing = await fetch_one( error = await _forward_lead(lead_id, supplier_id)
"SELECT 1 FROM lead_forwards WHERE lead_id = ? AND supplier_id = ?", if error:
(lead_id, supplier_id), return Response(error, status=422)
)
if existing:
return Response("Already forwarded to this supplier.", status=422)
now = utcnow_iso()
await execute(
"""INSERT INTO lead_forwards (lead_id, supplier_id, credit_cost, status, created_at)
VALUES (?, ?, 0, 'sent', ?)""",
(lead_id, supplier_id, now),
)
await execute(
"UPDATE lead_requests SET unlock_count = unlock_count + 1, status = 'forwarded' WHERE id = ?",
(lead_id,),
)
from ..worker import enqueue
await enqueue("send_lead_forward_email", {"lead_id": lead_id, "supplier_id": supplier_id})
lead = await get_lead_detail(lead_id) lead = await get_lead_detail(lead_id)
return await render_template( return await render_template(
@@ -929,13 +866,10 @@ async def get_suppliers_list(
async def get_supplier_stats() -> dict: async def get_supplier_stats() -> dict:
"""Get aggregate supplier stats for the admin list header.""" """Get aggregate supplier stats for the admin list header."""
claimed = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers WHERE claimed_by IS NOT NULL")
growth = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers WHERE tier = 'growth'")
pro = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers WHERE tier = 'pro'")
return { return {
"claimed": claimed["cnt"] if claimed else 0, "claimed": await count_where("suppliers WHERE claimed_by IS NOT NULL"),
"growth": growth["cnt"] if growth else 0, "growth": await count_where("suppliers WHERE tier = 'growth'"),
"pro": pro["cnt"] if pro else 0, "pro": await count_where("suppliers WHERE tier = 'pro'"),
} }
@@ -1017,11 +951,7 @@ async def supplier_detail(supplier_id: int):
(supplier_id,), (supplier_id,),
) )
enquiry_row = await fetch_one( enquiry_count = await count_where("supplier_enquiries WHERE supplier_id = ?", (supplier_id,))
"SELECT COUNT(*) as cnt FROM supplier_enquiries WHERE supplier_id = ?",
(supplier_id,),
)
enquiry_count = enquiry_row["cnt"] if enquiry_row else 0
# Email activity timeline — correlate by contact_email (no FK) # Email activity timeline — correlate by contact_email (no FK)
timeline = [] timeline = []
@@ -1239,7 +1169,6 @@ _PRODUCT_CATEGORIES = [
@role_required("admin") @role_required("admin")
async def billing_products(): async def billing_products():
"""Read-only overview of Paddle products, subscriptions, and revenue proxies.""" """Read-only overview of Paddle products, subscriptions, and revenue proxies."""
active_subs_row = await fetch_one("SELECT COUNT(*) as cnt FROM subscriptions WHERE status = 'active'")
mrr_row = await fetch_one( mrr_row = await fetch_one(
"""SELECT COALESCE(SUM( """SELECT COALESCE(SUM(
CASE WHEN pp.key LIKE '%_yearly' THEN pp.price_cents / 12 CASE WHEN pp.key LIKE '%_yearly' THEN pp.price_cents / 12
@@ -1249,14 +1178,12 @@ async def billing_products():
JOIN paddle_products pp ON s.plan = pp.key JOIN paddle_products pp ON s.plan = pp.key
WHERE s.status = 'active' AND pp.billing_type = 'subscription'""" WHERE s.status = 'active' AND pp.billing_type = 'subscription'"""
) )
active_boosts_row = await fetch_one("SELECT COUNT(*) as cnt FROM supplier_boosts WHERE status = 'active'")
bp_exports_row = await fetch_one("SELECT COUNT(*) as cnt FROM business_plan_exports WHERE status = 'completed'")
stats = { stats = {
"active_subs": (active_subs_row or {}).get("cnt", 0), "active_subs": await count_where("subscriptions WHERE status = 'active'"),
"mrr_cents": (mrr_row or {}).get("total_cents", 0), "mrr_cents": (mrr_row or {}).get("total_cents", 0),
"active_boosts": (active_boosts_row or {}).get("cnt", 0), "active_boosts": await count_where("supplier_boosts WHERE status = 'active'"),
"bp_exports": (bp_exports_row or {}).get("cnt", 0), "bp_exports": await count_where("business_plan_exports WHERE status = 'completed'"),
} }
products_rows = await fetch_all("SELECT * FROM paddle_products ORDER BY key") products_rows = await fetch_all("SELECT * FROM paddle_products ORDER BY key")
@@ -1342,23 +1269,18 @@ async def get_email_log(
async def get_email_stats() -> dict: async def get_email_stats() -> dict:
"""Aggregate email stats for the list header.""" """Aggregate email stats for the list header."""
total = await fetch_one("SELECT COUNT(*) as cnt FROM email_log")
delivered = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE last_event = 'delivered'")
bounced = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE last_event = 'bounced'")
today = utcnow().date().isoformat() today = utcnow().date().isoformat()
sent_today = await fetch_one("SELECT COUNT(*) as cnt FROM email_log WHERE created_at >= ?", (today,))
return { return {
"total": total["cnt"] if total else 0, "total": await count_where("email_log WHERE 1=1"),
"delivered": delivered["cnt"] if delivered else 0, "delivered": await count_where("email_log WHERE last_event = 'delivered'"),
"bounced": bounced["cnt"] if bounced else 0, "bounced": await count_where("email_log WHERE last_event = 'bounced'"),
"sent_today": sent_today["cnt"] if sent_today else 0, "sent_today": await count_where("email_log WHERE created_at >= ?", (today,)),
} }
async def get_unread_count() -> int: async def get_unread_count() -> int:
"""Count unread inbound emails.""" """Count unread inbound emails."""
row = await fetch_one("SELECT COUNT(*) as cnt FROM inbound_emails WHERE is_read = 0") return await count_where("inbound_emails WHERE is_read = 0")
return row["cnt"] if row else 0
@bp.route("/emails") @bp.route("/emails")
@@ -1824,11 +1746,7 @@ async def template_detail(slug: str):
columns = await get_table_columns(config["data_table"]) columns = await get_table_columns(config["data_table"])
sample_rows = await fetch_template_data(config["data_table"], limit=10) sample_rows = await fetch_template_data(config["data_table"], limit=10)
# Count generated articles generated_count = await count_where("articles WHERE template_slug = ?", (slug,))
row = await fetch_one(
"SELECT COUNT(*) as cnt FROM articles WHERE template_slug = ?", (slug,),
)
generated_count = row["cnt"] if row else 0
return await render_template( return await render_template(
"admin/template_detail.html", "admin/template_detail.html",
@@ -1959,8 +1877,8 @@ async def _query_scenarios(search: str, country: str, venue_type: str) -> tuple[
f"SELECT * FROM published_scenarios WHERE {where} ORDER BY created_at DESC LIMIT 500", f"SELECT * FROM published_scenarios WHERE {where} ORDER BY created_at DESC LIMIT 500",
tuple(params), tuple(params),
) )
total_row = await fetch_one("SELECT COUNT(*) as cnt FROM published_scenarios") total = await count_where("published_scenarios WHERE 1=1")
return rows, (total_row["cnt"] if total_row else 0) return rows, total
@bp.route("/scenarios") @bp.route("/scenarios")
@@ -2927,11 +2845,9 @@ _CSV_IMPORT_LIMIT = 500 # guard against huge uploads
async def get_follow_up_due_count() -> int: async def get_follow_up_due_count() -> int:
"""Count pipeline suppliers with follow_up_at <= today.""" """Count pipeline suppliers with follow_up_at <= today."""
row = await fetch_one( return await count_where(
"""SELECT COUNT(*) as cnt FROM suppliers "suppliers WHERE outreach_status IS NOT NULL AND follow_up_at <= date('now')"
WHERE outreach_status IS NOT NULL AND follow_up_at <= date('now')"""
) )
return row["cnt"] if row else 0
async def get_outreach_pipeline() -> dict: async def get_outreach_pipeline() -> dict:

View File

@@ -192,6 +192,15 @@ async def fetch_all(sql: str, params: tuple = ()) -> list[dict]:
return [dict(row) for row in rows] return [dict(row) for row in rows]
async def count_where(table_where: str, params: tuple = ()) -> int:
"""Count rows matching a condition. Compresses the fetch_one + null-check pattern.
Usage: await count_where("users WHERE deleted_at IS NULL")
"""
row = await fetch_one(f"SELECT COUNT(*) AS n FROM {table_where}", params)
return row["n"] if row else 0
async def execute(sql: str, params: tuple = ()) -> int: async def execute(sql: str, params: tuple = ()) -> int:
"""Execute SQL and return lastrowid.""" """Execute SQL and return lastrowid."""
db = await get_db() db = await get_db()

View File

@@ -6,7 +6,7 @@ from pathlib import Path
from quart import Blueprint, flash, g, redirect, render_template, request, url_for from quart import Blueprint, flash, g, redirect, render_template, request, url_for
from ..auth.routes import login_required, update_user from ..auth.routes import login_required, update_user
from ..core import csrf_protect, fetch_one, soft_delete, utcnow_iso from ..core import count_where, csrf_protect, fetch_one, soft_delete, utcnow_iso
from ..i18n import get_translations from ..i18n import get_translations
bp = Blueprint( bp = Blueprint(
@@ -18,17 +18,13 @@ bp = Blueprint(
async def get_user_stats(user_id: int) -> dict: async def get_user_stats(user_id: int) -> dict:
scenarios = await fetch_one(
"SELECT COUNT(*) as count FROM scenarios WHERE user_id = ? AND deleted_at IS NULL",
(user_id,),
)
leads = await fetch_one(
"SELECT COUNT(*) as count FROM lead_requests WHERE user_id = ?",
(user_id,),
)
return { return {
"scenarios": scenarios["count"] if scenarios else 0, "scenarios": await count_where(
"leads": leads["count"] if leads else 0, "scenarios WHERE user_id = ? AND deleted_at IS NULL", (user_id,)
),
"leads": await count_where(
"lead_requests WHERE user_id = ?", (user_id,)
),
} }

View File

@@ -6,7 +6,7 @@ from pathlib import Path
from quart import Blueprint, g, make_response, redirect, render_template, request, url_for from quart import Blueprint, g, make_response, redirect, render_template, request, url_for
from ..core import csrf_protect, execute, fetch_all, fetch_one, utcnow_iso from ..core import count_where, csrf_protect, execute, fetch_all, fetch_one, utcnow_iso
from ..i18n import COUNTRY_LABELS, get_translations from ..i18n import COUNTRY_LABELS, get_translations
bp = Blueprint( bp = Blueprint(
@@ -79,11 +79,7 @@ async def _build_directory_query(q, country, category, region, page, per_page=24
where = " AND ".join(wheres) if wheres else "1=1" where = " AND ".join(wheres) if wheres else "1=1"
count_row = await fetch_one( total = await count_where(f"suppliers s WHERE {where}", tuple(params))
f"SELECT COUNT(*) as cnt FROM suppliers s WHERE {where}",
tuple(params),
)
total = count_row["cnt"] if count_row else 0
offset = (page - 1) * per_page offset = (page - 1) * per_page
# Tier-based ordering: sticky first, then pro > growth > free, then name # Tier-based ordering: sticky first, then pro > growth > free, then name
@@ -159,16 +155,16 @@ async def index():
"SELECT category, COUNT(*) as cnt FROM suppliers GROUP BY category ORDER BY cnt DESC" "SELECT category, COUNT(*) as cnt FROM suppliers GROUP BY category ORDER BY cnt DESC"
) )
total_suppliers = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers") total_suppliers = await count_where("suppliers")
total_countries = await fetch_one("SELECT COUNT(DISTINCT country_code) as cnt FROM suppliers") total_countries = await count_where("(SELECT DISTINCT country_code FROM suppliers)")
return await render_template( return await render_template(
"directory.html", "directory.html",
**ctx, **ctx,
country_counts=country_counts, country_counts=country_counts,
category_counts=category_counts, category_counts=category_counts,
total_suppliers=total_suppliers["cnt"] if total_suppliers else 0, total_suppliers=total_suppliers,
total_countries=total_countries["cnt"] if total_countries else 0, total_countries=total_countries,
) )
@@ -204,11 +200,9 @@ async def supplier_detail(slug: str):
# Enquiry count (Basic+) # Enquiry count (Basic+)
enquiry_count = 0 enquiry_count = 0
if supplier.get("tier") in ("basic", "growth", "pro"): if supplier.get("tier") in ("basic", "growth", "pro"):
row = await fetch_one( enquiry_count = await count_where(
"SELECT COUNT(*) as cnt FROM supplier_enquiries WHERE supplier_id = ?", "supplier_enquiries WHERE supplier_id = ?", (supplier["id"],)
(supplier["id"],),
) )
enquiry_count = row["cnt"] if row else 0
lang = g.get("lang", "en") lang = g.get("lang", "en")
cat_labels, country_labels, region_labels = get_directory_labels(lang) cat_labels, country_labels, region_labels = get_directory_labels(lang)

View File

@@ -12,6 +12,7 @@ from quart import Blueprint, Response, g, jsonify, render_template, request
from ..auth.routes import login_required from ..auth.routes import login_required
from ..core import ( from ..core import (
config, config,
count_where,
csrf_protect, csrf_protect,
execute, execute,
feature_gate, feature_gate,
@@ -50,11 +51,9 @@ COUNTRY_PRESETS = {
async def count_scenarios(user_id: int) -> int: async def count_scenarios(user_id: int) -> int:
row = await fetch_one( return await count_where(
"SELECT COUNT(*) as cnt FROM scenarios WHERE user_id = ? AND deleted_at IS NULL", "scenarios WHERE user_id = ? AND deleted_at IS NULL", (user_id,)
(user_id,),
) )
return row["cnt"] if row else 0
async def get_default_scenario(user_id: int) -> dict | None: async def get_default_scenario(user_id: int) -> dict | None:

View File

@@ -5,7 +5,7 @@ from pathlib import Path
from quart import Blueprint, g, render_template, request, session from quart import Blueprint, g, render_template, request, session
from ..core import check_rate_limit, csrf_protect, execute, fetch_all, fetch_one from ..core import check_rate_limit, count_where, csrf_protect, execute, fetch_all, fetch_one
from ..i18n import get_translations from ..i18n import get_translations
bp = Blueprint( bp = Blueprint(
@@ -17,13 +17,9 @@ bp = Blueprint(
async def _supplier_counts(): async def _supplier_counts():
"""Fetch aggregate supplier stats for landing/marketing pages.""" """Fetch aggregate supplier stats for landing/marketing pages."""
total = await fetch_one("SELECT COUNT(*) as cnt FROM suppliers")
countries = await fetch_one(
"SELECT COUNT(DISTINCT country_code) as cnt FROM suppliers"
)
return ( return (
total["cnt"] if total else 0, await count_where("suppliers"),
countries["cnt"] if countries else 0, await count_where("(SELECT DISTINCT country_code FROM suppliers)"),
) )
@@ -75,15 +71,15 @@ async def suppliers():
total_suppliers, total_countries = await _supplier_counts() total_suppliers, total_countries = await _supplier_counts()
# Live stats # Live stats
calc_requests = await fetch_one("SELECT COUNT(*) as cnt FROM scenarios WHERE deleted_at IS NULL") calc_requests = await count_where("scenarios WHERE deleted_at IS NULL")
avg_budget = await fetch_one( avg_budget = await fetch_one(
"SELECT AVG(budget_estimate) as avg FROM lead_requests WHERE budget_estimate > 0 AND lead_type = 'quote'" "SELECT AVG(budget_estimate) as avg FROM lead_requests WHERE budget_estimate > 0 AND lead_type = 'quote'"
) )
active_suppliers = await fetch_one( active_suppliers = await count_where(
"SELECT COUNT(*) as cnt FROM suppliers WHERE tier IN ('growth', 'pro') AND claimed_by IS NOT NULL" "suppliers WHERE tier IN ('growth', 'pro') AND claimed_by IS NOT NULL"
) )
monthly_leads = await fetch_one( monthly_leads = await count_where(
"SELECT COUNT(*) as cnt FROM lead_requests WHERE lead_type = 'quote' AND created_at >= date('now', '-30 days')" "lead_requests WHERE lead_type = 'quote' AND created_at >= date('now', '-30 days')"
) )
# Lead feed preview — 3 recent verified hot/warm leads, anonymized # Lead feed preview — 3 recent verified hot/warm leads, anonymized
@@ -100,10 +96,10 @@ async def suppliers():
"suppliers.html", "suppliers.html",
total_suppliers=total_suppliers, total_suppliers=total_suppliers,
total_countries=total_countries, total_countries=total_countries,
calc_requests=calc_requests["cnt"] if calc_requests else 0, calc_requests=calc_requests,
avg_budget=int(avg_budget["avg"]) if avg_budget and avg_budget["avg"] else 0, avg_budget=int(avg_budget["avg"]) if avg_budget and avg_budget["avg"] else 0,
active_suppliers=active_suppliers["cnt"] if active_suppliers else 0, active_suppliers=active_suppliers,
monthly_leads=monthly_leads["cnt"] if monthly_leads else 0, monthly_leads=monthly_leads,
preview_leads=preview_leads, preview_leads=preview_leads,
) )

View File

@@ -11,6 +11,7 @@ from werkzeug.utils import secure_filename
from ..core import ( from ..core import (
capture_waitlist_email, capture_waitlist_email,
config, config,
count_where,
csrf_protect, csrf_protect,
execute, execute,
feature_gate, feature_gate,
@@ -776,9 +777,8 @@ async def dashboard_overview():
supplier = g.supplier supplier = g.supplier
# Leads unlocked count # Leads unlocked count
unlocked = await fetch_one( leads_unlocked = await count_where(
"SELECT COUNT(*) as cnt FROM lead_forwards WHERE supplier_id = ?", "lead_forwards WHERE supplier_id = ?", (supplier["id"],)
(supplier["id"],),
) )
# New leads matching supplier's area since last login # New leads matching supplier's area since last login
@@ -787,22 +787,20 @@ async def dashboard_overview():
new_leads_count = 0 new_leads_count = 0
if service_area: if service_area:
placeholders = ",".join("?" * len(service_area)) placeholders = ",".join("?" * len(service_area))
row = await fetch_one( new_leads_count = await count_where(
f"""SELECT COUNT(*) as cnt FROM lead_requests f"""lead_requests
WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL
AND country IN ({placeholders}) AND country IN ({placeholders})
AND NOT EXISTS (SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ?)""", AND NOT EXISTS (SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ?)""",
(*service_area, supplier["id"]), (*service_area, supplier["id"]),
) )
new_leads_count = row["cnt"] if row else 0
else: else:
row = await fetch_one( new_leads_count = await count_where(
"""SELECT COUNT(*) as cnt FROM lead_requests """lead_requests
WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL WHERE lead_type = 'quote' AND status = 'new' AND verified_at IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ?)""", AND NOT EXISTS (SELECT 1 FROM lead_forwards WHERE lead_id = lead_requests.id AND supplier_id = ?)""",
(supplier["id"],), (supplier["id"],),
) )
new_leads_count = row["cnt"] if row else 0
# Recent activity (last 10 events from credit_ledger + lead_forwards) # Recent activity (last 10 events from credit_ledger + lead_forwards)
recent_activity = await fetch_all( recent_activity = await fetch_all(
@@ -825,16 +823,14 @@ async def dashboard_overview():
# Enquiry count for Basic tier # Enquiry count for Basic tier
enquiry_count = 0 enquiry_count = 0
if supplier.get("tier") == "basic": if supplier.get("tier") == "basic":
eq_row = await fetch_one( enquiry_count = await count_where(
"SELECT COUNT(*) as cnt FROM supplier_enquiries WHERE supplier_id = ?", "supplier_enquiries WHERE supplier_id = ?", (supplier["id"],)
(supplier["id"],),
) )
enquiry_count = eq_row["cnt"] if eq_row else 0
return await render_template( return await render_template(
"suppliers/partials/dashboard_overview.html", "suppliers/partials/dashboard_overview.html",
supplier=supplier, supplier=supplier,
leads_unlocked=unlocked["cnt"] if unlocked else 0, leads_unlocked=leads_unlocked,
new_leads_count=new_leads_count, new_leads_count=new_leads_count,
recent_activity=recent_activity, recent_activity=recent_activity,
active_boosts=active_boosts, active_boosts=active_boosts,