Add BeanFlows MVP: coffee analytics dashboard, API, and web app

- Fix pipeline granularity: add market_year to cleaned/serving SQL models
- Add DuckDB data access layer with async query functions (analytics.py)
- Build Chart.js dashboard: supply/demand, STU ratio, top producers, YoY table
- Add country comparison page with multi-select picker
- Replace items CRUD with read-only commodity API (list, metrics, countries, CSV)
- Configure BeanFlows plan tiers (Free/Starter/Pro) with feature gating
- Rewrite public pages for coffee market intelligence positioning
- Remove boilerplate items schema, update health check for DuckDB
- Add test suite: 139 tests passing (dashboard, API, billing)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-18 16:11:50 +01:00
parent b222c01828
commit 2748c606e9
59 changed files with 6272 additions and 2 deletions

View File

@@ -0,0 +1,3 @@
"""BeanFlows - Commodity analytics for coffee traders"""
__version__ = "0.1.0"

View File

@@ -0,0 +1,340 @@
"""
Admin domain: password-protected admin panel for managing users, tasks, etc.
"""
import secrets
from datetime import datetime, timedelta
from functools import wraps
from pathlib import Path
from quart import Blueprint, flash, redirect, render_template, request, session, url_for
from ..core import config, csrf_protect, execute, fetch_all, fetch_one
# Blueprint with its own template folder
bp = Blueprint(
"admin",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/admin",
)
# =============================================================================
# Config
# =============================================================================
def get_admin_password() -> str:
"""Get admin password from env. Generate one if not set (dev only)."""
import os
password = os.getenv("ADMIN_PASSWORD", "")
if not password and config.DEBUG:
# In dev, use a default password
return "admin"
return password
# =============================================================================
# SQL Queries
# =============================================================================
async def get_dashboard_stats() -> dict:
"""Get admin dashboard statistics."""
now = datetime.utcnow()
today = now.date().isoformat()
week_ago = (now - timedelta(days=7)).isoformat()
users_total = await fetch_one("SELECT COUNT(*) as count FROM users WHERE deleted_at IS NULL")
users_today = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
(today,),
)
users_week = await fetch_one(
"SELECT COUNT(*) as count FROM users WHERE created_at >= ? AND deleted_at IS NULL",
(week_ago,),
)
subs = await fetch_one(
"SELECT COUNT(*) as count FROM subscriptions WHERE status = 'active'"
)
tasks_pending = await fetch_one("SELECT COUNT(*) as count FROM tasks WHERE status = 'pending'")
tasks_failed = await fetch_one("SELECT COUNT(*) as count FROM tasks WHERE status = 'failed'")
# Analytics stats (DuckDB)
analytics = {"commodity_count": 0, "min_year": None, "max_year": None}
try:
from ..analytics import _conn as duckdb_conn
from ..analytics import fetch_analytics
if duckdb_conn is not None:
rows = await fetch_analytics(
"""
SELECT COUNT(DISTINCT commodity_code) as commodity_count,
MIN(market_year) as min_year,
MAX(market_year) as max_year
FROM serving.commodity_metrics
WHERE country_code IS NOT NULL
"""
)
if rows:
analytics = rows[0]
except Exception:
pass
return {
"users_total": users_total["count"] if users_total else 0,
"users_today": users_today["count"] if users_today else 0,
"users_week": users_week["count"] if users_week else 0,
"active_subscriptions": subs["count"] if subs else 0,
"tasks_pending": tasks_pending["count"] if tasks_pending else 0,
"tasks_failed": tasks_failed["count"] if tasks_failed else 0,
"commodity_count": analytics.get("commodity_count", 0),
"data_year_range": f"{analytics.get('min_year', '?')}{analytics.get('max_year', '?')}",
}
async def get_users(limit: int = 50, offset: int = 0, search: str = None) -> list[dict]:
"""Get users with optional search."""
if search:
return await fetch_all(
"""
SELECT u.*, s.plan, s.status as sub_status
FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id AND s.status = 'active'
WHERE u.deleted_at IS NULL AND u.email LIKE ?
ORDER BY u.created_at DESC
LIMIT ? OFFSET ?
""",
(f"%{search}%", limit, offset)
)
return await fetch_all(
"""
SELECT u.*, s.plan, s.status as sub_status
FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id AND s.status = 'active'
WHERE u.deleted_at IS NULL
ORDER BY u.created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset)
)
async def get_user_by_id(user_id: int) -> dict | None:
"""Get user by ID with subscription info."""
return await fetch_one(
"""
SELECT u.*, s.plan, s.status as sub_status, s.stripe_customer_id
FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id
WHERE u.id = ?
""",
(user_id,)
)
async def get_recent_tasks(limit: int = 50) -> list[dict]:
"""Get recent tasks."""
return await fetch_all(
"""
SELECT * FROM tasks
ORDER BY created_at DESC
LIMIT ?
""",
(limit,)
)
async def get_failed_tasks() -> list[dict]:
"""Get failed tasks."""
return await fetch_all(
"SELECT * FROM tasks WHERE status = 'failed' ORDER BY created_at DESC"
)
async def retry_task(task_id: int) -> bool:
"""Retry a failed task."""
result = await execute(
"""
UPDATE tasks
SET status = 'pending', run_at = ?, error = NULL
WHERE id = ? AND status = 'failed'
""",
(datetime.utcnow().isoformat(), task_id)
)
return result > 0
async def delete_task(task_id: int) -> bool:
"""Delete a task."""
result = await execute("DELETE FROM tasks WHERE id = ?", (task_id,))
return result > 0
# =============================================================================
# Decorators
# =============================================================================
def admin_required(f):
"""Require admin authentication."""
@wraps(f)
async def decorated(*args, **kwargs):
if not session.get("is_admin"):
return redirect(url_for("admin.login"))
return await f(*args, **kwargs)
return decorated
# =============================================================================
# Routes
# =============================================================================
@bp.route("/login", methods=["GET", "POST"])
@csrf_protect
async def login():
"""Admin login page."""
admin_password = get_admin_password()
if not admin_password:
await flash("Admin access not configured. Set ADMIN_PASSWORD env var.", "error")
return redirect(url_for("public.landing"))
if session.get("is_admin"):
return redirect(url_for("admin.index"))
if request.method == "POST":
form = await request.form
password = form.get("password", "")
if secrets.compare_digest(password, admin_password):
session["is_admin"] = True
await flash("Welcome, admin!", "success")
return redirect(url_for("admin.index"))
else:
await flash("Invalid password.", "error")
return await render_template("login.html")
@bp.route("/logout", methods=["POST"])
@csrf_protect
async def logout():
"""Admin logout."""
session.pop("is_admin", None)
await flash("Logged out of admin.", "info")
return redirect(url_for("admin.login"))
@bp.route("/")
@admin_required
async def index():
"""Admin dashboard."""
stats = await get_dashboard_stats()
recent_users = await get_users(limit=10)
failed_tasks = await get_failed_tasks()
return await render_template(
"index.html",
stats=stats,
recent_users=recent_users,
failed_tasks=failed_tasks,
)
@bp.route("/users")
@admin_required
async def users():
"""User list."""
search = request.args.get("search", "").strip()
page = int(request.args.get("page", 1))
per_page = 50
offset = (page - 1) * per_page
user_list = await get_users(limit=per_page, offset=offset, search=search or None)
return await render_template(
"users.html",
users=user_list,
search=search,
page=page,
)
@bp.route("/users/<int:user_id>")
@admin_required
async def user_detail(user_id: int):
"""User detail page."""
user = await get_user_by_id(user_id)
if not user:
await flash("User not found.", "error")
return redirect(url_for("admin.users"))
return await render_template("user_detail.html", user=user)
@bp.route("/users/<int:user_id>/impersonate", methods=["POST"])
@admin_required
@csrf_protect
async def impersonate(user_id: int):
"""Impersonate a user (login as them)."""
user = await get_user_by_id(user_id)
if not user:
await flash("User not found.", "error")
return redirect(url_for("admin.users"))
# Store admin session so we can return
session["admin_impersonating"] = True
session["user_id"] = user_id
await flash(f"Now impersonating {user['email']}. Return to admin to stop.", "warning")
return redirect(url_for("dashboard.index"))
@bp.route("/stop-impersonating", methods=["POST"])
@csrf_protect
async def stop_impersonating():
"""Stop impersonating and return to admin."""
session.pop("user_id", None)
session.pop("admin_impersonating", None)
await flash("Stopped impersonating.", "info")
return redirect(url_for("admin.index"))
@bp.route("/tasks")
@admin_required
async def tasks():
"""Task queue management."""
task_list = await get_recent_tasks(limit=100)
failed = await get_failed_tasks()
return await render_template(
"tasks.html",
tasks=task_list,
failed_tasks=failed,
)
@bp.route("/tasks/<int:task_id>/retry", methods=["POST"])
@admin_required
@csrf_protect
async def task_retry(task_id: int):
"""Retry a failed task."""
success = await retry_task(task_id)
if success:
await flash("Task queued for retry.", "success")
else:
await flash("Could not retry task.", "error")
return redirect(url_for("admin.tasks"))
@bp.route("/tasks/<int:task_id>/delete", methods=["POST"])
@admin_required
@csrf_protect
async def task_delete(task_id: int):
"""Delete a task."""
success = await delete_task(task_id)
if success:
await flash("Task deleted.", "success")
else:
await flash("Could not delete task.", "error")
return redirect(url_for("admin.tasks"))

View File

@@ -0,0 +1,124 @@
{% extends "base.html" %}
{% block title %}Admin Dashboard - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header style="display: flex; justify-content: space-between; align-items: center;">
<div>
<h1>Admin Dashboard</h1>
{% if session.get('admin_impersonating') %}
<mark>Currently impersonating a user</mark>
<form method="post" action="{{ url_for('admin.stop_impersonating') }}" style="display: inline;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="secondary outline" style="padding: 0.25rem 0.5rem;">Stop</button>
</form>
{% endif %}
</div>
<form method="post" action="{{ url_for('admin.logout') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="secondary outline">Logout</button>
</form>
</header>
<!-- Stats Grid -->
<div class="grid">
<article>
<header><small>Total Users</small></header>
<p style="font-size: 2rem; margin: 0;"><strong>{{ stats.users_total }}</strong></p>
<small>+{{ stats.users_today }} today, +{{ stats.users_week }} this week</small>
</article>
<article>
<header><small>Active Subscriptions</small></header>
<p style="font-size: 2rem; margin: 0;"><strong>{{ stats.active_subscriptions }}</strong></p>
</article>
<article>
<header><small>Task Queue</small></header>
<p style="font-size: 2rem; margin: 0;"><strong>{{ stats.tasks_pending }}</strong> pending</p>
{% if stats.tasks_failed > 0 %}
<small style="color: var(--del-color);">{{ stats.tasks_failed }} failed</small>
{% else %}
<small style="color: var(--ins-color);">0 failed</small>
{% endif %}
</article>
</div>
<!-- Quick Links -->
<div class="grid" style="margin-bottom: 2rem;">
<a href="{{ url_for('admin.users') }}" role="button" class="secondary outline">All Users</a>
<a href="{{ url_for('admin.tasks') }}" role="button" class="secondary outline">Task Queue</a>
<a href="{{ url_for('dashboard.index') }}" role="button" class="secondary outline">View as User</a>
</div>
<div class="grid">
<!-- Recent Users -->
<section>
<h2>Recent Users</h2>
<article>
{% if recent_users %}
<table>
<thead>
<tr>
<th>Email</th>
<th>Plan</th>
<th>Joined</th>
</tr>
</thead>
<tbody>
{% for u in recent_users %}
<tr>
<td>
<a href="{{ url_for('admin.user_detail', user_id=u.id) }}">{{ u.email }}</a>
</td>
<td>{{ u.plan or 'free' }}</td>
<td>{{ u.created_at[:10] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<a href="{{ url_for('admin.users') }}">View all →</a>
{% else %}
<p>No users yet.</p>
{% endif %}
</article>
</section>
<!-- Failed Tasks -->
<section>
<h2>Failed Tasks</h2>
<article>
{% if failed_tasks %}
<table>
<thead>
<tr>
<th>Task</th>
<th>Error</th>
<th></th>
</tr>
</thead>
<tbody>
{% for task in failed_tasks[:5] %}
<tr>
<td>{{ task.task_name }}</td>
<td><small>{{ task.error[:50] }}...</small></td>
<td>
<form method="post" action="{{ url_for('admin.task_retry', task_id=task.id) }}" style="margin: 0;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="outline" style="padding: 0.25rem 0.5rem; margin: 0;">Retry</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<a href="{{ url_for('admin.tasks') }}">View all →</a>
{% else %}
<p style="color: var(--ins-color);">✓ No failed tasks</p>
{% endif %}
</article>
</section>
</div>
</main>
{% endblock %}

View File

@@ -0,0 +1,30 @@
{% extends "base.html" %}
{% block title %}Admin Login - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 400px; margin: 4rem auto;">
<header>
<h1>Admin Login</h1>
</header>
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<label for="password">
Password
<input
type="password"
id="password"
name="password"
required
autofocus
>
</label>
<button type="submit">Login</button>
</form>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,106 @@
{% extends "base.html" %}
{% block title %}Tasks - Admin - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header style="display: flex; justify-content: space-between; align-items: center;">
<h1>Task Queue</h1>
<a href="{{ url_for('admin.index') }}" role="button" class="secondary outline">← Dashboard</a>
</header>
<!-- Failed Tasks -->
{% if failed_tasks %}
<section>
<h2 style="color: var(--del-color);">Failed Tasks ({{ failed_tasks | length }})</h2>
<article style="border-color: var(--del-color);">
<table>
<thead>
<tr>
<th>ID</th>
<th>Task</th>
<th>Error</th>
<th>Retries</th>
<th>Created</th>
<th></th>
</tr>
</thead>
<tbody>
{% for task in failed_tasks %}
<tr>
<td>{{ task.id }}</td>
<td><code>{{ task.task_name }}</code></td>
<td>
<details>
<summary>{{ task.error[:40] if task.error else 'No error' }}...</summary>
<pre style="font-size: 0.75rem; white-space: pre-wrap;">{{ task.error }}</pre>
</details>
</td>
<td>{{ task.retries }}</td>
<td>{{ task.created_at[:16] }}</td>
<td>
<div style="display: flex; gap: 0.5rem;">
<form method="post" action="{{ url_for('admin.task_retry', task_id=task.id) }}" style="margin: 0;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="outline" style="padding: 0.25rem 0.5rem; margin: 0;">Retry</button>
</form>
<form method="post" action="{{ url_for('admin.task_delete', task_id=task.id) }}" style="margin: 0;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="outline secondary" style="padding: 0.25rem 0.5rem; margin: 0;">Delete</button>
</form>
</div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</article>
</section>
{% endif %}
<!-- All Tasks -->
<section>
<h2>Recent Tasks</h2>
<article>
{% if tasks %}
<table>
<thead>
<tr>
<th>ID</th>
<th>Task</th>
<th>Status</th>
<th>Run At</th>
<th>Created</th>
<th>Completed</th>
</tr>
</thead>
<tbody>
{% for task in tasks %}
<tr>
<td>{{ task.id }}</td>
<td><code>{{ task.task_name }}</code></td>
<td>
{% if task.status == 'complete' %}
<span style="color: var(--ins-color);">✓ complete</span>
{% elif task.status == 'failed' %}
<span style="color: var(--del-color);">✗ failed</span>
{% elif task.status == 'pending' %}
<span style="color: var(--mark-background-color);">○ pending</span>
{% else %}
{{ task.status }}
{% endif %}
</td>
<td>{{ task.run_at[:16] if task.run_at else '-' }}</td>
<td>{{ task.created_at[:16] }}</td>
<td>{{ task.completed_at[:16] if task.completed_at else '-' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p>No tasks in queue.</p>
{% endif %}
</article>
</section>
</main>
{% endblock %}

View File

@@ -0,0 +1,73 @@
{% extends "base.html" %}
{% block title %}User: {{ user.email }} - Admin - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header style="display: flex; justify-content: space-between; align-items: center;">
<h1>{{ user.email }}</h1>
<a href="{{ url_for('admin.users') }}" role="button" class="secondary outline">← Users</a>
</header>
<div class="grid">
<!-- User Info -->
<article>
<header><h3>User Info</h3></header>
<dl>
<dt>ID</dt>
<dd>{{ user.id }}</dd>
<dt>Email</dt>
<dd>{{ user.email }}</dd>
<dt>Name</dt>
<dd>{{ user.name or '-' }}</dd>
<dt>Created</dt>
<dd>{{ user.created_at }}</dd>
<dt>Last Login</dt>
<dd>{{ user.last_login_at or 'Never' }}</dd>
</dl>
</article>
<!-- Subscription -->
<article>
<header><h3>Subscription</h3></header>
<dl>
<dt>Plan</dt>
<dd>
{% if user.plan %}
<mark>{{ user.plan }}</mark>
{% else %}
free
{% endif %}
</dd>
<dt>Status</dt>
<dd>{{ user.sub_status or 'N/A' }}</dd>
{% if user.stripe_customer_id %}
<dt>Stripe Customer</dt>
<dd>
<a href="https://dashboard.stripe.com/customers/{{ user.stripe_customer_id }}" target="_blank">
{{ user.stripe_customer_id }}
</a>
</dd>
{% endif %}
</dl>
</article>
</div>
<!-- Actions -->
<article>
<header><h3>Actions</h3></header>
<div style="display: flex; gap: 1rem;">
<form method="post" action="{{ url_for('admin.impersonate', user_id=user.id) }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="secondary">Impersonate User</button>
</form>
</div>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,83 @@
{% extends "base.html" %}
{% block title %}Users - Admin - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header style="display: flex; justify-content: space-between; align-items: center;">
<h1>Users</h1>
<a href="{{ url_for('admin.index') }}" role="button" class="secondary outline">← Dashboard</a>
</header>
<!-- Search -->
<form method="get" style="margin-bottom: 2rem;">
<div class="grid">
<input
type="search"
name="search"
placeholder="Search by email..."
value="{{ search }}"
>
<button type="submit">Search</button>
</div>
</form>
<!-- User Table -->
<article>
{% if users %}
<table>
<thead>
<tr>
<th>ID</th>
<th>Email</th>
<th>Name</th>
<th>Plan</th>
<th>Joined</th>
<th>Last Login</th>
<th></th>
</tr>
</thead>
<tbody>
{% for u in users %}
<tr>
<td>{{ u.id }}</td>
<td><a href="{{ url_for('admin.user_detail', user_id=u.id) }}">{{ u.email }}</a></td>
<td>{{ u.name or '-' }}</td>
<td>
{% if u.plan %}
<mark>{{ u.plan }}</mark>
{% else %}
free
{% endif %}
</td>
<td>{{ u.created_at[:10] }}</td>
<td>{{ u.last_login_at[:10] if u.last_login_at else 'Never' }}</td>
<td>
<form method="post" action="{{ url_for('admin.impersonate', user_id=u.id) }}" style="margin: 0;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="outline secondary" style="padding: 0.25rem 0.5rem; margin: 0;">
Impersonate
</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
<!-- Pagination -->
<div style="display: flex; gap: 1rem; justify-content: center; margin-top: 1rem;">
{% if page > 1 %}
<a href="?page={{ page - 1 }}{% if search %}&search={{ search }}{% endif %}">← Previous</a>
{% endif %}
<span>Page {{ page }}</span>
{% if users | length == 50 %}
<a href="?page={{ page + 1 }}{% if search %}&search={{ search }}{% endif %}">Next →</a>
{% endif %}
</div>
{% else %}
<p>No users found.</p>
{% endif %}
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,220 @@
"""
DuckDB analytics data access layer.
Bridge between the async Quart app and sync DuckDB reads.
All queries run via asyncio.to_thread() against a read-only connection.
"""
import asyncio
import os
import duckdb
# Coffee (Green) commodity code in USDA PSD
COFFEE_COMMODITY_CODE = 711100
# Metrics safe for user-facing queries (prevents SQL injection in dynamic column refs)
ALLOWED_METRICS = frozenset({
"Production",
"Imports",
"Exports",
"Total_Distribution",
"Ending_Stocks",
"Beginning_Stocks",
"Total_Supply",
"Domestic_Consumption",
"Net_Supply",
"Trade_Balance",
"Supply_Demand_Balance",
"Stock_to_Use_Ratio_pct",
"Production_YoY_pct",
})
_conn: duckdb.DuckDBPyConnection | None = None
def open_analytics_db() -> None:
"""Open read-only DuckDB connection."""
global _conn
db_path = os.getenv("DUCKDB_PATH", "")
assert db_path, "DUCKDB_PATH environment variable must be set"
_conn = duckdb.connect(db_path, read_only=True)
def close_analytics_db() -> None:
"""Close DuckDB connection."""
global _conn
if _conn:
_conn.close()
_conn = None
async def fetch_analytics(sql: str, params: list | None = None) -> list[dict]:
"""Run a read-only DuckDB query off the event loop. Returns list of dicts."""
assert _conn is not None, "Analytics DB not initialized — call open_analytics_db() first"
def _query():
result = _conn.execute(sql, params or [])
columns = [desc[0] for desc in result.description]
return [dict(zip(columns, row)) for row in result.fetchall()]
return await asyncio.to_thread(_query)
def _validate_metrics(metrics: list[str]) -> list[str]:
"""Filter metrics to allowed set. Returns validated list."""
valid = [m for m in metrics if m in ALLOWED_METRICS]
assert valid, f"No valid metrics in {metrics}. Allowed: {sorted(ALLOWED_METRICS)}"
return valid
# =============================================================================
# Query Functions
# =============================================================================
async def get_available_commodities() -> list[dict]:
"""Distinct commodity list from serving layer."""
return await fetch_analytics(
"""
SELECT DISTINCT commodity_code, commodity_name
FROM serving.commodity_metrics
WHERE country_code IS NOT NULL
ORDER BY commodity_name
"""
)
async def get_global_time_series(
commodity_code: int,
metrics: list[str],
start_year: int | None = None,
end_year: int | None = None,
) -> list[dict]:
"""Global supply/demand time series by market_year for a commodity."""
metrics = _validate_metrics(metrics)
cols = ", ".join(metrics)
where_parts = ["country_name = 'Global'", "commodity_code = ?"]
params: list = [commodity_code]
if start_year is not None:
where_parts.append("market_year >= ?")
params.append(start_year)
if end_year is not None:
where_parts.append("market_year <= ?")
params.append(end_year)
where_clause = " AND ".join(where_parts)
return await fetch_analytics(
f"""
SELECT market_year, {cols}
FROM serving.commodity_metrics
WHERE {where_clause}
ORDER BY market_year
""",
params,
)
async def get_top_countries(
commodity_code: int,
metric: str,
limit: int = 10,
) -> list[dict]:
"""Country ranking for latest market year by a single metric."""
metric = _validate_metrics([metric])[0]
return await fetch_analytics(
f"""
WITH latest AS (
SELECT MAX(market_year) AS max_year
FROM serving.commodity_metrics
WHERE commodity_code = ? AND country_code IS NOT NULL
)
SELECT country_name, country_code, market_year, {metric}
FROM serving.commodity_metrics, latest
WHERE commodity_code = ?
AND country_code IS NOT NULL
AND market_year = latest.max_year
ORDER BY {metric} DESC
LIMIT ?
""",
[commodity_code, commodity_code, limit],
)
async def get_stock_to_use_trend(commodity_code: int) -> list[dict]:
"""Global stock-to-use ratio over time."""
return await fetch_analytics(
"""
SELECT market_year, Stock_to_Use_Ratio_pct
FROM serving.commodity_metrics
WHERE commodity_code = ?
AND country_name = 'Global'
ORDER BY market_year
""",
[commodity_code],
)
async def get_supply_demand_balance(commodity_code: int) -> list[dict]:
"""Global supply-demand balance trend."""
return await fetch_analytics(
"""
SELECT market_year, Production, Total_Distribution, Supply_Demand_Balance
FROM serving.commodity_metrics
WHERE commodity_code = ?
AND country_name = 'Global'
ORDER BY market_year
""",
[commodity_code],
)
async def get_production_yoy_by_country(
commodity_code: int,
limit: int = 15,
) -> list[dict]:
"""Latest YoY production changes by country."""
return await fetch_analytics(
"""
WITH latest AS (
SELECT MAX(market_year) AS max_year
FROM serving.commodity_metrics
WHERE commodity_code = ? AND country_code IS NOT NULL
)
SELECT country_name, country_code, market_year,
Production, Production_YoY_pct
FROM serving.commodity_metrics, latest
WHERE commodity_code = ?
AND country_code IS NOT NULL
AND market_year = latest.max_year
AND Production > 0
ORDER BY ABS(Production_YoY_pct) DESC
LIMIT ?
""",
[commodity_code, commodity_code, limit],
)
async def get_country_comparison(
commodity_code: int,
country_codes: list[str],
metric: str,
) -> list[dict]:
"""Multi-country time series for a single metric."""
assert len(country_codes) <= 10, "Maximum 10 countries for comparison"
metric = _validate_metrics([metric])[0]
placeholders = ", ".join(["?"] * len(country_codes))
return await fetch_analytics(
f"""
SELECT country_name, country_code, market_year, {metric}
FROM serving.commodity_metrics
WHERE commodity_code = ?
AND country_code IN ({placeholders})
ORDER BY country_name, market_year
""",
[commodity_code, *country_codes],
)

View File

@@ -0,0 +1,191 @@
"""
API domain: REST API for commodity analytics with key authentication and rate limiting.
"""
import csv
import hashlib
import io
from datetime import datetime
from functools import wraps
from quart import Blueprint, Response, g, jsonify, request
from .. import analytics
from ..core import check_rate_limit, execute, fetch_one
bp = Blueprint("api", __name__)
# =============================================================================
# SQL Queries
# =============================================================================
async def verify_api_key(raw_key: str) -> dict | None:
"""Verify API key and return key data with user info."""
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
result = await fetch_one(
"""
SELECT ak.*, u.email, u.id as user_id,
COALESCE(s.plan, 'free') as plan
FROM api_keys ak
JOIN users u ON u.id = ak.user_id
LEFT JOIN subscriptions s ON s.user_id = u.id AND s.status = 'active'
WHERE ak.key_hash = ? AND ak.deleted_at IS NULL AND u.deleted_at IS NULL
""",
(key_hash,),
)
if result:
await execute(
"UPDATE api_keys SET last_used_at = ? WHERE id = ?",
(datetime.utcnow().isoformat(), result["id"]),
)
return result
async def log_api_request(user_id: int, endpoint: str, method: str) -> None:
"""Log API request for analytics and rate limiting."""
await execute(
"""
INSERT INTO api_requests (user_id, endpoint, method, created_at)
VALUES (?, ?, ?, ?)
""",
(user_id, endpoint, method, datetime.utcnow().isoformat()),
)
# =============================================================================
# Decorators
# =============================================================================
def api_key_required(scopes: list[str] = None):
"""Require valid API key with optional scope check."""
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return jsonify({"error": "Missing API key"}), 401
raw_key = auth[7:]
key_data = await verify_api_key(raw_key)
if not key_data:
return jsonify({"error": "Invalid API key"}), 401
# Check scopes
if scopes:
key_scopes = (key_data.get("scopes") or "").split(",")
if not any(s in key_scopes for s in scopes):
return jsonify({"error": "Insufficient permissions"}), 403
# Check plan allows API access
plan = key_data.get("plan", "free")
if plan == "free":
return jsonify({"error": "API access requires a Starter or Pro plan"}), 403
# Rate limiting
rate_key = f"api:{key_data['id']}"
allowed, info = await check_rate_limit(rate_key, limit=1000, window=3600)
if not allowed:
response = jsonify({"error": "Rate limit exceeded", **info})
response.headers["X-RateLimit-Limit"] = str(info["limit"])
response.headers["X-RateLimit-Remaining"] = str(info["remaining"])
response.headers["X-RateLimit-Reset"] = str(info["reset"])
return response, 429
await log_api_request(key_data["user_id"], request.path, request.method)
g.api_key = key_data
g.user_id = key_data["user_id"]
g.plan = plan
return await f(*args, **kwargs)
return decorated
return decorator
# =============================================================================
# Routes
# =============================================================================
@bp.route("/me")
@api_key_required()
async def me():
"""Get current user info."""
return jsonify({
"user_id": g.user_id,
"email": g.api_key["email"],
"plan": g.plan,
"key_name": g.api_key["name"],
"scopes": g.api_key["scopes"].split(","),
})
@bp.route("/commodities")
@api_key_required(scopes=["read"])
async def list_commodities():
"""List available commodities."""
commodities = await analytics.get_available_commodities()
return jsonify({"commodities": commodities})
@bp.route("/commodities/<int:code>/metrics")
@api_key_required(scopes=["read"])
async def commodity_metrics(code: int):
"""Time series metrics for a commodity. Query params: metrics, start_year, end_year."""
raw_metrics = request.args.getlist("metrics") or ["Production", "Exports", "Imports", "Ending_Stocks"]
metrics = [m for m in raw_metrics if m in analytics.ALLOWED_METRICS]
if not metrics:
return jsonify({"error": f"No valid metrics. Allowed: {sorted(analytics.ALLOWED_METRICS)}"}), 400
start_year = request.args.get("start_year", type=int)
end_year = request.args.get("end_year", type=int)
data = await analytics.get_global_time_series(code, metrics, start_year, end_year)
return jsonify({"commodity_code": code, "metrics": metrics, "data": data})
@bp.route("/commodities/<int:code>/countries")
@api_key_required(scopes=["read"])
async def commodity_countries(code: int):
"""Country ranking for a commodity. Query params: metric, limit."""
metric = request.args.get("metric", "Production")
if metric not in analytics.ALLOWED_METRICS:
return jsonify({"error": f"Invalid metric. Allowed: {sorted(analytics.ALLOWED_METRICS)}"}), 400
limit = min(int(request.args.get("limit", 20)), 100)
data = await analytics.get_top_countries(code, metric, limit)
return jsonify({"commodity_code": code, "metric": metric, "data": data})
@bp.route("/commodities/<int:code>/metrics.csv")
@api_key_required(scopes=["read"])
async def commodity_metrics_csv(code: int):
"""CSV export of time series metrics."""
if g.plan == "free":
return jsonify({"error": "CSV export requires a Starter or Pro plan"}), 403
raw_metrics = request.args.getlist("metrics") or [
"Production", "Exports", "Imports", "Ending_Stocks", "Total_Distribution",
]
metrics = [m for m in raw_metrics if m in analytics.ALLOWED_METRICS]
if not metrics:
return jsonify({"error": "No valid metrics"}), 400
data = await analytics.get_global_time_series(code, metrics)
output = io.StringIO()
if data:
writer = csv.DictWriter(output, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
return Response(
output.getvalue(),
mimetype="text/csv",
headers={"Content-Disposition": f"attachment; filename=commodity_{code}_metrics.csv"},
)

123
web/src/beanflows/app.py Normal file
View File

@@ -0,0 +1,123 @@
"""
BeanFlows - Application factory and entry point.
"""
import os
from pathlib import Path
from quart import Quart, g, session
from .analytics import close_analytics_db, open_analytics_db
from .core import close_db, config, get_csrf_token, init_db, setup_request_id
def create_app() -> Quart:
"""Create and configure the Quart application."""
# Get package directory for templates
pkg_dir = Path(__file__).parent
app = Quart(
__name__,
template_folder=str(pkg_dir / "templates"),
static_folder=str(pkg_dir / "static"),
)
app.secret_key = config.SECRET_KEY
# Session config
app.config["SESSION_COOKIE_SECURE"] = not config.DEBUG
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
app.config["PERMANENT_SESSION_LIFETIME"] = 60 * 60 * 24 * config.SESSION_LIFETIME_DAYS
# Database lifecycle
@app.before_serving
async def startup():
await init_db()
if os.getenv("DUCKDB_PATH"):
open_analytics_db()
@app.after_serving
async def shutdown():
close_analytics_db()
await close_db()
# Security headers
@app.after_request
async def add_security_headers(response):
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
if not config.DEBUG:
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
# Load current user before each request
@app.before_request
async def load_user():
g.user = None
user_id = session.get("user_id")
if user_id:
from .auth.routes import get_user_by_id
g.user = await get_user_by_id(user_id)
# Template context globals
@app.context_processor
def inject_globals():
from datetime import datetime
return {
"config": config,
"user": g.get("user"),
"now": datetime.utcnow(),
"csrf_token": get_csrf_token,
}
# Health check
@app.route("/health")
async def health():
from .analytics import _conn as duckdb_conn
from .analytics import fetch_analytics
from .core import fetch_one
result = {"status": "healthy", "sqlite": "ok", "duckdb": "ok"}
try:
await fetch_one("SELECT 1")
except Exception as e:
result["sqlite"] = str(e)
result["status"] = "unhealthy"
if duckdb_conn is not None:
try:
await fetch_analytics("SELECT 1")
except Exception as e:
result["duckdb"] = str(e)
result["status"] = "unhealthy"
else:
result["duckdb"] = "not configured"
status_code = 200 if result["status"] == "healthy" else 500
return result, status_code
# Register blueprints
from .admin.routes import bp as admin_bp
from .api.routes import bp as api_bp
from .auth.routes import bp as auth_bp
from .billing.routes import bp as billing_bp
from .dashboard.routes import bp as dashboard_bp
from .public.routes import bp as public_bp
app.register_blueprint(public_bp)
app.register_blueprint(auth_bp)
app.register_blueprint(dashboard_bp)
app.register_blueprint(billing_bp)
app.register_blueprint(api_bp, url_prefix="/api/v1")
app.register_blueprint(admin_bp)
# Request ID tracking
setup_request_id(app)
return app
app = create_app()
if __name__ == "__main__":
app.run(debug=config.DEBUG, port=5000)

View File

@@ -0,0 +1,314 @@
"""
Auth domain: magic link authentication, user management, decorators.
"""
import secrets
from functools import wraps
from datetime import datetime, timedelta
from pathlib import Path
from quart import Blueprint, render_template, request, redirect, url_for, session, flash, g
from ..core import config, fetch_one, fetch_all, execute, csrf_protect
# Blueprint with its own template folder
bp = Blueprint(
"auth",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/auth",
)
# =============================================================================
# SQL Queries
# =============================================================================
async def get_user_by_id(user_id: int) -> dict | None:
"""Get user by ID."""
return await fetch_one(
"SELECT * FROM users WHERE id = ? AND deleted_at IS NULL",
(user_id,)
)
async def get_user_by_email(email: str) -> dict | None:
"""Get user by email."""
return await fetch_one(
"SELECT * FROM users WHERE email = ? AND deleted_at IS NULL",
(email.lower(),)
)
async def create_user(email: str) -> int:
"""Create new user, return ID."""
now = datetime.utcnow().isoformat()
return await execute(
"INSERT INTO users (email, created_at) VALUES (?, ?)",
(email.lower(), now)
)
async def update_user(user_id: int, **fields) -> None:
"""Update user fields."""
if not fields:
return
sets = ", ".join(f"{k} = ?" for k in fields.keys())
values = list(fields.values()) + [user_id]
await execute(f"UPDATE users SET {sets} WHERE id = ?", tuple(values))
async def create_auth_token(user_id: int, token: str, minutes: int = None) -> int:
"""Create auth token for user."""
minutes = minutes or config.MAGIC_LINK_EXPIRY_MINUTES
expires = datetime.utcnow() + timedelta(minutes=minutes)
return await execute(
"INSERT INTO auth_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
(user_id, token, expires.isoformat())
)
async def get_valid_token(token: str) -> dict | None:
"""Get token if valid and not expired."""
return await fetch_one(
"""
SELECT at.*, u.email
FROM auth_tokens at
JOIN users u ON u.id = at.user_id
WHERE at.token = ? AND at.expires_at > ? AND at.used_at IS NULL
""",
(token, datetime.utcnow().isoformat())
)
async def mark_token_used(token_id: int) -> None:
"""Mark token as used."""
await execute(
"UPDATE auth_tokens SET used_at = ? WHERE id = ?",
(datetime.utcnow().isoformat(), token_id)
)
async def get_user_with_subscription(user_id: int) -> dict | None:
"""Get user with their active subscription info."""
return await fetch_one(
"""
SELECT u.*, s.plan, s.status as sub_status, s.current_period_end
FROM users u
LEFT JOIN subscriptions s ON s.user_id = u.id AND s.status = 'active'
WHERE u.id = ? AND u.deleted_at IS NULL
""",
(user_id,)
)
# =============================================================================
# Decorators
# =============================================================================
def login_required(f):
"""Require authenticated user."""
@wraps(f)
async def decorated(*args, **kwargs):
if not g.get("user"):
await flash("Please sign in to continue.", "warning")
return redirect(url_for("auth.login", next=request.path))
return await f(*args, **kwargs)
return decorated
def subscription_required(plans: list[str] = None):
"""Require active subscription, optionally of specific plan(s)."""
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
if not g.get("user"):
await flash("Please sign in to continue.", "warning")
return redirect(url_for("auth.login"))
user = await get_user_with_subscription(g.user["id"])
if not user or not user.get("plan"):
await flash("Please subscribe to access this feature.", "warning")
return redirect(url_for("billing.pricing"))
if plans and user["plan"] not in plans:
await flash(f"This feature requires a {' or '.join(plans)} plan.", "warning")
return redirect(url_for("billing.pricing"))
return await f(*args, **kwargs)
return decorated
return decorator
# =============================================================================
# Routes
# =============================================================================
@bp.route("/login", methods=["GET", "POST"])
@csrf_protect
async def login():
"""Login page - request magic link."""
if g.get("user"):
return redirect(url_for("dashboard.index"))
if request.method == "POST":
form = await request.form
email = form.get("email", "").strip().lower()
if not email or "@" not in email:
await flash("Please enter a valid email address.", "error")
return redirect(url_for("auth.login"))
# Get or create user
user = await get_user_by_email(email)
if not user:
user_id = await create_user(email)
else:
user_id = user["id"]
# Create magic link token
token = secrets.token_urlsafe(32)
await create_auth_token(user_id, token)
# Queue email
from ..worker import enqueue
await enqueue("send_magic_link", {"email": email, "token": token})
await flash("Check your email for the sign-in link!", "success")
return redirect(url_for("auth.magic_link_sent", email=email))
return await render_template("login.html")
@bp.route("/signup", methods=["GET", "POST"])
@csrf_protect
async def signup():
"""Signup page - same as login but with different messaging."""
if g.get("user"):
return redirect(url_for("dashboard.index"))
plan = request.args.get("plan", "free")
if request.method == "POST":
form = await request.form
email = form.get("email", "").strip().lower()
selected_plan = form.get("plan", "free")
if not email or "@" not in email:
await flash("Please enter a valid email address.", "error")
return redirect(url_for("auth.signup", plan=selected_plan))
# Check if user exists
user = await get_user_by_email(email)
if user:
await flash("Account already exists. Please sign in.", "info")
return redirect(url_for("auth.login"))
# Create user
user_id = await create_user(email)
# Create magic link token
token = secrets.token_urlsafe(32)
await create_auth_token(user_id, token)
# Queue emails
from ..worker import enqueue
await enqueue("send_magic_link", {"email": email, "token": token})
await enqueue("send_welcome", {"email": email})
await flash("Check your email to complete signup!", "success")
return redirect(url_for("auth.magic_link_sent", email=email))
return await render_template("signup.html", plan=plan)
@bp.route("/verify")
async def verify():
"""Verify magic link token."""
token = request.args.get("token")
if not token:
await flash("Invalid or expired link.", "error")
return redirect(url_for("auth.login"))
token_data = await get_valid_token(token)
if not token_data:
await flash("Invalid or expired link. Please request a new one.", "error")
return redirect(url_for("auth.login"))
# Mark token as used
await mark_token_used(token_data["id"])
# Update last login
await update_user(token_data["user_id"], last_login_at=datetime.utcnow().isoformat())
# Set session
session.permanent = True
session["user_id"] = token_data["user_id"]
await flash("Successfully signed in!", "success")
# Redirect to intended page or dashboard
next_url = request.args.get("next", url_for("dashboard.index"))
return redirect(next_url)
@bp.route("/logout", methods=["POST"])
@csrf_protect
async def logout():
"""Log out user."""
session.clear()
await flash("You have been signed out.", "info")
return redirect(url_for("public.landing"))
@bp.route("/magic-link-sent")
async def magic_link_sent():
"""Confirmation page after magic link sent."""
email = request.args.get("email", "")
return await render_template("magic_link_sent.html", email=email)
@bp.route("/dev-login")
async def dev_login():
"""Instant login for development. Only works in DEBUG mode."""
if not config.DEBUG:
return "Not available", 404
email = request.args.get("email", "dev@localhost")
user = await get_user_by_email(email)
if not user:
user_id = await create_user(email)
else:
user_id = user["id"]
session.permanent = True
session["user_id"] = user_id
await flash(f"Dev login as {email}", "success")
return redirect(url_for("dashboard.index"))
@bp.route("/resend", methods=["POST"])
@csrf_protect
async def resend():
"""Resend magic link."""
form = await request.form
email = form.get("email", "").strip().lower()
if not email:
await flash("Email address required.", "error")
return redirect(url_for("auth.login"))
user = await get_user_by_email(email)
if user:
token = secrets.token_urlsafe(32)
await create_auth_token(user["id"], token)
from ..worker import enqueue
await enqueue("send_magic_link", {"email": email, "token": token})
# Always show success (don't reveal if email exists)
await flash("If that email is registered, we've sent a new link.", "success")
return redirect(url_for("auth.magic_link_sent", email=email))

View File

@@ -0,0 +1,39 @@
{% extends "base.html" %}
{% block title %}Sign In - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 400px; margin: 4rem auto;">
<header>
<h1>Sign In</h1>
<p>Enter your email to receive a sign-in link.</p>
</header>
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<label for="email">
Email
<input
type="email"
id="email"
name="email"
placeholder="you@example.com"
required
autofocus
>
</label>
<button type="submit">Send Sign-In Link</button>
</form>
<footer style="text-align: center; margin-top: 1rem;">
<small>
Don't have an account?
<a href="{{ url_for('auth.signup') }}">Sign up</a>
</small>
</footer>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,35 @@
{% extends "base.html" %}
{% block title %}Check Your Email - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 400px; margin: 4rem auto; text-align: center;">
<header>
<h1>Check Your Email</h1>
</header>
<p>We've sent a sign-in link to:</p>
<p><strong>{{ email }}</strong></p>
<p>Click the link in the email to sign in. The link expires in {{ config.MAGIC_LINK_EXPIRY_MINUTES }} minutes.</p>
<hr>
<details>
<summary>Didn't receive the email?</summary>
<ul style="text-align: left;">
<li>Check your spam folder</li>
<li>Make sure the email address is correct</li>
<li>Wait a minute and try again</li>
</ul>
<form method="post" action="{{ url_for('auth.resend') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="email" value="{{ email }}">
<button type="submit" class="secondary outline">Resend Link</button>
</form>
</details>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,44 @@
{% extends "base.html" %}
{% block title %}Sign Up - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 400px; margin: 4rem auto;">
<header>
<h1>Create Account</h1>
<p>Enter your email to get started.</p>
</header>
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" name="plan" value="{{ plan }}">
<label for="email">
Email
<input
type="email"
id="email"
name="email"
placeholder="you@example.com"
required
autofocus
>
</label>
{% if plan and plan != 'free' %}
<small>You'll be able to subscribe to the <strong>{{ plan | title }}</strong> plan after signing up.</small>
{% endif %}
<button type="submit">Create Account</button>
</form>
<footer style="text-align: center; margin-top: 1rem;">
<small>
Already have an account?
<a href="{{ url_for('auth.login') }}">Sign in</a>
</small>
</footer>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,275 @@
"""
Billing domain: checkout, webhooks, subscription management.
Payment provider: paddle
"""
import json
from datetime import datetime
from functools import wraps
from pathlib import Path
from quart import Blueprint, render_template, request, redirect, url_for, flash, g, jsonify, session
import httpx
from ..core import config, fetch_one, fetch_all, execute
from ..core import verify_hmac_signature
from ..auth.routes import login_required
# Blueprint with its own template folder
bp = Blueprint(
"billing",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/billing",
)
# =============================================================================
# SQL Queries
# =============================================================================
async def get_subscription(user_id: int) -> dict | None:
"""Get user's subscription."""
return await fetch_one(
"SELECT * FROM subscriptions WHERE user_id = ? ORDER BY created_at DESC LIMIT 1",
(user_id,)
)
async def upsert_subscription(
user_id: int,
plan: str,
status: str,
provider_customer_id: str,
provider_subscription_id: str,
current_period_end: str = None,
) -> int:
"""Create or update subscription."""
now = datetime.utcnow().isoformat()
customer_col = "paddle_customer_id"
subscription_col = "paddle_subscription_id"
existing = await fetch_one("SELECT id FROM subscriptions WHERE user_id = ?", (user_id,))
if existing:
await execute(
f"""UPDATE subscriptions
SET plan = ?, status = ?, {customer_col} = ?, {subscription_col} = ?,
current_period_end = ?, updated_at = ?
WHERE user_id = ?""",
(plan, status, provider_customer_id, provider_subscription_id,
current_period_end, now, user_id),
)
return existing["id"]
else:
return await execute(
f"""INSERT INTO subscriptions
(user_id, plan, status, {customer_col}, {subscription_col},
current_period_end, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(user_id, plan, status, provider_customer_id, provider_subscription_id,
current_period_end, now, now),
)
async def get_subscription_by_provider_id(subscription_id: str) -> dict | None:
return await fetch_one(
"SELECT * FROM subscriptions WHERE paddle_subscription_id = ?",
(subscription_id,)
)
async def update_subscription_status(provider_subscription_id: str, status: str, **extra) -> None:
"""Update subscription status by provider subscription ID."""
extra["updated_at"] = datetime.utcnow().isoformat()
extra["status"] = status
sets = ", ".join(f"{k} = ?" for k in extra)
values = list(extra.values())
values.append(provider_subscription_id)
await execute(f"UPDATE subscriptions SET {sets} WHERE paddle_subscription_id = ?", tuple(values))
async def can_access_feature(user_id: int, feature: str) -> bool:
"""Check if user can access a feature based on their plan."""
sub = await get_subscription(user_id)
plan = sub["plan"] if sub and sub["status"] in ("active", "on_trial", "cancelled") else "free"
return feature in config.PLAN_FEATURES.get(plan, [])
async def is_within_limits(user_id: int, resource: str, current_count: int) -> bool:
"""Check if user is within their plan limits."""
sub = await get_subscription(user_id)
plan = sub["plan"] if sub and sub["status"] in ("active", "on_trial", "cancelled") else "free"
limit = config.PLAN_LIMITS.get(plan, {}).get(resource, 0)
if limit == -1:
return True
return current_count < limit
# =============================================================================
# Access Gating
# =============================================================================
def subscription_required(allowed=("active", "on_trial", "cancelled")):
"""Decorator to gate routes behind active subscription."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if "user_id" not in session:
return redirect(url_for("auth.login"))
sub = await get_subscription(session["user_id"])
if not sub or sub["status"] not in allowed:
return redirect(url_for("billing.pricing"))
return await func(*args, **kwargs)
return wrapper
return decorator
# =============================================================================
# Routes
# =============================================================================
@bp.route("/pricing")
async def pricing():
"""Pricing page."""
user_sub = None
if "user_id" in session:
user_sub = await get_subscription(session["user_id"])
return await render_template("pricing.html", subscription=user_sub)
@bp.route("/success")
@login_required
async def success():
"""Checkout success page."""
return await render_template("success.html")
# =============================================================================
# Paddle Implementation
# =============================================================================
@bp.route("/checkout/<plan>", methods=["POST"])
@login_required
async def checkout(plan: str):
"""Create Paddle checkout via API."""
price_id = config.PADDLE_PRICES.get(plan)
if not price_id:
await flash("Invalid plan selected.", "error")
return redirect(url_for("billing.pricing"))
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.paddle.com/transactions",
headers={
"Authorization": f"Bearer {config.PADDLE_API_KEY}",
"Content-Type": "application/json",
},
json={
"items": [{"price_id": price_id, "quantity": 1}],
"custom_data": {"user_id": str(g.user["id"]), "plan": plan},
"checkout": {
"url": f"{config.BASE_URL}/billing/success",
},
},
)
response.raise_for_status()
checkout_url = response.json()["data"]["checkout"]["url"]
return redirect(checkout_url)
@bp.route("/manage", methods=["POST"])
@login_required
async def manage():
"""Redirect to Paddle customer portal."""
sub = await get_subscription(g.user["id"])
if not sub or not sub.get("paddle_subscription_id"):
await flash("No active subscription found.", "error")
return redirect(url_for("dashboard.settings"))
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.paddle.com/subscriptions/{sub['paddle_subscription_id']}",
headers={"Authorization": f"Bearer {config.PADDLE_API_KEY}"},
)
response.raise_for_status()
portal_url = response.json()["data"]["management_urls"]["update_payment_method"]
return redirect(portal_url)
@bp.route("/cancel", methods=["POST"])
@login_required
async def cancel():
"""Cancel subscription via Paddle API."""
sub = await get_subscription(g.user["id"])
if sub and sub.get("paddle_subscription_id"):
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.paddle.com/subscriptions/{sub['paddle_subscription_id']}/cancel",
headers={
"Authorization": f"Bearer {config.PADDLE_API_KEY}",
"Content-Type": "application/json",
},
json={"effective_from": "next_billing_period"},
)
return redirect(url_for("dashboard.settings"))
@bp.route("/webhook/paddle", methods=["POST"])
async def webhook():
"""Handle Paddle webhooks."""
payload = await request.get_data()
sig = request.headers.get("Paddle-Signature", "")
if not verify_hmac_signature(payload, sig, config.PADDLE_WEBHOOK_SECRET):
return jsonify({"error": "Invalid signature"}), 400
event = json.loads(payload)
event_type = event.get("event_type")
data = event.get("data", {})
custom_data = data.get("custom_data", {})
user_id = custom_data.get("user_id")
if event_type == "subscription.activated":
plan = custom_data.get("plan", "starter")
await upsert_subscription(
user_id=int(user_id) if user_id else 0,
plan=plan,
status="active",
provider_customer_id=str(data.get("customer_id", "")),
provider_subscription_id=data.get("id", ""),
current_period_end=data.get("current_billing_period", {}).get("ends_at"),
)
elif event_type == "subscription.updated":
await update_subscription_status(
data.get("id", ""),
status=data.get("status", "active"),
current_period_end=data.get("current_billing_period", {}).get("ends_at"),
)
elif event_type == "subscription.canceled":
await update_subscription_status(data.get("id", ""), status="cancelled")
elif event_type == "subscription.past_due":
await update_subscription_status(data.get("id", ""), status="past_due")
return jsonify({"received": True})

View File

@@ -0,0 +1,120 @@
{% extends "base.html" %}
{% block title %}Pricing - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header style="text-align: center; margin-bottom: 3rem;">
<h1>Simple, Transparent Pricing</h1>
<p>Start free with coffee data. Upgrade when you need more.</p>
</header>
<div class="grid">
<!-- Free Plan -->
<article>
<header>
<h3>Free</h3>
<p><strong style="font-size: 2rem;">$0</strong> <small>/month</small></p>
</header>
<ul>
<li>Coffee dashboard</li>
<li>Last 5 years of data</li>
<li>Global &amp; country charts</li>
<li>Community support</li>
</ul>
<footer>
{% if user %}
{% if (user.plan or 'free') == 'free' %}
<button class="secondary" disabled>Current Plan</button>
{% else %}
<button class="secondary" disabled>Free</button>
{% endif %}
{% else %}
<a href="{{ url_for('auth.signup', plan='free') }}" role="button" class="secondary">Get Started</a>
{% endif %}
</footer>
</article>
<!-- Starter Plan -->
<article>
<header>
<h3>Starter</h3>
<p><strong style="font-size: 2rem;">TBD</strong> <small>/month</small></p>
</header>
<ul>
<li>Full coffee history (18+ years)</li>
<li>CSV data export</li>
<li>REST API access (10k calls/mo)</li>
<li>Email support</li>
</ul>
<footer>
{% if user %}
{% if (user.plan or 'free') == 'starter' %}
<button class="secondary" disabled>Current Plan</button>
{% else %}
<form method="post" action="{{ url_for('billing.checkout', plan='starter') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit">Upgrade</button>
</form>
{% endif %}
{% else %}
<a href="{{ url_for('auth.signup', plan='starter') }}" role="button">Get Started</a>
{% endif %}
</footer>
</article>
<!-- Pro Plan -->
<article>
<header>
<h3>Pro</h3>
<p><strong style="font-size: 2rem;">TBD</strong> <small>/month</small></p>
</header>
<ul>
<li>All 65 USDA commodities</li>
<li>Unlimited API calls</li>
<li>CSV &amp; API export</li>
<li>Priority support</li>
</ul>
<footer>
{% if user %}
{% if (user.plan or 'free') == 'pro' %}
<button class="secondary" disabled>Current Plan</button>
{% else %}
<form method="post" action="{{ url_for('billing.checkout', plan='pro') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit">Upgrade</button>
</form>
{% endif %}
{% else %}
<a href="{{ url_for('auth.signup', plan='pro') }}" role="button">Get Started</a>
{% endif %}
</footer>
</article>
</div>
<!-- FAQ -->
<section style="margin-top: 4rem; max-width: 600px; margin-left: auto; margin-right: auto;">
<h2>Frequently Asked Questions</h2>
<details>
<summary>Where does the data come from?</summary>
<p>All data comes from the USDA Production, Supply &amp; Distribution (PSD) Online database, which is freely available. We process and transform it daily into analytics-ready metrics.</p>
</details>
<details>
<summary>Can I change plans later?</summary>
<p>Yes. Upgrade or downgrade at any time. Changes take effect immediately with prorated billing.</p>
</details>
<details>
<summary>What commodities are available on Pro?</summary>
<p>All 65 commodities tracked by USDA PSD, including coffee, cocoa, sugar, cotton, grains, oilseeds, and more.</p>
</details>
<details>
<summary>How do I cancel?</summary>
<p>Cancel anytime from your dashboard settings. You keep access until the end of your billing period.</p>
</details>
</section>
</main>
{% endblock %}

View File

@@ -0,0 +1,26 @@
{% extends "base.html" %}
{% block title %}Success! - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 500px; margin: 4rem auto; text-align: center;">
<header>
<h1>🎉 Welcome Aboard!</h1>
</header>
<p>Your subscription is now active. You have full access to all features included in your plan.</p>
<p>
<a href="{{ url_for('dashboard.index') }}" role="button">Go to Dashboard</a>
</p>
<hr>
<p><small>
Need to manage your subscription? Visit
<a href="{{ url_for('dashboard.settings') }}">account settings</a>.
</small></p>
</article>
</main>
{% endblock %}

334
web/src/beanflows/core.py Normal file
View File

@@ -0,0 +1,334 @@
"""
Core infrastructure: database, config, email, and shared utilities.
"""
import os
import secrets
import hashlib
import hmac
import aiosqlite
import httpx
from pathlib import Path
from functools import wraps
from datetime import datetime, timedelta
from contextvars import ContextVar
from quart import request, session, g
# =============================================================================
# Configuration
# =============================================================================
class Config:
APP_NAME: str = os.getenv("APP_NAME", "BeanFlows")
SECRET_KEY: str = os.getenv("SECRET_KEY", "change-me-in-production")
BASE_URL: str = os.getenv("BASE_URL", "http://localhost:5000")
DEBUG: bool = os.getenv("DEBUG", "false").lower() == "true"
DATABASE_PATH: str = os.getenv("DATABASE_PATH", "data/app.db")
MAGIC_LINK_EXPIRY_MINUTES: int = int(os.getenv("MAGIC_LINK_EXPIRY_MINUTES", "15"))
SESSION_LIFETIME_DAYS: int = int(os.getenv("SESSION_LIFETIME_DAYS", "30"))
PAYMENT_PROVIDER: str = "paddle"
PADDLE_API_KEY: str = os.getenv("PADDLE_API_KEY", "")
PADDLE_WEBHOOK_SECRET: str = os.getenv("PADDLE_WEBHOOK_SECRET", "")
PADDLE_PRICES: dict = {
"starter": os.getenv("PADDLE_PRICE_STARTER", ""),
"pro": os.getenv("PADDLE_PRICE_PRO", ""),
}
RESEND_API_KEY: str = os.getenv("RESEND_API_KEY", "")
EMAIL_FROM: str = os.getenv("EMAIL_FROM", "hello@example.com")
RATE_LIMIT_REQUESTS: int = int(os.getenv("RATE_LIMIT_REQUESTS", "100"))
RATE_LIMIT_WINDOW: int = int(os.getenv("RATE_LIMIT_WINDOW", "60"))
PLAN_FEATURES: dict = {
"free": ["dashboard", "coffee_only", "limited_history"],
"starter": ["dashboard", "coffee_only", "full_history", "export", "api"],
"pro": ["dashboard", "all_commodities", "full_history", "export", "api", "priority_support"],
}
PLAN_LIMITS: dict = {
"free": {"commodities": 1, "history_years": 5, "api_calls": 0},
"starter": {"commodities": 1, "history_years": -1, "api_calls": 10000},
"pro": {"commodities": -1, "history_years": -1, "api_calls": -1}, # -1 = unlimited
}
config = Config()
# =============================================================================
# Database
# =============================================================================
_db: aiosqlite.Connection | None = None
async def init_db(path: str = None) -> None:
"""Initialize database connection with WAL mode."""
global _db
db_path = path or config.DATABASE_PATH
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
_db = await aiosqlite.connect(db_path)
_db.row_factory = aiosqlite.Row
await _db.execute("PRAGMA journal_mode=WAL")
await _db.execute("PRAGMA foreign_keys=ON")
await _db.execute("PRAGMA busy_timeout=5000")
await _db.execute("PRAGMA synchronous=NORMAL")
await _db.execute("PRAGMA cache_size=-64000")
await _db.execute("PRAGMA temp_store=MEMORY")
await _db.execute("PRAGMA mmap_size=268435456")
await _db.commit()
async def close_db() -> None:
"""Close database connection."""
global _db
if _db:
await _db.execute("PRAGMA wal_checkpoint(TRUNCATE)")
await _db.close()
_db = None
async def get_db() -> aiosqlite.Connection:
"""Get database connection."""
if _db is None:
await init_db()
return _db
async def fetch_one(sql: str, params: tuple = ()) -> dict | None:
"""Fetch a single row as dict."""
db = await get_db()
async with db.execute(sql, params) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def fetch_all(sql: str, params: tuple = ()) -> list[dict]:
"""Fetch all rows as list of dicts."""
db = await get_db()
async with db.execute(sql, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def execute(sql: str, params: tuple = ()) -> int:
"""Execute SQL and return lastrowid."""
db = await get_db()
async with db.execute(sql, params) as cursor:
await db.commit()
return cursor.lastrowid
async def execute_many(sql: str, params_list: list[tuple]) -> None:
"""Execute SQL for multiple parameter sets."""
db = await get_db()
await db.executemany(sql, params_list)
await db.commit()
class transaction:
"""Async context manager for transactions."""
async def __aenter__(self):
self.db = await get_db()
return self.db
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
await self.db.commit()
else:
await self.db.rollback()
return False
# =============================================================================
# Email
# =============================================================================
async def send_email(to: str, subject: str, html: str, text: str = None) -> bool:
"""Send email via Resend API."""
if not config.RESEND_API_KEY:
print(f"[EMAIL] Would send to {to}: {subject}")
return True
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.resend.com/emails",
headers={"Authorization": f"Bearer {config.RESEND_API_KEY}"},
json={
"from": config.EMAIL_FROM,
"to": to,
"subject": subject,
"html": html,
"text": text or html,
},
)
return response.status_code == 200
# =============================================================================
# CSRF Protection
# =============================================================================
def get_csrf_token() -> str:
"""Get or create CSRF token for current session."""
if "csrf_token" not in session:
session["csrf_token"] = secrets.token_urlsafe(32)
return session["csrf_token"]
def validate_csrf_token(token: str) -> bool:
"""Validate CSRF token."""
return token and secrets.compare_digest(token, session.get("csrf_token", ""))
def csrf_protect(f):
"""Decorator to require valid CSRF token for POST requests."""
@wraps(f)
async def decorated(*args, **kwargs):
if request.method == "POST":
form = await request.form
token = form.get("csrf_token") or request.headers.get("X-CSRF-Token")
if not validate_csrf_token(token):
return {"error": "Invalid CSRF token"}, 403
return await f(*args, **kwargs)
return decorated
# =============================================================================
# Rate Limiting (SQLite-based)
# =============================================================================
async def check_rate_limit(key: str, limit: int = None, window: int = None) -> tuple[bool, dict]:
"""
Check if rate limit exceeded. Returns (is_allowed, info).
Uses SQLite for storage - no Redis needed.
"""
limit = limit or config.RATE_LIMIT_REQUESTS
window = window or config.RATE_LIMIT_WINDOW
now = datetime.utcnow()
window_start = now - timedelta(seconds=window)
# Clean old entries and count recent
await execute(
"DELETE FROM rate_limits WHERE key = ? AND timestamp < ?",
(key, window_start.isoformat())
)
result = await fetch_one(
"SELECT COUNT(*) as count FROM rate_limits WHERE key = ? AND timestamp > ?",
(key, window_start.isoformat())
)
count = result["count"] if result else 0
info = {
"limit": limit,
"remaining": max(0, limit - count - 1),
"reset": int((window_start + timedelta(seconds=window)).timestamp()),
}
if count >= limit:
return False, info
# Record this request
await execute(
"INSERT INTO rate_limits (key, timestamp) VALUES (?, ?)",
(key, now.isoformat())
)
return True, info
def rate_limit(limit: int = None, window: int = None, key_func=None):
"""Decorator for rate limiting routes."""
def decorator(f):
@wraps(f)
async def decorated(*args, **kwargs):
if key_func:
key = key_func()
else:
key = f"ip:{request.remote_addr}"
allowed, info = await check_rate_limit(key, limit, window)
if not allowed:
response = {"error": "Rate limit exceeded", **info}
return response, 429
return await f(*args, **kwargs)
return decorated
return decorator
# =============================================================================
# Request ID Tracking
# =============================================================================
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
def get_request_id() -> str:
"""Get current request ID."""
return request_id_var.get()
def setup_request_id(app):
"""Setup request ID middleware."""
@app.before_request
async def set_request_id():
rid = request.headers.get("X-Request-ID") or secrets.token_hex(8)
request_id_var.set(rid)
g.request_id = rid
@app.after_request
async def add_request_id_header(response):
response.headers["X-Request-ID"] = get_request_id()
return response
# =============================================================================
# Webhook Signature Verification
# =============================================================================
def verify_hmac_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify HMAC-SHA256 webhook signature."""
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(signature, expected)
# =============================================================================
# Soft Delete Helpers
# =============================================================================
async def soft_delete(table: str, id: int) -> bool:
"""Mark record as deleted."""
result = await execute(
f"UPDATE {table} SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL",
(datetime.utcnow().isoformat(), id)
)
return result > 0
async def restore(table: str, id: int) -> bool:
"""Restore soft-deleted record."""
result = await execute(
f"UPDATE {table} SET deleted_at = NULL WHERE id = ?",
(id,)
)
return result > 0
async def hard_delete(table: str, id: int) -> bool:
"""Permanently delete record."""
result = await execute(f"DELETE FROM {table} WHERE id = ?", (id,))
return result > 0
async def purge_deleted(table: str, days: int = 30) -> int:
"""Purge records deleted more than X days ago."""
cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat()
return await execute(
f"DELETE FROM {table} WHERE deleted_at IS NOT NULL AND deleted_at < ?",
(cutoff,)
)

View File

@@ -0,0 +1,246 @@
"""
Dashboard domain: coffee analytics dashboard, settings, API keys.
"""
import asyncio
import hashlib
import secrets
from datetime import datetime
from pathlib import Path
from quart import Blueprint, flash, g, jsonify, redirect, render_template, request, url_for
from .. import analytics
from ..auth.routes import get_user_with_subscription, login_required, update_user
from ..billing.routes import get_subscription
from ..core import csrf_protect, execute, fetch_all, fetch_one, soft_delete
# Blueprint with its own template folder
bp = Blueprint(
"dashboard",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
url_prefix="/dashboard",
)
# =============================================================================
# SQL Queries
# =============================================================================
async def get_user_stats(user_id: int) -> dict:
"""Get dashboard stats for user."""
api_calls = await fetch_one(
"""
SELECT COUNT(*) as count FROM api_requests
WHERE user_id = ? AND created_at > date('now', '-30 days')
""",
(user_id,),
)
return {
"api_calls": api_calls["count"] if api_calls else 0,
}
# API Key queries
async def create_api_key(user_id: int, name: str, scopes: list[str] = None) -> tuple[str, int]:
"""Create API key. Returns (raw_key, key_id) - raw_key shown only once."""
raw_key = f"sk_{secrets.token_urlsafe(32)}"
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
prefix = raw_key[:12]
now = datetime.utcnow().isoformat()
key_id = await execute(
"""
INSERT INTO api_keys (user_id, name, key_hash, key_prefix, scopes, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(user_id, name, key_hash, prefix, ",".join(scopes or ["read"]), now),
)
return raw_key, key_id
async def get_user_api_keys(user_id: int) -> list[dict]:
"""Get all API keys for user (without hashes)."""
return await fetch_all(
"""
SELECT id, name, key_prefix, scopes, created_at, last_used_at
FROM api_keys
WHERE user_id = ? AND deleted_at IS NULL
ORDER BY created_at DESC
""",
(user_id,),
)
async def delete_api_key(key_id: int, user_id: int) -> bool:
"""Delete API key (soft delete)."""
result = await execute(
"""
UPDATE api_keys
SET deleted_at = ?
WHERE id = ? AND user_id = ? AND deleted_at IS NULL
""",
(datetime.utcnow().isoformat(), key_id, user_id),
)
return result > 0
# =============================================================================
# Routes
# =============================================================================
@bp.route("/")
@login_required
async def index():
"""Coffee analytics dashboard."""
user = await get_user_with_subscription(g.user["id"])
stats = await get_user_stats(g.user["id"])
plan = user.get("plan") or "free"
# Fetch all analytics data in parallel
time_series, top_producers, stu_trend, balance, yoy = await asyncio.gather(
analytics.get_global_time_series(
analytics.COFFEE_COMMODITY_CODE,
["Production", "Exports", "Imports", "Ending_Stocks", "Total_Distribution"],
),
analytics.get_top_countries(analytics.COFFEE_COMMODITY_CODE, "Production", limit=10),
analytics.get_stock_to_use_trend(analytics.COFFEE_COMMODITY_CODE),
analytics.get_supply_demand_balance(analytics.COFFEE_COMMODITY_CODE),
analytics.get_production_yoy_by_country(analytics.COFFEE_COMMODITY_CODE, limit=15),
)
# Latest global snapshot for key metric cards
latest = time_series[-1] if time_series else {}
# Apply free plan history limit (last 5 years)
if plan == "free" and time_series:
max_year = time_series[-1]["market_year"]
cutoff_year = max_year - 5
time_series = [r for r in time_series if r["market_year"] >= cutoff_year]
stu_trend = [r for r in stu_trend if r["market_year"] >= cutoff_year]
balance = [r for r in balance if r["market_year"] >= cutoff_year]
return await render_template(
"index.html",
user=user,
stats=stats,
plan=plan,
latest=latest,
time_series=time_series,
top_producers=top_producers,
stu_trend=stu_trend,
balance=balance,
yoy=yoy,
)
@bp.route("/countries")
@login_required
async def countries():
"""Country comparison page."""
user = await get_user_with_subscription(g.user["id"])
plan = user.get("plan") or "free"
# Get available countries for coffee
all_countries = await analytics.get_top_countries(analytics.COFFEE_COMMODITY_CODE, "Production", limit=50)
# Parse query params
selected_codes = request.args.getlist("country")
metric = request.args.get("metric", "Production")
comparison_data = []
if selected_codes:
selected_codes = selected_codes[:10] # cap at 10
comparison_data = await analytics.get_country_comparison(
analytics.COFFEE_COMMODITY_CODE, selected_codes, metric
)
# HTMX partial: return just the chart data as JSON
if request.headers.get("HX-Request"):
return jsonify({"data": comparison_data, "metric": metric})
return await render_template(
"countries.html",
user=user,
plan=plan,
all_countries=all_countries,
selected_codes=selected_codes,
metric=metric,
comparison_data=comparison_data,
)
@bp.route("/settings", methods=["GET", "POST"])
@login_required
@csrf_protect
async def settings():
"""User settings page."""
if request.method == "POST":
form = await request.form
# Update user settings
await update_user(
g.user["id"],
name=form.get("name", "").strip() or None,
updated_at=datetime.utcnow().isoformat(),
)
await flash("Settings saved!", "success")
return redirect(url_for("dashboard.settings"))
user = await get_user_with_subscription(g.user["id"])
subscription = await get_subscription(g.user["id"])
api_keys = await get_user_api_keys(g.user["id"])
return await render_template(
"settings.html",
user=user,
subscription=subscription,
api_keys=api_keys,
)
@bp.route("/api-keys", methods=["POST"])
@login_required
@csrf_protect
async def create_key():
"""Create new API key."""
form = await request.form
name = form.get("name", "").strip() or "Untitled Key"
scopes = form.getlist("scopes") or ["read"]
raw_key, key_id = await create_api_key(g.user["id"], name, scopes)
await flash(f"API key created! Copy it now, it won't be shown again: {raw_key}", "success")
return redirect(url_for("dashboard.settings") + "#api-keys")
@bp.route("/api-keys/<int:key_id>/delete", methods=["POST"])
@login_required
@csrf_protect
async def delete_key(key_id: int):
"""Delete API key."""
success = await delete_api_key(key_id, g.user["id"])
if success:
await flash("API key deleted.", "success")
else:
await flash("Could not delete API key.", "error")
return redirect(url_for("dashboard.settings") + "#api-keys")
@bp.route("/delete-account", methods=["POST"])
@login_required
@csrf_protect
async def delete_account():
"""Delete user account (soft delete)."""
from quart import session
await soft_delete("users", g.user["id"])
session.clear()
await flash("Your account has been deleted.", "info")
return redirect(url_for("public.landing"))

View File

@@ -0,0 +1,101 @@
{% extends "base.html" %}
{% block title %}Country Comparison - {{ config.APP_NAME }}{% endblock %}
{% block head %}
<script src="https://cdn.jsdelivr.net/npm/chart.js@4"></script>
{% endblock %}
{% block content %}
<main class="container">
<header>
<h1>Country Comparison</h1>
<p>Compare coffee metrics across producing and consuming countries.</p>
</header>
<!-- Filters -->
<form id="country-form" method="get" action="{{ url_for('dashboard.countries') }}">
<div class="grid">
<label>
Metric
<select name="metric" onchange="this.form.submit()">
{% for m in ["Production", "Exports", "Imports", "Ending_Stocks"] %}
<option value="{{ m }}" {{ "selected" if metric == m }}>{{ m | replace("_", " ") }}</option>
{% endfor %}
</select>
</label>
<label>
Countries (select up to 10)
<select name="country" multiple size="8" onchange="this.form.submit()">
{% for c in all_countries %}
<option value="{{ c.country_code }}" {{ "selected" if c.country_code in selected_codes }}>
{{ c.country_name }}
</option>
{% endfor %}
</select>
</label>
</div>
</form>
<!-- Chart -->
{% if comparison_data %}
<section>
<canvas id="comparisonChart" style="max-height: 500px;"></canvas>
</section>
{% else %}
<article style="text-align: center; color: var(--muted-color);">
<p>Select countries above to see the comparison chart.</p>
</article>
{% endif %}
<a href="{{ url_for('dashboard.index') }}" role="button" class="secondary outline">Back to Dashboard</a>
</main>
{% endblock %}
{% block scripts %}
<script>
const COLORS = [
'#2563eb', '#dc2626', '#16a34a', '#ca8a04', '#9333ea',
'#0891b2', '#e11d48', '#65a30d', '#d97706', '#7c3aed'
];
const rawData = {{ comparison_data | tojson }};
const metric = {{ metric | tojson }};
if (rawData.length > 0) {
// Group by country
const byCountry = {};
for (const row of rawData) {
if (!byCountry[row.country_name]) byCountry[row.country_name] = [];
byCountry[row.country_name].push(row);
}
// Collect all years
const allYears = [...new Set(rawData.map(r => r.market_year))].sort();
const datasets = Object.entries(byCountry).map(([name, rows], i) => {
const yearMap = Object.fromEntries(rows.map(r => [r.market_year, r[metric]]));
return {
label: name,
data: allYears.map(y => yearMap[y] ?? null),
borderColor: COLORS[i % COLORS.length],
tension: 0.3,
spanGaps: true
};
});
new Chart(document.getElementById('comparisonChart'), {
type: 'line',
data: { labels: allYears, datasets },
options: {
responsive: true,
plugins: {
legend: { position: 'bottom' },
title: { display: true, text: metric.replace(/_/g, ' ') + ' by Country' }
},
scales: { y: { beginAtZero: false } }
}
});
}
</script>
{% endblock %}

View File

@@ -0,0 +1,211 @@
{% extends "base.html" %}
{% block title %}Dashboard - {{ config.APP_NAME }}{% endblock %}
{% block head %}
<script src="https://cdn.jsdelivr.net/npm/chart.js@4"></script>
{% endblock %}
{% block content %}
<main class="container">
<header>
<h1>Coffee Dashboard</h1>
<p>Welcome back{% if user.name %}, {{ user.name }}{% endif %}! Global coffee market data from USDA PSD.</p>
</header>
<!-- Key Metric Cards -->
<div class="grid">
<article>
<header><small>Global Production (latest year)</small></header>
<p style="font-size: 2rem; margin: 0;">
<strong>{{ "{:,.0f}".format(latest.get("Production", 0)) }}</strong>
</p>
<small>1,000 60-kg bags</small>
</article>
<article>
<header><small>Stock-to-Use Ratio</small></header>
<p style="font-size: 2rem; margin: 0;">
{% if stu_trend %}
<strong>{{ "{:.1f}".format(stu_trend[-1].get("Stock_to_Use_Ratio_pct", 0)) }}%</strong>
{% else %}
<strong>--</strong>
{% endif %}
</p>
<small>Ending stocks / consumption</small>
</article>
<article>
<header><small>Trade Balance</small></header>
<p style="font-size: 2rem; margin: 0;">
<strong>{{ "{:,.0f}".format(latest.get("Exports", 0) - latest.get("Imports", 0)) }}</strong>
</p>
<small>Exports minus imports</small>
</article>
<article>
<header><small>Your Plan</small></header>
<p style="font-size: 2rem; margin: 0;"><strong>{{ plan | title }}</strong></p>
<small>
{% if plan == "free" %}
<a href="{{ url_for('billing.pricing') }}">Upgrade for full history</a>
{% else %}
{{ stats.api_calls }} API calls (30d)
{% endif %}
</small>
</article>
</div>
<!-- Global Supply/Demand Time Series -->
<section>
<h2>Global Supply &amp; Demand</h2>
{% if plan == "free" %}
<p><small>Showing last 5 years. <a href="{{ url_for('billing.pricing') }}">Upgrade</a> for full 18+ year history.</small></p>
{% endif %}
<canvas id="supplyDemandChart" style="max-height: 400px;"></canvas>
</section>
<!-- Stock-to-Use Ratio -->
<section>
<h2>Stock-to-Use Ratio Trend</h2>
<canvas id="stuChart" style="max-height: 300px;"></canvas>
</section>
<!-- Two-column: Top Producers + YoY Table -->
<div class="grid">
<section>
<h2>Top Producing Countries</h2>
<canvas id="topProducersChart" style="max-height: 400px;"></canvas>
</section>
<section>
<h2>Year-over-Year Production Change</h2>
<div style="overflow-x: auto;">
<table>
<thead>
<tr>
<th>Country</th>
<th style="text-align: right;">Production</th>
<th style="text-align: right;">YoY %</th>
</tr>
</thead>
<tbody>
{% for row in yoy %}
<tr>
<td>{{ row.country_name }}</td>
<td style="text-align: right;">{{ "{:,.0f}".format(row.Production) }}</td>
<td style="text-align: right; color: {{ 'var(--ins-color)' if row.Production_YoY_pct and row.Production_YoY_pct > 0 else 'var(--del-color)' if row.Production_YoY_pct and row.Production_YoY_pct < 0 else 'inherit' }};">
{% if row.Production_YoY_pct is not none %}
{{ "{:+.1f}%".format(row.Production_YoY_pct) }}
{% else %}
--
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
</div>
<!-- CSV Export (plan-gated) -->
{% if plan != "free" %}
<section>
<a href="{{ url_for('api.commodity_metrics_csv', code=711100) }}" role="button" class="secondary outline">Export CSV</a>
</section>
{% else %}
<section>
<p><small>CSV export available on Starter and Pro plans. <a href="{{ url_for('billing.pricing') }}">Upgrade</a></small></p>
</section>
{% endif %}
<!-- Quick Actions -->
<section>
<div class="grid">
<a href="{{ url_for('dashboard.countries') }}" role="button" class="secondary outline">Country Comparison</a>
<a href="{{ url_for('dashboard.settings') }}" role="button" class="secondary outline">Settings</a>
<a href="{{ url_for('dashboard.settings') }}#api-keys" role="button" class="secondary outline">API Keys</a>
</div>
</section>
</main>
{% endblock %}
{% block scripts %}
<script>
// Chart colors
const COLORS = [
'#2563eb', '#dc2626', '#16a34a', '#ca8a04', '#9333ea',
'#0891b2', '#e11d48', '#65a30d', '#d97706', '#7c3aed'
];
// -- Supply/Demand Chart --
const tsData = {{ time_series | tojson }};
if (tsData.length > 0) {
new Chart(document.getElementById('supplyDemandChart'), {
type: 'line',
data: {
labels: tsData.map(r => r.market_year),
datasets: [
{label: 'Production', data: tsData.map(r => r.Production), borderColor: COLORS[0], tension: 0.3},
{label: 'Exports', data: tsData.map(r => r.Exports), borderColor: COLORS[1], tension: 0.3},
{label: 'Imports', data: tsData.map(r => r.Imports), borderColor: COLORS[2], tension: 0.3},
{label: 'Ending Stocks', data: tsData.map(r => r.Ending_Stocks), borderColor: COLORS[3], tension: 0.3},
{label: 'Total Distribution', data: tsData.map(r => r.Total_Distribution), borderColor: COLORS[4], tension: 0.3},
]
},
options: {
responsive: true,
plugins: {legend: {position: 'bottom'}},
scales: {y: {beginAtZero: false}}
}
});
}
// -- Stock-to-Use Chart --
const stuData = {{ stu_trend | tojson }};
if (stuData.length > 0) {
new Chart(document.getElementById('stuChart'), {
type: 'line',
data: {
labels: stuData.map(r => r.market_year),
datasets: [{
label: 'Stock-to-Use Ratio (%)',
data: stuData.map(r => r.Stock_to_Use_Ratio_pct),
borderColor: COLORS[0],
backgroundColor: 'rgba(37,99,235,0.1)',
fill: true,
tension: 0.3
}]
},
options: {
responsive: true,
plugins: {legend: {display: false}},
scales: {y: {beginAtZero: false}}
}
});
}
// -- Top Producers Horizontal Bar --
const topData = {{ top_producers | tojson }};
if (topData.length > 0) {
new Chart(document.getElementById('topProducersChart'), {
type: 'bar',
data: {
labels: topData.map(r => r.country_name),
datasets: [{
label: 'Production',
data: topData.map(r => r.Production),
backgroundColor: COLORS[0]
}]
},
options: {
indexAxis: 'y',
responsive: true,
plugins: {legend: {display: false}},
scales: {x: {beginAtZero: true}}
}
});
}
</script>
{% endblock %}

View File

@@ -0,0 +1,155 @@
{% extends "base.html" %}
{% block title %}Settings - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header>
<h1>Settings</h1>
</header>
<!-- Profile Section -->
<section>
<h2>Profile</h2>
<article>
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<label for="email">
Email
<input type="email" id="email" value="{{ user.email }}" disabled>
<small>Email cannot be changed</small>
</label>
<label for="name">
Name
<input type="text" id="name" name="name" value="{{ user.name or '' }}" placeholder="Your name">
</label>
<button type="submit">Save Changes</button>
</form>
</article>
</section>
<!-- Subscription Section -->
<section>
<h2>Subscription</h2>
<article>
<div class="grid">
<div>
<strong>Current Plan:</strong> {{ (user.plan or 'free') | title }}
</div>
<div>
<strong>Status:</strong> {{ (user.sub_status or 'active') | title }}
</div>
{% if user.current_period_end %}
<div>
<strong>Renews:</strong> {{ user.current_period_end[:10] }}
</div>
{% endif %}
</div>
<div style="margin-top: 1rem;">
{% if subscription %}
<form method="post" action="{{ url_for('billing.portal') }}" style="display: inline;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="secondary">Manage Subscription</button>
</form>
{% else %}
<a href="{{ url_for('billing.pricing') }}" role="button">Upgrade Plan</a>
{% endif %}
</div>
</article>
</section>
<!-- API Keys Section -->
<section id="api-keys">
<h2>API Keys</h2>
<article>
<p>API keys allow you to access the API programmatically.</p>
{% if api_keys %}
<table>
<thead>
<tr>
<th>Name</th>
<th>Key</th>
<th>Scopes</th>
<th>Created</th>
<th></th>
</tr>
</thead>
<tbody>
{% for key in api_keys %}
<tr>
<td>{{ key.name }}</td>
<td><code>{{ key.key_prefix }}...</code></td>
<td>{{ key.scopes }}</td>
<td>{{ key.created_at[:10] }}</td>
<td>
<form method="post" action="{{ url_for('dashboard.delete_key', key_id=key.id) }}" style="margin: 0;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="secondary outline" style="padding: 0.25rem 0.5rem; margin: 0;">Delete</button>
</form>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<p><em>No API keys yet.</em></p>
{% endif %}
<details>
<summary>Create New API Key</summary>
<form method="post" action="{{ url_for('dashboard.create_key') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<label for="key-name">
Key Name
<input type="text" id="key-name" name="name" placeholder="My API Key" required>
</label>
<fieldset>
<legend>Scopes</legend>
<label>
<input type="checkbox" name="scopes" value="read" checked>
Read
</label>
<label>
<input type="checkbox" name="scopes" value="write">
Write
</label>
</fieldset>
<button type="submit">Create Key</button>
</form>
</details>
</article>
</section>
<!-- Danger Zone -->
<section>
<h2>Danger Zone</h2>
<article style="border-color: var(--del-color);">
<p>Once you delete your account, there is no going back. Please be certain.</p>
<details>
<summary role="button" class="secondary outline" style="--pico-color: var(--del-color);">Delete Account</summary>
<p>Are you sure? This will:</p>
<ul>
<li>Delete all your data</li>
<li>Cancel your subscription</li>
<li>Remove your API keys</li>
</ul>
<form method="post" action="{{ url_for('dashboard.delete_account') }}">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="secondary" style="--pico-background-color: var(--del-color);">
Yes, Delete My Account
</button>
</form>
</details>
</article>
</section>
</main>
{% endblock %}

View File

@@ -0,0 +1,53 @@
"""
Simple migration runner. Runs schema.sql against the database.
"""
import sqlite3
from pathlib import Path
import os
import sys
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from dotenv import load_dotenv
load_dotenv()
def migrate():
"""Run migrations."""
# Get database path from env or default
db_path = os.getenv("DATABASE_PATH", "data/app.db")
# Ensure directory exists
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
# Read schema
schema_path = Path(__file__).parent / "schema.sql"
schema = schema_path.read_text()
# Connect and execute
conn = sqlite3.connect(db_path)
# Enable WAL mode
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
# Run schema
conn.executescript(schema)
conn.commit()
print(f"✓ Migrations complete: {db_path}")
# Show tables
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = [row[0] for row in cursor.fetchall()]
print(f" Tables: {', '.join(tables)}")
conn.close()
if __name__ == "__main__":
migrate()

View File

@@ -0,0 +1,101 @@
-- BeanFlows Database Schema
-- Run with: python -m beanflows.migrations.migrate
-- Users
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
name TEXT,
created_at TEXT NOT NULL,
updated_at TEXT,
last_login_at TEXT,
deleted_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_users_email ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_deleted ON users(deleted_at);
-- Auth Tokens (magic links)
CREATE TABLE IF NOT EXISTS auth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id),
token TEXT UNIQUE NOT NULL,
expires_at TEXT NOT NULL,
used_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_auth_tokens_token ON auth_tokens(token);
CREATE INDEX IF NOT EXISTS idx_auth_tokens_user ON auth_tokens(user_id);
-- Subscriptions
CREATE TABLE IF NOT EXISTS subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL UNIQUE REFERENCES users(id),
plan TEXT NOT NULL DEFAULT 'free',
status TEXT NOT NULL DEFAULT 'free',
paddle_customer_id TEXT,
paddle_subscription_id TEXT,
current_period_end TEXT,
created_at TEXT NOT NULL,
updated_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_subscriptions_user ON subscriptions(user_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_provider ON subscriptions(paddle_subscription_id);
-- API Keys
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id),
name TEXT NOT NULL,
key_hash TEXT UNIQUE NOT NULL,
key_prefix TEXT NOT NULL,
scopes TEXT DEFAULT 'read',
created_at TEXT NOT NULL,
last_used_at TEXT,
deleted_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON api_keys(key_hash);
CREATE INDEX IF NOT EXISTS idx_api_keys_user ON api_keys(user_id);
-- API Request Log
CREATE TABLE IF NOT EXISTS api_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id),
endpoint TEXT NOT NULL,
method TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_api_requests_user ON api_requests(user_id);
CREATE INDEX IF NOT EXISTS idx_api_requests_date ON api_requests(created_at);
-- Rate Limits
CREATE TABLE IF NOT EXISTS rate_limits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_rate_limits_key ON rate_limits(key, timestamp);
-- Background Tasks
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_name TEXT NOT NULL,
payload TEXT,
status TEXT NOT NULL DEFAULT 'pending',
run_at TEXT NOT NULL,
retries INTEGER DEFAULT 0,
error TEXT,
created_at TEXT NOT NULL,
completed_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status, run_at);

View File

@@ -0,0 +1,45 @@
"""
Public domain: landing page, marketing pages, legal pages.
"""
from pathlib import Path
from quart import Blueprint, render_template
from ..core import config
# Blueprint with its own template folder
bp = Blueprint(
"public",
__name__,
template_folder=str(Path(__file__).parent / "templates"),
)
@bp.route("/")
async def landing():
"""Landing page."""
return await render_template("landing.html")
@bp.route("/features")
async def features():
"""Features page."""
return await render_template("features.html")
@bp.route("/terms")
async def terms():
"""Terms of service."""
return await render_template("terms.html")
@bp.route("/privacy")
async def privacy():
"""Privacy policy."""
return await render_template("privacy.html")
@bp.route("/about")
async def about():
"""About page."""
return await render_template("about.html")

View File

@@ -0,0 +1,34 @@
{% extends "base.html" %}
{% block title %}About - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 800px; margin: 0 auto;">
<header style="text-align: center;">
<h1>About {{ config.APP_NAME }}</h1>
</header>
<section>
<p>{{ config.APP_NAME }} was built with a simple philosophy: ship fast, stay simple.</p>
<p>Too many SaaS boilerplates are over-engineered. They use PostgreSQL when SQLite would do. They add Redis for a job queue that runs 10 tasks a day. They have 50 npm dependencies for a landing page.</p>
<p>We took a different approach:</p>
<ul>
<li><strong>SQLite for everything</strong> It handles more than you think.</li>
<li><strong>Server-rendered HTML</strong> No build step, no hydration, no complexity.</li>
<li><strong>Minimal dependencies</strong> Fewer things to break.</li>
<li><strong>Flat structure</strong> Find things where you expect them.</li>
</ul>
<p>The result is a codebase you can understand in an afternoon and deploy for $5/month.</p>
</section>
<section style="text-align: center; margin-top: 3rem;">
<a href="{{ url_for('auth.signup') }}" role="button">Get Started</a>
</section>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,73 @@
{% extends "base.html" %}
{% block title %}Features - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<header style="text-align: center; margin-bottom: 3rem;">
<h1>Features</h1>
<p>Coffee market intelligence built on USDA Production, Supply &amp; Distribution data.</p>
</header>
<section>
<article>
<h2>Supply &amp; Demand Dashboard</h2>
<p>Interactive charts showing global coffee production, exports, imports, ending stocks, and total distribution by market year. Spot surplus and deficit years at a glance.</p>
<ul>
<li>18+ years of historical data (2006&ndash;present)</li>
<li>Line charts for production, trade, and consumption trends</li>
<li>Key metric cards for quick orientation</li>
<li>Auto-refreshed daily from USDA PSD Online</li>
</ul>
</article>
<article>
<h2>Country Analysis &amp; Comparison</h2>
<p>Rank the world's coffee producers and consumers. Compare up to 10 countries side-by-side on any metric.</p>
<ul>
<li>Top-N country rankings (production, exports, imports, stocks)</li>
<li>Year-over-year production change table with directional coloring</li>
<li>Multi-country overlay charts</li>
<li>65 commodity-country combinations from USDA data</li>
</ul>
</article>
<article>
<h2>Stock-to-Use Ratio</h2>
<p>The ratio traders watch most closely. Track the global coffee stock-to-use ratio over time to gauge market tightness and anticipate price moves.</p>
<ul>
<li>Global ratio trend chart</li>
<li>Ending stocks vs. total distribution breakdown</li>
<li>Historical context spanning two decades</li>
</ul>
</article>
<article>
<h2>Data Export &amp; API</h2>
<p>Download CSV files or integrate directly with your trading systems via REST API.</p>
<ul>
<li>CSV export of any metric series</li>
<li>RESTful JSON API with Bearer token auth</li>
<li>Rate-limited and logged for security</li>
<li>Commodity listing, time series, and country endpoints</li>
</ul>
</article>
<article>
<h2>Daily Data Pipeline</h2>
<p>Our pipeline extracts data from the USDA PSD Online database, transforms it through a 4-layer SQL pipeline (raw &rarr; staging &rarr; cleaned &rarr; serving), and delivers analytics-ready metrics every day.</p>
<ul>
<li>Automated daily extraction from USDA</li>
<li>SQLMesh + DuckDB transformation pipeline</li>
<li>Incremental processing (only new data each day)</li>
<li>Auditable data lineage</li>
</ul>
</article>
</section>
<section style="text-align: center; margin-top: 3rem;">
<a href="{{ url_for('auth.signup') }}" role="button">Start Free</a>
<a href="{{ url_for('billing.pricing') }}" role="button" class="secondary outline" style="margin-left: 1rem;">View Pricing</a>
</section>
</main>
{% endblock %}

View File

@@ -0,0 +1,91 @@
{% extends "base.html" %}
{% block title %}{{ config.APP_NAME }} - Coffee Market Intelligence for Independent Traders{% endblock %}
{% block content %}
<main class="container">
<!-- Hero -->
<header style="text-align: center; padding: 4rem 0;">
<h1>Coffee Market Intelligence<br>for Independent Traders</h1>
<p style="font-size: 1.25rem; max-width: 640px; margin: 0 auto;">
Track global supply and demand, compare producing countries, and spot trends
with 18+ years of USDA data. No expensive terminal required.
</p>
<div style="margin-top: 2rem;">
<a href="{{ url_for('auth.signup') }}" role="button" style="margin-right: 1rem;">Start Free</a>
<a href="{{ url_for('public.features') }}" role="button" class="secondary outline">See Features</a>
</div>
</header>
<!-- Value Props -->
<section style="padding: 4rem 0;">
<h2 style="text-align: center;">What You Get</h2>
<div class="grid">
<article>
<h3>Supply &amp; Demand Charts</h3>
<p>Global production, exports, imports, ending stocks, and consumption visualized by market year.</p>
</article>
<article>
<h3>Country Analysis</h3>
<p>Compare up to 10 producing countries side-by-side. See who's growing, who's shrinking.</p>
</article>
<article>
<h3>Stock-to-Use Ratio</h3>
<p>The key indicator traders watch. Track the global ratio over time to gauge tightness.</p>
</article>
</div>
<div class="grid">
<article>
<h3>CSV &amp; API Export</h3>
<p>Download data for your own models. Integrate with your trading tools via REST API.</p>
</article>
<article>
<h3>Daily Refresh</h3>
<p>Data pipeline runs daily against USDA PSD Online. Always current, always reliable.</p>
</article>
<article>
<h3>No Lock-in</h3>
<p>Public USDA data, open methodology. You own your exports. Cancel anytime.</p>
</article>
</div>
</section>
<!-- How It Works -->
<section style="background: var(--card-background-color); border-radius: var(--border-radius); padding: 2rem;">
<h2 style="text-align: center;">How It Works</h2>
<div class="grid">
<div style="text-align: center;">
<p style="font-size: 2rem;">1</p>
<h4>Sign Up</h4>
<p>Enter your email, click the magic link. No password needed.</p>
</div>
<div style="text-align: center;">
<p style="font-size: 2rem;">2</p>
<h4>Explore the Dashboard</h4>
<p>Instant access to coffee supply/demand charts and country rankings.</p>
</div>
<div style="text-align: center;">
<p style="font-size: 2rem;">3</p>
<h4>Go Deeper</h4>
<p>Upgrade for full history, CSV exports, and API access for your own models.</p>
</div>
</div>
</section>
<!-- CTA -->
<section style="text-align: center; padding: 4rem 0;">
<h2>Ready to See the Data?</h2>
<p>Free plan includes the last 5 years of coffee market data. No credit card required.</p>
<a href="{{ url_for('auth.signup') }}" role="button">Start Free</a>
</section>
</main>
{% endblock %}

View File

@@ -0,0 +1,92 @@
{% extends "base.html" %}
{% block title %}Privacy Policy - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 800px; margin: 0 auto;">
<header>
<h1>Privacy Policy</h1>
<p><small>Last updated: January 2024</small></p>
</header>
<section>
<h2>1. Information We Collect</h2>
<p>We collect information you provide directly:</p>
<ul>
<li>Email address (required for account creation)</li>
<li>Name (optional)</li>
<li>Payment information (processed by Stripe)</li>
</ul>
<p>We automatically collect:</p>
<ul>
<li>IP address</li>
<li>Browser type</li>
<li>Usage data</li>
</ul>
</section>
<section>
<h2>2. How We Use Information</h2>
<p>We use your information to:</p>
<ul>
<li>Provide and maintain the service</li>
<li>Process payments</li>
<li>Send transactional emails</li>
<li>Improve the service</li>
<li>Respond to support requests</li>
</ul>
</section>
<section>
<h2>3. Information Sharing</h2>
<p>We do not sell your personal information. We may share information with:</p>
<ul>
<li>Service providers (Stripe for payments, Resend for email)</li>
<li>Law enforcement when required by law</li>
</ul>
</section>
<section>
<h2>4. Data Retention</h2>
<p>We retain your data as long as your account is active. Upon deletion, we remove your data within 30 days.</p>
</section>
<section>
<h2>5. Security</h2>
<p>We implement industry-standard security measures including encryption, secure sessions, and regular backups.</p>
</section>
<section>
<h2>6. Cookies</h2>
<p>We use essential cookies for session management. We do not use tracking or advertising cookies.</p>
</section>
<section>
<h2>7. Your Rights</h2>
<p>You have the right to:</p>
<ul>
<li>Access your data</li>
<li>Correct inaccurate data</li>
<li>Delete your account and data</li>
<li>Export your data</li>
</ul>
</section>
<section>
<h2>8. GDPR Compliance</h2>
<p>For EU users: We process data based on consent and legitimate interest. You may contact us to exercise your GDPR rights.</p>
</section>
<section>
<h2>9. Changes</h2>
<p>We may update this policy. We will notify you of significant changes via email.</p>
</section>
<section>
<h2>10. Contact</h2>
<p>For privacy inquiries: {{ config.EMAIL_FROM }}</p>
</section>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,71 @@
{% extends "base.html" %}
{% block title %}Terms of Service - {{ config.APP_NAME }}{% endblock %}
{% block content %}
<main class="container">
<article style="max-width: 800px; margin: 0 auto;">
<header>
<h1>Terms of Service</h1>
<p><small>Last updated: January 2024</small></p>
</header>
<section>
<h2>1. Acceptance of Terms</h2>
<p>By accessing or using {{ config.APP_NAME }}, you agree to be bound by these Terms of Service. If you do not agree, do not use the service.</p>
</section>
<section>
<h2>2. Description of Service</h2>
<p>{{ config.APP_NAME }} provides a software-as-a-service platform. Features and functionality may change over time.</p>
</section>
<section>
<h2>3. User Accounts</h2>
<p>You are responsible for maintaining the security of your account. You must provide accurate information and keep it updated.</p>
</section>
<section>
<h2>4. Acceptable Use</h2>
<p>You agree not to:</p>
<ul>
<li>Violate any laws or regulations</li>
<li>Infringe on intellectual property rights</li>
<li>Transmit harmful code or malware</li>
<li>Attempt to gain unauthorized access</li>
<li>Interfere with service operation</li>
</ul>
</section>
<section>
<h2>5. Payment Terms</h2>
<p>Paid plans are billed in advance. Refunds are handled on a case-by-case basis. We may change pricing with 30 days notice.</p>
</section>
<section>
<h2>6. Termination</h2>
<p>We may terminate or suspend your account for violations of these terms. You may cancel your account at any time.</p>
</section>
<section>
<h2>7. Disclaimer of Warranties</h2>
<p>The service is provided "as is" without warranties of any kind. We do not guarantee uninterrupted or error-free operation.</p>
</section>
<section>
<h2>8. Limitation of Liability</h2>
<p>We shall not be liable for any indirect, incidental, special, or consequential damages arising from use of the service.</p>
</section>
<section>
<h2>9. Changes to Terms</h2>
<p>We may modify these terms at any time. Continued use after changes constitutes acceptance of the new terms.</p>
</section>
<section>
<h2>10. Contact</h2>
<p>For questions about these terms, please contact us at {{ config.EMAIL_FROM }}.</p>
</section>
</article>
</main>
{% endblock %}

View File

@@ -0,0 +1,40 @@
/* BeanFlows Custom Styles */
article {
margin-bottom: 1.5rem;
}
code {
background: var(--code-background-color);
padding: 0.125rem 0.25rem;
border-radius: var(--border-radius);
}
table {
width: 100%;
}
/* HTMX loading indicators */
.htmx-indicator {
display: none;
}
.htmx-request .htmx-indicator {
display: inline;
}
.htmx-request.htmx-indicator {
display: inline;
}
/* Dashboard chart sections */
section canvas {
width: 100% !important;
}
/* Key metric cards */
article header small {
text-transform: uppercase;
letter-spacing: 0.05em;
color: var(--muted-color);
}

View File

@@ -0,0 +1,97 @@
<!DOCTYPE html>
<html lang="en" data-theme="light">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}{{ config.APP_NAME }}{% endblock %}</title>
<!-- Pico CSS -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css">
<!-- Custom styles -->
<link rel="stylesheet" href="{{ url_for('static', filename='css/custom.css') }}">
{% block head %}{% endblock %}
</head>
<body>
<!-- Navigation -->
<nav class="container">
<ul>
<li><a href="{{ url_for('public.landing') }}"><strong>{{ config.APP_NAME }}</strong></a></li>
</ul>
<ul>
<li><a href="{{ url_for('public.features') }}">Features</a></li>
<li><a href="{{ url_for('billing.pricing') }}">Pricing</a></li>
{% if user %}
<li><a href="{{ url_for('dashboard.index') }}">Dashboard</a></li>
{% if session.get('is_admin') %}
<li><a href="{{ url_for('admin.index') }}"><mark>Admin</mark></a></li>
{% endif %}
<li>
<form method="post" action="{{ url_for('auth.logout') }}" style="margin: 0;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<button type="submit" class="outline secondary" style="padding: 0.5rem 1rem; margin: 0;">Sign Out</button>
</form>
</li>
{% else %}
<li><a href="{{ url_for('auth.login') }}">Sign In</a></li>
<li><a href="{{ url_for('auth.signup') }}" role="button">Get Started</a></li>
{% endif %}
</ul>
</nav>
<!-- Flash messages -->
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<div class="container">
{% for category, message in messages %}
<article
style="padding: 1rem; margin-bottom: 1rem;
{% if category == 'error' %}border-left: 4px solid var(--del-color);
{% elif category == 'success' %}border-left: 4px solid var(--ins-color);
{% elif category == 'warning' %}border-left: 4px solid var(--mark-background-color);
{% else %}border-left: 4px solid var(--primary);{% endif %}"
>
{{ message }}
</article>
{% endfor %}
</div>
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
<!-- Footer -->
<footer class="container" style="margin-top: 4rem; padding: 2rem 0; border-top: 1px solid var(--muted-border-color);">
<div class="grid">
<div>
<strong>{{ config.APP_NAME }}</strong>
<p><small>Coffee market intelligence for independent traders.</small></p>
</div>
<div>
<strong>Product</strong>
<ul style="list-style: none; padding: 0;">
<li><a href="{{ url_for('public.features') }}">Features</a></li>
<li><a href="{{ url_for('billing.pricing') }}">Pricing</a></li>
<li><a href="{{ url_for('public.about') }}">About</a></li>
</ul>
</div>
<div>
<strong>Legal</strong>
<ul style="list-style: none; padding: 0;">
<li><a href="{{ url_for('public.terms') }}">Terms</a></li>
<li><a href="{{ url_for('public.privacy') }}">Privacy</a></li>
</ul>
</div>
</div>
<p style="text-align: center; margin-top: 2rem;">
<small>&copy; {{ now.year }} {{ config.APP_NAME }}. All rights reserved.</small>
</p>
</footer>
<!-- HTMX (optional) -->
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
{% block scripts %}{% endblock %}
</body>
</html>

238
web/src/beanflows/worker.py Normal file
View File

@@ -0,0 +1,238 @@
"""
Background task worker - SQLite-based queue (no Redis needed).
"""
import asyncio
import json
import traceback
from datetime import datetime, timedelta
from .core import config, init_db, fetch_one, fetch_all, execute, send_email
# Task handlers registry
HANDLERS: dict[str, callable] = {}
def task(name: str):
"""Decorator to register a task handler."""
def decorator(f):
HANDLERS[name] = f
return f
return decorator
# =============================================================================
# Task Queue Operations
# =============================================================================
async def enqueue(task_name: str, payload: dict = None, run_at: datetime = None) -> int:
"""Add a task to the queue."""
return await execute(
"""
INSERT INTO tasks (task_name, payload, status, run_at, created_at)
VALUES (?, ?, 'pending', ?, ?)
""",
(
task_name,
json.dumps(payload or {}),
(run_at or datetime.utcnow()).isoformat(),
datetime.utcnow().isoformat(),
)
)
async def get_pending_tasks(limit: int = 10) -> list[dict]:
"""Get pending tasks ready to run."""
now = datetime.utcnow().isoformat()
return await fetch_all(
"""
SELECT * FROM tasks
WHERE status = 'pending' AND run_at <= ?
ORDER BY run_at ASC
LIMIT ?
""",
(now, limit)
)
async def mark_complete(task_id: int) -> None:
"""Mark task as completed."""
await execute(
"UPDATE tasks SET status = 'complete', completed_at = ? WHERE id = ?",
(datetime.utcnow().isoformat(), task_id)
)
async def mark_failed(task_id: int, error: str, retries: int) -> None:
"""Mark task as failed, schedule retry if attempts remain."""
max_retries = 3
if retries < max_retries:
# Exponential backoff: 1min, 5min, 25min
delay = timedelta(minutes=5 ** retries)
run_at = datetime.utcnow() + delay
await execute(
"""
UPDATE tasks
SET status = 'pending', error = ?, retries = ?, run_at = ?
WHERE id = ?
""",
(error, retries + 1, run_at.isoformat(), task_id)
)
else:
await execute(
"UPDATE tasks SET status = 'failed', error = ? WHERE id = ?",
(error, task_id)
)
# =============================================================================
# Built-in Task Handlers
# =============================================================================
@task("send_email")
async def handle_send_email(payload: dict) -> None:
"""Send an email."""
await send_email(
to=payload["to"],
subject=payload["subject"],
html=payload["html"],
text=payload.get("text"),
)
@task("send_magic_link")
async def handle_send_magic_link(payload: dict) -> None:
"""Send magic link email."""
link = f"{config.BASE_URL}/auth/verify?token={payload['token']}"
html = f"""
<h2>Sign in to {config.APP_NAME}</h2>
<p>Click the link below to sign in:</p>
<p><a href="{link}">{link}</a></p>
<p>This link expires in {config.MAGIC_LINK_EXPIRY_MINUTES} minutes.</p>
<p>If you didn't request this, you can safely ignore this email.</p>
"""
await send_email(
to=payload["email"],
subject=f"Sign in to {config.APP_NAME}",
html=html,
)
@task("send_welcome")
async def handle_send_welcome(payload: dict) -> None:
"""Send welcome email to new user."""
html = f"""
<h2>Welcome to {config.APP_NAME}!</h2>
<p>Thanks for signing up. We're excited to have you.</p>
<p><a href="{config.BASE_URL}/dashboard">Go to your dashboard</a></p>
"""
await send_email(
to=payload["email"],
subject=f"Welcome to {config.APP_NAME}",
html=html,
)
@task("cleanup_expired_tokens")
async def handle_cleanup_tokens(payload: dict) -> None:
"""Clean up expired auth tokens."""
await execute(
"DELETE FROM auth_tokens WHERE expires_at < ?",
(datetime.utcnow().isoformat(),)
)
@task("cleanup_rate_limits")
async def handle_cleanup_rate_limits(payload: dict) -> None:
"""Clean up old rate limit entries."""
cutoff = (datetime.utcnow() - timedelta(hours=1)).isoformat()
await execute("DELETE FROM rate_limits WHERE timestamp < ?", (cutoff,))
@task("cleanup_old_tasks")
async def handle_cleanup_tasks(payload: dict) -> None:
"""Clean up completed/failed tasks older than 7 days."""
cutoff = (datetime.utcnow() - timedelta(days=7)).isoformat()
await execute(
"DELETE FROM tasks WHERE status IN ('complete', 'failed') AND created_at < ?",
(cutoff,)
)
# =============================================================================
# Worker Loop
# =============================================================================
async def process_task(task: dict) -> None:
"""Process a single task."""
task_name = task["task_name"]
task_id = task["id"]
retries = task.get("retries", 0)
handler = HANDLERS.get(task_name)
if not handler:
await mark_failed(task_id, f"Unknown task: {task_name}", retries)
return
try:
payload = json.loads(task["payload"]) if task["payload"] else {}
await handler(payload)
await mark_complete(task_id)
print(f"[WORKER] Completed: {task_name} (id={task_id})")
except Exception as e:
error = f"{e}\n{traceback.format_exc()}"
await mark_failed(task_id, error, retries)
print(f"[WORKER] Failed: {task_name} (id={task_id}): {e}")
async def run_worker(poll_interval: float = 1.0) -> None:
"""Main worker loop."""
print("[WORKER] Starting...")
await init_db()
while True:
try:
tasks = await get_pending_tasks(limit=10)
for task in tasks:
await process_task(task)
if not tasks:
await asyncio.sleep(poll_interval)
except Exception as e:
print(f"[WORKER] Error: {e}")
await asyncio.sleep(poll_interval * 5)
async def run_scheduler() -> None:
"""Schedule periodic cleanup tasks."""
print("[SCHEDULER] Starting...")
await init_db()
while True:
try:
# Schedule cleanup tasks every hour
await enqueue("cleanup_expired_tokens")
await enqueue("cleanup_rate_limits")
await enqueue("cleanup_old_tasks")
await asyncio.sleep(3600) # 1 hour
except Exception as e:
print(f"[SCHEDULER] Error: {e}")
await asyncio.sleep(60)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "scheduler":
asyncio.run(run_scheduler())
else:
asyncio.run(run_worker())