From 18e78816c39bfc209e0f305db47338e27215b498 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 17 May 2021 16:08:26 +0200 Subject: [PATCH] stream: reuse `maxBytesPerChunk` buffer for inflate / gunzip (#30239) Before, for every round of `parse` a new buffer was allocated that was then copied again by `ByteString.fromArray`. This effectively more than doubled the allocation rate while inflating. For bulk data like expected for compressed data this can make a big difference in throughput. The slight downside of keeping the buffer is that the stage now uses more memory by default even while idle. deflate/gzip's window is 64kb which happens to be also the default `maxBytesPerChunk` setting. It is therefore expected that the additional buffer will less than double the existing memory footprint while dividing the allocation rate by more than two which seems like a good trade-off. --- .../impl/io/compression/DeflateDecompressorBase.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala index 98d05f5b83..2091b72252 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala @@ -21,12 +21,19 @@ import akka.util.ByteString def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit def inflating: Inflate + /** + * Pre-allocated buffer to read from inflater. ByteString.fromArray below + * will always create a copy of the read data. Keeping this fixed + * buffer around avoids reallocating a buffer that may be too big in many + * cases for every call of `parse`. + */ + private[this] val buffer = new Array[Byte](maxBytesPerChunk) + abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] { override def canWorkWithPartialData = true override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = { inflater.setInput(reader.remainingData.toArray) - val buffer = new Array[Byte](maxBytesPerChunk) val read = inflater.inflate(buffer) reader.skip(reader.remainingSize - inflater.getRemaining)