feat(ilp): add gzip support to ILP/HTTP server#6165
Conversation
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughAdds gzip decompression support for Line HTTP ingestion: new native inflater init, Zip JNI method, gzip-aware processing and lifecycle in LineHttpProcessor/State, adaptive buffer accessor, logging tweak, and expanded gzip-focused tests and compatibility test renames. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant C as Client
participant HP as LineHttpProcessorImpl
participant ST as LineHttpProcessorState
participant Z as Zip (JNI)
rect #F2F8FF
C->>HP: Send HTTP headers
HP->>HP: Detect Content-Encoding: gzip?
alt gzip
HP->>Z: inflateInitGzip()
alt init OK
HP->>ST: setInflateStream(stream)
else init error
HP-->>C: 415 Encoding Not Supported
end
end
end
loop for each chunk
C->>HP: onChunk(lo,hi)
alt gzip
HP->>ST: inflateAndParse(lo,hi)
ST->>Z: inflate(stream, input)
ST->>ST: process decompressed bytes
alt inflate error
ST-->>C: 415 Encoding Not Supported
end
else plain
HP->>ST: parse(lo,hi)
end
end
rect #F5FFF5
C->>HP: onRequestComplete
HP->>ST: cleanupGzip()
HP-->>C: Complete (e.g., 204)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@nwoolmer have you considered alternatives like Snappy or zstd? Both should be a few times faster in compression/decompression while providing decent compression ratio. |
Indeed there are better options from a compression ratio and speed perspective. However, there is a non-functional requirement which is that it integrates with other tools including Telegraf. Telegraf output can be configured for gzip, but I am not familiar whether or not it supports other encodings. Also, we have |
We should have zstd and lz4 as a dependency in Rust code already. As for Telegraf, that's a fair argument, but we could support zlib along with faster alternatives - our clients would then use something more efficient while zlib will be there for 3rd-party compat. |
For now, this is server-only change to fulfill an internal requirement. Let's follow up with more improvements when we roll out the next client-wide changes. |
|
GitHub Actions - Rebuild Native Libraries seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorState.java
Outdated
Show resolved
Hide resolved
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorImpl.java
Outdated
Show resolved
Hide resolved
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/src/main/c/share/zip.c (1)
47-60: Add null-pointer guards in native Zip methods.In core/src/main/c/share/zip.c, for each JNI entry (
Java_io_questdb_std_Zip_setInput,Java_io_questdb_std_Zip_deflate, and similarly in inflate, deflateEnd, availOut, getAvailOut), add at the top:if (ptr == 0) { jclass npe = (*e)->FindClass(e, "java/lang/NullPointerException"); (*e)->ThrowNew(e, npe, "Null z_stream pointer"); return /* or appropriate jint 0 */; }to fail fast instead of dereferencing a null pointer.
♻️ Duplicate comments (1)
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorState.java (1)
185-217: Fix gzip inflate handling when the buffer fills up.
Zip.inflatecan returnZ_BUF_ERRORafter writing bytes once the receive buffer runs out of space. BecausebufPosis only advanced after the negative-return check, this branch always falls through toMESSAGE_TOO_LARGE, dropping valid gzip payloads while the parser never sees the bytes that were already produced. Move thebufPosupdate ahead of the status check (computeproduced = len - Zip.getAvailOut(...)), process that chunk, and only reject after trying to compact/grow the buffer—mirroring the uncompressed path. This is the same issue I raised earlier; it still needs to be fixed.- int ret = Zip.inflate(inflateStream, p, len, false); - - if (ret < 0) { - if (ret != Zip.Z_BUF_ERROR) { + int status = Zip.inflate(inflateStream, p, len, false); + int produced = len - Zip.getAvailOut(inflateStream); + if (produced > 0) { + recvBuffer.setBufPos(p + produced); + } + + if (status < 0) { + if (status != Zip.Z_BUF_ERROR) { reject(ENCODING_NOT_SUPPORTED, "gzip decompression error", fd); cleanupGzip(); return; } - if (recvBuffer.getBufPos() > pp) { - // when inflate fails with Z_BUF_ERROR - this means either - // have not processed buffer yet, or it is too small. Trigger buffer processing - // to make space in the buffer - currentStatus = processLocalBuffer(); - continue; - } - - reject(MESSAGE_TOO_LARGE, "server buffer is too small", fd); - cleanupGzip(); - return; + if (produced > 0) { + currentStatus = processLocalBuffer(); + pp = recvBuffer.getBufPos(); + continue; + } + if (!recvBuffer.tryCompactOrGrowBuffer()) { + reject(MESSAGE_TOO_LARGE, "server buffer is too small", fd); + cleanupGzip(); + return; + } + continue; } - recvBuffer.setBufPos(p + (len - Zip.getAvailOut(inflateStream))); - - if (ret == Zip.Z_STREAM_END) { - currentStatus = processLocalBuffer(); - cleanupGzip(); - break; - } + if (produced > 0) { + currentStatus = processLocalBuffer(); + if (stopParse()) { + return; + } + pp = recvBuffer.getBufPos(); + } + + if (status == Zip.Z_STREAM_END) { + cleanupGzip(); + break; + }
🧹 Nitpick comments (4)
core/src/main/c/share/zip.c (2)
136-140: Duplicate API: getAvailOut duplicates availOut.Zip_availOut(ptr) already exposes avail_out. Consider reusing it (or aliasing on Java) to keep a single native symbol and avoid drift.
Option A: remove getAvailOut and use availOut on Java.
Option B: keep both but document one as preferred to avoid confusion.
90-107: Optional: minor JNI hygiene.
- Return -1 on OOM is fine but collides with Z_ERRNO; consider a distinct sentinel like INT_MIN for clearer differentiation on Java side.
- Consider factoring stream allocation into a tiny static helper to reduce repetition in inflateInit/inflateInitGzip.
Also applies to: 128-134
compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java (2)
225-235: Add negative-path gzip tests.Consider adding:
- Corrupted gzip body → 400 with parse error.
- Mismatched header (Content-Encoding: gzip, uncompressed body) → 400/415.
- “Gzip bomb” expansion exceeding line.http.max.recv.buffer.size → request too large.
I can draft these tests with minimal scaffolding if helpful.
776-785: Avoid potential barrier hangs: add timeout.Use a bounded await to fail fast on thread errors and avoid indefinite waits.
- startBarrier.await(); + startBarrier.await(60, java.util.concurrent.TimeUnit.SECONDS);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdb.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdb.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/linux-aarch64/libquestdb.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/linux-x86-64/libquestdb.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/windows-x86-64/libquestdb.dllis excluded by!**/*.dll
📒 Files selected for processing (7)
compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java(4 hunks)core/src/main/c/share/zip.c(1 hunks)core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorImpl.java(4 hunks)core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorState.java(8 hunks)core/src/main/java/io/questdb/cutlass/line/tcp/LineWalAppender.java(1 hunks)core/src/main/java/io/questdb/std/Zip.java(2 hunks)core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java(13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorImpl.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: New pull request (Check Changes Check changes)
- GitHub Check: build
- GitHub Check: build
- GitHub Check: aarch64
🔇 Additional comments (6)
compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java (4)
215-218: Good coverage: gzip happy-path.New gzip test reuses the helper and mirrors the uncompressed case. LGTM.
221-224: Helper refactor for parallel case looks good.Switching to the helper with per-thread gzip control simplifies reuse.
702-710: Per-call gzip enable is fine.Enabling gzip per InfluxDB client instance is clear and isolated. LGTM.
749-764: Helper encapsulation reads well.Single-table gzip vs plain path validated identically. LGTM.
core/src/main/java/io/questdb/cutlass/line/tcp/LineWalAppender.java (1)
475-476: LGTM: cleaner throwable logging.Passing the Throwable directly is correct and idiomatic.
core/src/main/c/share/zip.c (1)
133-134: Confirm inflate flush semantics: Only one call-site invokes Zip.inflate with flush=false (LineHttpProcessorState.java:187). No flush=true usage detected; ensure any future flush=true calls are confined to the final chunk to prevent premature Z_BUF_ERROR/Z_STREAM_END.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
compat/src/test/java/io/questdb/compat/InfluxDBClientFailureTest.java (1)
298-306: This will likely OOM or stall CI: avoid building a 10M-String list.10,000,000 String objects + backing char arrays will consume multiple GB and can time out. Stream or batch instead.
Apply a bounded‑memory batching approach:
- for (int i = 0; i < 10_000_000; i++) { - points.add("m1,tag1=value1 f1=1i,x=12i"); - } - InfluxDBUtils.assertRequestOk(influxDB, points, "m1,tag1=value1 f1=1i,x=12i"); + final String line = "m1,tag1=value1 f1=1i,x=12i\n"; + // ~100k lines total in 100 chunks to exercise gzip with large payloads without exhausting heap + final int batches = 100, perBatch = 1_000; + for (int b = 0; b < batches; b++) { + StringBuilder sb = new StringBuilder(line.length() * perBatch); + for (int i = 0; i < perBatch; i++) sb.append(line); + points.add(sb.toString()); + } + InfluxDBUtils.assertRequestOk(influxDB, points, line);Optionally gate a truly huge test behind a system property and mark it nightly.
♻️ Duplicate comments (2)
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorState.java (2)
178-228: Decompress more before parsing to reduce parser fragmentation.Inflate until either stream end or the local buffer can’t accept more bytes, then call
processLocalBuffer()once. This reduces parse calls and improves throughput.
178-228: Handle Z_BUF_ERROR by compacting/growing buffer before rejecting.Currently, if the output buffer is full and
inflatereturnsZ_BUF_ERRORwithnewBytes == 0, the code immediately returns 413. This breaks valid requests whose decompressed size fits withinline.http.max.recv.buffer.size. Mirror the uncompressed path: tryrecvBuffer.tryCompactOrGrowBuffer()and retry before failing.- if (ret < 0) { - if (ret != Zip.Z_BUF_ERROR) { - reject(ENCODING_NOT_SUPPORTED, "gzip decompression error", fd); - cleanupGzip(); - return; - } - - // inflate can return Z_BUF_ERROR after writing bytes once the recv buffer runs out of space - if (newBytes > 0) { - currentStatus = processLocalBuffer(); - pp = recvBuffer.getBufPos(); - continue; - } - - reject(MESSAGE_TOO_LARGE, "server buffer is too small", fd); - cleanupGzip(); - return; - } + if (ret == Zip.Z_BUF_ERROR) { + // Output buffer exhausted. + if (newBytes > 0) { + currentStatus = processLocalBuffer(); + pp = recvBuffer.getBufPos(); + if (stopParse()) { + cleanupGzip(); + return; + } + continue; + } + if (recvBuffer.tryCompactOrGrowBuffer()) { + // Retry inflate with more space. + continue; + } + reject(MESSAGE_TOO_LARGE, "transaction is too large, either flush more frequently or increase buffer size \"line.http.max.recv.buffer.size\"", fd); + cleanupGzip(); + return; + } else if (ret < 0) { + // Other zlib errors + reject(ENCODING_NOT_SUPPORTED, "gzip decompression error", fd); + cleanupGzip(); + return; + }
🧹 Nitpick comments (5)
compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java (2)
733-734: Use locale‑independent uppercasing to avoid surprises.Case transforms can differ by default locale (e.g., Turkish). Prefer ROOT.
Apply:
- String tableNameUpper = tableName.toUpperCase(); + String tableNameUpper = tableName.toUpperCase(java.util.Locale.ROOT);If you prefer an import:
import java.util.Locale; // ... String tableNameUpper = tableName.toUpperCase(Locale.ROOT);
787-836: Parallel helper is solid; barrier start avoids head‑of‑line bias.Minimal suggestion: name threads for easier log triage.
Example:
- threadList.add(new Thread(() -> { + threadList.add(new Thread(() -> { // ... - })); + }, "ilp-http-" + threadNo));core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java (2)
492-496: Close GZIPOutputStream with try‑with‑resources to guarantee trailer flush.Use TWR instead of manual finish(); it’s safer and clearer.
-ByteArrayOutputStream out = new ByteArrayOutputStream(); -GZIPOutputStream strm = new GZIPOutputStream(out); -strm.write(b); -strm.finish(); -byte[] outBytes = out.toByteArray(); +ByteArrayOutputStream out = new ByteArrayOutputStream(); +byte[] outBytes; +try (GZIPOutputStream strm = new GZIPOutputStream(out)) { + strm.write(b); +} +outBytes = out.toByteArray();
473-522: Add coverage for Content‑Encoding edge cases.Please add tests for:
- Case‑insensitive header (e.g., “GZip”).
- Multiple codings (e.g., “gzip, deflate” and “deflate, gzip”) → expect 415 until multi‑step decoding is supported.
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorState.java (1)
117-122: Optionally reset the gzip flag on cleanup to avoid accidental reuse.Not required, but harmless and clarifies lifecycle.
public void cleanupGzip() { if (inflateStream != 0) { Zip.inflateEnd(inflateStream); inflateStream = 0; + isGzipEncoded = false; } }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (5)
core/src/main/resources/io/questdb/bin/darwin-aarch64/libquestdb.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/darwin-x86-64/libquestdb.dylibis excluded by!**/*.dylibcore/src/main/resources/io/questdb/bin/linux-aarch64/libquestdb.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/linux-x86-64/libquestdb.sois excluded by!**/*.socore/src/main/resources/io/questdb/bin/windows-x86-64/libquestdb.dllis excluded by!**/*.dll
📒 Files selected for processing (7)
compat/src/test/java/io/questdb/compat/InfluxDBClientFailureTest.java(3 hunks)compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java(5 hunks)core/src/main/c/share/zip.c(1 hunks)core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorImpl.java(4 hunks)core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorState.java(9 hunks)core/src/main/java/io/questdb/std/Zip.java(2 hunks)core/src/test/java/io/questdb/test/cutlass/http/line/LineHttpFailureTest.java(13 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- core/src/main/java/io/questdb/std/Zip.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: New pull request (Check Changes Check changes)
- GitHub Check: build
- GitHub Check: aarch64
- GitHub Check: build
🔇 Additional comments (15)
compat/src/test/java/io/questdb/compat/InfluxDBClientFailureTest.java (2)
35-41: Good swap to explicit imports.Reduces accidental package bleed and speeds incremental builds.
271-286: Gzip enable/disable happy path looks good.Covers toggling on the same client; assertions are appropriate.
compat/src/test/java/io/questdb/compat/InfluxDBClientTest.java (11)
45-46: Import for CyclicBarrier is appropriate.Matches the new parallel helper usage.
210-213: Helper extraction improves reuse.Single flag parameter keeps gzip/no‑gzip flows unified.
215-218: Added gzip variant increases coverage.Mirrors the uncompressed path; good.
221-242: Nice coverage: gzip then raw on same connection.Validates mid‑connection toggle; txn=4 rationale is sound (2 writes × 2 calls).
If flakes appear under load, consider asserting the HTTP keep‑alive state via server metrics/logs to ensure reuse.
245-247: Parallel many tables (no gzip) reads well.Thread fan‑out + per‑table asserts are clear.
250-253: All‑gzip parallel variant OK.Complements raw test; good matrix coverage.
256-259: Mixed gzip/non‑gzip parallel variant OK.Exercises interleaving; good addition.
277-279: Per‑thread connection with shared helper is fine.No shared state in sendIlp; safe for concurrency.
314-316: Keep‑alive off + helper usage looks correct.Validates ingest without persistent connections.
765-785: Helper abstraction for gzip toggle is clean.Awaits 2 txns (two writes in sendIlp); assertions match; LGTM.
838-841: Functional interface for per‑thread gzip toggle is a good fit.Keeps call sites concise.
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorImpl.java (2)
97-101: LGTM: correct routing of chunks to gzip vs plain parser.Straightforward and clear.
199-200: LGTM: gzip resources are cleaned up at request end.Ensures native inflater is not leaked.
core/src/main/java/io/questdb/cutlass/http/processors/LineHttpProcessorImpl.java
Outdated
Show resolved
Hide resolved
[PR Coverage check]😍 pass : 49 / 61 (80.33%) file detail
|
ILP can be fairly inefficient with bandwidth. A 2x-3x increase in bandwidth would not be unusual for ILP streaming, due to its text protocol nature.
Upgrading the clients to a binary protocol is a WIP. In the meantime, this change allows the server to accept
gzipencoded ILP.In a local test, this will reduce overall throughput, due to the overhead of encoding and decoding the compressed buffer:
TSBS, cpu-only
568582 rows/sec295322 rows/sec1333824 rows/sec785233 rows/secHowever, in aggregate, this can be an advantage; for example, when receiving text-heavy data from multiple sources (logs, documents). The encoding cpu-overhead is masked on the client-side, and the reduction in bandwidth usage raises the ceiling for maximum throughput.
When we next roll out client changes, we can review this and see if we'd like to introduce alternative encodings that may perform better, for example,
zstd.