diff --git a/src/core/streaming.ts b/src/core/streaming.ts index 3ea5f21524..f5dabe55f9 100644 --- a/src/core/streaming.ts +++ b/src/core/streaming.ts @@ -250,12 +250,15 @@ export async function* _iterSSEMessages( } } +const DOUBLE_NEWLINE_DELIMITER_MAX_OVERLAP_BYTES = 3; + /** * Given an async iterable iterator, iterates over it and yields full * SSE chunks, i.e. yields when a double new-line is encountered. */ async function* iterSSEChunks(iterator: AsyncIterableIterator): AsyncGenerator { let data = new Uint8Array(); + let searchStartIndex = 0; for await (const chunk of iterator) { if (chunk == null) { @@ -273,10 +276,17 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator): AsyncGene data = newData; let patternIndex; - while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) { + while ((patternIndex = findDoubleNewlineIndex(data.subarray(searchStartIndex))) !== -1) { + patternIndex += searchStartIndex; yield data.slice(0, patternIndex); data = data.slice(patternIndex); + searchStartIndex = 0; } + + searchStartIndex = Math.max( + 0, + data.length - DOUBLE_NEWLINE_DELIMITER_MAX_OVERLAP_BYTES, + ); } if (data.length > 0) {