Adds [product:slug] and [product-group:category] marker replacement. Templates: product_card.html (horizontal editorial callout) and product_group.html (responsive comparison grid). Chained after bake_scenario_cards() in generate_articles(), preview_article(), article_new(), article_edit(), and _rebuild_article(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
309 lines
11 KiB
Python
309 lines
11 KiB
Python
"""
|
|
Content domain: public article serving, markets hub, scenario widget rendering.
|
|
"""
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from markupsafe import Markup
|
|
from quart import Blueprint, abort, g, redirect, render_template, request
|
|
|
|
from ..core import capture_waitlist_email, csrf_protect, feature_gate, fetch_all, fetch_one
|
|
from ..i18n import get_translations
|
|
|
|
bp = Blueprint(
|
|
"content",
|
|
__name__,
|
|
template_folder=str(Path(__file__).parent / "templates"),
|
|
)
|
|
|
|
BUILD_DIR = Path("data/content/_build")
|
|
|
|
RESERVED_PREFIXES = (
|
|
"/admin", "/auth", "/planner", "/billing", "/dashboard",
|
|
"/directory", "/leads", "/suppliers", "/reports", "/health",
|
|
"/sitemap", "/static", "/features", "/feedback",
|
|
)
|
|
|
|
SCENARIO_RE = re.compile(r'\[scenario:([a-z0-9_-]+)(?::([a-z]+))?\]')
|
|
PRODUCT_RE = re.compile(r'\[product:([a-z0-9_-]+)\]')
|
|
PRODUCT_GROUP_RE = re.compile(r'\[product-group:([a-z0-9_-]+)\]')
|
|
|
|
SECTION_TEMPLATES = {
|
|
None: "partials/scenario_summary.html",
|
|
"capex": "partials/scenario_capex.html",
|
|
"operating": "partials/scenario_operating.html",
|
|
"cashflow": "partials/scenario_cashflow.html",
|
|
"returns": "partials/scenario_returns.html",
|
|
"full": "partials/scenario_full.html",
|
|
}
|
|
|
|
# Standalone Jinja2 env for baking scenario cards into static HTML.
|
|
# Does not use a Quart request context, so url_for and t are injected
|
|
# explicitly. Baked content is always EN (admin operation).
|
|
_TEMPLATE_DIR = Path(__file__).parent / "templates"
|
|
_bake_env = Environment(loader=FileSystemLoader(str(_TEMPLATE_DIR)), autoescape=True)
|
|
_bake_env.filters["tformat"] = lambda s, **kw: s.format_map(kw)
|
|
|
|
# Hardcoded EN URL stubs — the bake env has no request context so Quart's
|
|
# url_for cannot be used. Only endpoints referenced by scenario card templates
|
|
# need to be listed here.
|
|
_BAKE_URLS: dict[str, str] = {
|
|
"planner.index": "/en/planner/",
|
|
"directory.index": "/en/directory/",
|
|
}
|
|
_bake_env.globals["url_for"] = lambda endpoint, **kw: _BAKE_URLS.get(endpoint, f"/{endpoint}")
|
|
|
|
|
|
def is_reserved_path(url_path: str) -> bool:
|
|
"""Check if a url_path starts with a reserved prefix."""
|
|
clean = "/" + url_path.strip("/")
|
|
return any(clean.startswith(p) for p in RESERVED_PREFIXES)
|
|
|
|
|
|
async def bake_scenario_cards(
|
|
html: str,
|
|
lang: str = "en",
|
|
scenario_overrides: dict[str, dict] | None = None,
|
|
) -> str:
|
|
"""Replace [scenario:slug] and [scenario:slug:section] markers with rendered HTML.
|
|
|
|
scenario_overrides: optional {slug: {calc_json, state_json, ...}} dict
|
|
that bypasses the DB lookup (used by preview_article).
|
|
"""
|
|
matches = list(SCENARIO_RE.finditer(html))
|
|
if not matches:
|
|
return html
|
|
|
|
# Batch-fetch all referenced scenarios from DB
|
|
slugs = list({m.group(1) for m in matches})
|
|
placeholders = ",".join("?" * len(slugs))
|
|
rows = await fetch_all(
|
|
f"SELECT * FROM published_scenarios WHERE slug IN ({placeholders})",
|
|
tuple(slugs),
|
|
)
|
|
scenarios = {row["slug"]: row for row in rows}
|
|
|
|
# Merge in any overrides (preview mode — no DB row exists yet)
|
|
if scenario_overrides:
|
|
scenarios.update(scenario_overrides)
|
|
|
|
for match in reversed(matches):
|
|
slug = match.group(1)
|
|
section = match.group(2)
|
|
|
|
scenario = scenarios.get(slug)
|
|
if not scenario:
|
|
continue
|
|
|
|
template_name = SECTION_TEMPLATES.get(section)
|
|
if not template_name:
|
|
continue
|
|
|
|
calc_data = json.loads(scenario["calc_json"])
|
|
state_data = json.loads(scenario["state_json"])
|
|
|
|
tmpl = _bake_env.get_template(template_name)
|
|
card_html = tmpl.render(
|
|
scenario=scenario, d=calc_data, s=state_data,
|
|
lang=lang, t=get_translations(lang),
|
|
)
|
|
html = html[:match.start()] + card_html + html[match.end():]
|
|
|
|
return html
|
|
|
|
|
|
async def bake_product_cards(html: str, lang: str = "de") -> str:
|
|
"""Replace [product:slug] and [product-group:category] markers with rendered HTML.
|
|
|
|
Processes markers in two passes (product first, then groups) to keep logic
|
|
clear. Reverse iteration preserves string offsets when splicing.
|
|
"""
|
|
from ..affiliate import get_product, get_products_by_category
|
|
|
|
t = get_translations(lang)
|
|
|
|
# ── Pass 1: [product:slug] ────────────────────────────────────────────────
|
|
product_matches = list(PRODUCT_RE.finditer(html))
|
|
if product_matches:
|
|
slugs = list({m.group(1) for m in product_matches})
|
|
products: dict[str, dict | None] = {}
|
|
for slug in slugs:
|
|
products[slug] = await get_product(slug, lang)
|
|
|
|
tmpl = _bake_env.get_template("partials/product_card.html")
|
|
for match in reversed(product_matches):
|
|
slug = match.group(1)
|
|
product = products.get(slug)
|
|
if not product:
|
|
continue
|
|
card_html = tmpl.render(product=product, lang=lang, t=t)
|
|
html = html[:match.start()] + card_html + html[match.end():]
|
|
|
|
# ── Pass 2: [product-group:category] ─────────────────────────────────────
|
|
group_matches = list(PRODUCT_GROUP_RE.finditer(html))
|
|
if group_matches:
|
|
categories = list({m.group(1) for m in group_matches})
|
|
groups: dict[str, list] = {}
|
|
for cat in categories:
|
|
groups[cat] = await get_products_by_category(cat, lang)
|
|
|
|
tmpl = _bake_env.get_template("partials/product_group.html")
|
|
for match in reversed(group_matches):
|
|
cat = match.group(1)
|
|
group_products = groups.get(cat, [])
|
|
if not group_products:
|
|
continue
|
|
grid_html = tmpl.render(products=group_products, category=cat, lang=lang, t=t)
|
|
html = html[:match.start()] + grid_html + html[match.end():]
|
|
|
|
return html
|
|
|
|
|
|
# =============================================================================
|
|
# Markets Hub
|
|
# =============================================================================
|
|
|
|
@bp.route("/markets", methods=["GET", "POST"])
|
|
@csrf_protect
|
|
@feature_gate("markets", "markets_waitlist.html")
|
|
async def markets():
|
|
"""Hub page: search + country/region filter for articles."""
|
|
from ..core import is_flag_enabled
|
|
if not await is_flag_enabled("markets") and request.method == "POST":
|
|
form = await request.form
|
|
email = form.get("email", "").strip().lower()
|
|
if email and "@" in email:
|
|
await capture_waitlist_email(email, intent="markets")
|
|
return await render_template("markets_waitlist.html", confirmed=True)
|
|
|
|
q = request.args.get("q", "").strip()
|
|
country = request.args.get("country", "")
|
|
region = request.args.get("region", "")
|
|
|
|
countries = await fetch_all(
|
|
"""SELECT DISTINCT country FROM articles
|
|
WHERE country IS NOT NULL AND country != ''
|
|
AND status = 'published' AND published_at <= datetime('now')
|
|
ORDER BY country"""
|
|
)
|
|
regions = await fetch_all(
|
|
"""SELECT DISTINCT region FROM articles
|
|
WHERE region IS NOT NULL AND region != ''
|
|
AND status = 'published' AND published_at <= datetime('now')
|
|
ORDER BY region"""
|
|
)
|
|
|
|
articles = await _filter_articles(q, country, region)
|
|
|
|
return await render_template(
|
|
"markets.html",
|
|
articles=articles,
|
|
countries=[c["country"] for c in countries],
|
|
regions=[r["region"] for r in regions],
|
|
current_q=q,
|
|
current_country=country,
|
|
current_region=region,
|
|
)
|
|
|
|
|
|
@bp.route("/markets/results")
|
|
@feature_gate("markets", "markets_waitlist.html")
|
|
async def market_results():
|
|
"""HTMX partial: filtered article cards."""
|
|
q = request.args.get("q", "").strip()
|
|
country = request.args.get("country", "")
|
|
region = request.args.get("region", "")
|
|
|
|
articles = await _filter_articles(q, country, region)
|
|
return await render_template("partials/market_results.html", articles=articles)
|
|
|
|
|
|
async def _filter_articles(q: str, country: str, region: str) -> list[dict]:
|
|
"""Query published articles for the current language."""
|
|
lang = g.get("lang", "en")
|
|
if q:
|
|
# FTS query
|
|
wheres = ["articles_fts MATCH ?"]
|
|
params: list = [q]
|
|
wheres.append("a.language = ?")
|
|
params.append(lang)
|
|
if country:
|
|
wheres.append("a.country = ?")
|
|
params.append(country)
|
|
if region:
|
|
wheres.append("a.region = ?")
|
|
params.append(region)
|
|
where = " AND ".join(wheres)
|
|
return await fetch_all(
|
|
f"""SELECT a.* FROM articles a
|
|
JOIN articles_fts ON articles_fts.rowid = a.id
|
|
WHERE {where}
|
|
AND a.status = 'published' AND a.published_at <= datetime('now')
|
|
ORDER BY a.published_at DESC
|
|
LIMIT 100""",
|
|
tuple(params),
|
|
)
|
|
else:
|
|
wheres = ["status = 'published'", "published_at <= datetime('now')", "language = ?"]
|
|
params = [lang]
|
|
if country:
|
|
wheres.append("country = ?")
|
|
params.append(country)
|
|
if region:
|
|
wheres.append("region = ?")
|
|
params.append(region)
|
|
where = " AND ".join(wheres)
|
|
return await fetch_all(
|
|
f"""SELECT * FROM articles WHERE {where}
|
|
ORDER BY published_at DESC LIMIT 100""",
|
|
tuple(params),
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Catch-all Article Serving (must be registered LAST)
|
|
# =============================================================================
|
|
|
|
@bp.route("/<path:url_path>")
|
|
async def article_page(url_path: str):
|
|
"""Serve a published article by its url_path."""
|
|
clean_path = "/" + url_path.strip("/")
|
|
lang = g.get("lang", "en")
|
|
|
|
article = await fetch_one(
|
|
"""SELECT * FROM articles
|
|
WHERE url_path = ? AND language = ? AND status = 'published'
|
|
AND published_at <= datetime('now')""",
|
|
(clean_path, lang),
|
|
)
|
|
if not article:
|
|
# If a scheduled (not yet live) article exists at this URL, redirect to
|
|
# the nearest parent path rather than showing a bare 404.
|
|
scheduled = await fetch_one(
|
|
"SELECT 1 FROM articles WHERE url_path = ? AND language = ?",
|
|
(clean_path, lang),
|
|
)
|
|
if scheduled:
|
|
parent = clean_path.rsplit("/", 1)[0] or f"/{lang}/markets"
|
|
return redirect(parent, 302)
|
|
abort(404)
|
|
|
|
# SSG articles: language-prefixed build path
|
|
lang = article["language"] if article.get("language") else "en"
|
|
build_path = BUILD_DIR / lang / f"{article['slug']}.html"
|
|
if not build_path.exists():
|
|
# Fallback: flat build path (legacy manual articles)
|
|
build_path = BUILD_DIR / f"{article['slug']}.html"
|
|
if not build_path.exists():
|
|
abort(404)
|
|
|
|
body_html = build_path.read_text()
|
|
|
|
return await render_template(
|
|
"article_detail.html",
|
|
article=article,
|
|
body_html=Markup(body_html),
|
|
)
|