diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala index 844789e56b..466570a777 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala @@ -158,7 +158,7 @@ abstract class CoderSpec(codecName: String) extends WordSpec with CodecSpecSuppo lazy val corruptContent = { val content = encode(largeText).toArray - content(14) = 26.toByte + content(14) = 36.toByte ByteString(content) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/compression/GzipWithCustomCompressionLevelSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/compression/GzipWithCustomCompressionLevelSpec.scala new file mode 100644 index 0000000000..31ab69e0b5 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/compression/GzipWithCustomCompressionLevelSpec.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package akka.stream.io.compression + +import java.util.zip.{ Deflater, ZipException } + +import akka.stream.impl.io.compression.{ Compressor, GzipCompressor } +import akka.stream.scaladsl.{ Compression, Flow } +import akka.util.ByteString + +class GzipWithCustomCompressionLevelSpec extends GzipSpec { + + import CompressionTestingTools._ + + override protected def newCompressor(): Compressor = + new GzipCompressor(Deflater.BEST_SPEED) + + override protected val encoderFlow: Flow[ByteString, ByteString, Any] = + Compression.gzip(Deflater.BEST_SPEED) + + override def extraTests(): Unit = { + "decode concatenated compressions" in { + ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!") + } + "throw an error on truncated input" in { + val ex = the[RuntimeException] thrownBy ourDecode(streamEncode(smallTextBytes).dropRight(5)) + ex.ultimateCause.getMessage should equal("Truncated GZIP stream") + } + "throw an error if compressed data is just missing the trailer at the end" in { + def brokenCompress(payload: String) = newCompressor().compress(ByteString(payload, "UTF-8")) + val ex = the[RuntimeException] thrownBy ourDecode(brokenCompress("abcdefghijkl")) + ex.ultimateCause.getMessage should equal("Truncated GZIP stream") + } + "throw early if header is corrupt" in { + val cause = (the[RuntimeException] thrownBy ourDecode(ByteString(0, 1, 2, 3, 4))).ultimateCause + cause should (be(a[ZipException]) and have message "Not in GZIP format") + } + } + +} diff --git a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes new file mode 100644 index 0000000000..0077a82945 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.GzipCompressor.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.io.compression.DeflateCompressor.this") diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala index 91e5539cac..4a5f427588 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala @@ -11,10 +11,10 @@ import akka.util.{ ByteString, ByteStringBuilder } import scala.annotation.tailrec /** INTERNAL API */ -@InternalApi private[akka] class DeflateCompressor extends Compressor { +@InternalApi private[akka] class DeflateCompressor(level: Int = Deflater.BEST_COMPRESSION, nowrap: Boolean = false) extends Compressor { import DeflateCompressor._ - protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false) + protected lazy val deflater = new Deflater(level, nowrap) override final def compressAndFlush(input: ByteString): ByteString = { val buffer = newTempBuffer(input.size) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala index 27fb04711e..8e3e45ccad 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala @@ -9,8 +9,8 @@ import akka.annotation.InternalApi import akka.util.ByteString /** INTERNAL API */ -@InternalApi private[akka] class GzipCompressor extends DeflateCompressor { - override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true) +@InternalApi private[akka] class GzipCompressor(compressionLevel: Int = Deflater.BEST_COMPRESSION) extends DeflateCompressor(compressionLevel, true) { + override protected lazy val deflater = new Deflater(compressionLevel, true) private val checkSum = new CRC32 // CRC32 of uncompressed data private var headerSent = false private var bytesRead = 0L diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala index af754bfc8c..760dbc6f50 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Compression.scala @@ -33,6 +33,14 @@ object Compression { def gzip: Flow[ByteString, ByteString, NotUsed] = scaladsl.Compression.gzip.asJava + /** + * Same as [[gzip]] with a custom level. + * + * @param level Compression level (0-9) + */ + def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.gzip(level).asJava + /** * Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor * will SYNC_FLUSH after every [[ByteString]] so that it is guaranteed that every [[ByteString]] @@ -41,4 +49,14 @@ object Compression { */ def deflate: Flow[ByteString, ByteString, NotUsed] = scaladsl.Compression.deflate.asJava + + /** + * Same as [[deflate]] with configurable level and nowrap + * + * @param level Compression level (0-9) + * @param nowrap if true then use GZIP compatible compression + */ + def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] = + scaladsl.Compression.deflate(level, nowrap).asJava + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala index b2ff7389a1..2200d590de 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Compression.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl +import java.util.zip.Deflater + import akka.NotUsed import akka.stream.impl.io.compression._ import akka.util.ByteString @@ -16,10 +18,17 @@ object Compression { * coming out of the flow can be fully decompressed without waiting for additional data. This may * come at a compression performance cost for very small chunks. * - * FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 + * FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 */ - def gzip: Flow[ByteString, ByteString, NotUsed] = - CompressionUtils.compressorFlow(() ⇒ new GzipCompressor) + def gzip: Flow[ByteString, ByteString, NotUsed] = gzip(Deflater.BEST_COMPRESSION) + + /** + * Same as [[gzip]] with a custom level. + * + * @param level Compression level (0-9) + */ + def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] = + CompressionUtils.compressorFlow(() ⇒ new GzipCompressor(level)) /** * Creates a Flow that decompresses a gzip-compressed stream of data. @@ -36,10 +45,19 @@ object Compression { * coming out of the flow can be fully decompressed without waiting for additional data. This may * come at a compression performance cost for very small chunks. * - * FIXME: should compression level / strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 + * FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849 */ - def deflate: Flow[ByteString, ByteString, NotUsed] = - CompressionUtils.compressorFlow(() ⇒ new DeflateCompressor) + def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, false) + + /** + * Same as [[deflate]] with configurable level and nowrap + * + * @param level Compression level (0-9) + * @param nowrap if true then use GZIP compatible compression + * + */ + def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] = + CompressionUtils.compressorFlow(() ⇒ new DeflateCompressor(level, nowrap)) /** * Creates a Flow that decompresses a deflate-compressed stream of data.