From 9d0e6843f4714b9b401a23059be83da175a09dcb Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 10:36:14 +0100 Subject: [PATCH 1/9] feat(secrets): add SOPS+age secret management infrastructure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - .sops.yaml: creation rules matching .env.{dev,prod}.sops (dotenv format) - .env.dev.sops: encrypted dev defaults (blank API keys, local paths) - .env.prod.sops: encrypted prod template (placeholder values to fill in) - Makefile: root Makefile with secrets-decrypt-dev/prod, secrets-edit-dev/prod, css-build/watch - .gitignore: add age-key.txt Dev workflow: make secrets-decrypt-dev → .env (repo root) → web app picks it up. Server: deploy.sh will auto-decrypt .env.prod.sops on each deploy. Co-Authored-By: Claude Opus 4.6 --- .env.dev.sops | 40 ++++++++++++++++++++++++++++++++++++++++ .env.prod.sops | 42 ++++++++++++++++++++++++++++++++++++++++++ .gitignore | 2 ++ .sops.yaml | 5 +++++ Makefile | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 121 insertions(+) create mode 100644 .env.dev.sops create mode 100644 .env.prod.sops create mode 100644 .sops.yaml create mode 100644 Makefile diff --git a/.env.dev.sops b/.env.dev.sops new file mode 100644 index 0000000..b323b06 --- /dev/null +++ b/.env.dev.sops @@ -0,0 +1,40 @@ +#ENC[AES256_GCM,data:Y03dMA==,iv:Bq1MfZ/aVo4raoH/Y7xsIen4s5GIg4eArrI929pqGzo=,tag:u/C1H0R17wq/vfpYdLX5sw==,type:comment] +APP_NAME=ENC[AES256_GCM,data:Qr9bssqHfYQm,iv:cAk645WZat/v4T42Tb88aV8X4hmsfJoigijoFu4VDRw=,tag:QSMpmvYmWCf3LybYvks0LQ==,type:str] +SECRET_KEY=ENC[AES256_GCM,data:66PF8cO7v0q5Vvss63uay+lolTb0z92axWKrprvk+DS/qgukFy7l9/bY5vtxX0wchtgKRJHgqWA0N+m8mmXR4w==,iv:wt/NI1pNE1PqmNiw+KUXd8F68uzgJ3RquinLi4NNsCo=,tag:33QqpgzUGQwSx/wG9/cncQ==,type:str] +BASE_URL=ENC[AES256_GCM,data:B83TzqeS6/cBM0knti+LNQ4BSVq5,iv:PR5UxvFsBKvxfhMzTztwIyrMcp/5YPi69/jmTuC/RXs=,tag:r0ZLyug0pOIbAsXCTRWV3Q==,type:str] +DEBUG=ENC[AES256_GCM,data:1QyyOA==,iv:VeputT2ParZesM5XLealrSbWPfk1uzMV5KdoHUhBuNg=,tag:KuAU9Iqvrw66XvKlF5CDUQ==,type:str] +ADMIN_EMAILS=ENC[AES256_GCM,data:e/kgeIVJS81PiVqU/+JOe1gFL5waw7aOKAqcvh7WYM98zoZOvlhdL8N9xCTOE6Q=,iv:SNZkABWIwxQdts+4N97G4YMcazDPyY5R/S477WceofY=,tag:SyBZ+/zUuo+jIsuZImmr2A==,type:str] +#ENC[AES256_GCM,data:6wzQxW+1Cnbv,iv:kuirtO5MAxDX4V4McgNPyduDZjNDtXKVGXWg8edpJzk=,tag:CGJPiaLtyv78YkYMtnOCjQ==,type:comment] +DATABASE_PATH=ENC[AES256_GCM,data:zzeUqqc8+ArsARg=,iv:IoWSmT9lKLfntTfh4r7XnUWxt3mBDzFXmp+kP0UWO9o=,tag:LeiTpfzCSssFrKwDTNHV+g==,type:str] +DUCKDB_PATH=ENC[AES256_GCM,data:aqAOTsKU7rCV1eDm,iv:MQ+eWfsajjbmkMDzJzqnVDrzHuyK3A5wp5Vu1Wf3Fgc=,tag:D8K5Ir8MrwrRJRgRnXHmXA==,type:str] +SERVING_DUCKDB_PATH=ENC[AES256_GCM,data:i9WF2WVcczBEs3pybXUifw==,iv:Lxjr41YMSzA71QIs9gWZPXK+rGjHTWdIWY+EfcCdWpA=,tag:TfHX1bknieSsHeOB5bXtHA==,type:str] +#ENC[AES256_GCM,data:EHV03/8=,iv:ugMJxjVydjldxo47/wVzMRfkZYeQoSpHKza/WhrZeGw=,tag:LHjHjSn6TJgTvuJyZEckDw==,type:comment] +MAGIC_LINK_EXPIRY_MINUTES=ENC[AES256_GCM,data:TmM=,iv:EDbj/eRhoUI0Z6VGmZG737e+WeMXFKcV4R5PvDadLdI=,tag:s62JFPJqUhW7PwNgnJnudA==,type:str] +SESSION_LIFETIME_DAYS=ENC[AES256_GCM,data:lHA=,iv:scl0hJiJGcDzXC490vbnObdIPgFUHBdNGMg6z88zEzs=,tag:OTQVOJysJ5WmqWOrqcOtOg==,type:str] +#ENC[AES256_GCM,data:a8BGZeTIKeFsSU8CHvhO,iv:mWD87KNtwDDfk0Qz8YZeseBxG4PPpi4y+Ol31wWLw70=,tag:0PhSpVgRvPRqyWfMY3oDUg==,type:comment] +RESEND_API_KEY= +EMAIL_FROM=ENC[AES256_GCM,data:gPTft3EjtqY5eYVIMGhI3QRiYRmrLQ==,iv:opAwBOoeWtJU82EWj7rwUVQMh5adXumoCAnUqq36anQ=,tag:Pt3zt3KLTyxatUz3Ycx++A==,type:str] +#ENC[AES256_GCM,data:ueoir67O+Q==,iv:LVVN8NYUYItQ0uVnCQ5DvokL+AUrrodjt+6dPfVXmH8=,tag:/ULp8fiJ8hA9h5ylWbwxrA==,type:comment] +PADDLE_API_KEY= +PADDLE_WEBHOOK_SECRET= +PADDLE_ENVIRONMENT=ENC[AES256_GCM,data:PWCtxtSvFA==,iv:7s1xEJQlHgZ36RluRy/2W6C3YIXyTHoKENNHWCmhkjM=,tag:eAVTxntBqdUU77WHAW0C8g==,type:str] +PADDLE_PRICE_STARTER= +PADDLE_PRICE_PRO= +#ENC[AES256_GCM,data:MT0H2hNYQMJ2zGNnbAw=,iv:v7YbUgEUXBZ2VK5iWdfq6nYG+odfM1sO17W6jBUP1PI=,tag:LrbrLm8qR8hX8aBK70gIRQ==,type:comment] +RATE_LIMIT_REQUESTS=ENC[AES256_GCM,data:JHTa,iv:1XJ36DdmxMC25KdHWHAR1O9kYr4jf/oo9oPUEk52Le0=,tag:yPyu5LCleMe0OljyWzVLUQ==,type:str] +RATE_LIMIT_WINDOW=ENC[AES256_GCM,data:1G8=,iv:c435cmq4kWSLXDa6IZ3giJisj5FTFJ0VeWySB+Qfr+o=,tag:dirUvtSkAkE3gyYRmQBcEw==,type:str] +#ENC[AES256_GCM,data:n0/C4SL7Jf9l,iv:sYUVR07+nelY4nM5JkT9bxWPVLh9FHOUiLAvsu0INIE=,tag:+L0l5KY9sj+Y3hfi8UOEgw==,type:comment] +WAITLIST_MODE=ENC[AES256_GCM,data:b69b3Ws=,iv:Bvc8KJoS8eI/a3w/a6hoEfixNgWrETPM5D+8zKH+Wnw=,tag:bwd5MegmT+1kTMvaWtnmAw==,type:str] +RESEND_AUDIENCE_WAITLIST= +#ENC[AES256_GCM,data:QgdFxg4o2osH9TezpP/18eo2,iv:Ku/qa0Ykn5GkntFelPf3nqWEonisbqiLWbcI71vilN8=,tag:vxDFx6aoLJMhaBkBb62sbQ==,type:comment] +UMAMI_SCRIPT_URL= +UMAMI_WEBSITE_ID= +#ENC[AES256_GCM,data:0yFJzsRAZzgc4sibGIHsXPWiYJgcPw==,iv:kKnxkVjNJTG4Q/Y1J/EXBszowshhqTE0BKxU+3zwJi8=,tag:ENfyo73mQYDqhW9rpyfZAA==,type:comment] +LANDING_DIR=ENC[AES256_GCM,data:VBPmCA0MrYEFWs1T,iv:gZD0iZgxcSghqnUgdIO3XB8p+2HgND6kj2YhTFSPYKE=,tag:lQd7nli93iOL3tIi9j7o7A==,type:str] +ALERT_WEBHOOK_URL= +sops_age__list_0__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSBKenhTSE93QTJDVzMrUEJM\nZ2ladHNKYlRNQnNxempDRmMyQWQ1allmNzBNCm5STXdTZVRlMzVKLzlMdnhrTjBS\ndmoyK25SQ2FUb21FQjJEYXFVM3RiOGsKLS0tIGFMTlpDOXpackFCZ0x4dCtldklv\nUkcvaTl3aDh6bnJWZHhrY2xiUmVBa0EKZrmColawZ+jYQMjvQQRu4h8RaZHY9bMU\nujsQy81VDQk27VtMnG/gURQzz8h0A1BmMC9C7tlBJ+iUaAVZ6JKfoQ==\n-----END AGE ENCRYPTED FILE-----\n +sops_age__list_0__map_recipient=age1f5002gj4s78jju45jd28kuejtcfhn5cdujz885fl7z2p9ym68pnsgky87a +sops_lastmodified=2026-02-26T09:32:40Z +sops_mac=ENC[AES256_GCM,data:aTXxTIvl/yzdws4HM9achusrJdMeXnbf5cqz3u0K0lY/HB1/R/W67DZSDJJ/qs1yu0DdLMq0G2NLFvzbQybzRLhrL8tsxLPFWAZec7o0aCaoopixNiBgzQZWjjZLC3DtJvmVPrcRgTfMV7ced4PPwuCQFCp3/qM5E5EuQFitJPc=,iv:Xz7WAnk92FJFZT2cI8ZeyjCImN8EhQsrFgPBDCoa/Gw=,tag:slFr8zxsvFy8jSyikS+e/w==,type:str] +sops_unencrypted_suffix=_unencrypted +sops_version=3.12.1 diff --git a/.env.prod.sops b/.env.prod.sops new file mode 100644 index 0000000..4bafa0f --- /dev/null +++ b/.env.prod.sops @@ -0,0 +1,42 @@ +#ENC[AES256_GCM,data:tnioMw==,iv:oCs2UJy56trVLbUaqdrqPtUCOBoSgtxTno7vVRYXRF4=,tag:8pm3SK8VDfFklgHXkAhVkQ==,type:comment] +APP_NAME=ENC[AES256_GCM,data:EE99qBVM6sPa,iv:C27vqa2qIha2warNZ+VwhAnh1q9rHFUcgVzhOrpc3fA=,tag:GwPXWe2oH4wbMIk00B+Dew==,type:str] +SECRET_KEY=ENC[AES256_GCM,data:SuPcge43Als8ZqgHm+9nLiwjCv0JqR56,iv:DnV5aEz7QoRN8s4jLuw+1n7esG3DoscuZHZT4YcuucY=,tag:xrSkH48rK1UW+biJrZZAvg==,type:str] +BASE_URL=ENC[AES256_GCM,data:BKdN5lGK1i7j7zZ7HMqarwgHp5AZxd6R,iv:yeXc/48+Zjd6vgKjP+Pe9aIgsB7zprIObpSteMls3fk=,tag:tQiSLSM1dTSHTO2350skUA==,type:str] +DEBUG=ENC[AES256_GCM,data:ntBp/hw=,iv:906FN6bz3SHoEclG7MquCNUhHa9wDD2PyhxTDCVFUGk=,tag:fUgh61rftbmunJwNquyL/A==,type:str] +ADMIN_EMAILS=ENC[AES256_GCM,data:W7kmtrgck47tGpiHy4bIoF7TZouqjNGPHK+zQoZvxT9iz1reuHbP6bXUfuMzsh0=,iv:GXkKRbComRXAVLzif8DV14IySjzRkAg/U9DUj4ytEjE=,tag:6iKYsgbhDgjDQbwZM6hSNg==,type:str] +#ENC[AES256_GCM,data:tIhB0x4AbNjs,iv:fkmVB5Cfa11g4YVXGEXPPnGDznhoMm+J108L/ZRkCn8=,tag:y7tqZ7cQ64A3ArM/MmfTlw==,type:comment] +DATABASE_PATH=ENC[AES256_GCM,data:Rzif9KAhrVn/F3U=,iv:VgXwn8b38/dFkiTYHDiKe660eWtGPdbeMPC4Xc2RPHk=,tag:OSlbuCeQHcVigj0zxnH+5Q==,type:str] +DUCKDB_PATH=ENC[AES256_GCM,data:UWMI9RTAHBNgb9EOxnmKUZovyGedu/xz5/yoOFpd,iv:oWVAoDtboVAC+SCTf+b/mQ+zzCGSRTrf3fjt1femqng=,tag:B46K6jTM0iVWQvL1FJlbyg==,type:str] +SERVING_DUCKDB_PATH=ENC[AES256_GCM,data:Y3bouhWcgp3d9v1KGuXuPZIFiIe/WKnVwEVs799T,iv:uTpVqvRYOhUKM2JNiFsX/YK/sfmajWI899vtmuWuozA=,tag:z8ASJTKzG6lSUBLuvzciwQ==,type:str] +#ENC[AES256_GCM,data:E3cNcRc=,iv:GR/I/NNyv/Ha6ZMH8nd0GZstJLI9MNLCutEKefuBDpk=,tag:dHOwKaKKPoWSt2TiVJVXJA==,type:comment] +MAGIC_LINK_EXPIRY_MINUTES=ENC[AES256_GCM,data:w1I=,iv:CGm9QV5OeVaDVBbRXJL/qO7RnOeSemG+zh3QCgww688=,tag:lfv4wxdx4hzFRC8vPu0Txg==,type:str] +SESSION_LIFETIME_DAYS=ENC[AES256_GCM,data:9fA=,iv:uBe1LugrsipQpOQX3wLFf4Er+v1SIQKNEcdglsmDwKM=,tag:g5lyQgBUCpWNWb2bkCmS3Q==,type:str] +#ENC[AES256_GCM,data:Rd7HVrAHuomB78FCbYDB,iv:kxl7/gArMFCkWuQiv+hXWxCzgNkwDbe2WMs7p9/rlXQ=,tag:+IOGQO/HziVl32CDjiI9Pg==,type:comment] +RESEND_API_KEY=ENC[AES256_GCM,data:srgytZ80mgTWF9DePH8QUR6TqrxI,iv:fCttiplfgdso2lKT2wPaS57SZ3npu0r2GIMnZLcAi7Q=,tag:k7OrEr2J5ikDWeDdZ6raRg==,type:str] +EMAIL_FROM=ENC[AES256_GCM,data:oI1SUEpq5lbRT1FmHQ7QecDSj222kQ==,iv:ou981i5Ksx89IzDmudYFVuKWnHqXFXfcMI1jLwBAtPQ=,tag:QYmUIsgcqccmgrOJX+1Kvg==,type:str] +#ENC[AES256_GCM,data:BLQ9NzKrxA==,iv:7Lc0e7NxwMWZ3T405KAdaNXWtGnnHHWcp6oI8m2GJio=,tag:/NMk8DWNjxrRoDcYjDjvPQ==,type:comment] +PADDLE_API_KEY=ENC[AES256_GCM,data:fS/C0Iygf+S1xjss49D2w8/LlcfI,iv:wLNuuqpBGnClizMRTIRtMdsu8SytU5p13zpkLbXEnNI=,tag:4//Cj5GQ/EolpKxOyEMkNg==,type:str] +PADDLE_WEBHOOK_SECRET=ENC[AES256_GCM,data:8Z/ODGntXsms8i+p+enaBVZjJuUa9ZIe,iv:NBr4IlxG60eQf7E43oDCCKKKDYeQSB1zMXL/z4YckP8=,tag:M4bF4y74bdLZgQ5dWkHFnQ==,type:str] +PADDLE_ENVIRONMENT=ENC[AES256_GCM,data:R/ScKVocPj4U2w==,iv:vXLNTdmyL+P2gOCWRr0I/stijTVOkHvHZbFAMHsLMEM=,tag:ov9jXtf5v9r9yLitsKh+YQ==,type:str] +PADDLE_PRICE_STARTER=ENC[AES256_GCM,data:q1PG9iI2ISR2ydOrL7B1agMaeGP9,iv:JSpx0RT+e1ohuy6kyKMfmZqw/Oq9dT8Vs13/e+dZnyk=,tag:AREcvK1Bm2jaunctp0yHWg==,type:str] +PADDLE_PRICE_PRO=ENC[AES256_GCM,data:qk74BtToWDvY32eaYKyB1G3q+znH,iv:TLwWA7erfJPQmuw9L8P3G/pDbkTNJjbbdffYYl4+1kA=,tag:TlJFnC3o7Bwl8/MU5Qkb6g==,type:str] +#ENC[AES256_GCM,data:JeFAjIIPFnY5Jb8xZUA=,iv:OcB3V+3APid4wVIOVJlZQHCEcrkmiduzwaFPzToxEAo=,tag:ogQ8UX2PTc1RqTyAO5B9jw==,type:comment] +RATE_LIMIT_REQUESTS=ENC[AES256_GCM,data:c78c,iv:f7ZIb5n/f4DeMg5WKzVE/lbgfT7RfftnB3amrvuviE8=,tag:nPAI9P9oTV84cHWXOmYacw==,type:str] +RATE_LIMIT_WINDOW=ENC[AES256_GCM,data:rTs=,iv:s4ns8X4FPtOdmNtZ35xwgMk5F+kdiAnz0BKdhf6qN3k=,tag:6RSI4kp9ENb5iNj7jXY86Q==,type:str] +#ENC[AES256_GCM,data:IiDU8DxK2LgK,iv:n0zJ+UixDFs2u1rLSxJ/VnWXYJZ8Vda/BQdyS+RujEE=,tag:GfVtYNoHmy9GX5+ZW7QjPg==,type:comment] +WAITLIST_MODE=ENC[AES256_GCM,data:e0tSBHY=,iv:L83mH2xgqLakaq9wb4RymKeXb7l67MNo38zGmSbhi48=,tag:i0z/OalFlgvj/lP4ipzfYQ==,type:str] +RESEND_AUDIENCE_WAITLIST=ENC[AES256_GCM,data:FcQEW8NGrdY7naM1LZuqaAEllNpMjIV9,iv:v0XxXCsjmk1rigORy8vrf1NNzYfn093x2sNb1JAPXuY=,tag:XjLmhewcV3M+Lk4zUhIWbg==,type:str] +#ENC[AES256_GCM,data:LgHFs0MBe0NfkE0DMJNYUkZh,iv:/C+IKpNQgSbOcwW9+1wN2gfwtY/OT5InkFDyJdPNw/M=,tag:jqEcXMfhowRVNSnrSs3ENg==,type:comment] +UMAMI_SCRIPT_URL=ENC[AES256_GCM,data:85Nyjy8Rho38dyerGD5Mmw==,iv:+MXncm4quelDuV4QTI2Qqgt9G9ZffIkVDYpIdfOVI5Y=,tag:6LVNGEipfo+XWfdA6g7O5w==,type:str] +UMAMI_WEBSITE_ID=ENC[AES256_GCM,data:ArK+fRNSVlXQBnbCOl6+,iv:1nhATMUcBq9m+GLGlkVXaJhFOH9yVfngux7ZPi1bzLM=,tag:SJSSl8G9rztaCbf49e54eQ==,type:str] +#ENC[AES256_GCM,data:zx6ieYt6brZX6IrIgGkfGCqDlf0FOw==,iv:3dBgRYc9eI/Dhx109NUMh2yW2Fmqegg0n3rsjcbzJEw=,tag:4lbfJT/n1T53D0peeI4IhQ==,type:comment] +LANDING_DIR=ENC[AES256_GCM,data:3YAGFB10q6g6ZLIHdDuvzMaD59+E,iv:S9NVxU/w+cwU1OPWjOEjnG8ocMdWrqR9VG4rFa4h4uA=,tag:0vq5Cn0Di1cUmbLrv1C1Uw==,type:str] +ALERT_WEBHOOK_URL=ENC[AES256_GCM,data:ARYR45VFPLX37u5UNn9fJeBNXDj8,iv:rWDphUHYX/nLD46fDNfx3ZyFEbYK1hMksHCGqWTI66o=,tag:qE1FR6Sj+k07Yb+SlV3Vgw==,type:str] +#ENC[AES256_GCM,data:ySDq589xP4ZwGD5JTQxh1Lr89h8zoz7RDLYfSl2Up/TSFF1tqA==,iv:oBQMgWLlT+r4TbtdLPSs7q7stg/qnEEbsu65+HjGBqQ=,tag:JiySwKWJIuZbEsY0sWJnQA==,type:comment] +GITLAB_READ_TOKEN=ENC[AES256_GCM,data:JRxX3H9mj3DCa0kyi7aGqvop,iv:W/oqCW7sDv791VclZteW0M+jkab3unGVWJoB//w4FJ4=,tag:3FJbkKPxH/obs67Hcd80+A==,type:str] +sops_age__list_0__map_enc=-----BEGIN AGE ENCRYPTED FILE-----\nYWdlLWVuY3J5cHRpb24ub3JnL3YxCi0+IFgyNTUxOSArZU8rVW8wZW9vd2RwbVV1\ndWlPV3gzSDhsbndQNC9mbnJpejdCWXdIYlU4CmU4MXorYTlwY0krNm4vSytXTGcz\nNTY1UXA2QzFjaENXVTZWME5YZk16eU0KLS0tIDg1YnA3UGhDa1BpK3F4VFN5TFJq\nZXB4eVMvNytWZlFzWGNycDBDOGJ2RWMKvrVwXOWClAjlGT95pm1eDIabbVjLH5Nt\nfTwn0f5aVQ9I40AoUi/qRoCdFtdMupSAEjlCq5P0/A+WvVZfFp45lg==\n-----END AGE ENCRYPTED FILE-----\n +sops_age__list_0__map_recipient=age1f5002gj4s78jju45jd28kuejtcfhn5cdujz885fl7z2p9ym68pnsgky87a +sops_lastmodified=2026-02-26T09:35:35Z +sops_mac=ENC[AES256_GCM,data:nAp6AHWjro8Xv+e1PIH+rGur9N3bRNgVfCE8f8YiLUIuZPWCkTjpN5n+cGTGc/2vw/DB8qSQ0WH72WPcgT8odOz0YAJEpp1ejvvXZfuo8uOYfPZeTiAOByOAS6an9BqkRyMMKR3KTEh0DevvwGKQO+iN4FRT1Ey8CDrWle61Y0U=,iv:3aaJoF5JY8uKnIHOCB2CbxbhbYz1gmB/JNoMTBoZ83Q=,tag:unYD+L7le3CnCgm1Zkz8tQ==,type:str] +sops_unencrypted_suffix=_unencrypted +sops_version=3.12.1 diff --git a/.gitignore b/.gitignore index 4a401c3..7ad3159 100644 --- a/.gitignore +++ b/.gitignore @@ -184,6 +184,8 @@ data/ .claude/worktrees/ +age-key.txt + .bedrock-state .bedrockapikey toggle-bedrock.sh diff --git a/.sops.yaml b/.sops.yaml new file mode 100644 index 0000000..dda8846 --- /dev/null +++ b/.sops.yaml @@ -0,0 +1,5 @@ +creation_rules: + - path_regex: \.env\.(dev|prod)\.sops$ + # Developer workstation key. Add server key after running infra/setup_server.sh. + # To add the server key: update this file, then run: sops updatekeys .env.dev.sops .env.prod.sops + age: age1f5002gj4s78jju45jd28kuejtcfhn5cdujz885fl7z2p9ym68pnsgky87a diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3370002 --- /dev/null +++ b/Makefile @@ -0,0 +1,32 @@ +TAILWIND := web/bin/tailwindcss + +web/bin/tailwindcss: + @mkdir -p web/bin + curl -sLo web/bin/tailwindcss https://github.com/tailwindlabs/tailwindcss/releases/latest/download/tailwindcss-linux-x64 + chmod +x web/bin/tailwindcss + +css-build: web/bin/tailwindcss + $(TAILWIND) -i web/src/beanflows/static/css/input.css -o web/src/beanflows/static/css/output.css --minify + +css-watch: web/bin/tailwindcss + $(TAILWIND) -i web/src/beanflows/static/css/input.css -o web/src/beanflows/static/css/output.css --watch + +# -- Secrets (SOPS + age) -- +# .env.*.sops files use dotenv format but sops can't infer from the extension, +# so we pass --input-type / --output-type explicitly. + +SOPS_DOTENV := sops --input-type dotenv --output-type dotenv + +secrets-decrypt-dev: + $(SOPS_DOTENV) --decrypt .env.dev.sops > .env + +secrets-decrypt-prod: + $(SOPS_DOTENV) --decrypt .env.prod.sops > .env + +secrets-edit-dev: + $(SOPS_DOTENV) .env.dev.sops + +secrets-edit-prod: + $(SOPS_DOTENV) .env.prod.sops + +.PHONY: css-build css-watch secrets-decrypt-dev secrets-decrypt-prod secrets-edit-dev secrets-edit-prod From 6d716a83ae3d3a0f5339b37f6499998cb02502bf Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 10:44:25 +0100 Subject: [PATCH 2/9] feat(secrets): rewrite secrets.py for SOPS, update cli.py secrets.py: replace Pulumi ESC (esc CLI) with SOPS decrypt. Reads .env.prod.sops via `sops --decrypt`, parses dotenv output. Same public API: get_secret(), list_secrets(), test_connection(). cli.py: update secrets subcommand help text and test command messaging. Co-Authored-By: Claude Opus 4.6 --- src/materia/cli.py | 12 ++++----- src/materia/secrets.py | 55 ++++++++++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/src/materia/cli.py b/src/materia/cli.py index 0b77e61..8e52517 100644 --- a/src/materia/cli.py +++ b/src/materia/cli.py @@ -106,7 +106,7 @@ def pipeline_list(): typer.echo(f" • {name:<15} (command: {cmd}, timeout: {config['timeout_seconds']}s)") -secrets_app = typer.Typer(help="Manage secrets via Pulumi ESC") +secrets_app = typer.Typer(help="Manage secrets via SOPS + age") app.add_typer(secrets_app, name="secrets") @@ -142,15 +142,15 @@ def secrets_get( @secrets_app.command("test") def secrets_test(): - """Test ESC connection and authentication.""" + """Test sops decryption (verifies sops is installed and age key is present).""" from materia.secrets import test_connection - typer.echo("Testing Pulumi ESC connection...") + typer.echo("Testing SOPS decryption...") if test_connection(): - typer.echo("✓ ESC connection successful") + typer.echo("✓ SOPS decryption successful") else: - typer.echo("✗ ESC connection failed", err=True) - typer.echo("\nMake sure you've run: esc login") + typer.echo("✗ SOPS decryption failed", err=True) + typer.echo("\nMake sure sops is installed and your age key is at ~/.config/sops/age/keys.txt") raise typer.Exit(1) diff --git a/src/materia/secrets.py b/src/materia/secrets.py index f2464eb..691594a 100644 --- a/src/materia/secrets.py +++ b/src/materia/secrets.py @@ -1,44 +1,67 @@ -"""Secrets management via Pulumi ESC.""" +"""Secrets management via SOPS + age.""" -import json import subprocess from functools import lru_cache +from pathlib import Path + +# Default secrets file path (relative to repo root) +_DEFAULT_SECRETS_PATH = Path(__file__).parent.parent.parent / ".env.prod.sops" + + +def _parse_dotenv(text: str) -> dict[str, str]: + """Parse dotenv-format text into a dict, skipping comments and blanks.""" + result = {} + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, _, value = line.partition("=") + result[key.strip()] = value.strip() + return result @lru_cache(maxsize=1) -def _load_environment() -> dict[str, str]: - """Load secrets from Pulumi ESC environment.""" +def _load_environment(secrets_path: str = None) -> dict[str, str]: + """Decrypt and load secrets from a SOPS-encrypted dotenv file.""" + path = Path(secrets_path) if secrets_path else _DEFAULT_SECRETS_PATH + assert path.exists(), f"Secrets file not found: {path}" try: result = subprocess.run( - ["esc", "env", "open", "beanflows/prod", "--format", "json"], + ["sops", "--input-type", "dotenv", "--output-type", "dotenv", "--decrypt", str(path)], capture_output=True, text=True, check=True, + timeout=30, ) - data = json.loads(result.stdout) - return data.get("environmentVariables", {}) + return _parse_dotenv(result.stdout) except subprocess.CalledProcessError as e: - raise RuntimeError(f"Failed to load ESC environment: {e.stderr}") + raise RuntimeError(f"Failed to decrypt secrets: {e.stderr.strip()}") except FileNotFoundError: - raise RuntimeError("ESC CLI not found. Install with: curl -fsSL https://get.pulumi.com/esc/install.sh | sh") + raise RuntimeError( + "sops not found. Install with: brew install sops " + "or see https://github.com/getsops/sops/releases" + ) -def get_secret(key: str) -> str | None: +def get_secret(key: str, secrets_path: str = None) -> str | None: """Get a secret value by key.""" - env = _load_environment() + env = _load_environment(secrets_path) return env.get(key) -def list_secrets() -> list[str]: +def list_secrets(secrets_path: str = None) -> list[str]: """List all available secret keys.""" - env = _load_environment() + env = _load_environment(secrets_path) return list(env.keys()) -def test_connection() -> bool: - """Test ESC connection.""" +def test_connection(secrets_path: str = None) -> bool: + """Test that sops is available and can decrypt the secrets file.""" try: - _load_environment() + _load_environment.cache_clear() + _load_environment(secrets_path) return True except Exception: return False From 643c0b2db9d2e65bd79d73146ecaebd92914ecea Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 10:47:34 +0100 Subject: [PATCH 3/9] feat(secrets): update core.py dotenv to load from repo root .env Load .env from repo root first (created by `make secrets-decrypt-dev`), falling back to web/.env for legacy setups. Also fixes import sort order and removes unused httpx import. Co-Authored-By: Claude Opus 4.6 --- web/src/beanflows/core.py | 135 ++++++++++++++++++++++++++------------ 1 file changed, 92 insertions(+), 43 deletions(-) diff --git a/web/src/beanflows/core.py b/web/src/beanflows/core.py index e9b5c0d..c404a88 100644 --- a/web/src/beanflows/core.py +++ b/web/src/beanflows/core.py @@ -1,29 +1,33 @@ """ Core infrastructure: database, config, email, and shared utilities. """ + +import hashlib +import hmac import os import random import secrets -import hashlib -import hmac +from contextvars import ContextVar +from datetime import datetime, timedelta +from functools import wraps +from pathlib import Path import aiosqlite import resend -import httpx -from pathlib import Path -from functools import wraps -from datetime import datetime, timedelta -from contextvars import ContextVar -from quart import g, make_response, render_template, request, session from dotenv import load_dotenv +from quart import g, make_response, render_template, request, session -# web/.env is three levels up from web/src/beanflows/core.py -load_dotenv(Path(__file__).parent.parent.parent / ".env", override=False) +# Load .env from repo root first (created by `make secrets-decrypt-dev`), +# fall back to web/.env for legacy local dev setups. +_repo_root = Path(__file__).parent.parent.parent.parent +load_dotenv(_repo_root / ".env", override=False) +load_dotenv(_repo_root / "web" / ".env", override=False) # ============================================================================= # Configuration # ============================================================================= + class Config: APP_NAME: str = os.getenv("APP_NAME", "BeanFlows") SECRET_KEY: str = os.getenv("SECRET_KEY", "change-me-in-production") @@ -53,7 +57,9 @@ class Config: ADMIN_EMAILS: list[str] = [ e.strip().lower() - for e in os.getenv("ADMIN_EMAILS", "hendrik@beanflow.coffee,simon@beanflows.coffee").split(",") + for e in os.getenv("ADMIN_EMAILS", "hendrik@beanflow.coffee,simon@beanflows.coffee").split( + "," + ) if e.strip() ] @@ -66,7 +72,14 @@ class Config: PLAN_FEATURES: dict = { "free": ["dashboard", "coffee_only", "limited_history"], "starter": ["dashboard", "coffee_only", "full_history", "export", "api"], - "pro": ["dashboard", "all_commodities", "full_history", "export", "api", "priority_support"], + "pro": [ + "dashboard", + "all_commodities", + "full_history", + "export", + "api", + "priority_support", + ], } PLAN_LIMITS: dict = { @@ -165,6 +178,7 @@ class transaction: await self.db.rollback() return False + # ============================================================================= # Email # ============================================================================= @@ -175,8 +189,12 @@ EMAIL_ADDRESSES = { async def send_email( - to: str, subject: str, html: str, text: str = None, - from_addr: str = None, template: str = None, + to: str, + subject: str, + html: str, + text: str = None, + from_addr: str = None, + template: str = None, ) -> bool: """Send email via Resend SDK and log to email_log table.""" if not config.RESEND_API_KEY: @@ -191,13 +209,15 @@ async def send_email( provider_id = None error_msg = None try: - result = resend.Emails.send({ - "from": from_addr or config.EMAIL_FROM, - "to": to, - "subject": subject, - "html": html, - "text": text or html, - }) + result = resend.Emails.send( + { + "from": from_addr or config.EMAIL_FROM, + "to": to, + "subject": subject, + "html": html, + "text": text or html, + } + ) provider_id = result.get("id") if isinstance(result, dict) else None except Exception as e: error_msg = str(e) @@ -206,15 +226,24 @@ async def send_email( await execute( """INSERT INTO email_log (recipient, subject, template, status, provider_id, error, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", - (to, subject, template, "error" if error_msg else "sent", - provider_id, error_msg, datetime.utcnow().isoformat()), + ( + to, + subject, + template, + "error" if error_msg else "sent", + provider_id, + error_msg, + datetime.utcnow().isoformat(), + ), ) return error_msg is None + # ============================================================================= # CSRF Protection # ============================================================================= + def get_csrf_token() -> str: """Get or create CSRF token for current session.""" if "csrf_token" not in session: @@ -229,6 +258,7 @@ def validate_csrf_token(token: str) -> bool: def csrf_protect(f): """Decorator to require valid CSRF token for POST requests.""" + @wraps(f) async def decorated(*args, **kwargs): if request.method == "POST": @@ -237,12 +267,15 @@ def csrf_protect(f): if not validate_csrf_token(token): return {"error": "Invalid CSRF token"}, 403 return await f(*args, **kwargs) + return decorated + # ============================================================================= # Rate Limiting (SQLite-based) # ============================================================================= + async def check_rate_limit(key: str, limit: int = None, window: int = None) -> tuple[bool, dict]: """ Check if rate limit exceeded. Returns (is_allowed, info). @@ -255,13 +288,12 @@ async def check_rate_limit(key: str, limit: int = None, window: int = None) -> t # Clean old entries and count recent await execute( - "DELETE FROM rate_limits WHERE key = ? AND timestamp < ?", - (key, window_start.isoformat()) + "DELETE FROM rate_limits WHERE key = ? AND timestamp < ?", (key, window_start.isoformat()) ) result = await fetch_one( "SELECT COUNT(*) as count FROM rate_limits WHERE key = ? AND timestamp > ?", - (key, window_start.isoformat()) + (key, window_start.isoformat()), ) count = result["count"] if result else 0 @@ -275,16 +307,14 @@ async def check_rate_limit(key: str, limit: int = None, window: int = None) -> t return False, info # Record this request - await execute( - "INSERT INTO rate_limits (key, timestamp) VALUES (?, ?)", - (key, now.isoformat()) - ) + await execute("INSERT INTO rate_limits (key, timestamp) VALUES (?, ?)", (key, now.isoformat())) return True, info def rate_limit(limit: int = None, window: int = None, key_func=None): """Decorator for rate limiting routes.""" + def decorator(f): @wraps(f) async def decorated(*args, **kwargs): @@ -300,9 +330,12 @@ def rate_limit(limit: int = None, window: int = None, key_func=None): return response, 429 return await f(*args, **kwargs) + return decorated + return decorator + # ============================================================================= # Request ID Tracking # ============================================================================= @@ -317,6 +350,7 @@ def get_request_id() -> str: def setup_request_id(app): """Setup request ID middleware.""" + @app.before_request async def set_request_id(): rid = request.headers.get("X-Request-ID") or secrets.token_hex(8) @@ -328,34 +362,35 @@ def setup_request_id(app): response.headers["X-Request-ID"] = get_request_id() return response + # ============================================================================= # Webhook Signature Verification # ============================================================================= + def verify_hmac_signature(payload: bytes, signature: str, secret: str) -> bool: """Verify HMAC-SHA256 webhook signature.""" expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(signature, expected) + # ============================================================================= # Soft Delete Helpers # ============================================================================= + async def soft_delete(table: str, id: int) -> bool: """Mark record as deleted.""" result = await execute( f"UPDATE {table} SET deleted_at = ? WHERE id = ? AND deleted_at IS NULL", - (datetime.utcnow().isoformat(), id) + (datetime.utcnow().isoformat(), id), ) return result > 0 async def restore(table: str, id: int) -> bool: """Restore soft-deleted record.""" - result = await execute( - f"UPDATE {table} SET deleted_at = NULL WHERE id = ?", - (id,) - ) + result = await execute(f"UPDATE {table} SET deleted_at = NULL WHERE id = ?", (id,)) return result > 0 @@ -369,8 +404,7 @@ async def purge_deleted(table: str, days: int = 30) -> int: """Purge records deleted more than X days ago.""" cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() return await execute( - f"DELETE FROM {table} WHERE deleted_at IS NOT NULL AND deleted_at < ?", - (cutoff,) + f"DELETE FROM {table} WHERE deleted_at IS NOT NULL AND deleted_at < ?", (cutoff,) ) @@ -378,8 +412,10 @@ async def purge_deleted(table: str, days: int = 30) -> int: # Waitlist # ============================================================================= + def waitlist_gate(template: str, **extra_context): """Intercept GET requests when WAITLIST_MODE=true and render the waitlist template.""" + def decorator(f): @wraps(f) async def wrapper(*args, **kwargs): @@ -389,7 +425,9 @@ def waitlist_gate(template: str, **extra_context): ctx[k] = v() if callable(v) else v return await render_template(template, **ctx) return await f(*args, **kwargs) + return wrapper + return decorator @@ -411,16 +449,19 @@ async def capture_waitlist_email( if result: from .worker import enqueue + await enqueue("send_waitlist_confirmation", {"email": email}) if config.RESEND_AUDIENCE_WAITLIST and config.RESEND_API_KEY: try: resend.api_key = config.RESEND_API_KEY - resend.Contacts.create({ - "email": email, - "audience_id": config.RESEND_AUDIENCE_WAITLIST, - "unsubscribed": False, - }) + resend.Contacts.create( + { + "email": email, + "audience_id": config.RESEND_AUDIENCE_WAITLIST, + "unsubscribed": False, + } + ) except Exception as e: print(f"[WAITLIST] Resend audience error: {e}") @@ -432,8 +473,10 @@ async def capture_waitlist_email( # A/B Testing # ============================================================================= + def ab_test(experiment: str, variants: tuple = ("control", "treatment")): """Assign visitor to an A/B test variant via cookie, tag Umami pageviews.""" + def decorator(f): @wraps(f) async def wrapper(*args, **kwargs): @@ -448,7 +491,9 @@ def ab_test(experiment: str, variants: tuple = ("control", "treatment")): response = await make_response(await f(*args, **kwargs)) response.set_cookie(cookie_key, assigned, max_age=30 * 24 * 60 * 60) return response + return wrapper + return decorator @@ -456,6 +501,7 @@ def ab_test(experiment: str, variants: tuple = ("control", "treatment")): # Feature Flags (DB-backed, admin-toggleable) # ============================================================================= + async def is_flag_enabled(name: str, default: bool = False) -> bool: """Check if a feature flag is enabled. Falls back to default if not found.""" row = await fetch_one("SELECT enabled FROM feature_flags WHERE name = ?", (name,)) @@ -482,6 +528,7 @@ async def get_all_flags() -> list[dict]: def feature_gate(flag_name: str, fallback_template: str, **extra_context): """Gate a route behind a feature flag; renders fallback on GET, 403 on POST.""" + def decorator(f): @wraps(f) async def decorated(*args, **kwargs): @@ -491,5 +538,7 @@ def feature_gate(flag_name: str, fallback_template: str, **extra_context): return await render_template(fallback_template, **ctx) return {"error": "Feature not available"}, 403 return await f(*args, **kwargs) + return decorated - return decorator \ No newline at end of file + + return decorator From f253e39c2cfa796245e76199afa194b3150239a8 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 10:59:07 +0100 Subject: [PATCH 4/9] feat(deploy): port padelnomics deploy.sh improvements to web/deploy.sh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Auto-install sops + age binaries to web/bin/ if not present - Generate age keypair at repo root age-key.txt if missing (prints public key with instructions to add to .sops.yaml, then exits) - Decrypt .env.prod.sops → web/.env at deploy time (no CI secrets needed) - Backup SQLite DB before migration (timestamped, keeps last 3) - Rollback on health check failure: dump logs + restore DB backup - Reset nginx router to current slot before --wait to avoid upstream errors - Remove web/scripts/deploy.sh (duplicate) Co-Authored-By: Claude Opus 4.6 --- web/deploy.sh | 126 ++++++++++++++++++++++++++++++++++++------ web/scripts/deploy.sh | 22 -------- 2 files changed, 110 insertions(+), 38 deletions(-) delete mode 100644 web/scripts/deploy.sh diff --git a/web/deploy.sh b/web/deploy.sh index 153c709..4426b69 100644 --- a/web/deploy.sh +++ b/web/deploy.sh @@ -1,6 +1,64 @@ #!/usr/bin/env bash set -euo pipefail +# ── Ensure sops + age are installed ─────────────────────── +APP_DIR="$(cd "$(dirname "$0")" && pwd)" +BIN_DIR="$APP_DIR/bin" +mkdir -p "$BIN_DIR" +export PATH="$BIN_DIR:$PATH" + +ARCH=$(uname -m) +case "$ARCH" in + x86_64) ARCH_SOPS="amd64"; ARCH_AGE="amd64" ;; + aarch64) ARCH_SOPS="arm64"; ARCH_AGE="arm64" ;; + *) echo "Unsupported architecture: $ARCH"; exit 1 ;; +esac + +if ! command -v age &>/dev/null; then + echo "==> Installing age to $BIN_DIR..." + AGE_VERSION="v1.3.1" + curl -fsSL "https://dl.filippo.io/age/${AGE_VERSION}?for=linux/${ARCH_AGE}" -o /tmp/age.tar.gz + tar -xzf /tmp/age.tar.gz -C "$BIN_DIR" --strip-components=1 age/age age/age-keygen + chmod +x "$BIN_DIR/age" "$BIN_DIR/age-keygen" + rm /tmp/age.tar.gz +fi + +if ! command -v sops &>/dev/null; then + echo "==> Installing sops to $BIN_DIR..." + SOPS_VERSION="v3.12.1" + curl -fsSL "https://github.com/getsops/sops/releases/download/${SOPS_VERSION}/sops-${SOPS_VERSION}.linux.${ARCH_SOPS}" -o "$BIN_DIR/sops" + chmod +x "$BIN_DIR/sops" +fi + +# ── Ensure age keypair exists ───────────────────────────── +# Key file lives at repo root (one level up from web/) +AGE_KEY_FILE="${SOPS_AGE_KEY_FILE:-$APP_DIR/../age-key.txt}" +AGE_KEY_FILE="$(realpath "$AGE_KEY_FILE")" +export SOPS_AGE_KEY_FILE="$AGE_KEY_FILE" + +if [ ! -f "$AGE_KEY_FILE" ]; then + echo "==> Generating age keypair at $AGE_KEY_FILE..." + age-keygen -o "$AGE_KEY_FILE" 2>&1 + chmod 600 "$AGE_KEY_FILE" + AGE_PUB=$(grep "public key:" "$AGE_KEY_FILE" | awk '{print $NF}') + echo "" + echo "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + echo "!! NEW SERVER — add this public key to .sops.yaml: !!" + echo "!! !!" + echo "!! $AGE_PUB !!" + echo "!! !!" + echo "!! Then run: sops updatekeys .env.prod.sops !!" + echo "!! Commit, push, and re-deploy. !!" + echo "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" + echo "" + exit 1 +fi + +# ── Decrypt secrets ─────────────────────────────────────── +echo "==> Decrypting secrets from .env.prod.sops..." +sops --input-type dotenv --output-type dotenv -d "$APP_DIR/../.env.prod.sops" > "$APP_DIR/.env" +chmod 600 "$APP_DIR/.env" + COMPOSE="docker compose -f docker-compose.prod.yml" LIVE_FILE=".live-slot" ROUTER_CONF="router/default.conf" @@ -22,28 +80,29 @@ echo "==> Current: $CURRENT → Deploying: $TARGET" echo "==> Building $TARGET..." $COMPOSE --profile "$TARGET" build +# ── Backup DB before migration ──────────────────────────────── + +BACKUP_TAG="pre-deploy-$(date +%Y%m%d-%H%M%S)" +echo "==> Backing up database (${BACKUP_TAG})..." +$COMPOSE run --rm --entrypoint "" "${TARGET}-app" \ + sh -c "cp /app/data/app.db /app/data/app.db.${BACKUP_TAG} 2>/dev/null || true" + # ── Migrate ───────────────────────────────────────────────── echo "==> Running migrations..." $COMPOSE --profile "$TARGET" run --rm "${TARGET}-app" \ python -m beanflows.migrations.migrate -# ── Start & health check ─────────────────────────────────── +# ── Ensure router points to current live slot before --wait ── +# nginx resolves upstream hostnames — if config points to a stopped slot, +# the health check fails. Reset router to current slot while target starts. -echo "==> Starting $TARGET (waiting for health check)..." -if ! $COMPOSE --profile "$TARGET" up -d --wait; then - echo "!!! Health check failed — rolling back" - $COMPOSE stop "${TARGET}-app" "${TARGET}-worker" "${TARGET}-scheduler" - exit 1 -fi - -# ── Switch router ─────────────────────────────────────────── - -echo "==> Switching router to $TARGET..." -mkdir -p "$(dirname "$ROUTER_CONF")" -cat > "$ROUTER_CONF" < "$ROUTER_CONF" < Resetting router to current slot ($CURRENT)..." + _write_router_conf "$CURRENT" + $COMPOSE restart router +fi + +# ── Start & health check ─────────────────────────────────── + +echo "==> Starting $TARGET (waiting for health check)..." +if ! $COMPOSE --profile "$TARGET" up -d --wait; then + echo "!!! Health check failed — dumping logs" + echo "--- ${TARGET}-app logs ---" + $COMPOSE --profile "$TARGET" logs --tail=60 "${TARGET}-app" 2>&1 || true + echo "--- router logs ---" + $COMPOSE logs --tail=10 router 2>&1 || true + echo "!!! Rolling back" + $COMPOSE stop "${TARGET}-app" "${TARGET}-worker" "${TARGET}-scheduler" + LATEST=$($COMPOSE run --rm --entrypoint "" "${TARGET}-app" \ + sh -c "ls -t /app/data/app.db.pre-deploy-* 2>/dev/null | head -1") + if [ -n "$LATEST" ]; then + echo "==> Restoring database from ${LATEST}..." + $COMPOSE run --rm --entrypoint "" "${TARGET}-app" \ + sh -c "cp '${LATEST}' /app/data/app.db" + fi + exit 1 +fi + +# ── Write router config and reload (new slot is healthy) ──── + +echo "==> Switching router to $TARGET..." +_write_router_conf "$TARGET" $COMPOSE exec router nginx -s reload +# ── Cleanup old pre-deploy backups (keep last 3) ───────────── + +$COMPOSE run --rm --entrypoint "" "${TARGET}-app" \ + sh -c "ls -t /app/data/app.db.pre-deploy-* 2>/dev/null | tail -n +4 | xargs rm -f" || true + # ── Stop old slot ─────────────────────────────────────────── if [ "$CURRENT" != "none" ]; then diff --git a/web/scripts/deploy.sh b/web/scripts/deploy.sh deleted file mode 100644 index cdce459..0000000 --- a/web/scripts/deploy.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -set -e - -# BeanFlows Deployment Script - -echo "🚀 Deploying BeanFlows..." - -# Pull latest code -git pull origin main - -# Build and restart containers -docker compose build -docker compose up -d - -# Run migrations -docker compose exec app uv run python -m beanflows.migrations.migrate - -# Health check -sleep 5 -curl -f http://localhost:5000/health || exit 1 - -echo "✅ Deployment complete!" From 4c7e5208045d711f8e7a09a58336933a6d0d41af Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 10:59:33 +0100 Subject: [PATCH 5/9] fix(deploy): add analytics.duckdb bind-mount to docker-compose.prod.yml App containers need access to the serving DuckDB populated by the pipeline supervisor. Bind-mounts /data/materia/analytics.duckdb as read-only and sets SERVING_DUCKDB_PATH in container environment. Co-Authored-By: Claude Opus 4.6 --- web/docker-compose.prod.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/web/docker-compose.prod.yml b/web/docker-compose.prod.yml index d2a06df..27ce611 100644 --- a/web/docker-compose.prod.yml +++ b/web/docker-compose.prod.yml @@ -33,8 +33,10 @@ services: env_file: ./.env environment: - DATABASE_PATH=/app/data/app.db + - SERVING_DUCKDB_PATH=/data/materia/analytics.duckdb volumes: - app-data:/app/data + - /data/materia/analytics.duckdb:/data/materia/analytics.duckdb:ro networks: - net healthcheck: @@ -82,8 +84,10 @@ services: env_file: ./.env environment: - DATABASE_PATH=/app/data/app.db + - SERVING_DUCKDB_PATH=/data/materia/analytics.duckdb volumes: - app-data:/app/data + - /data/materia/analytics.duckdb:/data/materia/analytics.duckdb:ro networks: - net healthcheck: From 520da2c9207d207de55e9206a3c9b215920153da Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 11:10:06 +0100 Subject: [PATCH 6/9] feat(ci): switch to pull-based deploy via git tags MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace push-based SSH deploy (deploy:web stage with SSH credentials + individual env var injection) with tag-based pull deploy: - Add `tag` stage: creates v${CI_PIPELINE_IID} tag using CI_JOB_TOKEN - Remove all SSH variables (SSH_PRIVATE_KEY, SSH_KNOWN_HOSTS, DEPLOY_USER, DEPLOY_HOST) and all individual secret variables from CI - Zero deploy secrets in CI — only CI_JOB_TOKEN (built-in) needed Deployment is now handled by the on-server supervisor (src/materia/supervisor.py) which polls for new v* tags every 60s and runs web/deploy.sh automatically. Secrets live in .env.prod.sops (git-committed, age-encrypted), decrypted at deploy time by deploy.sh — never stored in GitLab CI variables. Co-Authored-By: Claude Opus 4.6 --- .gitlab/.gitlab-ci.yml | 62 ++++++++++-------------------------------- 1 file changed, 14 insertions(+), 48 deletions(-) diff --git a/.gitlab/.gitlab-ci.yml b/.gitlab/.gitlab-ci.yml index 8b5369f..d7303d0 100644 --- a/.gitlab/.gitlab-ci.yml +++ b/.gitlab/.gitlab-ci.yml @@ -1,9 +1,8 @@ image: python:3.13 stages: -# - lint - test - - deploy + - tag variables: UV_CACHE_DIR: "$CI_PROJECT_DIR/.uv-cache" @@ -23,14 +22,6 @@ workflow: - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH - if: $CI_COMMIT_TAG -#lint: -# stage: lint -# before_script: -# - *uv_setup -# script: -# - uv sync -# - uv run ruff check . - # --- Data platform --- test:cli: @@ -71,44 +62,19 @@ test:web: - changes: - web/**/* -deploy:web: - stage: deploy +# --- Deploy (pull-based via supervisor) --- +# The on-server supervisor (src/materia/supervisor.py) polls for new tags +# every 60s and deploys automatically when a new v tag appears. +# No SSH keys, no deploy credentials needed in CI. + +tag: + stage: tag image: alpine:latest - needs: [test:web] + needs: [] + before_script: + - apk add --no-cache git + script: + - git tag "v${CI_PIPELINE_IID}" + - git push "https://gitlab-ci-token:${CI_JOB_TOKEN}@${CI_SERVER_HOST}/${CI_PROJECT_PATH}.git" "v${CI_PIPELINE_IID}" rules: - if: $CI_COMMIT_BRANCH == "master" - before_script: - - apk add --no-cache openssh-client - - eval $(ssh-agent -s) - - echo "$SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add - - - mkdir -p ~/.ssh - - chmod 700 ~/.ssh - - echo "$SSH_KNOWN_HOSTS" >> ~/.ssh/known_hosts - script: - - | - ssh "$DEPLOY_USER@$DEPLOY_HOST" "cat > /opt/beanflows/web/.env" << ENVEOF - APP_NAME=$APP_NAME - SECRET_KEY=$SECRET_KEY - BASE_URL=$BASE_URL - DEBUG=false - DATABASE_PATH=data/app.db - MAGIC_LINK_EXPIRY_MINUTES=$MAGIC_LINK_EXPIRY_MINUTES - SESSION_LIFETIME_DAYS=$SESSION_LIFETIME_DAYS - RESEND_API_KEY=$RESEND_API_KEY - EMAIL_FROM=$EMAIL_FROM - RESEND_AUDIENCE_WAITLIST=$RESEND_AUDIENCE_WAITLIST - ADMIN_EMAILS=$ADMIN_EMAILS - WAITLIST_MODE=$WAITLIST_MODE - RATE_LIMIT_REQUESTS=$RATE_LIMIT_REQUESTS - RATE_LIMIT_WINDOW=$RATE_LIMIT_WINDOW - PADDLE_API_KEY=$PADDLE_API_KEY - PADDLE_WEBHOOK_SECRET=$PADDLE_WEBHOOK_SECRET - PADDLE_ENVIRONMENT=$PADDLE_ENVIRONMENT - PADDLE_PRICE_STARTER=$PADDLE_PRICE_STARTER - PADDLE_PRICE_PRO=$PADDLE_PRICE_PRO - UMAMI_SCRIPT_URL=$UMAMI_SCRIPT_URL - UMAMI_WEBSITE_ID=$UMAMI_WEBSITE_ID - SERVING_DUCKDB_PATH=$SERVING_DUCKDB_PATH - ENVEOF - - ssh "$DEPLOY_USER@$DEPLOY_HOST" "chmod 600 /opt/beanflows/web/.env" - - ssh "$DEPLOY_USER@$DEPLOY_HOST" "cd /opt/beanflows && git pull origin master && cd web && bash deploy.sh" From 5d7d53a260b67de3cb49429d0387bcd812254573 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 11:59:55 +0100 Subject: [PATCH 7/9] feat(supervisor): port Python supervisor from padelnomics + workflows.toml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port padelnomics' schedule-aware Python supervisor to materia: - src/materia/supervisor.py — croniter scheduling, topological wave execution (parallel independent workflows), tag-based git pull + deploy, status CLI subcommand - infra/supervisor/workflows.toml — workflow registry (psd daily, cot weekly, prices daily, ice daily, weather daily) - infra/supervisor/materia-supervisor.service — updated ExecStart to Python supervisor, added SUPERVISOR_GIT_PULL=1 Adaptations from padelnomics: - Uses extract_core.state.open_state_db (not padelnomics_extract.utils) - uv run sqlmesh -p transform/sqlmesh_materia run - uv run materia pipeline run export_serving - web/deploy.sh path (materia's deploy.sh is under web/) - Removed proxy_mode (not used in materia) Also: add croniter dependency to src/materia, delete old supervisor.sh. Co-Authored-By: Claude Opus 4.6 --- infra/supervisor/materia-supervisor.service | 3 +- infra/supervisor/supervisor.sh | 69 --- infra/supervisor/workflows.toml | 34 ++ pyproject.toml | 1 + src/materia/supervisor.py | 448 ++++++++++++++++++++ uv.lock | 34 +- 6 files changed, 503 insertions(+), 86 deletions(-) delete mode 100644 infra/supervisor/supervisor.sh create mode 100644 infra/supervisor/workflows.toml create mode 100644 src/materia/supervisor.py diff --git a/infra/supervisor/materia-supervisor.service b/infra/supervisor/materia-supervisor.service index a2521f1..16c0427 100644 --- a/infra/supervisor/materia-supervisor.service +++ b/infra/supervisor/materia-supervisor.service @@ -7,13 +7,14 @@ Wants=network-online.target Type=simple User=root WorkingDirectory=/opt/materia -ExecStart=/opt/materia/infra/supervisor/supervisor.sh +ExecStart=/bin/sh -c 'exec uv run python src/materia/supervisor.py' Restart=always RestartSec=10 EnvironmentFile=/opt/materia/.env Environment=LANDING_DIR=/data/materia/landing Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb Environment=SERVING_DUCKDB_PATH=/data/materia/analytics.duckdb +Environment=SUPERVISOR_GIT_PULL=1 # Resource limits LimitNOFILE=65536 diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh deleted file mode 100644 index fc8e983..0000000 --- a/infra/supervisor/supervisor.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/bin/sh -# Materia Supervisor - Continuous pipeline orchestration -# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand -# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh -# -# Environment variables (set in systemd EnvironmentFile): -# LANDING_DIR — local path for extracted landing data -# DUCKDB_PATH — path to DuckDB lakehouse file (SQLMesh pipeline DB) -# SERVING_DUCKDB_PATH — path to serving-only DuckDB (web app reads from here) -# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts - -set -eu - -readonly REPO_DIR="/opt/materia" - -while true -do - ( - # Clone repo if missing - if ! [ -d "$REPO_DIR/.git" ] - then - echo "Repository not found, bootstrap required!" - exit 1 - fi - - cd "$REPO_DIR" - - # Update code from git - git fetch origin master - git switch --discard-changes --detach origin/master - uv sync - - # Extract all data sources - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract - - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract_cot - - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract_prices - - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract_ice - - # Transform all data sources - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run transform - - # Export serving tables to analytics.duckdb (atomic swap). - # The web app reads from SERVING_DUCKDB_PATH and picks up the new file - # automatically via inode-based connection reopen — no restart needed. - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/analytics.duckdb}" \ - uv run materia pipeline run export_serving - - ) || { - # Notify on failure if webhook is configured, then sleep to avoid busy-loop - if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then - curl -s -d "Materia pipeline failed at $(date)" "$ALERT_WEBHOOK_URL" 2>/dev/null || true - fi - sleep 600 # Sleep 10 min on failure - } -done diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml new file mode 100644 index 0000000..991df26 --- /dev/null +++ b/infra/supervisor/workflows.toml @@ -0,0 +1,34 @@ +# Workflow registry — the supervisor reads this file on every tick. +# To add a new extractor: add a [section] here and create the Python module. +# +# Fields: +# module — Python module path (must expose an entry function) +# entry — function name in the module (default: "main") +# schedule — named preset ("hourly", "daily", "weekly", "monthly") +# or raw cron expression (e.g. "0 6 * * 1-5") +# depends_on — optional: list of workflow names that must complete first + +[extract_psd] +module = "psdonline.execute" +entry = "extract_psd_dataset" +schedule = "daily" + +[extract_cot] +module = "cftc_cot.execute" +entry = "extract_cot_dataset" +schedule = "weekly" + +[extract_prices] +module = "coffee_prices.execute" +entry = "extract_coffee_prices" +schedule = "daily" + +[extract_ice] +module = "ice_stocks.execute" +entry = "extract_ice_all" +schedule = "daily" + +[extract_weather] +module = "openmeteo.execute" +entry = "extract_weather" +schedule = "daily" diff --git a/pyproject.toml b/pyproject.toml index 1467931..db45dc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "hcloud>=2.8.0", "prefect>=3.6.15", "msgspec>=0.19", + "croniter>=6.0.0", ] [project.scripts] diff --git a/src/materia/supervisor.py b/src/materia/supervisor.py new file mode 100644 index 0000000..734fe73 --- /dev/null +++ b/src/materia/supervisor.py @@ -0,0 +1,448 @@ +"""Materia Supervisor — schedule-aware pipeline orchestration. + +Reads a TOML workflow registry, runs extractors on cron-based schedules +(with dependency ordering and parallel execution), then runs SQLMesh +transform + export_serving. On production, polls for new git tags and +deploys the web app automatically when a new tag appears. + +Crash safety: the main loop catches all exceptions and backs off, matching +the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always, +the supervisor is effectively unkillable. + +Usage: + # Run the supervisor loop (production) + LANDING_DIR=data/landing uv run python src/materia/supervisor.py + + # Show workflow status + LANDING_DIR=data/landing uv run python src/materia/supervisor.py status +""" + +import importlib +import logging +import os +import subprocess +import sys +import time +import tomllib +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import UTC, datetime +from pathlib import Path + +from croniter import croniter + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +TICK_INTERVAL_SECONDS = 60 +BACKOFF_SECONDS = 600 # 10 min on tick failure +SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess +REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/materia")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) +DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb") +SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb") +ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "") +WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml")) + +NAMED_SCHEDULES = { + "hourly": "0 * * * *", + "daily": "0 5 * * *", + "weekly": "0 3 * * 1", + "monthly": "0 4 1 * *", +} + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("materia.supervisor") + + +# --------------------------------------------------------------------------- +# State DB helpers (reuse extraction state DB) +# --------------------------------------------------------------------------- + +def _open_state_db(): + """Open the extraction state DB at {LANDING_DIR}/.state.sqlite.""" + from extract_core.state import open_state_db + return open_state_db(LANDING_DIR) + + +def _get_last_success_time(conn, workflow_name: str) -> datetime | None: + """Return the finish time of the last successful run, or None.""" + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'success'", + (workflow_name,), + ).fetchone() + if not row or not row["t"]: + return None + return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + + +# --------------------------------------------------------------------------- +# Workflow loading + scheduling +# --------------------------------------------------------------------------- + +def load_workflows(path: Path) -> list[dict]: + """Load workflow definitions from TOML file.""" + assert path.exists(), f"Workflows file not found: {path}" + with open(path, "rb") as f: + data = tomllib.load(f) + + workflows = [] + for name, cfg in data.items(): + assert "module" in cfg, f"Workflow '{name}' missing 'module'" + assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'" + workflows.append({ + "name": name, + "module": cfg["module"], + "entry": cfg.get("entry", "main"), + "schedule": cfg["schedule"], + "depends_on": cfg.get("depends_on", []), + }) + return workflows + + +def resolve_schedule(schedule: str) -> str: + """Resolve a named schedule to a cron expression, or pass through raw cron.""" + return NAMED_SCHEDULES.get(schedule, schedule) + + +def is_due(conn, workflow: dict) -> bool: + """Check if the most recent cron trigger hasn't been served yet.""" + cron_expr = resolve_schedule(workflow["schedule"]) + assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}" + + last_success = _get_last_success_time(conn, workflow["name"]) + if last_success is None: + return True # never ran + + now_naive = datetime.now(UTC).replace(tzinfo=None) + prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC) + return last_success < prev_trigger + + +# --------------------------------------------------------------------------- +# Topological ordering +# --------------------------------------------------------------------------- + +def topological_waves(workflows: list[dict]) -> list[list[dict]]: + """Group workflows into dependency waves for parallel execution. + + Wave 0: no deps. Wave 1: depends only on wave 0. Etc. + """ + name_to_wf = {w["name"]: w for w in workflows} + due_names = set(name_to_wf.keys()) + + in_degree: dict[str, int] = {} + dependents: dict[str, list[str]] = defaultdict(list) + for w in workflows: + deps_in_scope = [d for d in w["depends_on"] if d in due_names] + in_degree[w["name"]] = len(deps_in_scope) + for d in deps_in_scope: + dependents[d].append(w["name"]) + + waves = [] + remaining = set(due_names) + max_iterations = len(workflows) + 1 + + for _ in range(max_iterations): + if not remaining: + break + wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0] + assert wave, f"Circular dependency detected among: {remaining}" + waves.append(wave) + for w in wave: + remaining.discard(w["name"]) + for dep in dependents[w["name"]]: + if dep in remaining: + in_degree[dep] -= 1 + + return waves + + +# --------------------------------------------------------------------------- +# Workflow execution +# --------------------------------------------------------------------------- + +def run_workflow(conn, workflow: dict) -> None: + """Run a single workflow by importing its module and calling the entry function.""" + module_name = workflow["module"] + entry_name = workflow["entry"] + + logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name) + + try: + module = importlib.import_module(module_name) + entry_fn = getattr(module, entry_name) + entry_fn() + logger.info("Workflow %s completed successfully", workflow["name"]) + except Exception: + logger.exception("Workflow %s failed", workflow["name"]) + send_alert(f"Workflow '{workflow['name']}' failed") + raise + + +def run_due_workflows(conn, workflows: list[dict]) -> bool: + """Run all due workflows in dependency-wave order. Returns True if any ran.""" + due = [w for w in workflows if is_due(conn, w)] + if not due: + logger.info("No workflows due") + return False + + logger.info("Due workflows: %s", [w["name"] for w in due]) + waves = topological_waves(due) + + for i, wave in enumerate(waves): + wave_names = [w["name"] for w in wave] + logger.info("Wave %d: %s", i, wave_names) + + if len(wave) == 1: + try: + run_workflow(conn, wave[0]) + except Exception: + pass # already logged in run_workflow + else: + with ThreadPoolExecutor(max_workers=len(wave)) as pool: + futures = {pool.submit(run_workflow, conn, w): w for w in wave} + for future in as_completed(futures): + try: + future.result() + except Exception: + pass # already logged in run_workflow + + return True + + +# --------------------------------------------------------------------------- +# Transform + Export + Deploy +# --------------------------------------------------------------------------- + +def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool: + """Run a shell command. Returns True on success.""" + logger.info("Shell: %s", cmd) + result = subprocess.run( + cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds + ) + if result.returncode != 0: + logger.error( + "Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s", + result.returncode, cmd, result.stdout[-500:], result.stderr[-500:], + ) + return False + return True + + +def run_transform() -> None: + """Run SQLMesh — evaluates model staleness internally.""" + logger.info("Running SQLMesh transform") + ok = run_shell("uv run sqlmesh -p transform/sqlmesh_materia run") + if not ok: + send_alert("SQLMesh transform failed") + + +def run_export() -> None: + """Export serving tables to analytics.duckdb.""" + logger.info("Exporting serving tables") + ok = run_shell( + f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} " + f"uv run materia pipeline run export_serving" + ) + if not ok: + send_alert("Serving export failed") + + +def web_code_changed() -> bool: + """Check if web app code changed since last deploy.""" + result = subprocess.run( + ["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "web/Dockerfile"], + capture_output=True, text=True, timeout=30, + ) + return bool(result.stdout.strip()) + + +def current_deployed_tag() -> str | None: + """Return the tag currently checked out, or None if not on a tag.""" + result = subprocess.run( + ["git", "describe", "--tags", "--exact-match", "HEAD"], + capture_output=True, text=True, timeout=10, + ) + return result.stdout.strip() or None + + +def latest_remote_tag() -> str | None: + """Fetch tags from origin and return the latest v tag.""" + subprocess.run( + ["git", "fetch", "--tags", "--prune-tags", "origin"], + capture_output=True, text=True, timeout=30, + ) + result = subprocess.run( + ["git", "tag", "--list", "--sort=-version:refname", "v*"], + capture_output=True, text=True, timeout=10, + ) + tags = result.stdout.strip().splitlines() + return tags[0] if tags else None + + +def git_pull_and_sync() -> None: + """Checkout the latest passing release tag and sync dependencies. + + A tag v is created by CI only after tests pass, so presence of a new + tag implies green CI. Skips if already on the latest tag. + """ + latest = latest_remote_tag() + if not latest: + logger.info("No release tags found — skipping pull") + return + + current = current_deployed_tag() + if current == latest: + logger.info("Already on latest tag %s — skipping pull", latest) + return + + logger.info("New tag %s available (current: %s) — deploying", latest, current) + run_shell(f"git checkout --detach {latest}") + run_shell("uv sync --all-packages") + + +# --------------------------------------------------------------------------- +# Alerting +# --------------------------------------------------------------------------- + +def send_alert(message: str) -> None: + """Send alert via ntfy.sh (or any webhook accepting POST body).""" + if not ALERT_WEBHOOK_URL: + return + timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + try: + subprocess.run( + ["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL], + timeout=10, capture_output=True, + ) + except Exception: + logger.exception("Failed to send alert") + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- + +def tick() -> None: + """One cycle: git pull, run due extractors, transform, export, maybe deploy.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + try: + # Git pull + sync (production only — SUPERVISOR_GIT_PULL env var set in systemd service) + if os.getenv("SUPERVISOR_GIT_PULL"): + git_pull_and_sync() + + # Run due extractors + run_due_workflows(conn, workflows) + + # SQLMesh always runs (evaluates model staleness internally) + run_transform() + + # Export serving tables to analytics.duckdb + run_export() + + # Deploy web app if code changed + if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed(): + logger.info("Web code changed — deploying") + ok = run_shell("./web/deploy.sh") + if ok: + send_alert("Deploy succeeded") + else: + send_alert("Deploy FAILED — check journalctl -u materia-supervisor") + finally: + conn.close() + + +def supervisor_loop() -> None: + """Infinite supervisor loop — never exits unless killed.""" + logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS) + logger.info("Workflows: %s", WORKFLOWS_PATH) + logger.info("Landing dir: %s", LANDING_DIR) + + while True: + try: + tick() + except KeyboardInterrupt: + logger.info("Supervisor stopped (KeyboardInterrupt)") + break + except Exception: + logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS) + send_alert("Supervisor tick failed") + time.sleep(BACKOFF_SECONDS) + else: + time.sleep(TICK_INTERVAL_SECONDS) + + +# --------------------------------------------------------------------------- +# Status CLI +# --------------------------------------------------------------------------- + +def print_status() -> None: + """Print workflow status table.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + now = datetime.now(UTC) + + print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}") + print(f"{'─' * 28} {'─' * 18} {'─' * 20} {'─' * 8} {'─' * 12}") + + for w in workflows: + last_success = _get_last_success_time(conn, w["name"]) + cron_expr = resolve_schedule(w["schedule"]) + + if last_success: + last_str = last_success.strftime("%Y-%m-%d %H:%M") + status = "ok" + else: + last_str = "never" + status = "pending" + + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'failed'", + (w["name"],), + ).fetchone() + if row and row["t"]: + last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + if last_success is None or last_fail > last_success: + status = "FAILED" + + now_naive = now.replace(tzinfo=None) + next_trigger = croniter(cron_expr, now_naive).get_next(datetime) + delta = next_trigger - now_naive + if delta.total_seconds() < 3600: + next_str = f"in {int(delta.total_seconds() / 60)}m" + elif delta.total_seconds() < 86400: + next_str = next_trigger.strftime("%H:%M") + else: + next_str = next_trigger.strftime("%b %d") + + schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr + print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}") + + conn.close() + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + if len(sys.argv) > 1 and sys.argv[1] == "status": + print_status() + else: + supervisor_loop() + + +if __name__ == "__main__": + main() diff --git a/uv.lock b/uv.lock index 6baaff9..d6f5025 100644 --- a/uv.lock +++ b/uv.lock @@ -14,7 +14,7 @@ members = [ "extract-core", "ice-stocks", "materia", - "openweathermap", + "openmeteo", "psdonline", "sqlmesh-materia", ] @@ -1566,6 +1566,7 @@ name = "materia" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "croniter" }, { name = "hcloud" }, { name = "msgspec" }, { name = "niquests" }, @@ -1593,6 +1594,7 @@ exploration = [ [package.metadata] requires-dist = [ + { name = "croniter", specifier = ">=6.0.0" }, { name = "hcloud", specifier = ">=2.8.0" }, { name = "msgspec", specifier = ">=0.19" }, { name = "niquests", specifier = ">=3.15.2" }, @@ -1766,6 +1768,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1", size = 160065, upload-time = "2025-06-19T22:48:06.508Z" }, ] +[[package]] +name = "openmeteo" +version = "0.1.0" +source = { editable = "extract/openmeteo" } +dependencies = [ + { name = "extract-core" }, + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "niquests", specifier = ">=3.14.1" }, +] + [[package]] name = "opentelemetry-api" version = "1.39.1" @@ -1779,21 +1796,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, ] -[[package]] -name = "openweathermap" -version = "0.1.0" -source = { editable = "extract/openweathermap" } -dependencies = [ - { name = "extract-core" }, - { name = "niquests" }, -] - -[package.metadata] -requires-dist = [ - { name = "extract-core", editable = "extract/extract_core" }, - { name = "niquests", specifier = ">=3.14.1" }, -] - [[package]] name = "orjson" version = "3.11.7" From 95f881827e7cc18b4e44dbdc5c4093fa41c2dd21 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 12:03:11 +0100 Subject: [PATCH 8/9] feat(infra): replace Pulumi ESC with SOPS in bootstrap + setup scripts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - bootstrap_supervisor.sh: remove esc CLI + PULUMI_ACCESS_TOKEN; install sops+age; check age keypair exists; decrypt .env.prod.sops → .env; checkout latest release tag; use uv sync --all-packages - setup_server.sh: add age keypair generation at /opt/materia/age-key.txt; install age binary; print public key with .sops.yaml instructions Co-Authored-By: Claude Opus 4.6 --- infra/bootstrap_supervisor.sh | 149 ++++++++++++++++++++-------------- infra/setup_server.sh | 89 +++++++++++++------- 2 files changed, 148 insertions(+), 90 deletions(-) diff --git a/infra/bootstrap_supervisor.sh b/infra/bootstrap_supervisor.sh index 760405d..a6a018e 100755 --- a/infra/bootstrap_supervisor.sh +++ b/infra/bootstrap_supervisor.sh @@ -1,117 +1,146 @@ #!/bin/bash -# Bootstrap script for Materia supervisor instance -# Run this once on a new supervisor to set it up +# Bootstrap script for Materia supervisor instance. +# Run once on a fresh server after setup_server.sh. # # Usage: -# From CI/CD or locally: -# ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh +# ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh # -# Or on the supervisor itself: -# curl -fsSL | bash +# Prerequisites: +# - age keypair exists at /opt/materia/age-key.txt +# (or SOPS_AGE_KEY_FILE env var pointing elsewhere) +# - The server age public key is already in .sops.yaml and .env.prod.sops +# (run setup_server.sh first, then add the key and re-commit) +# - GITLAB_READ_TOKEN is set (GitLab project access token, read-only) set -euo pipefail echo "=== Materia Supervisor Bootstrap ===" echo "This script will:" -echo " 1. Install dependencies (git, uv, esc)" +echo " 1. Install dependencies (git, uv, sops, age)" echo " 2. Clone the materia repository" -echo " 3. Setup systemd service" -echo " 4. Start the supervisor" +echo " 3. Decrypt secrets from .env.prod.sops" +echo " 4. Set up systemd service" +echo " 5. Start the supervisor" echo "" -# Check if we're root if [ "$EUID" -ne 0 ]; then echo "ERROR: This script must be run as root" exit 1 fi -# Configuration +# ── Configuration ────────────────────────────────────────── REPO_DIR="/opt/materia" GITLAB_PROJECT="deemanone/materia" +AGE_KEY_FILE="${SOPS_AGE_KEY_FILE:-$REPO_DIR/age-key.txt}" -# GITLAB_READ_TOKEN should be set in Pulumi ESC (beanflows/prod) if [ -z "${GITLAB_READ_TOKEN:-}" ]; then - echo "ERROR: GITLAB_READ_TOKEN environment variable not set" - echo "Please add it to Pulumi ESC (beanflows/prod) first" + echo "ERROR: GITLAB_READ_TOKEN not set" + echo " export GITLAB_READ_TOKEN=" exit 1 fi -REPO_URL="https://gitlab-ci-token:${GITLAB_READ_TOKEN}@gitlab.com/${GITLAB_PROJECT}.git" +REPO_URL="https://oauth2:${GITLAB_READ_TOKEN}@gitlab.com/${GITLAB_PROJECT}.git" +# ── System dependencies ──────────────────────────────────── echo "--- Installing system dependencies ---" -apt-get update -apt-get install -y git curl python3-pip +apt-get update -q +apt-get install -y -q git curl ca-certificates +# ── uv ───────────────────────────────────────────────────── echo "--- Installing uv ---" -if ! command -v uv &> /dev/null; then +if ! command -v uv &>/dev/null; then curl -LsSf https://astral.sh/uv/install.sh | sh - export PATH="$HOME/.cargo/bin:$PATH" - echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> /root/.bashrc + export PATH="$HOME/.local/bin:$PATH" + echo 'export PATH="$HOME/.local/bin:$PATH"' >> /root/.bashrc fi -echo "--- Installing Pulumi ESC ---" -if ! command -v esc &> /dev/null; then - curl -fsSL https://get.pulumi.com/esc/install.sh | sh - export PATH="$HOME/.pulumi/bin:$PATH" - echo 'export PATH="$HOME/.pulumi/bin:$PATH"' >> /root/.bashrc +# ── sops + age ───────────────────────────────────────────── +echo "--- Installing sops + age ---" +ARCH=$(uname -m) +case "$ARCH" in + x86_64) ARCH_SOPS="amd64"; ARCH_AGE="amd64" ;; + aarch64) ARCH_SOPS="arm64"; ARCH_AGE="arm64" ;; + *) echo "Unsupported architecture: $ARCH"; exit 1 ;; +esac + +if ! command -v age &>/dev/null; then + AGE_VERSION="v1.3.1" + curl -fsSL "https://dl.filippo.io/age/${AGE_VERSION}?for=linux/${AGE_ARCH}" -o /tmp/age.tar.gz + tar -xzf /tmp/age.tar.gz -C /usr/local/bin --strip-components=1 age/age age/age-keygen + chmod +x /usr/local/bin/age /usr/local/bin/age-keygen + rm /tmp/age.tar.gz fi -echo "--- Setting up Pulumi ESC authentication ---" -if [ -z "${PULUMI_ACCESS_TOKEN:-}" ]; then - echo "ERROR: PULUMI_ACCESS_TOKEN environment variable not set" - echo "Please set it before running this script:" - echo " export PULUMI_ACCESS_TOKEN=" - exit 1 +if ! command -v sops &>/dev/null; then + SOPS_VERSION="v3.12.1" + curl -fsSL "https://github.com/getsops/sops/releases/download/${SOPS_VERSION}/sops-${SOPS_VERSION}.linux.${ARCH_SOPS}" -o /usr/local/bin/sops + chmod +x /usr/local/bin/sops fi -esc login --token "$PULUMI_ACCESS_TOKEN" - -echo "--- Loading secrets from Pulumi ESC ---" -eval $(esc env open beanflows/prod --format shell) - +# ── Clone repository ─────────────────────────────────────── echo "--- Cloning repository ---" -if [ -d "$REPO_DIR" ]; then - echo "Repository already exists, pulling latest..." +if [ -d "$REPO_DIR/.git" ]; then + echo "Repository already exists — fetching latest tags..." cd "$REPO_DIR" - git pull origin master + git fetch --tags --prune-tags origin else git clone "$REPO_URL" "$REPO_DIR" cd "$REPO_DIR" fi +# Checkout latest release tag (same logic as supervisor) +LATEST_TAG=$(git tag --list --sort=-version:refname "v*" | head -1) +if [ -n "$LATEST_TAG" ]; then + echo "Checking out $LATEST_TAG..." + git checkout --detach "$LATEST_TAG" +else + echo "No release tags found — staying on current HEAD" +fi + +# ── Check age keypair ────────────────────────────────────── +echo "--- Checking age keypair ---" +if [ ! -f "$AGE_KEY_FILE" ]; then + echo "ERROR: Age keypair not found at $AGE_KEY_FILE" + echo "" + echo "Run infra/setup_server.sh first to generate the keypair, then:" + echo " 1. Copy the public key from setup_server.sh output" + echo " 2. Add it to .sops.yaml on your workstation" + echo " 3. Run: sops updatekeys .env.prod.sops" + echo " 4. Commit + push and re-run this bootstrap" + exit 1 +fi +export SOPS_AGE_KEY_FILE="$AGE_KEY_FILE" + +# ── Decrypt secrets ──────────────────────────────────────── +echo "--- Decrypting secrets from .env.prod.sops ---" +sops --input-type dotenv --output-type dotenv -d "$REPO_DIR/.env.prod.sops" > "$REPO_DIR/.env" +chmod 600 "$REPO_DIR/.env" +echo "Secrets written to $REPO_DIR/.env" + +# ── Data directories ─────────────────────────────────────── echo "--- Creating data directories ---" -mkdir -p /data/materia/landing/psd +mkdir -p /data/materia/landing +# ── Python dependencies ──────────────────────────────────── echo "--- Installing Python dependencies ---" -uv sync - -echo "--- Creating environment file ---" -cat > "$REPO_DIR/.env" </dev/null; then - useradd --system --create-home --shell /usr/sbin/nologin "$APP_USER" - echo "Created user: $APP_USER" -else - echo "User $APP_USER already exists, skipping" +if [ "$EUID" -ne 0 ]; then + echo "ERROR: This script must be run as root" + exit 1 fi -# Create app directory owned by app user -mkdir -p "$APP_DIR" -chown "$APP_USER:$APP_USER" "$APP_DIR" -chmod 750 "$APP_DIR" -echo "Created $APP_DIR (owner: $APP_USER)" +# ── Create data directories ──────────────────────────────── +echo "--- Creating data directories ---" +mkdir -p /data/materia/landing +mkdir -p "$REPO_DIR" +echo "Data dir: /data/materia" -# Generate deploy key if not already present -if [ ! -f "$KEY_PATH" ]; then - mkdir -p "/home/$APP_USER/.ssh" - ssh-keygen -t ed25519 -f "$KEY_PATH" -N "" -C "beanflows-server" - chown -R "$APP_USER:$APP_USER" "/home/$APP_USER/.ssh" - chmod 700 "/home/$APP_USER/.ssh" - chmod 600 "$KEY_PATH" - chmod 644 "$KEY_PATH.pub" - echo "Generated deploy key: $KEY_PATH" -else - echo "Deploy key already exists, skipping" +# ── Install age ──────────────────────────────────────────── +echo "--- Installing age ---" +ARCH=$(uname -m) +case "$ARCH" in + x86_64) ARCH_AGE="amd64" ;; + aarch64) ARCH_AGE="arm64" ;; + *) echo "Unsupported architecture: $ARCH"; exit 1 ;; +esac + +if ! command -v age-keygen &>/dev/null; then + AGE_VERSION="v1.3.1" + curl -fsSL "https://dl.filippo.io/age/${AGE_VERSION}?for=linux/${ARCH_AGE}" -o /tmp/age.tar.gz + tar -xzf /tmp/age.tar.gz -C /usr/local/bin --strip-components=1 age/age age/age-keygen + chmod +x /usr/local/bin/age /usr/local/bin/age-keygen + rm /tmp/age.tar.gz + echo "age installed to /usr/local/bin" fi +# ── Generate age keypair ─────────────────────────────────── +echo "--- Setting up age keypair ---" +if [ -f "$AGE_KEY_FILE" ]; then + echo "Keypair already exists at $AGE_KEY_FILE — skipping generation" +else + age-keygen -o "$AGE_KEY_FILE" 2>/dev/null + chmod 600 "$AGE_KEY_FILE" + echo "Generated: $AGE_KEY_FILE" +fi + +AGE_PUB=$(grep "public key:" "$AGE_KEY_FILE" | awk '{print $NF}') + echo "" -echo "=== Add this deploy key to GitLab ===" -echo "GitLab → repo → Settings → Repository → Deploy Keys (read-only)" +echo "==================================================================" +echo " Server age public key:" echo "" -cat "$KEY_PATH.pub" +echo " $AGE_PUB" +echo "" +echo " Add this key to .sops.yaml on your workstation:" +echo "" +echo " creation_rules:" +echo " - path_regex: \\.env\\.(dev|prod)\\.sops\$" +echo " age: >-" +echo " " +echo " + $AGE_PUB" +echo "" +echo " Then re-encrypt the prod secrets file:" +echo " sops updatekeys .env.prod.sops" +echo " git add .sops.yaml .env.prod.sops && git commit -m 'chore: add server age key'" +echo " git push" +echo "" +echo " Then run infra/bootstrap_supervisor.sh to complete setup." +echo "==================================================================" From 518b50d0f57a59e686943fca1dbb63be3bcb6fe3 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 12:04:55 +0100 Subject: [PATCH 9/9] docs(claude+infra): expand CLAUDE.md + infra/readme.md for full architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLAUDE.md additions: - List all 6 extractor packages + extract_core - Full data flow with all sources + dual-DuckDB - Foundation-as-ontology: dim_commodity conforms cross-source identifiers - Two-DuckDB architecture explanation (why not serving.duckdb) - Extraction pattern: one-package-per-source, state SQLite, adding new source - Supervisor: croniter scheduling, topological waves, tag-based deploy - CI/CD: pull-based via git tags, no SSH - Secrets management: SOPS+age section, file table, server key workflow - uv workspace management section - Remove Pulumi ESC references; update env vars table infra/readme.md: - Update architecture diagram (add analytics.duckdb, age-key.txt) - Rewrite setup flow: setup_server.sh → add key to SOPS → bootstrap - Secrets management section with file table - Deploy model: pull-based (no SSH/CI credentials) - Monitoring: add supervisor status + extraction state DB query Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 164 +++++++++++++++++++++++++++++++++++++++--------- infra/readme.md | 118 +++++++++++++++++++++++++--------- 2 files changed, 223 insertions(+), 59 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 1431444..9845bf4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,13 +4,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -Materia is a commodity data analytics platform (product: **BeanFlows.coffee**) for coffee traders. It's a uv workspace monorepo with three packages: extraction (USDA PSD data), SQL transformation (SQLMesh + DuckDB), and a CLI for worker management and local pipeline execution. +Materia is a commodity data analytics platform (product: **BeanFlows.coffee**) for coffee traders. It's a uv workspace monorepo: multiple extraction packages, a SQL transformation pipeline, a web app, and a CLI for local pipeline execution. ## Commands ```bash # Install dependencies -uv sync +uv sync --all-packages # Lint & format ruff check . # Check @@ -24,9 +24,6 @@ cd transform/sqlmesh_materia && uv run sqlmesh test # SQLMesh model tests # Run a single test uv run pytest tests/test_cli.py::test_name -v -# Extract data -LANDING_DIR=data/landing uv run extract_psd - # SQLMesh (from repo root) uv run sqlmesh -p transform/sqlmesh_materia plan # Plans to dev_ by default uv run sqlmesh -p transform/sqlmesh_materia plan prod # Production @@ -34,45 +31,153 @@ uv run sqlmesh -p transform/sqlmesh_materia test # Run model tests uv run sqlmesh -p transform/sqlmesh_materia format # Format SQL # CLI -uv run materia pipeline run extract|transform +uv run materia pipeline run extract|transform|export_serving uv run materia pipeline list -uv run materia worker create|destroy|list -uv run materia secrets get +uv run materia secrets list +uv run materia secrets test + +# Supervisor status (production) +uv run python src/materia/supervisor.py status + +# CSS (Tailwind) +make css-build # one-shot build +make css-watch # watch mode + +# Secrets +make secrets-decrypt-dev # decrypt .env.dev.sops → .env (local dev) +make secrets-decrypt-prod # decrypt .env.prod.sops → .env +make secrets-edit-dev # edit dev secrets in $EDITOR +make secrets-edit-prod # edit prod secrets in $EDITOR ``` ## Architecture -**Workspace packages** (`pyproject.toml` → `tool.uv.workspace`): -- `extract/psdonline/` — Downloads USDA PSD Online data, normalizes ZIP→gzip CSV, writes to local landing directory -- `extract/openmeteo/` — Daily weather for 12 coffee-growing regions (Open-Meteo, ERA5 reanalysis, no API key) -- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (local DuckDB) -- `src/materia/` — CLI (Typer) for pipeline execution, worker management, secrets -- `web/` — Future web frontend +**Workspace packages** (`pyproject.toml` → `[tool.uv.workspace]`): +- `extract/extract_core/` — Shared extraction utilities: state tracking (SQLite), HTTP helpers, atomic file writes +- `extract/psdonline/` — USDA PSD Online data (ZIP → gzip CSV) +- `extract/cftc_cot/` — CFTC Commitments of Traders (weekly) +- `extract/coffee_prices/` — KC=F futures prices +- `extract/ice_stocks/` — ICE warehouse stocks + aging reports +- `extract/openmeteo/` — Daily weather for 12 coffee-growing regions (Open-Meteo ERA5, no API key) +- `transform/sqlmesh_materia/` — 3-layer SQL transformation pipeline (DuckDB) +- `src/materia/` — CLI (Typer): pipeline execution, secrets, version +- `web/` — Quart + HTMX web app (BeanFlows.coffee dashboard) **Data flow:** ``` -USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip -Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz - → rclone cron syncs landing/ to R2 +USDA API → extract → /data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip +CFTC API → extract → /data/materia/landing/cot/{year}/{date}.csv.gz +Yahoo/prices → extract → /data/materia/landing/prices/{symbol}/{date}.json.gz +ICE API → extract → /data/materia/landing/ice_stocks/{date}.csv.gz +Open-Meteo → extract → /data/materia/landing/weather/{location_id}/{year}/{date}.json.gz + → rclone timer syncs landing/ to R2 every 6 hours → SQLMesh staging → foundation → serving → /data/materia/lakehouse.duckdb - → Web app reads lakehouse.duckdb (read-only) + → export_serving pipeline → /data/materia/analytics.duckdb (web app) + → Web app reads analytics.duckdb (read-only, per-thread) ``` **SQLMesh 3-layer model structure** (`transform/sqlmesh_materia/models/`): 1. `staging/` — Type casting, lookup joins, basic cleansing (reads landing directly) -2. `foundation/` — Business logic, pivoting, dimensions, facts (also reads landing directly) +2. `foundation/` — Business logic, pivoting, **conformed dimensions** (ontology), facts 3. `serving/` — Analytics-ready aggregates for the web app +**Foundation layer is the ontology.** `dim_commodity` conforms identifiers across all sources: +- Each row = one commodity (e.g. Arabica coffee) +- Columns: `usda_commodity_code`, `cftc_contract_market_code`, `ice_stock_report_code`, `ticker` (KC=F), etc. +- New data sources add columns to existing dims, not new tables +- Facts join to dims via surrogate keys (MD5 hash keys generated in staging) + +**Two-DuckDB architecture:** +- `lakehouse.duckdb` (`DUCKDB_PATH`) — SQLMesh exclusive write; never opened by web app +- `analytics.duckdb` (`SERVING_DUCKDB_PATH`) — read-only serving copy for web app +- Why not `serving.duckdb`: DuckDB derives catalog name from filename stem — "serving" would collide with the "serving" schema inside +- `export_serving` pipeline copies `serving.*` tables via Arrow + atomic rename after each transform +- Web app uses per-thread connections (`threading.local`) with inode-based reopen on rotation + +**Extraction pattern** — one workspace package per data source: +- All packages depend on `extract_core` (shared state tracking, HTTP, file writes) +- Landing zone is immutable and content-addressed: `{LANDING_DIR}/{source}/{partitions}/{hash}.ext` +- State tracked in SQLite at `{LANDING_DIR}/.state.sqlite` (WAL mode, OLTP — not DuckDB) +- Query state: `sqlite3 data/landing/.state.sqlite "SELECT * FROM extraction_runs ORDER BY run_id DESC LIMIT 20"` + +**Adding a new data source:** +```bash +# Create package +uv init --package extract/new_source +uv add --package new_source extract-core niquests + +# Add entry function in extract/new_source/src/new_source/execute.py +# Register in infra/supervisor/workflows.toml +# Add staging + foundation models in transform/sqlmesh_materia/models/ +``` + +**Supervisor** (`src/materia/supervisor.py`): +- Croniter-based scheduling with named presets: `hourly`, `daily`, `weekly`, `monthly` +- Workflow registry: `infra/supervisor/workflows.toml` +- Dependency-wave execution: independent workflows run in parallel (ThreadPoolExecutor) +- Each tick: git pull (tag-based) → due extractors → SQLMesh → export_serving → web deploy if changed +- Crash-safe: systemd `Restart=always` + 10-minute backoff on tick failure + +**CI/CD** (`.gitlab/.gitlab-ci.yml`) — pull-based, no SSH: +- `test` stage: pytest, sqlmesh test, web pytest +- `tag` stage: creates `v${CI_PIPELINE_IID}` tag after tests pass (master branch only) +- Supervisor polls for new tags every 60s, checks out latest, runs `uv sync` +- No SSH keys or deploy credentials in CI — only `CI_JOB_TOKEN` (built-in) + **CLI modules** (`src/materia/`): -- `cli.py` — Typer app with subcommands: worker, pipeline, secrets, version -- `workers.py` — Hetzner cloud instance management (for ad-hoc compute) +- `cli.py` — Typer app with subcommands: pipeline, secrets, version - `pipelines.py` — Local subprocess pipeline execution with bounded timeouts -- `secrets.py` — Pulumi ESC integration for environment secrets +- `secrets.py` — SOPS+age integration (decrypts `.env.prod.sops`) **Infrastructure** (`infra/`): -- Pulumi IaC for Cloudflare R2 buckets and Hetzner compute -- Supervisor systemd service for always-on orchestration (pulls git, runs pipelines) +- Pulumi IaC for Cloudflare R2 buckets +- Python supervisor + systemd service - rclone systemd timer for landing data backup to R2 +- `setup_server.sh` — one-time server init (age keypair generation) +- `bootstrap_supervisor.sh` — full server setup from scratch + +## Secrets management (SOPS + age) + +| File | Purpose | +|------|---------| +| `.env.dev.sops` | Dev defaults (safe values, local paths) | +| `.env.prod.sops` | Production secrets (encrypted) | +| `.sops.yaml` | Maps file patterns to age public keys | +| `age-key.txt` | Server age keypair (gitignored, generated by `setup_server.sh`) | + +```bash +make secrets-decrypt-dev # decrypt dev secrets → .env (local dev) +make secrets-edit-prod # edit prod secrets in $EDITOR +``` + +`web/deploy.sh` auto-decrypts `.env.prod.sops` → `web/.env` on each deploy. +`src/materia/secrets.py` decrypts on-demand via subprocess call to `sops`. + +**Adding the server key (new server setup):** +1. Run `infra/setup_server.sh` on the server — prints the age public key +2. Add the public key to `.sops.yaml` on your workstation +3. Run `sops updatekeys .env.prod.sops` +4. Commit + push + +## uv workspace management + +```bash +# Install everything (run from repo root) +uv sync --all-packages --all-groups + +# Create a new extraction package +uv init --package extract/new_source +uv add --package new_source extract-core niquests + +# Add a dependency to an existing package +uv add --package materia croniter +uv add --package beanflows duckdb + +# Run a command in a specific package context +uv run --package new_source python -c "import new_source" +``` + +Always use `uv` CLI to manage dependencies — never edit `pyproject.toml` manually for dependency changes. ## Coding Philosophy @@ -90,9 +195,9 @@ Read `coding_philosophy.md` for the full guide. Key points: - **Python 3.13** (`.python-version`) - **Ruff**: double quotes, spaces, E501 ignored (formatter handles line length) - **SQLMesh**: DuckDB dialect, `@daily` cron, start date `2025-07-07`, default env `dev_{{ user() }}` -- **Storage**: Local NVMe (`LANDING_DIR`, `DUCKDB_PATH`), R2 for backup via rclone -- **Secrets**: Pulumi ESC (`esc run beanflows/prod -- `) -- **CI**: GitLab CI (`.gitlab/.gitlab-ci.yml`) — runs pytest and sqlmesh test on push/MR +- **Storage**: Local NVMe (`LANDING_DIR`, `DUCKDB_PATH`, `SERVING_DUCKDB_PATH`), R2 for backup via rclone +- **Secrets**: SOPS + age (`.env.*.sops` files, Makefile targets) +- **CI**: GitLab CI — test → tag (pull-based deploy, no SSH) - **Pre-commit hooks**: installed via `pre-commit install` ## Environment Variables @@ -100,4 +205,7 @@ Read `coding_philosophy.md` for the full guide. Key points: | Variable | Default | Description | |----------|---------|-------------| | `LANDING_DIR` | `data/landing` | Root directory for extracted landing data | -| `DUCKDB_PATH` | `local.duckdb` | Path to the DuckDB lakehouse database | +| `DUCKDB_PATH` | `local.duckdb` | Path to the SQLMesh lakehouse database (exclusive write) | +| `SERVING_DUCKDB_PATH` | `analytics.duckdb` | Path to the serving DB (read by web app) | +| `ALERT_WEBHOOK_URL` | _(empty)_ | ntfy.sh URL for supervisor failure alerts | +| `SUPERVISOR_GIT_PULL` | _(unset)_ | Set to any value to enable tag-based git pull in supervisor | diff --git a/infra/readme.md b/infra/readme.md index 9a8060c..a7319a2 100644 --- a/infra/readme.md +++ b/infra/readme.md @@ -6,63 +6,101 @@ Single-server local-first setup for BeanFlows.coffee on Hetzner NVMe. ``` Hetzner Server (NVMe) -├── /opt/materia/ # Git repo, code, uv environment -├── /data/materia/landing/ # Extracted USDA data (year/month subdirs) -├── /data/materia/lakehouse.duckdb # SQLMesh output database +├── /opt/materia/ # Git repo (checked out at latest release tag) +├── /opt/materia/age-key.txt # Server age keypair (chmod 600, gitignored) +├── /opt/materia/.env # Decrypted from .env.prod.sops at deploy time +├── /data/materia/landing/ # Extracted raw data (immutable, content-addressed) +├── /data/materia/lakehouse.duckdb # SQLMesh exclusive write +├── /data/materia/analytics.duckdb # Read-only serving copy for web app └── systemd services: - ├── materia-supervisor # Pulls git, runs extract + transform daily - └── materia-backup.timer # Syncs landing/ to R2 every 6 hours + ├── materia-supervisor # Python supervisor: extract → transform → export → deploy + └── materia-backup.timer # rclone: syncs landing/ to R2 every 6 hours ``` ## Data Flow -1. **Extract**: USDA API → `/data/materia/landing/psd/{year}/{month}/{etag}.csv.gzip` -2. **Transform**: SQLMesh reads landing CSVs → writes to `/data/materia/lakehouse.duckdb` -3. **Backup**: rclone syncs `/data/materia/landing/` → R2 `materia-raw/landing/` -4. **Web**: Reads `lakehouse.duckdb` (read-only) +1. **Extract** — Supervisor runs due extractors per `infra/supervisor/workflows.toml` +2. **Transform** — SQLMesh reads landing → writes `lakehouse.duckdb` +3. **Export** — `export_serving` copies `serving.*` → `analytics.duckdb` (atomic rename) +4. **Backup** — rclone syncs `/data/materia/landing/` → R2 `materia-raw/landing/` +5. **Web** — Web app reads `analytics.duckdb` read-only (per-thread connections) -## Setup +## Setup (new server) -### Prerequisites - -- Hetzner server with NVMe storage -- Pulumi ESC configured (`beanflows/prod` environment) -- `GITLAB_READ_TOKEN` and `PULUMI_ACCESS_TOKEN` set - -### Bootstrap +### 1. Run setup_server.sh ```bash -# From local machine or CI: +bash infra/setup_server.sh +``` + +This creates data directories, installs age, and generates the server age keypair at `/opt/materia/age-key.txt`. It prints the server's age public key. + +### 2. Add the server key to SOPS + +On your workstation: + +```bash +# Add the server public key to .sops.yaml +# Then re-encrypt prod secrets to include the server key: +sops updatekeys .env.prod.sops +git add .sops.yaml .env.prod.sops +git commit -m "chore: add server age key" +git push +``` + +### 3. Bootstrap the supervisor + +```bash +# Requires GITLAB_READ_TOKEN (GitLab project access token, read-only) +export GITLAB_READ_TOKEN= ssh root@ 'bash -s' < infra/bootstrap_supervisor.sh ``` -This installs dependencies, clones the repo, creates data directories, and starts the supervisor service. +This installs uv + sops + age, clones the repo, decrypts secrets, installs Python dependencies, and starts the supervisor service. -### R2 Backup - -1. Install rclone: `apt install rclone` -2. Copy and configure: `cp infra/backup/rclone.conf.example /root/.config/rclone/rclone.conf` -3. Fill in R2 credentials from Pulumi ESC -4. Install systemd units: +### 4. Set up R2 backup ```bash +apt install rclone +cp infra/backup/rclone.conf.example /root/.config/rclone/rclone.conf +# Fill in R2 credentials from .env.prod.sops (ACCESS_KEY_ID, SECRET_ACCESS_KEY, bucket endpoint) cp infra/backup/materia-backup.service /etc/systemd/system/ cp infra/backup/materia-backup.timer /etc/systemd/system/ systemctl daemon-reload systemctl enable --now materia-backup.timer ``` -## Pulumi IaC +## Secrets management -Still manages Cloudflare R2 buckets and can provision Hetzner instances: +Secrets are stored as SOPS-encrypted dotenv files in the repo root: + +| File | Purpose | +|------|---------| +| `.env.dev.sops` | Dev defaults (safe values, local paths) | +| `.env.prod.sops` | Production secrets | +| `.sops.yaml` | Maps file patterns to age public keys | ```bash -cd infra -pulumi login -pulumi stack select prod -pulumi up +# Decrypt for local dev +make secrets-decrypt-dev + +# Edit prod secrets +make secrets-edit-prod ``` +`bootstrap_supervisor.sh` decrypts `.env.prod.sops` → `/opt/materia/.env` during setup. +`web/deploy.sh` re-decrypts on every deploy (so secret rotations take effect automatically). + +## Deploy model (pull-based) + +No SSH keys or deploy credentials in CI. + +1. CI runs tests (`test:cli`, `test:sqlmesh`, `test:web`) +2. On master, CI creates tag `v${CI_PIPELINE_IID}` using built-in `CI_JOB_TOKEN` +3. Supervisor polls for new tags every 60s +4. When a new tag appears: `git checkout --detach ` + `uv sync --all-packages` +5. If `web/` files changed: `./web/deploy.sh` (Docker blue/green + health check) + ## Monitoring ```bash @@ -70,9 +108,27 @@ pulumi up systemctl status materia-supervisor journalctl -u materia-supervisor -f +# Workflow status table +cd /opt/materia && uv run python src/materia/supervisor.py status + # Backup timer status systemctl list-timers materia-backup.timer journalctl -u materia-backup -f + +# Extraction state DB +sqlite3 /data/materia/landing/.state.sqlite \ + "SELECT extractor, status, finished_at FROM extraction_runs ORDER BY run_id DESC LIMIT 20" +``` + +## Pulumi IaC + +Still manages Cloudflare R2 buckets: + +```bash +cd infra +pulumi login +pulumi stack select prod +pulumi up ``` ## Cost