Compression level for gzip flow (#23845)

Excluded binary compay check of Compressors as they are InternalApis
This commit is contained in:
Christopher Batey 2017-10-30 08:40:20 -05:00 committed by Konrad `ktoso` Malawski
parent a2cbff1061
commit 2ee51c602a
7 changed files with 91 additions and 11 deletions

View file

@ -158,7 +158,7 @@ abstract class CoderSpec(codecName: String) extends WordSpec with CodecSpecSuppo
lazy val corruptContent = {
val content = encode(largeText).toArray
content(14) = 26.toByte
content(14) = 36.toByte
ByteString(content)
}

View file

@ -0,0 +1,42 @@
/*
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.io.compression
import java.util.zip.{ Deflater, ZipException }
import akka.stream.impl.io.compression.{ Compressor, GzipCompressor }
import akka.stream.scaladsl.{ Compression, Flow }
import akka.util.ByteString
class GzipWithCustomCompressionLevelSpec extends GzipSpec {
import CompressionTestingTools._
override protected def newCompressor(): Compressor =
new GzipCompressor(Deflater.BEST_SPEED)
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
Compression.gzip(Deflater.BEST_SPEED)
override def extraTests(): Unit = {
"decode concatenated compressions" in {
ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!")
}
"throw an error on truncated input" in {
val ex = the[RuntimeException] thrownBy ourDecode(streamEncode(smallTextBytes).dropRight(5))
ex.ultimateCause.getMessage should equal("Truncated GZIP stream")
}
"throw an error if compressed data is just missing the trailer at the end" in {
def brokenCompress(payload: String) = newCompressor().compress(ByteString(payload, "UTF-8"))
val ex = the[RuntimeException] thrownBy ourDecode(brokenCompress("abcdefghijkl"))
ex.ultimateCause.getMessage should equal("Truncated GZIP stream")
}
"throw early if header is corrupt" in {
val cause = (the[RuntimeException] thrownBy ourDecode(ByteString(0, 1, 2, 3, 4))).ultimateCause
cause should (be(a[ZipException]) and have message "Not in GZIP format")
}
}
}

View file

@ -0,0 +1,2 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this")

View file

@ -11,10 +11,10 @@ import akka.util.{ ByteString, ByteStringBuilder }
import scala.annotation.tailrec
/** INTERNAL API */
@InternalApi private[akka] class DeflateCompressor extends Compressor {
@InternalApi private[akka] class DeflateCompressor(level: Int = Deflater.BEST_COMPRESSION, nowrap: Boolean = false) extends Compressor {
import DeflateCompressor._
protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false)
protected lazy val deflater = new Deflater(level, nowrap)
override final def compressAndFlush(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)

View file

@ -9,8 +9,8 @@ import akka.annotation.InternalApi
import akka.util.ByteString
/** INTERNAL API */
@InternalApi private[akka] class GzipCompressor extends DeflateCompressor {
override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true)
@InternalApi private[akka] class GzipCompressor(compressionLevel: Int = Deflater.BEST_COMPRESSION) extends DeflateCompressor(compressionLevel, true) {
override protected lazy val deflater = new Deflater(compressionLevel, true)
private val checkSum = new CRC32 // CRC32 of uncompressed data
private var headerSent = false
private var bytesRead = 0L

View file

@ -33,6 +33,14 @@ object Compression {
def gzip: Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.gzip.asJava
/**
* Same as [[gzip]] with a custom level.
*
* @param level Compression level (0-9)
*/
def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.gzip(level).asJava
/**
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor
* will SYNC_FLUSH after every [[ByteString]] so that it is guaranteed that every [[ByteString]]
@ -41,4 +49,14 @@ object Compression {
*/
def deflate: Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.deflate.asJava
/**
* Same as [[deflate]] with configurable level and nowrap
*
* @param level Compression level (0-9)
* @param nowrap if true then use GZIP compatible compression
*/
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.deflate(level, nowrap).asJava
}

View file

@ -3,6 +3,8 @@
*/
package akka.stream.scaladsl
import java.util.zip.Deflater
import akka.NotUsed
import akka.stream.impl.io.compression._
import akka.util.ByteString
@ -16,10 +18,17 @@ object Compression {
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
*
* FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
* FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
*/
def gzip: Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() new GzipCompressor)
def gzip: Flow[ByteString, ByteString, NotUsed] = gzip(Deflater.BEST_COMPRESSION)
/**
* Same as [[gzip]] with a custom level.
*
* @param level Compression level (0-9)
*/
def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() new GzipCompressor(level))
/**
* Creates a Flow that decompresses a gzip-compressed stream of data.
@ -36,10 +45,19 @@ object Compression {
* coming out of the flow can be fully decompressed without waiting for additional data. This may
* come at a compression performance cost for very small chunks.
*
* FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
* FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
*/
def deflate: Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() new DeflateCompressor)
def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, false)
/**
* Same as [[deflate]] with configurable level and nowrap
*
* @param level Compression level (0-9)
* @param nowrap if true then use GZIP compatible compression
*
*/
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() new DeflateCompressor(level, nowrap))
/**
* Creates a Flow that decompresses a deflate-compressed stream of data.