From 5d7d53a260b67de3cb49429d0387bcd812254573 Mon Sep 17 00:00:00 2001 From: Deeman Date: Thu, 26 Feb 2026 11:59:55 +0100 Subject: [PATCH] feat(supervisor): port Python supervisor from padelnomics + workflows.toml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port padelnomics' schedule-aware Python supervisor to materia: - src/materia/supervisor.py — croniter scheduling, topological wave execution (parallel independent workflows), tag-based git pull + deploy, status CLI subcommand - infra/supervisor/workflows.toml — workflow registry (psd daily, cot weekly, prices daily, ice daily, weather daily) - infra/supervisor/materia-supervisor.service — updated ExecStart to Python supervisor, added SUPERVISOR_GIT_PULL=1 Adaptations from padelnomics: - Uses extract_core.state.open_state_db (not padelnomics_extract.utils) - uv run sqlmesh -p transform/sqlmesh_materia run - uv run materia pipeline run export_serving - web/deploy.sh path (materia's deploy.sh is under web/) - Removed proxy_mode (not used in materia) Also: add croniter dependency to src/materia, delete old supervisor.sh. Co-Authored-By: Claude Opus 4.6 --- infra/supervisor/materia-supervisor.service | 3 +- infra/supervisor/supervisor.sh | 69 --- infra/supervisor/workflows.toml | 34 ++ pyproject.toml | 1 + src/materia/supervisor.py | 448 ++++++++++++++++++++ uv.lock | 34 +- 6 files changed, 503 insertions(+), 86 deletions(-) delete mode 100644 infra/supervisor/supervisor.sh create mode 100644 infra/supervisor/workflows.toml create mode 100644 src/materia/supervisor.py diff --git a/infra/supervisor/materia-supervisor.service b/infra/supervisor/materia-supervisor.service index a2521f1..16c0427 100644 --- a/infra/supervisor/materia-supervisor.service +++ b/infra/supervisor/materia-supervisor.service @@ -7,13 +7,14 @@ Wants=network-online.target Type=simple User=root WorkingDirectory=/opt/materia -ExecStart=/opt/materia/infra/supervisor/supervisor.sh +ExecStart=/bin/sh -c 'exec uv run python src/materia/supervisor.py' Restart=always RestartSec=10 EnvironmentFile=/opt/materia/.env Environment=LANDING_DIR=/data/materia/landing Environment=DUCKDB_PATH=/data/materia/lakehouse.duckdb Environment=SERVING_DUCKDB_PATH=/data/materia/analytics.duckdb +Environment=SUPERVISOR_GIT_PULL=1 # Resource limits LimitNOFILE=65536 diff --git a/infra/supervisor/supervisor.sh b/infra/supervisor/supervisor.sh deleted file mode 100644 index fc8e983..0000000 --- a/infra/supervisor/supervisor.sh +++ /dev/null @@ -1,69 +0,0 @@ -#!/bin/sh -# Materia Supervisor - Continuous pipeline orchestration -# Inspired by TigerBeetle's CFO supervisor: simple, resilient, easy to understand -# https://github.com/tigerbeetle/tigerbeetle/blob/main/src/scripts/cfo_supervisor.sh -# -# Environment variables (set in systemd EnvironmentFile): -# LANDING_DIR — local path for extracted landing data -# DUCKDB_PATH — path to DuckDB lakehouse file (SQLMesh pipeline DB) -# SERVING_DUCKDB_PATH — path to serving-only DuckDB (web app reads from here) -# ALERT_WEBHOOK_URL — optional ntfy.sh / Slack / Telegram webhook for failure alerts - -set -eu - -readonly REPO_DIR="/opt/materia" - -while true -do - ( - # Clone repo if missing - if ! [ -d "$REPO_DIR/.git" ] - then - echo "Repository not found, bootstrap required!" - exit 1 - fi - - cd "$REPO_DIR" - - # Update code from git - git fetch origin master - git switch --discard-changes --detach origin/master - uv sync - - # Extract all data sources - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract - - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract_cot - - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract_prices - - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run extract_ice - - # Transform all data sources - LANDING_DIR="${LANDING_DIR:-/data/materia/landing}" \ - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - uv run materia pipeline run transform - - # Export serving tables to analytics.duckdb (atomic swap). - # The web app reads from SERVING_DUCKDB_PATH and picks up the new file - # automatically via inode-based connection reopen — no restart needed. - DUCKDB_PATH="${DUCKDB_PATH:-/data/materia/lakehouse.duckdb}" \ - SERVING_DUCKDB_PATH="${SERVING_DUCKDB_PATH:-/data/materia/analytics.duckdb}" \ - uv run materia pipeline run export_serving - - ) || { - # Notify on failure if webhook is configured, then sleep to avoid busy-loop - if [ -n "${ALERT_WEBHOOK_URL:-}" ]; then - curl -s -d "Materia pipeline failed at $(date)" "$ALERT_WEBHOOK_URL" 2>/dev/null || true - fi - sleep 600 # Sleep 10 min on failure - } -done diff --git a/infra/supervisor/workflows.toml b/infra/supervisor/workflows.toml new file mode 100644 index 0000000..991df26 --- /dev/null +++ b/infra/supervisor/workflows.toml @@ -0,0 +1,34 @@ +# Workflow registry — the supervisor reads this file on every tick. +# To add a new extractor: add a [section] here and create the Python module. +# +# Fields: +# module — Python module path (must expose an entry function) +# entry — function name in the module (default: "main") +# schedule — named preset ("hourly", "daily", "weekly", "monthly") +# or raw cron expression (e.g. "0 6 * * 1-5") +# depends_on — optional: list of workflow names that must complete first + +[extract_psd] +module = "psdonline.execute" +entry = "extract_psd_dataset" +schedule = "daily" + +[extract_cot] +module = "cftc_cot.execute" +entry = "extract_cot_dataset" +schedule = "weekly" + +[extract_prices] +module = "coffee_prices.execute" +entry = "extract_coffee_prices" +schedule = "daily" + +[extract_ice] +module = "ice_stocks.execute" +entry = "extract_ice_all" +schedule = "daily" + +[extract_weather] +module = "openmeteo.execute" +entry = "extract_weather" +schedule = "daily" diff --git a/pyproject.toml b/pyproject.toml index 1467931..db45dc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "hcloud>=2.8.0", "prefect>=3.6.15", "msgspec>=0.19", + "croniter>=6.0.0", ] [project.scripts] diff --git a/src/materia/supervisor.py b/src/materia/supervisor.py new file mode 100644 index 0000000..734fe73 --- /dev/null +++ b/src/materia/supervisor.py @@ -0,0 +1,448 @@ +"""Materia Supervisor — schedule-aware pipeline orchestration. + +Reads a TOML workflow registry, runs extractors on cron-based schedules +(with dependency ordering and parallel execution), then runs SQLMesh +transform + export_serving. On production, polls for new git tags and +deploys the web app automatically when a new tag appears. + +Crash safety: the main loop catches all exceptions and backs off, matching +the TigerBeetle CFO supervisor pattern. Combined with systemd Restart=always, +the supervisor is effectively unkillable. + +Usage: + # Run the supervisor loop (production) + LANDING_DIR=data/landing uv run python src/materia/supervisor.py + + # Show workflow status + LANDING_DIR=data/landing uv run python src/materia/supervisor.py status +""" + +import importlib +import logging +import os +import subprocess +import sys +import time +import tomllib +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import UTC, datetime +from pathlib import Path + +from croniter import croniter + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +TICK_INTERVAL_SECONDS = 60 +BACKOFF_SECONDS = 600 # 10 min on tick failure +SUBPROCESS_TIMEOUT_SECONDS = 14400 # 4 hours max per subprocess +REPO_DIR = Path(os.getenv("REPO_DIR", "/opt/materia")) +LANDING_DIR = Path(os.getenv("LANDING_DIR", "data/landing")) +DUCKDB_PATH = os.getenv("DUCKDB_PATH", "data/lakehouse.duckdb") +SERVING_DUCKDB_PATH = os.getenv("SERVING_DUCKDB_PATH", "analytics.duckdb") +ALERT_WEBHOOK_URL = os.getenv("ALERT_WEBHOOK_URL", "") +WORKFLOWS_PATH = Path(os.getenv("WORKFLOWS_PATH", "infra/supervisor/workflows.toml")) + +NAMED_SCHEDULES = { + "hourly": "0 * * * *", + "daily": "0 5 * * *", + "weekly": "0 3 * * 1", + "monthly": "0 4 1 * *", +} + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[logging.StreamHandler(sys.stdout)], +) +logger = logging.getLogger("materia.supervisor") + + +# --------------------------------------------------------------------------- +# State DB helpers (reuse extraction state DB) +# --------------------------------------------------------------------------- + +def _open_state_db(): + """Open the extraction state DB at {LANDING_DIR}/.state.sqlite.""" + from extract_core.state import open_state_db + return open_state_db(LANDING_DIR) + + +def _get_last_success_time(conn, workflow_name: str) -> datetime | None: + """Return the finish time of the last successful run, or None.""" + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'success'", + (workflow_name,), + ).fetchone() + if not row or not row["t"]: + return None + return datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + + +# --------------------------------------------------------------------------- +# Workflow loading + scheduling +# --------------------------------------------------------------------------- + +def load_workflows(path: Path) -> list[dict]: + """Load workflow definitions from TOML file.""" + assert path.exists(), f"Workflows file not found: {path}" + with open(path, "rb") as f: + data = tomllib.load(f) + + workflows = [] + for name, cfg in data.items(): + assert "module" in cfg, f"Workflow '{name}' missing 'module'" + assert "schedule" in cfg, f"Workflow '{name}' missing 'schedule'" + workflows.append({ + "name": name, + "module": cfg["module"], + "entry": cfg.get("entry", "main"), + "schedule": cfg["schedule"], + "depends_on": cfg.get("depends_on", []), + }) + return workflows + + +def resolve_schedule(schedule: str) -> str: + """Resolve a named schedule to a cron expression, or pass through raw cron.""" + return NAMED_SCHEDULES.get(schedule, schedule) + + +def is_due(conn, workflow: dict) -> bool: + """Check if the most recent cron trigger hasn't been served yet.""" + cron_expr = resolve_schedule(workflow["schedule"]) + assert croniter.is_valid(cron_expr), f"Invalid cron: {cron_expr} for {workflow['name']}" + + last_success = _get_last_success_time(conn, workflow["name"]) + if last_success is None: + return True # never ran + + now_naive = datetime.now(UTC).replace(tzinfo=None) + prev_trigger = croniter(cron_expr, now_naive).get_prev(datetime).replace(tzinfo=UTC) + return last_success < prev_trigger + + +# --------------------------------------------------------------------------- +# Topological ordering +# --------------------------------------------------------------------------- + +def topological_waves(workflows: list[dict]) -> list[list[dict]]: + """Group workflows into dependency waves for parallel execution. + + Wave 0: no deps. Wave 1: depends only on wave 0. Etc. + """ + name_to_wf = {w["name"]: w for w in workflows} + due_names = set(name_to_wf.keys()) + + in_degree: dict[str, int] = {} + dependents: dict[str, list[str]] = defaultdict(list) + for w in workflows: + deps_in_scope = [d for d in w["depends_on"] if d in due_names] + in_degree[w["name"]] = len(deps_in_scope) + for d in deps_in_scope: + dependents[d].append(w["name"]) + + waves = [] + remaining = set(due_names) + max_iterations = len(workflows) + 1 + + for _ in range(max_iterations): + if not remaining: + break + wave = [name_to_wf[n] for n in remaining if in_degree[n] == 0] + assert wave, f"Circular dependency detected among: {remaining}" + waves.append(wave) + for w in wave: + remaining.discard(w["name"]) + for dep in dependents[w["name"]]: + if dep in remaining: + in_degree[dep] -= 1 + + return waves + + +# --------------------------------------------------------------------------- +# Workflow execution +# --------------------------------------------------------------------------- + +def run_workflow(conn, workflow: dict) -> None: + """Run a single workflow by importing its module and calling the entry function.""" + module_name = workflow["module"] + entry_name = workflow["entry"] + + logger.info("Running workflow: %s (%s.%s)", workflow["name"], module_name, entry_name) + + try: + module = importlib.import_module(module_name) + entry_fn = getattr(module, entry_name) + entry_fn() + logger.info("Workflow %s completed successfully", workflow["name"]) + except Exception: + logger.exception("Workflow %s failed", workflow["name"]) + send_alert(f"Workflow '{workflow['name']}' failed") + raise + + +def run_due_workflows(conn, workflows: list[dict]) -> bool: + """Run all due workflows in dependency-wave order. Returns True if any ran.""" + due = [w for w in workflows if is_due(conn, w)] + if not due: + logger.info("No workflows due") + return False + + logger.info("Due workflows: %s", [w["name"] for w in due]) + waves = topological_waves(due) + + for i, wave in enumerate(waves): + wave_names = [w["name"] for w in wave] + logger.info("Wave %d: %s", i, wave_names) + + if len(wave) == 1: + try: + run_workflow(conn, wave[0]) + except Exception: + pass # already logged in run_workflow + else: + with ThreadPoolExecutor(max_workers=len(wave)) as pool: + futures = {pool.submit(run_workflow, conn, w): w for w in wave} + for future in as_completed(futures): + try: + future.result() + except Exception: + pass # already logged in run_workflow + + return True + + +# --------------------------------------------------------------------------- +# Transform + Export + Deploy +# --------------------------------------------------------------------------- + +def run_shell(cmd: str, timeout_seconds: int = SUBPROCESS_TIMEOUT_SECONDS) -> bool: + """Run a shell command. Returns True on success.""" + logger.info("Shell: %s", cmd) + result = subprocess.run( + cmd, shell=True, capture_output=True, text=True, timeout=timeout_seconds + ) + if result.returncode != 0: + logger.error( + "Shell failed (rc=%d): %s\nstdout: %s\nstderr: %s", + result.returncode, cmd, result.stdout[-500:], result.stderr[-500:], + ) + return False + return True + + +def run_transform() -> None: + """Run SQLMesh — evaluates model staleness internally.""" + logger.info("Running SQLMesh transform") + ok = run_shell("uv run sqlmesh -p transform/sqlmesh_materia run") + if not ok: + send_alert("SQLMesh transform failed") + + +def run_export() -> None: + """Export serving tables to analytics.duckdb.""" + logger.info("Exporting serving tables") + ok = run_shell( + f"DUCKDB_PATH={DUCKDB_PATH} SERVING_DUCKDB_PATH={SERVING_DUCKDB_PATH} " + f"uv run materia pipeline run export_serving" + ) + if not ok: + send_alert("Serving export failed") + + +def web_code_changed() -> bool: + """Check if web app code changed since last deploy.""" + result = subprocess.run( + ["git", "diff", "--name-only", "HEAD~1", "HEAD", "--", "web/", "web/Dockerfile"], + capture_output=True, text=True, timeout=30, + ) + return bool(result.stdout.strip()) + + +def current_deployed_tag() -> str | None: + """Return the tag currently checked out, or None if not on a tag.""" + result = subprocess.run( + ["git", "describe", "--tags", "--exact-match", "HEAD"], + capture_output=True, text=True, timeout=10, + ) + return result.stdout.strip() or None + + +def latest_remote_tag() -> str | None: + """Fetch tags from origin and return the latest v tag.""" + subprocess.run( + ["git", "fetch", "--tags", "--prune-tags", "origin"], + capture_output=True, text=True, timeout=30, + ) + result = subprocess.run( + ["git", "tag", "--list", "--sort=-version:refname", "v*"], + capture_output=True, text=True, timeout=10, + ) + tags = result.stdout.strip().splitlines() + return tags[0] if tags else None + + +def git_pull_and_sync() -> None: + """Checkout the latest passing release tag and sync dependencies. + + A tag v is created by CI only after tests pass, so presence of a new + tag implies green CI. Skips if already on the latest tag. + """ + latest = latest_remote_tag() + if not latest: + logger.info("No release tags found — skipping pull") + return + + current = current_deployed_tag() + if current == latest: + logger.info("Already on latest tag %s — skipping pull", latest) + return + + logger.info("New tag %s available (current: %s) — deploying", latest, current) + run_shell(f"git checkout --detach {latest}") + run_shell("uv sync --all-packages") + + +# --------------------------------------------------------------------------- +# Alerting +# --------------------------------------------------------------------------- + +def send_alert(message: str) -> None: + """Send alert via ntfy.sh (or any webhook accepting POST body).""" + if not ALERT_WEBHOOK_URL: + return + timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + try: + subprocess.run( + ["curl", "-s", "-d", f"[{timestamp}] {message}", ALERT_WEBHOOK_URL], + timeout=10, capture_output=True, + ) + except Exception: + logger.exception("Failed to send alert") + + +# --------------------------------------------------------------------------- +# Main loop +# --------------------------------------------------------------------------- + +def tick() -> None: + """One cycle: git pull, run due extractors, transform, export, maybe deploy.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + try: + # Git pull + sync (production only — SUPERVISOR_GIT_PULL env var set in systemd service) + if os.getenv("SUPERVISOR_GIT_PULL"): + git_pull_and_sync() + + # Run due extractors + run_due_workflows(conn, workflows) + + # SQLMesh always runs (evaluates model staleness internally) + run_transform() + + # Export serving tables to analytics.duckdb + run_export() + + # Deploy web app if code changed + if os.getenv("SUPERVISOR_GIT_PULL") and web_code_changed(): + logger.info("Web code changed — deploying") + ok = run_shell("./web/deploy.sh") + if ok: + send_alert("Deploy succeeded") + else: + send_alert("Deploy FAILED — check journalctl -u materia-supervisor") + finally: + conn.close() + + +def supervisor_loop() -> None: + """Infinite supervisor loop — never exits unless killed.""" + logger.info("Supervisor starting (tick interval: %ds)", TICK_INTERVAL_SECONDS) + logger.info("Workflows: %s", WORKFLOWS_PATH) + logger.info("Landing dir: %s", LANDING_DIR) + + while True: + try: + tick() + except KeyboardInterrupt: + logger.info("Supervisor stopped (KeyboardInterrupt)") + break + except Exception: + logger.exception("Supervisor tick failed — backing off %ds", BACKOFF_SECONDS) + send_alert("Supervisor tick failed") + time.sleep(BACKOFF_SECONDS) + else: + time.sleep(TICK_INTERVAL_SECONDS) + + +# --------------------------------------------------------------------------- +# Status CLI +# --------------------------------------------------------------------------- + +def print_status() -> None: + """Print workflow status table.""" + workflows = load_workflows(WORKFLOWS_PATH) + conn = _open_state_db() + + now = datetime.now(UTC) + + print(f"{'Workflow':<28} {'Schedule':<18} {'Last Run':<20} {'Status':<8} {'Next'}") + print(f"{'─' * 28} {'─' * 18} {'─' * 20} {'─' * 8} {'─' * 12}") + + for w in workflows: + last_success = _get_last_success_time(conn, w["name"]) + cron_expr = resolve_schedule(w["schedule"]) + + if last_success: + last_str = last_success.strftime("%Y-%m-%d %H:%M") + status = "ok" + else: + last_str = "never" + status = "pending" + + row = conn.execute( + "SELECT MAX(finished_at) AS t FROM extraction_runs " + "WHERE extractor = ? AND status = 'failed'", + (w["name"],), + ).fetchone() + if row and row["t"]: + last_fail = datetime.fromisoformat(row["t"]).replace(tzinfo=UTC) + if last_success is None or last_fail > last_success: + status = "FAILED" + + now_naive = now.replace(tzinfo=None) + next_trigger = croniter(cron_expr, now_naive).get_next(datetime) + delta = next_trigger - now_naive + if delta.total_seconds() < 3600: + next_str = f"in {int(delta.total_seconds() / 60)}m" + elif delta.total_seconds() < 86400: + next_str = next_trigger.strftime("%H:%M") + else: + next_str = next_trigger.strftime("%b %d") + + schedule_display = w["schedule"] if w["schedule"] in NAMED_SCHEDULES else cron_expr + print(f"{w['name']:<28} {schedule_display:<18} {last_str:<20} {status:<8} {next_str}") + + conn.close() + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + if len(sys.argv) > 1 and sys.argv[1] == "status": + print_status() + else: + supervisor_loop() + + +if __name__ == "__main__": + main() diff --git a/uv.lock b/uv.lock index 6baaff9..d6f5025 100644 --- a/uv.lock +++ b/uv.lock @@ -14,7 +14,7 @@ members = [ "extract-core", "ice-stocks", "materia", - "openweathermap", + "openmeteo", "psdonline", "sqlmesh-materia", ] @@ -1566,6 +1566,7 @@ name = "materia" version = "0.1.0" source = { editable = "." } dependencies = [ + { name = "croniter" }, { name = "hcloud" }, { name = "msgspec" }, { name = "niquests" }, @@ -1593,6 +1594,7 @@ exploration = [ [package.metadata] requires-dist = [ + { name = "croniter", specifier = ">=6.0.0" }, { name = "hcloud", specifier = ">=2.8.0" }, { name = "msgspec", specifier = ">=0.19" }, { name = "niquests", specifier = ">=3.15.2" }, @@ -1766,6 +1768,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1", size = 160065, upload-time = "2025-06-19T22:48:06.508Z" }, ] +[[package]] +name = "openmeteo" +version = "0.1.0" +source = { editable = "extract/openmeteo" } +dependencies = [ + { name = "extract-core" }, + { name = "niquests" }, +] + +[package.metadata] +requires-dist = [ + { name = "extract-core", editable = "extract/extract_core" }, + { name = "niquests", specifier = ">=3.14.1" }, +] + [[package]] name = "opentelemetry-api" version = "1.39.1" @@ -1779,21 +1796,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cf/df/d3f1ddf4bb4cb50ed9b1139cc7b1c54c34a1e7ce8fd1b9a37c0d1551a6bd/opentelemetry_api-1.39.1-py3-none-any.whl", hash = "sha256:2edd8463432a7f8443edce90972169b195e7d6a05500cd29e6d13898187c9950", size = 66356, upload-time = "2025-12-11T13:32:17.304Z" }, ] -[[package]] -name = "openweathermap" -version = "0.1.0" -source = { editable = "extract/openweathermap" } -dependencies = [ - { name = "extract-core" }, - { name = "niquests" }, -] - -[package.metadata] -requires-dist = [ - { name = "extract-core", editable = "extract/extract_core" }, - { name = "niquests", specifier = ">=3.14.1" }, -] - [[package]] name = "orjson" version = "3.11.7"