diff --git a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js index 3fcff0eb5d2..fcf20c7b974 100644 --- a/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js +++ b/src/workerd/api/node/tests/zlib-zstd-nodejs-test.js @@ -3,6 +3,7 @@ // https://opensource.org/licenses/Apache-2.0 import assert from 'node:assert'; import { Buffer } from 'node:buffer'; +import { randomBytes } from 'node:crypto'; import zlib from 'node:zlib'; // Basic sync compress/decompress test @@ -99,6 +100,20 @@ export const zstdLargeDataTest = { const decompressed = zlib.zstdDecompressSync(compressed); assert(input.equals(decompressed), 'Large data round-trip should match'); + + // workerd routes randomBytes() through getRandomValues(), which is capped at 64KiB per call. + const randomInput = Buffer.concat([ + randomBytes(64 * 1024), + randomBytes(36 * 1024), + ]); + const randomCompressed = zlib.zstdCompressSync(randomInput); + assert(Buffer.isBuffer(randomCompressed), 'Random compressed output should be a buffer'); + + const randomDecompressed = zlib.zstdDecompressSync(randomCompressed); + assert( + randomInput.equals(randomDecompressed), + 'Large random data round-trip should match' + ); }, }; diff --git a/src/workerd/api/node/zlib-util.c++ b/src/workerd/api/node/zlib-util.c++ index 26316394b00..ceac1efb901 100644 --- a/src/workerd/api/node/zlib-util.c++ +++ b/src/workerd/api/node/zlib-util.c++ @@ -902,11 +902,6 @@ kj::Maybe ZstdEncoderContext::getError() const { kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1); } - if (flush_ == ZSTD_e_end && lastResult != 0) { - // lastResult > 0 means more output is needed, which shouldn't happen at end - return CompressionError("Unexpected end of file"_kj, "Z_BUF_ERROR"_kj, Z_BUF_ERROR); - } - return kj::none; } @@ -915,6 +910,10 @@ bool ZstdEncoderContext::isStreamEnd() const { return !ZSTD_isError(lastResult) && lastResult == 0; } +bool ZstdEncoderContext::hasPendingOutput() const { + return flush_ == ZSTD_e_end && !ZSTD_isError(lastResult) && lastResult != 0; +} + ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode) : ZstdContext(_mode), dctx_(kj::disposeWith(ZSTD_createDCtx())) {} @@ -1088,7 +1087,7 @@ static kj::Array syncProcessBuffer(Context& ctx, GrowableBuffer& resul JSG_REQUIRE(ctx.isStreamEnd(), RangeError, "Memory limit exceeded"); break; } - } while (ctx.getAvailOut() == 0); + } while (ctx.getAvailOut() == 0 || ctx.hasPendingInput() || ctx.hasPendingOutput()); return result.releaseAsArray(); } diff --git a/src/workerd/api/node/zlib-util.h b/src/workerd/api/node/zlib-util.h index 35121558a60..02275ebee7f 100644 --- a/src/workerd/api/node/zlib-util.h +++ b/src/workerd/api/node/zlib-util.h @@ -155,6 +155,14 @@ class ZlibContext final { return err == Z_STREAM_END; } + bool hasPendingOutput() const { + return false; + } + + bool hasPendingInput() const { + return false; + } + uint getAvailIn() const { return stream.avail_in; }; @@ -237,6 +245,9 @@ class BrotliContext { void setBuffers(kj::ArrayPtr input, kj::ArrayPtr output); void setInputBuffer(kj::ArrayPtr input); void setOutputBuffer(kj::ArrayPtr output); + kj::uint getAvailIn() const { + return availIn; + } void setFlush(int flush); kj::uint getAvailOut() const; void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const; @@ -291,6 +302,12 @@ class BrotliEncoderContext final: public BrotliContext { kj::Maybe setParams(int key, uint32_t value); kj::Maybe getError() const; bool isStreamEnd() const; + bool hasPendingOutput() const { + return false; + } + bool hasPendingInput() const { + return false; + } private: bool lastResult = false; @@ -313,6 +330,12 @@ class BrotliDecoderContext final: public BrotliContext { kj::Maybe setParams(int key, uint32_t value); kj::Maybe getError() const; bool isStreamEnd() const; + bool hasPendingOutput() const { + return false; + } + bool hasPendingInput() const { + return false; + } private: BrotliDecoderResult lastResult = BROTLI_DECODER_RESULT_SUCCESS; @@ -329,6 +352,12 @@ class ZstdContext { void setBuffers(kj::ArrayPtr input, kj::ArrayPtr output); void setInputBuffer(kj::ArrayPtr input); void setOutputBuffer(kj::ArrayPtr output); + kj::uint getAvailIn() const { + return input_.size - input_.pos; + } + bool hasPendingInput() const { + return false; + } void setFlush(int flush); kj::uint getAvailOut() const; void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const; @@ -370,6 +399,10 @@ class ZstdEncoderContext final: public ZstdContext { kj::Maybe setParams(int key, int value); kj::Maybe getError() const; bool isStreamEnd() const; + bool hasPendingOutput() const; + bool hasPendingInput() const { + return getAvailIn() != 0; + } private: size_t lastResult = 0; @@ -389,6 +422,9 @@ class ZstdDecoderContext final: public ZstdContext { kj::Maybe setParams(int key, int value); kj::Maybe getError() const; bool isStreamEnd() const; + bool hasPendingOutput() const { + return false; + } private: size_t lastResult = 0;