stream: reuse maxBytesPerChunk buffer for inflate / gunzip (#30239)

Before, for every round of `parse` a new buffer was allocated that was then
copied again by `ByteString.fromArray`. This effectively more than doubled
the allocation rate while inflating. For bulk data like expected for
compressed data this can make a big difference in throughput.

The slight downside of keeping the buffer is that the stage now uses more memory
by default even while idle. deflate/gzip's window is 64kb which happens to be also the
default `maxBytesPerChunk` setting. It is therefore expected that the additional buffer will
less than double the existing memory footprint while dividing the allocation rate
by more than two which seems like a good trade-off.
This commit is contained in:
Johannes Rudolph 2021-05-17 16:08:26 +02:00 committed by GitHub
parent 2dde4b6b51
commit 18e78816c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -21,12 +21,19 @@ import akka.util.ByteString
def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit
def inflating: Inflate
/**
* Pre-allocated buffer to read from inflater. ByteString.fromArray below
* will always create a copy of the read data. Keeping this fixed
* buffer around avoids reallocating a buffer that may be too big in many
* cases for every call of `parse`.
*/
private[this] val buffer = new Array[Byte](maxBytesPerChunk)
abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] {
override def canWorkWithPartialData = true
override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = {
inflater.setInput(reader.remainingData.toArray)
val buffer = new Array[Byte](maxBytesPerChunk)
val read = inflater.inflate(buffer)
reader.skip(reader.remainingSize - inflater.getRemaining)