feat(extract): add compress_jsonl_atomic() utility

Streams a JSONL working file to .jsonl.gz in 1MB chunks (constant memory),
atomic rename via .tmp sibling, deletes source on success. Companion to
write_gzip_atomic() for extractors that stream records incrementally.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Deeman
2026-02-25 11:50:17 +01:00
parent e5960c08ff
commit 6bede60ef8

View File

@@ -174,3 +174,23 @@ def write_gzip_atomic(path: Path, data: bytes) -> int:
tmp.write_bytes(compressed) tmp.write_bytes(compressed)
tmp.rename(path) tmp.rename(path)
return len(compressed) return len(compressed)
def compress_jsonl_atomic(jsonl_path: Path, dest_path: Path) -> int:
"""Compress a JSONL working file to .jsonl.gz atomically, then delete the source.
Streams compression in 1MB chunks (constant memory regardless of file size).
Atomic via .tmp rename — readers never see a partial .jsonl.gz.
Deletes the uncompressed working file after successful compression.
Returns compressed bytes written.
"""
assert jsonl_path.exists(), f"source must exist: {jsonl_path}"
assert jsonl_path.stat().st_size > 0, f"source must not be empty: {jsonl_path}"
tmp = dest_path.with_suffix(dest_path.suffix + ".tmp")
with open(jsonl_path, "rb") as f_in, gzip.open(tmp, "wb") as f_out:
while chunk := f_in.read(1_048_576): # 1 MB chunks
f_out.write(chunk)
bytes_written = tmp.stat().st_size
tmp.rename(dest_path)
jsonl_path.unlink()
return bytes_written