diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala index c157c97d10..f3269bb210 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/compression/CoderSpec.scala @@ -111,8 +111,7 @@ abstract class CoderSpec(codecName: String) extends WordSpec with CodecSpecSuppo } "shouldn't produce huge ByteStrings for some input" in { - val array = new Array[Byte](10) // FIXME - util.Arrays.fill(array, 1.toByte) + val array = Array.fill(10)(1.toByte) val compressed = streamEncode(ByteString(array)) val limit = 10000 val resultBs = @@ -187,8 +186,8 @@ abstract class CoderSpec(codecName: String) extends WordSpec with CodecSpecSuppo } def decodeChunks(input: Source[ByteString, NotUsed]): ByteString = - input.via(decoderFlow()).join.awaitResult(3.seconds) + input.via(decoderFlow()).join.awaitResult(3.seconds) // TODO make it use remaining? def decodeFromIterator(iterator: () ⇒ Iterator[ByteString]): ByteString = Await.result(Source.fromIterator(iterator).via(decoderFlow()).join, 3.seconds) -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala index 43dbb04dee..66b4ee27ec 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CompressionSpec.scala @@ -23,23 +23,23 @@ class CompressionSpec extends StreamSpec { "Gzip decompression" must { "be able to decompress a gzipped stream" in { - Source.single(gzip(data)) + val source = Source.single(gzip(data)) .via(Compression.gunzip()) .map(_.decodeString(StandardCharsets.UTF_8)) - .runWith(TestSink.probe) - .requestNext(data) - .expectComplete() + + val res = source.runFold("")(_ + _) + res.futureValue should ===(data) } } "Deflate decompression" must { "be able to decompress a deflated stream" in { - Source.single(deflate(data)) + val source = Source.single(deflate(data)) .via(Compression.inflate()) .map(_.decodeString(StandardCharsets.UTF_8)) - .runWith(TestSink.probe) - .requestNext(data) - .expectComplete() + + val res = source.runFold("")(_ + _) + res.futureValue should ===(data) } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala index cecc6f893b..547f1fa901 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala @@ -14,14 +14,14 @@ private[akka] class DeflateDecompressor(maxBytesPerChunk: Int) override def createLogic(attr: Attributes) = new DecompressorParsingLogic { override val inflater: Inflater = new Inflater() - override case object Inflating extends Inflate(noPostProcessing = true) { + override case object inflating extends Inflate(noPostProcessing = true) { override def onTruncation(): Unit = completeStage() } - override def afterInflate = Inflating + override def afterInflate = inflating override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = {} - startWith(Inflating) + startWith(inflating) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala index b18af2b5c9..38bf8a2816 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala @@ -17,7 +17,7 @@ private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int) val inflater: Inflater def afterInflate: ParseStep[ByteString] def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit - val Inflating: Inflate + def inflating: Inflate abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] { override def canWorkWithPartialData = true @@ -45,4 +45,4 @@ private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int) } /** INTERNAL API */ -private[akka] object DeflateDecompressorBase \ No newline at end of file +private[akka] object DeflateDecompressorBase diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala index eddbd8eae5..9be420c056 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala @@ -23,7 +23,7 @@ private[akka] class GzipDecompressor(maxBytesPerChunk: Int) trait Step extends ParseStep[ByteString] { override def onTruncation(): Unit = failStage(new ZipException("Truncated GZIP stream")) } - override case object Inflating extends Inflate(false) with Step + override case object inflating extends Inflate(false) with Step startWith(ReadHeaders) /** Reading the header bytes */ @@ -41,7 +41,7 @@ private[akka] class GzipDecompressor(maxBytesPerChunk: Int) inflater.reset() crc32.reset() - ParseResult(None, Inflating, acceptUpstreamFinish = false) + ParseResult(None, inflating, acceptUpstreamFinish = false) } } var crc32: CRC32 = new CRC32