=str follow up review comments on compression PR

This commit is contained in:
Konrad Malawski 2016-11-21 13:00:24 +01:00 committed by Konrad Malawski
parent 7fa7cc1624
commit 6508e1175f
5 changed files with 18 additions and 19 deletions

View file

@ -111,8 +111,7 @@ abstract class CoderSpec(codecName: String) extends WordSpec with CodecSpecSuppo
}
"shouldn't produce huge ByteStrings for some input" in {
val array = new Array[Byte](10) // FIXME
util.Arrays.fill(array, 1.toByte)
val array = Array.fill(10)(1.toByte)
val compressed = streamEncode(ByteString(array))
val limit = 10000
val resultBs =
@ -187,8 +186,8 @@ abstract class CoderSpec(codecName: String) extends WordSpec with CodecSpecSuppo
}
def decodeChunks(input: Source[ByteString, NotUsed]): ByteString =
input.via(decoderFlow()).join.awaitResult(3.seconds)
input.via(decoderFlow()).join.awaitResult(3.seconds) // TODO make it use remaining?
def decodeFromIterator(iterator: () Iterator[ByteString]): ByteString =
Await.result(Source.fromIterator(iterator).via(decoderFlow()).join, 3.seconds)
}
}

View file

@ -23,23 +23,23 @@ class CompressionSpec extends StreamSpec {
"Gzip decompression" must {
"be able to decompress a gzipped stream" in {
Source.single(gzip(data))
val source = Source.single(gzip(data))
.via(Compression.gunzip())
.map(_.decodeString(StandardCharsets.UTF_8))
.runWith(TestSink.probe)
.requestNext(data)
.expectComplete()
val res = source.runFold("")(_ + _)
res.futureValue should ===(data)
}
}
"Deflate decompression" must {
"be able to decompress a deflated stream" in {
Source.single(deflate(data))
val source = Source.single(deflate(data))
.via(Compression.inflate())
.map(_.decodeString(StandardCharsets.UTF_8))
.runWith(TestSink.probe)
.requestNext(data)
.expectComplete()
val res = source.runFold("")(_ + _)
res.futureValue should ===(data)
}
}
}

View file

@ -14,14 +14,14 @@ private[akka] class DeflateDecompressor(maxBytesPerChunk: Int)
override def createLogic(attr: Attributes) = new DecompressorParsingLogic {
override val inflater: Inflater = new Inflater()
override case object Inflating extends Inflate(noPostProcessing = true) {
override case object inflating extends Inflate(noPostProcessing = true) {
override def onTruncation(): Unit = completeStage()
}
override def afterInflate = Inflating
override def afterInflate = inflating
override def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = {}
startWith(Inflating)
startWith(inflating)
}
}

View file

@ -17,7 +17,7 @@ private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int)
val inflater: Inflater
def afterInflate: ParseStep[ByteString]
def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit
val Inflating: Inflate
def inflating: Inflate
abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] {
override def canWorkWithPartialData = true
@ -45,4 +45,4 @@ private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int)
}
/** INTERNAL API */
private[akka] object DeflateDecompressorBase
private[akka] object DeflateDecompressorBase

View file

@ -23,7 +23,7 @@ private[akka] class GzipDecompressor(maxBytesPerChunk: Int)
trait Step extends ParseStep[ByteString] {
override def onTruncation(): Unit = failStage(new ZipException("Truncated GZIP stream"))
}
override case object Inflating extends Inflate(false) with Step
override case object inflating extends Inflate(false) with Step
startWith(ReadHeaders)
/** Reading the header bytes */
@ -41,7 +41,7 @@ private[akka] class GzipDecompressor(maxBytesPerChunk: Int)
inflater.reset()
crc32.reset()
ParseResult(None, Inflating, acceptUpstreamFinish = false)
ParseResult(None, inflating, acceptUpstreamFinish = false)
}
}
var crc32: CRC32 = new CRC32