feat(admin): add _sync_static_articles + group_key grouping

- _sync_static_articles(): auto-upserts data/content/articles/*.md into
  DB on every /admin/articles load; reads cornerstone → group_key
- _get_article_list_grouped(): now groups by COALESCE(group_key, url_path)
  so static EN/DE cornerstone articles pair into one row
- articles() route: calls _sync_static_articles() before listing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-27 07:44:04 +01:00
parent 250139598c
commit aea80f2541

View File

@@ -5,6 +5,8 @@ import csv
import io
import json
import logging
import os
import re
from datetime import date, timedelta
from pathlib import Path
@@ -2200,6 +2202,82 @@ async def scenario_pdf(scenario_id: int):
# Article Management
# =============================================================================
_ARTICLES_DIR = Path(__file__).parent.parent.parent.parent.parent / "data" / "content" / "articles"
_FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.DOTALL)
async def _sync_static_articles() -> None:
"""Upsert static .md articles from data/content/articles/ into the DB.
Reads YAML frontmatter from each file, renders body markdown to HTML,
and upserts into the articles table keyed on slug. Skips files where
the DB updated_at is newer than the file's mtime (no-op on unchanged files).
"""
import yaml
if not _ARTICLES_DIR.is_dir():
return
md_files = sorted(_ARTICLES_DIR.glob("*.md"))
if not md_files:
return
for md_path in md_files:
raw = md_path.read_text(encoding="utf-8")
m = _FRONTMATTER_RE.match(raw)
if not m:
continue
try:
fm = yaml.safe_load(m.group(1)) or {}
except Exception:
continue
slug = fm.get("slug")
if not slug:
continue
# Skip if DB record is newer than file mtime
file_mtime_iso = (
__import__("datetime").datetime.utcfromtimestamp(
os.path.getmtime(md_path)
).strftime("%Y-%m-%d %H:%M:%S")
)
existing = await fetch_one(
"SELECT updated_at FROM articles WHERE slug = ?", (slug,)
)
if existing and existing["updated_at"] and existing["updated_at"] >= file_mtime_iso:
continue
body_md = raw[m.end():]
body_html = mistune.html(body_md)
title = fm.get("title", slug)
url_path = fm.get("url_path", f"/{slug}")
language = fm.get("language", "en")
meta_description = fm.get("meta_description", "")
template_slug = fm.get("template_slug") or None
group_key = fm.get("cornerstone") or None
now_iso = utcnow_iso()
await execute(
"""INSERT INTO articles
(slug, title, url_path, language, meta_description, body_html,
status, template_slug, group_key, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, 'draft', ?, ?, ?, ?)
ON CONFLICT(slug) DO UPDATE SET
title = excluded.title,
url_path = excluded.url_path,
language = excluded.language,
meta_description = excluded.meta_description,
body_html = excluded.body_html,
template_slug = excluded.template_slug,
group_key = excluded.group_key,
updated_at = excluded.updated_at""",
(slug, title, url_path, language, meta_description, body_html,
template_slug, group_key, now_iso, now_iso),
)
async def _get_article_list(
status: str = None,
template_slug: str = None,
@@ -2251,7 +2329,12 @@ async def _get_article_list_grouped(
page: int = 1,
per_page: int = 50,
) -> list[dict]:
"""Get articles grouped by slug; each item has a 'variants' list (one per language)."""
"""Get articles grouped by COALESCE(group_key, url_path).
pSEO articles (group_key NULL) group by url_path — EN/DE share the same url_path.
Static cornerstones (group_key e.g. 'C2') group by cornerstone key regardless of url_path.
Each returned item has a 'variants' list (one dict per language variant).
"""
wheres = ["1=1"]
params: list = []
@@ -2271,43 +2354,48 @@ async def _get_article_list_grouped(
where = " AND ".join(wheres)
offset = (page - 1) * per_page
# Group by url_path — language variants share the same url_path (no lang prefix stored)
path_rows = await fetch_all(
f"""SELECT url_path, MAX(created_at) AS latest_created
# First pass: paginate over distinct group keys
group_rows = await fetch_all(
f"""SELECT COALESCE(group_key, url_path) AS group_id,
MAX(created_at) AS latest_created
FROM articles WHERE {where}
GROUP BY url_path
GROUP BY COALESCE(group_key, url_path)
ORDER BY latest_created DESC
LIMIT ? OFFSET ?""",
tuple(params + [per_page, offset]),
)
if not path_rows:
if not group_rows:
return []
url_paths = [r["url_path"] for r in path_rows]
placeholders = ",".join("?" * len(url_paths))
group_ids = [r["group_id"] for r in group_rows]
placeholders = ",".join("?" * len(group_ids))
# Second pass: fetch all variants for the paginated groups
variants = await fetch_all(
f"""SELECT *,
CASE WHEN status = 'published' AND published_at > datetime('now')
THEN 'scheduled'
WHEN status = 'published' THEN 'live'
ELSE status END AS display_status
FROM articles WHERE url_path IN ({placeholders})
ORDER BY url_path, language""",
tuple(url_paths),
COALESCE(group_key, url_path) AS group_id,
CASE WHEN status = 'published' AND published_at > datetime('now')
THEN 'scheduled'
WHEN status = 'published' THEN 'live'
ELSE status END AS display_status
FROM articles
WHERE COALESCE(group_key, url_path) IN ({placeholders})
ORDER BY COALESCE(group_key, url_path), language""",
tuple(group_ids),
)
by_path: dict[str, list] = {}
by_group: dict[str, list] = {}
for v in variants:
by_path.setdefault(v["url_path"], []).append(dict(v))
by_group.setdefault(v["group_id"], []).append(dict(v))
groups = []
for url_path in url_paths:
variant_list = by_path.get(url_path, [])
for gid in group_ids:
variant_list = by_group.get(gid, [])
if not variant_list:
continue
primary = next((v for v in variant_list if v["language"] == "en"), variant_list[0])
groups.append({
"url_path": url_path,
"url_path": primary["url_path"],
"title": primary["title"],
"published_at": primary["published_at"],
"template_slug": primary["template_slug"],
@@ -2341,7 +2429,8 @@ async def _is_generating() -> bool:
@bp.route("/articles")
@role_required("admin")
async def articles():
"""List all articles with filters."""
"""List all articles with filters. Syncs static .md files on every load."""
await _sync_static_articles()
search = request.args.get("search", "").strip()
status_filter = request.args.get("status", "")
template_filter = request.args.get("template", "")