Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/workerd/api/node/tests/zlib-zstd-nodejs-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/Apache-2.0
import assert from 'node:assert';
import { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import zlib from 'node:zlib';

// Basic sync compress/decompress test
Expand Down Expand Up @@ -99,6 +100,20 @@ export const zstdLargeDataTest = {

const decompressed = zlib.zstdDecompressSync(compressed);
assert(input.equals(decompressed), 'Large data round-trip should match');

// workerd routes randomBytes() through getRandomValues(), which is capped at 64KiB per call.
const randomInput = Buffer.concat([
randomBytes(64 * 1024),
randomBytes(36 * 1024),
]);
const randomCompressed = zlib.zstdCompressSync(randomInput);
assert(Buffer.isBuffer(randomCompressed), 'Random compressed output should be a buffer');

const randomDecompressed = zlib.zstdDecompressSync(randomCompressed);
assert(
randomInput.equals(randomDecompressed),
'Large random data round-trip should match'
);
},
};

Expand Down
11 changes: 5 additions & 6 deletions src/workerd/api/node/zlib-util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -902,11 +902,6 @@ kj::Maybe<CompressionError> ZstdEncoderContext::getError() const {
kj::str("ERR_ZSTD_COMPRESSION_FAILED"), -1);
}

if (flush_ == ZSTD_e_end && lastResult != 0) {
// lastResult > 0 means more output is needed, which shouldn't happen at end
return CompressionError("Unexpected end of file"_kj, "Z_BUF_ERROR"_kj, Z_BUF_ERROR);
}

return kj::none;
}

Expand All @@ -915,6 +910,10 @@ bool ZstdEncoderContext::isStreamEnd() const {
return !ZSTD_isError(lastResult) && lastResult == 0;
}

bool ZstdEncoderContext::hasPendingOutput() const {
return flush_ == ZSTD_e_end && !ZSTD_isError(lastResult) && lastResult != 0;
}

ZstdDecoderContext::ZstdDecoderContext(ZlibMode _mode)
: ZstdContext(_mode),
dctx_(kj::disposeWith<zstdFreeDCtx>(ZSTD_createDCtx())) {}
Expand Down Expand Up @@ -1088,7 +1087,7 @@ static kj::Array<kj::byte> syncProcessBuffer(Context& ctx, GrowableBuffer& resul
JSG_REQUIRE(ctx.isStreamEnd(), RangeError, "Memory limit exceeded");
break;
}
} while (ctx.getAvailOut() == 0);
} while (ctx.getAvailOut() == 0 || ctx.hasPendingInput() || ctx.hasPendingOutput());

return result.releaseAsArray();
}
Expand Down
36 changes: 36 additions & 0 deletions src/workerd/api/node/zlib-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ class ZlibContext final {
return err == Z_STREAM_END;
}

bool hasPendingOutput() const {
return false;
}

bool hasPendingInput() const {
return false;
}

uint getAvailIn() const {
return stream.avail_in;
};
Expand Down Expand Up @@ -237,6 +245,9 @@ class BrotliContext {
void setBuffers(kj::ArrayPtr<kj::byte> input, kj::ArrayPtr<kj::byte> output);
void setInputBuffer(kj::ArrayPtr<const kj::byte> input);
void setOutputBuffer(kj::ArrayPtr<kj::byte> output);
kj::uint getAvailIn() const {
return availIn;
}
void setFlush(int flush);
kj::uint getAvailOut() const;
void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const;
Expand Down Expand Up @@ -291,6 +302,12 @@ class BrotliEncoderContext final: public BrotliContext {
kj::Maybe<CompressionError> setParams(int key, uint32_t value);
kj::Maybe<CompressionError> getError() const;
bool isStreamEnd() const;
bool hasPendingOutput() const {
return false;
}
bool hasPendingInput() const {
return false;
}

private:
bool lastResult = false;
Expand All @@ -313,6 +330,12 @@ class BrotliDecoderContext final: public BrotliContext {
kj::Maybe<CompressionError> setParams(int key, uint32_t value);
kj::Maybe<CompressionError> getError() const;
bool isStreamEnd() const;
bool hasPendingOutput() const {
return false;
}
bool hasPendingInput() const {
return false;
}

private:
BrotliDecoderResult lastResult = BROTLI_DECODER_RESULT_SUCCESS;
Expand All @@ -329,6 +352,12 @@ class ZstdContext {
void setBuffers(kj::ArrayPtr<kj::byte> input, kj::ArrayPtr<kj::byte> output);
void setInputBuffer(kj::ArrayPtr<const kj::byte> input);
void setOutputBuffer(kj::ArrayPtr<kj::byte> output);
kj::uint getAvailIn() const {
return input_.size - input_.pos;
}
bool hasPendingInput() const {
return false;
}
void setFlush(int flush);
kj::uint getAvailOut() const;
void getAfterWriteResult(uint32_t* availIn, uint32_t* availOut) const;
Expand Down Expand Up @@ -370,6 +399,10 @@ class ZstdEncoderContext final: public ZstdContext {
kj::Maybe<CompressionError> setParams(int key, int value);
kj::Maybe<CompressionError> getError() const;
bool isStreamEnd() const;
bool hasPendingOutput() const;
bool hasPendingInput() const {
return getAvailIn() != 0;
}

private:
size_t lastResult = 0;
Expand All @@ -389,6 +422,9 @@ class ZstdDecoderContext final: public ZstdContext {
kj::Maybe<CompressionError> setParams(int key, int value);
kj::Maybe<CompressionError> getError() const;
bool isStreamEnd() const;
bool hasPendingOutput() const {
return false;
}

private:
size_t lastResult = 0;
Expand Down
Loading