feat(logging): convert scripts and migrations from print() to logging

- migrations/migrate.py: module logger, basicConfig in __main__
- scripts/seed_dev_data.py: module logger, convert all 19 prints
- scripts/seed_content.py: module logger, convert all 13 prints
- scripts/refresh_from_daas.py: module logger, convert all 11 prints
- scripts/setup_paddle.py: module logger, convert all 20 prints

All scripts use basicConfig(level=INFO, format='%(levelname)-8s %(message)s')
in their __main__ blocks for clean CLI output without timestamps.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-24 11:00:07 +01:00
parent ac4ad3179d
commit 77ca817925
5 changed files with 92 additions and 69 deletions

View File

@@ -34,12 +34,15 @@ Design decisions
""" """
import importlib import importlib
import logging
import os import os
import re import re
import sqlite3 import sqlite3
import sys import sys
from pathlib import Path from pathlib import Path
logger = logging.getLogger(__name__)
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -89,7 +92,7 @@ def migrate(db_path=None):
if pending: if pending:
for name in pending: for name in pending:
print(f" Applying {name}...") logger.info("Applying %s...", name)
mod = importlib.import_module( mod = importlib.import_module(
f"padelnomics.migrations.versions.{name}" f"padelnomics.migrations.versions.{name}"
) )
@@ -98,9 +101,9 @@ def migrate(db_path=None):
"INSERT INTO _migrations (name) VALUES (?)", (name,) "INSERT INTO _migrations (name) VALUES (?)", (name,)
) )
conn.commit() conn.commit()
print(f"Applied {len(pending)} migration(s): {db_path}") logger.info("Applied %s migration(s): %s", len(pending), db_path)
else: else:
print(f"All migrations already applied: {db_path}") logger.info("All migrations already applied: %s", db_path)
# Show tables (excluding internal sqlite/fts tables) # Show tables (excluding internal sqlite/fts tables)
cursor = conn.execute( cursor = conn.execute(
@@ -109,10 +112,11 @@ def migrate(db_path=None):
" ORDER BY name" " ORDER BY name"
) )
tables = [row[0] for row in cursor.fetchall()] tables = [row[0] for row in cursor.fetchall()]
print(f" Tables: {', '.join(tables)}") logger.info("Tables: %s", ", ".join(tables))
conn.close() conn.close()
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
migrate() migrate()

View File

@@ -34,12 +34,15 @@ Fields mapped (DuckDB → data_json camelCase key):
import argparse import argparse
import json import json
import logging
import os import os
import sqlite3 import sqlite3
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
logger = logging.getLogger(__name__)
load_dotenv() load_dotenv()
DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db") DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db")
@@ -67,13 +70,13 @@ def _load_analytics(city_slugs: list[str]) -> dict[str, dict]:
""" """
path = Path(DUCKDB_PATH) path = Path(DUCKDB_PATH)
if not path.exists(): if not path.exists():
print(f" [analytics] DuckDB not found at {path} — skipping analytics refresh.") logger.warning("DuckDB not found at %s — skipping analytics refresh.", path)
return {} return {}
try: try:
import duckdb import duckdb
except ImportError: except ImportError:
print(" [analytics] duckdb not installed — skipping analytics refresh.") logger.warning("duckdb not installed — skipping analytics refresh.")
return {} return {}
result: dict[str, dict] = {} result: dict[str, dict] = {}
@@ -98,7 +101,7 @@ def _load_analytics(city_slugs: list[str]) -> dict[str, dict]:
result[slug] = overrides result[slug] = overrides
except Exception as exc: except Exception as exc:
print(f" [analytics] DuckDB query failed: {exc}") logger.error("DuckDB query failed: %s", exc)
return result return result
@@ -124,13 +127,13 @@ def refresh(dry_run: bool = False) -> int:
city_slug_to_ids.setdefault(slug, []).append(row["id"]) city_slug_to_ids.setdefault(slug, []).append(row["id"])
if not city_slug_to_ids: if not city_slug_to_ids:
print("No template_data rows with city_slug found.") logger.info("No template_data rows with city_slug found.")
conn.close() conn.close()
return 0 return 0
analytics = _load_analytics(list(city_slug_to_ids.keys())) analytics = _load_analytics(list(city_slug_to_ids.keys()))
if not analytics: if not analytics:
print("No analytics data found — nothing to update.") logger.info("No analytics data found — nothing to update.")
conn.close() conn.close()
return 0 return 0
@@ -154,13 +157,13 @@ def refresh(dry_run: bool = False) -> int:
data.update(overrides) data.update(overrides)
if dry_run: if dry_run:
print(f" [dry-run] id={row_id} city_slug={slug}: {changed}") logger.info("[dry-run] id=%s city_slug=%s: %s", row_id, slug, changed)
else: else:
conn.execute( conn.execute(
"UPDATE template_data SET data_json = ?, updated_at = datetime('now') WHERE id = ?", "UPDATE template_data SET data_json = ?, updated_at = datetime('now') WHERE id = ?",
(json.dumps(data), row_id), (json.dumps(data), row_id),
) )
print(f" Updated id={row_id} city_slug={slug}: {list(changed.keys())}") logger.info("Updated id=%s city_slug=%s: %s", row_id, slug, list(changed.keys()))
updated += 1 updated += 1
if not dry_run: if not dry_run:
@@ -184,7 +187,7 @@ def _trigger_generation() -> None:
headers={"X-Admin-Key": admin_key}, headers={"X-Admin-Key": admin_key},
) )
with urllib.request.urlopen(req, timeout=120) as resp: with urllib.request.urlopen(req, timeout=120) as resp:
print(f" Generation triggered: HTTP {resp.status}") logger.info("Generation triggered: HTTP %s", resp.status)
def main() -> None: def main() -> None:
@@ -195,14 +198,17 @@ def main() -> None:
help="Trigger article re-generation after updating") help="Trigger article re-generation after updating")
args = parser.parse_args() args = parser.parse_args()
print(f"{'[DRY RUN] ' if args.dry_run else ''}Refreshing template_data from DuckDB…") prefix = "[DRY RUN] " if args.dry_run else ""
logger.info("%sRefreshing template_data from DuckDB...", prefix)
count = refresh(dry_run=args.dry_run) count = refresh(dry_run=args.dry_run)
print(f"{'Would update' if args.dry_run else 'Updated'} {count} rows.") action = "Would update" if args.dry_run else "Updated"
logger.info("%s %s rows.", action, count)
if args.generate and count > 0 and not args.dry_run: if args.generate and count > 0 and not args.dry_run:
print("Triggering article generation") logger.info("Triggering article generation...")
_trigger_generation() _trigger_generation()
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
main() main()

View File

@@ -15,6 +15,7 @@ Usage:
import asyncio import asyncio
import json import json
import logging
import os import os
import sqlite3 import sqlite3
import sys import sys
@@ -23,6 +24,8 @@ from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
logger = logging.getLogger(__name__)
load_dotenv() load_dotenv()
DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db") DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db")
@@ -1363,7 +1366,7 @@ def seed_templates(conn: sqlite3.Connection) -> dict[str, int]:
).fetchone() ).fetchone()
if existing: if existing:
print(f" Template '{tmpl['slug']}' already exists (id={existing[0]}), skipping.") logger.info(" Template '%s' already exists (id=%s), skipping.", tmpl["slug"], existing[0])
template_ids[tmpl["slug"]] = existing[0] template_ids[tmpl["slug"]] = existing[0]
else: else:
cur = conn.execute( cur = conn.execute(
@@ -1383,7 +1386,7 @@ def seed_templates(conn: sqlite3.Connection) -> dict[str, int]:
), ),
) )
template_ids[tmpl["slug"]] = cur.lastrowid template_ids[tmpl["slug"]] = cur.lastrowid
print(f" Created template '{tmpl['slug']}' (id={cur.lastrowid})") logger.info(" Created template '%s' (id=%s)", tmpl["slug"], cur.lastrowid)
return template_ids return template_ids
@@ -1411,7 +1414,7 @@ def seed_data_rows(conn: sqlite3.Connection, template_ids: dict[str, int]) -> in
).fetchone() ).fetchone()
if existing: if existing:
print(f" Data row '{city_slug}' ({lang}) already exists, skipping.") logger.info(" Data row '%s' (%s) already exists, skipping.", city_slug, lang)
else: else:
conn.execute( conn.execute(
"""INSERT INTO template_data (template_id, data_json, created_at) """INSERT INTO template_data (template_id, data_json, created_at)
@@ -1419,7 +1422,7 @@ def seed_data_rows(conn: sqlite3.Connection, template_ids: dict[str, int]) -> in
(tmpl_id, data_json, now), (tmpl_id, data_json, now),
) )
inserted += 1 inserted += 1
print(f" Inserted data row '{city_slug}' ({lang})") logger.info(" Inserted data row '%s' (%s)", city_slug, lang)
return inserted return inserted
@@ -1432,7 +1435,7 @@ async def generate_articles(template_ids: dict[str, int]) -> None:
from padelnomics.admin.routes import _generate_from_template # noqa: PLC0415 from padelnomics.admin.routes import _generate_from_template # noqa: PLC0415
from padelnomics.core import close_db, fetch_one, init_db from padelnomics.core import close_db, fetch_one, init_db
print("\nInitialising database connection...") logger.info("Initialising database connection...")
await init_db(DATABASE_PATH) await init_db(DATABASE_PATH)
start_date = date.today() - timedelta(days=30) # backdate so all are immediately live start_date = date.today() - timedelta(days=30) # backdate so all are immediately live
@@ -1441,9 +1444,9 @@ async def generate_articles(template_ids: dict[str, int]) -> None:
template = await fetch_one("SELECT * FROM article_templates WHERE id = ?", (tmpl_id,)) template = await fetch_one("SELECT * FROM article_templates WHERE id = ?", (tmpl_id,))
assert template is not None, f"Template '{slug}' not found in DB" assert template is not None, f"Template '{slug}' not found in DB"
print(f"\nGenerating articles for template '{slug}'...") logger.info("Generating articles for template '%s'...", slug)
count = await _generate_from_template(template, start_date, articles_per_day=3) count = await _generate_from_template(template, start_date, articles_per_day=3)
print(f" Generated {count} articles.") logger.info(" Generated %s articles.", count)
await close_db() await close_db()
@@ -1463,28 +1466,29 @@ def main() -> None:
conn.execute("PRAGMA foreign_keys=ON") conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
print("Seeding article templates...") logger.info("Seeding article templates...")
template_ids = seed_templates(conn) template_ids = seed_templates(conn)
print("\nSeeding city data rows...") logger.info("Seeding city data rows...")
inserted = seed_data_rows(conn, template_ids) inserted = seed_data_rows(conn, template_ids)
conn.commit() conn.commit()
conn.close() conn.close()
print(f"\nDone. {inserted} data rows inserted.") logger.info("Done. %s data rows inserted.", inserted)
print("Templates and data rows are visible in admin Templates.") logger.info("Templates and data rows are visible in admin -> Templates.")
if "--generate" in sys.argv: if "--generate" in sys.argv:
print("\nRunning article generation pipeline...") logger.info("Running article generation pipeline...")
asyncio.run(generate_articles(template_ids)) asyncio.run(generate_articles(template_ids))
print("\nGeneration complete. Check admin Articles.") logger.info("Generation complete. Check admin -> Articles.")
else: else:
print( logger.info(
"\nTo generate articles, either:\n" "To generate articles, either:\n"
" 1. Run: uv run python -m padelnomics.scripts.seed_content --generate\n" " 1. Run: uv run python -m padelnomics.scripts.seed_content --generate\n"
" 2. Or visit admin Templates (template) Generate" " 2. Or visit admin -> Templates -> (template) -> Generate"
) )
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
main() main()

View File

@@ -7,6 +7,7 @@ Usage:
uv run python -m padelnomics.scripts.seed_dev_data uv run python -m padelnomics.scripts.seed_dev_data
""" """
import logging
import os import os
import sqlite3 import sqlite3
import sys import sys
@@ -15,6 +16,8 @@ from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
logger = logging.getLogger(__name__)
load_dotenv() load_dotenv()
DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db") DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db")
@@ -284,7 +287,7 @@ LEADS = [
def main(): def main():
db_path = DATABASE_PATH db_path = DATABASE_PATH
if not Path(db_path).exists(): if not Path(db_path).exists():
print(f"ERROR: Database not found at {db_path}. Run migrations first.") logger.error("Database not found at %s. Run migrations first.", db_path)
sys.exit(1) sys.exit(1)
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
@@ -295,34 +298,34 @@ def main():
now = datetime.utcnow() now = datetime.utcnow()
# 1. Create dev user # 1. Create dev user
print("Creating dev user (dev@localhost)...") logger.info("Creating dev user (dev@localhost)...")
existing = conn.execute("SELECT id FROM users WHERE email = 'dev@localhost'").fetchone() existing = conn.execute("SELECT id FROM users WHERE email = 'dev@localhost'").fetchone()
if existing: if existing:
dev_user_id = existing["id"] dev_user_id = existing["id"]
print(f" Already exists (id={dev_user_id})") logger.info(" Already exists (id=%s)", dev_user_id)
else: else:
cursor = conn.execute( cursor = conn.execute(
"INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)", "INSERT INTO users (email, name, created_at) VALUES (?, ?, ?)",
("dev@localhost", "Dev User", now.isoformat()), ("dev@localhost", "Dev User", now.isoformat()),
) )
dev_user_id = cursor.lastrowid dev_user_id = cursor.lastrowid
print(f" Created (id={dev_user_id})") logger.info(" Created (id=%s)", dev_user_id)
# Grant admin role to dev user # Grant admin role to dev user
conn.execute( conn.execute(
"INSERT OR IGNORE INTO user_roles (user_id, role) VALUES (?, 'admin')", "INSERT OR IGNORE INTO user_roles (user_id, role) VALUES (?, 'admin')",
(dev_user_id,), (dev_user_id,),
) )
print(" Admin role granted") logger.info(" Admin role granted")
# 2. Seed suppliers # 2. Seed suppliers
print(f"\nSeeding {len(SUPPLIERS)} suppliers...") logger.info("Seeding %s suppliers...", len(SUPPLIERS))
supplier_ids = {} supplier_ids = {}
for s in SUPPLIERS: for s in SUPPLIERS:
existing = conn.execute("SELECT id FROM suppliers WHERE slug = ?", (s["slug"],)).fetchone() existing = conn.execute("SELECT id FROM suppliers WHERE slug = ?", (s["slug"],)).fetchone()
if existing: if existing:
supplier_ids[s["slug"]] = existing["id"] supplier_ids[s["slug"]] = existing["id"]
print(f" {s['name']} already exists (id={existing['id']})") logger.info(" %s already exists (id=%s)", s["name"], existing["id"])
continue continue
cursor = conn.execute( cursor = conn.execute(
@@ -340,10 +343,10 @@ def main():
), ),
) )
supplier_ids[s["slug"]] = cursor.lastrowid supplier_ids[s["slug"]] = cursor.lastrowid
print(f" {s['name']} -> id={cursor.lastrowid}") logger.info(" %s -> id=%s", s["name"], cursor.lastrowid)
# 3. Claim paid suppliers — each gets its own owner user + subscription # 3. Claim paid suppliers — each gets its own owner user + subscription
print("\nClaiming paid suppliers with owner accounts...") logger.info("Claiming paid suppliers with owner accounts...")
claimed_suppliers = [ claimed_suppliers = [
("padeltech-gmbh", "supplier_pro", "hans@padeltech.example.com", "Hans Weber"), ("padeltech-gmbh", "supplier_pro", "hans@padeltech.example.com", "Hans Weber"),
("courtbuild-spain", "supplier_growth", "maria@courtbuild.example.com", "Maria Garcia"), ("courtbuild-spain", "supplier_growth", "maria@courtbuild.example.com", "Maria Garcia"),
@@ -398,10 +401,10 @@ def main():
(owner_id, plan, f"sub_dev_{slug}", (owner_id, plan, f"sub_dev_{slug}",
period_end, now.isoformat()), period_end, now.isoformat()),
) )
print(f" {slug} -> owner {email} ({plan})") logger.info(" %s -> owner %s (%s)", slug, email, plan)
# 4. Seed leads # 4. Seed leads
print(f"\nSeeding {len(LEADS)} leads...") logger.info("Seeding %s leads...", len(LEADS))
lead_ids = [] lead_ids = []
for i, lead in enumerate(LEADS): for i, lead in enumerate(LEADS):
from padelnomics.credits import HEAT_CREDIT_COSTS from padelnomics.credits import HEAT_CREDIT_COSTS
@@ -426,10 +429,10 @@ def main():
), ),
) )
lead_ids.append(cursor.lastrowid) lead_ids.append(cursor.lastrowid)
print(f" Lead #{cursor.lastrowid}: {lead['contact_name']} ({lead['heat_score']}, {lead['country']})") logger.info(" Lead #%s: %s (%s, %s)", cursor.lastrowid, lead["contact_name"], lead["heat_score"], lead["country"])
# 5. Add credit ledger entries for claimed suppliers # 5. Add credit ledger entries for claimed suppliers
print("\nAdding credit ledger entries...") logger.info("Adding credit ledger entries...")
for slug in ("padeltech-gmbh", "courtbuild-spain", "desert-padel-fze"): for slug in ("padeltech-gmbh", "courtbuild-spain", "desert-padel-fze"):
sid = supplier_ids.get(slug) sid = supplier_ids.get(slug)
if not sid: if not sid:
@@ -448,10 +451,10 @@ def main():
VALUES (?, ?, ?, 'admin_adjustment', 'Welcome bonus', ?)""", VALUES (?, ?, ?, 'admin_adjustment', 'Welcome bonus', ?)""",
(sid, 10, monthly + 10, (now - timedelta(days=25)).isoformat()), (sid, 10, monthly + 10, (now - timedelta(days=25)).isoformat()),
) )
print(f" {slug}: 2 ledger entries") logger.info(" %s: 2 ledger entries", slug)
# 6. Add lead forwards for testing # 6. Add lead forwards for testing
print("\nAdding lead forwards...") logger.info("Adding lead forwards...")
padeltech_id = supplier_ids.get("padeltech-gmbh") padeltech_id = supplier_ids.get("padeltech-gmbh")
if padeltech_id and len(lead_ids) >= 2: if padeltech_id and len(lead_ids) >= 2:
for lead_id in lead_ids[:2]: for lead_id in lead_ids[:2]:
@@ -476,15 +479,16 @@ def main():
(padeltech_id, 80, lead_id, f"Unlocked lead #{lead_id}", (padeltech_id, 80, lead_id, f"Unlocked lead #{lead_id}",
(now - timedelta(hours=6)).isoformat()), (now - timedelta(hours=6)).isoformat()),
) )
print(f" PadelTech unlocked lead #{lead_id}") logger.info(" PadelTech unlocked lead #%s", lead_id)
conn.commit() conn.commit()
conn.close() conn.close()
print(f"\nDone! Seed data written to {db_path}") logger.info("Done! Seed data written to %s", db_path)
print(" Login: /auth/dev-login?email=dev@localhost") logger.info(" Login: /auth/dev-login?email=dev@localhost")
print(" Admin: set ADMIN_EMAILS=dev@localhost in .env, then dev-login grants admin role") logger.info(" Admin: set ADMIN_EMAILS=dev@localhost in .env, then dev-login grants admin role")
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
main() main()

View File

@@ -6,6 +6,7 @@ Commands:
uv run python -m padelnomics.scripts.setup_paddle --sync # re-populate DB from existing Paddle products uv run python -m padelnomics.scripts.setup_paddle --sync # re-populate DB from existing Paddle products
""" """
import logging
import os import os
import re import re
import sqlite3 import sqlite3
@@ -13,6 +14,8 @@ import sys
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
logger = logging.getLogger(__name__)
from paddle_billing import Client as PaddleClient from paddle_billing import Client as PaddleClient
from paddle_billing import Environment, Options from paddle_billing import Environment, Options
from paddle_billing.Entities.Events.EventTypeName import EventTypeName from paddle_billing.Entities.Events.EventTypeName import EventTypeName
@@ -33,7 +36,8 @@ DATABASE_PATH = os.getenv("DATABASE_PATH", "data/app.db")
BASE_URL = os.getenv("BASE_URL", "http://localhost:5000") BASE_URL = os.getenv("BASE_URL", "http://localhost:5000")
if not PADDLE_API_KEY: if not PADDLE_API_KEY:
print("ERROR: Set PADDLE_API_KEY in .env first") logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
logger.error("Set PADDLE_API_KEY in .env first")
sys.exit(1) sys.exit(1)
@@ -202,7 +206,7 @@ _PRODUCT_BY_NAME = {p["name"]: p for p in PRODUCTS}
def _open_db(): def _open_db():
db_path = DATABASE_PATH db_path = DATABASE_PATH
if not Path(db_path).exists(): if not Path(db_path).exists():
print(f"ERROR: Database not found at {db_path}. Run migrations first.") logger.error("Database not found at %s. Run migrations first.", db_path)
sys.exit(1) sys.exit(1)
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
@@ -221,7 +225,7 @@ def _write_product(conn, key, product_id, price_id, name, price_cents, billing_t
def sync(paddle, conn): def sync(paddle, conn):
"""Fetch existing products from Paddle and re-populate paddle_products table.""" """Fetch existing products from Paddle and re-populate paddle_products table."""
print(f"Syncing products from Paddle ({PADDLE_ENVIRONMENT})...\n") logger.info("Syncing products from Paddle (%s)...", PADDLE_ENVIRONMENT)
products = paddle.products.list(ListProducts(includes=[Includes.Prices])) products = paddle.products.list(ListProducts(includes=[Includes.Prices]))
@@ -231,7 +235,7 @@ def sync(paddle, conn):
if not spec: if not spec:
continue continue
if not product.prices or len(product.prices) == 0: if not product.prices or len(product.prices) == 0:
print(f" SKIP {spec['key']}: no prices on {product.id}") logger.warning(" SKIP %s: no prices on %s", spec["key"], product.id)
continue continue
# Use the first active price # Use the first active price
@@ -241,26 +245,26 @@ def sync(paddle, conn):
spec["name"], spec["price"], spec["billing_type"], spec["name"], spec["price"], spec["billing_type"],
) )
matched += 1 matched += 1
print(f" {spec['key']}: {product.id} / {price.id}") logger.info(" %s: %s / %s", spec["key"], product.id, price.id)
conn.commit() conn.commit()
if matched == 0: if matched == 0:
print("\nNo matching products found in Paddle. Run without --sync first.") logger.warning("No matching products found in Paddle. Run without --sync first.")
else: else:
print(f"\n{matched}/{len(PRODUCTS)} products synced to DB") logger.info("%s/%s products synced to DB", matched, len(PRODUCTS))
def create(paddle, conn): def create(paddle, conn):
"""Create new products and prices in Paddle, write to DB, set up webhook.""" """Create new products and prices in Paddle, write to DB, set up webhook."""
print(f"Creating products in {PADDLE_ENVIRONMENT}...\n") logger.info("Creating products in %s...", PADDLE_ENVIRONMENT)
for spec in PRODUCTS: for spec in PRODUCTS:
product = paddle.products.create(CreateProduct( product = paddle.products.create(CreateProduct(
name=spec["name"], name=spec["name"],
tax_category=TaxCategory.Standard, tax_category=TaxCategory.Standard,
)) ))
print(f" Product: {spec['name']} -> {product.id}") logger.info(" Product: %s -> %s", spec["name"], product.id)
price_kwargs = { price_kwargs = {
"description": spec["name"], "description": spec["name"],
@@ -276,7 +280,7 @@ def create(paddle, conn):
price_kwargs["billing_cycle"] = Duration(interval=Interval.Month, frequency=1) price_kwargs["billing_cycle"] = Duration(interval=Interval.Month, frequency=1)
price = paddle.prices.create(CreatePrice(**price_kwargs)) price = paddle.prices.create(CreatePrice(**price_kwargs))
print(f" Price: {spec['key']} = {price.id}") logger.info(" Price: %s = %s", spec["key"], price.id)
_write_product( _write_product(
conn, spec["key"], product.id, price.id, conn, spec["key"], product.id, price.id,
@@ -284,7 +288,7 @@ def create(paddle, conn):
) )
conn.commit() conn.commit()
print("\nAll products written to DB") logger.info("All products written to DB")
# -- Notification destination (webhook) ----------------------------------- # -- Notification destination (webhook) -----------------------------------
@@ -298,8 +302,8 @@ def create(paddle, conn):
EventTypeName.TransactionCompleted, EventTypeName.TransactionCompleted,
] ]
print("\nCreating webhook notification destination...") logger.info("Creating webhook notification destination...")
print(f" URL: {webhook_url}") logger.info(" URL: %s", webhook_url)
notification_setting = paddle.notification_settings.create( notification_setting = paddle.notification_settings.create(
CreateNotificationSetting( CreateNotificationSetting(
@@ -313,8 +317,8 @@ def create(paddle, conn):
) )
webhook_secret = notification_setting.endpoint_secret_key webhook_secret = notification_setting.endpoint_secret_key
print(f" ID: {notification_setting.id}") logger.info(" ID: %s", notification_setting.id)
print(f" Secret: {webhook_secret}") logger.info(" Secret: %s", webhook_secret)
env_path = Path(".env") env_path = Path(".env")
env_vars = { env_vars = {
@@ -331,13 +335,13 @@ def create(paddle, conn):
else: else:
env_text = env_text.rstrip("\n") + f"\n{replacement}\n" env_text = env_text.rstrip("\n") + f"\n{replacement}\n"
env_path.write_text(env_text) env_path.write_text(env_text)
print("\nPADDLE_WEBHOOK_SECRET and PADDLE_NOTIFICATION_SETTING_ID written to .env") logger.info("PADDLE_WEBHOOK_SECRET and PADDLE_NOTIFICATION_SETTING_ID written to .env")
else: else:
print("\n Add to .env:") logger.info("Add to .env:")
for key, value in env_vars.items(): for key, value in env_vars.items():
print(f" {key}={value}") logger.info(" %s=%s", key, value)
print("\nDone. dev_run.sh will start ngrok and update the webhook URL automatically.") logger.info("Done. dev_run.sh will start ngrok and update the webhook URL automatically.")
def main(): def main():
@@ -355,4 +359,5 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s")
main() main()