Speed up of framing spec #26679

From 1m to a couple of seconds on my machine. Concurrency FTW!
This commit is contained in:
Johan Andrén 2019-04-24 09:28:25 +02:00 committed by GitHub
parent 8f693998ee
commit 98d7ee8928
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -11,11 +11,10 @@ import akka.stream._
import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.Framing.FramingException
import akka.stream.stage.{ GraphStage, _ } import akka.stream.stage.{ GraphStage, _ }
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.testkit.LongRunningTest
import akka.util.{ ByteString, ByteStringBuilder } import akka.util.{ ByteString, ByteStringBuilder }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Random import scala.util.Random
@ -88,13 +87,26 @@ class FramingSpec extends StreamSpec {
yield delimiter.take(prefix) ++ s yield delimiter.take(prefix) ++ s
"work with various delimiters and test sequences" in { "work with various delimiters and test sequences" in {
for (delimiter <- delimiterBytes; _ <- 1 to 5) { import system.dispatcher
val resultFutures = for {
delimiter <- delimiterBytes
_ <- 1 to 5
} yield {
val testSequence = completeTestSequences(delimiter) val testSequence = completeTestSequences(delimiter)
val f = val f =
Source(testSequence).map(_ ++ delimiter).via(rechunk).via(Framing.delimiter(delimiter, 256)).runWith(Sink.seq) Source(testSequence).map(_ ++ delimiter).via(rechunk).via(Framing.delimiter(delimiter, 256)).runWith(Sink.seq)
f.futureValue should ===(testSequence) f.map(result => (result, testSequence, delimiter))
} }
val futureResults = Future.sequence(resultFutures)
futureResults.futureValue.foreach {
case (result, expected, delimiter) =>
withClue(s"delimiter: $delimiter") {
result should ===(expected)
}
}
} }
"Respect maximum line settings" in { "Respect maximum line settings" in {
@ -186,12 +198,13 @@ class FramingSpec extends StreamSpec {
offset ++ header ++ payload ++ tail offset ++ header ++ payload ++ tail
} }
"work with various byte orders, frame lengths and offsets" taggedAs LongRunningTest in { "work with various byte orders, frame lengths and offsets" in {
for { import system.dispatcher
val resultFutures = for {
byteOrder <- byteOrders byteOrder <- byteOrders
fieldOffset <- fieldOffsets fieldOffset <- fieldOffsets
fieldLength <- fieldLengths fieldLength <- fieldLengths
} { } yield {
val encodedFrames = frameLengths.filter(_ < (1L << (fieldLength * 8))).map { length => val encodedFrames = frameLengths.filter(_ < (1L << (fieldLength * 8))).map { length =>
val payload = referenceChunk.take(length) val payload = referenceChunk.take(length)
@ -203,17 +216,27 @@ class FramingSpec extends StreamSpec {
.via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder)) .via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder))
.grouped(10000) .grouped(10000)
.runWith(Sink.head) .runWith(Sink.head)
.futureValue(Timeout(5.seconds)) should ===(encodedFrames) .map(result => (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)))
}
val futureResults = Future.sequence(resultFutures)
futureResults.futureValue.foreach {
case (result, expected, (byteOrder, fieldOffset, fieldLength)) =>
withClue(s"byteOrder: $byteOrder, fieldOffset: $fieldOffset, fieldLength: $fieldLength") {
result should ===(expected)
}
} }
} }
"work with various byte orders, frame lengths and offsets using computeFrameSize" taggedAs LongRunningTest in { "work with various byte orders, frame lengths and offsets using computeFrameSize" in {
for { import system.dispatcher
val resultFutures = for {
byteOrder <- byteOrders byteOrder <- byteOrders
fieldOffset <- fieldOffsets fieldOffset <- fieldOffsets
fieldLength <- fieldLengths fieldLength <- fieldLengths
} { } yield {
def computeFrameSize(offset: Array[Byte], length: Int): Int = { def computeFrameSize(offset: Array[Byte], length: Int): Int = {
val sizeWithoutTail = offset.length + fieldLength + length val sizeWithoutTail = offset.length + fieldLength + length
@ -244,7 +267,15 @@ class FramingSpec extends StreamSpec {
.via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder, computeFrameSize)) .via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder, computeFrameSize))
.grouped(10000) .grouped(10000)
.runWith(Sink.head) .runWith(Sink.head)
.futureValue(Timeout(5.seconds)) should ===(encodedFrames) .map(result => (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)))
}
val futureResults = Future.sequence(resultFutures)
futureResults.futureValue.foreach {
case (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) =>
withClue(s"byteOrder: $byteOrder, fieldOffset: $fieldOffset, fieldLength: $fieldLength") {
result should ===(encodedFrames)
}
} }
} }
@ -298,14 +329,15 @@ class FramingSpec extends StreamSpec {
.futureValue shouldBe a[FramingException] .futureValue shouldBe a[FramingException]
} }
"report truncated frames" taggedAs LongRunningTest in { "report truncated frames" in {
for { import system.dispatcher
val resultFutures: List[Future[(Throwable, (ByteOrder, Int, Int, Int))]] = for {
//_ <- 1 to 10 //_ <- 1 to 10
byteOrder <- byteOrders byteOrder <- byteOrders
fieldOffset <- fieldOffsets fieldOffset <- fieldOffsets
fieldLength <- fieldLengths fieldLength <- fieldLengths
frameLength <- frameLengths if frameLength < (1 << (fieldLength * 8)) && (frameLength != 0) frameLength <- frameLengths if frameLength < (1 << (fieldLength * 8)) && (frameLength != 0)
} { } yield {
val fullFrame = encode(referenceChunk.take(frameLength), fieldOffset, fieldLength, byteOrder) val fullFrame = encode(referenceChunk.take(frameLength), fieldOffset, fieldLength, byteOrder)
val partialFrame = fullFrame.dropRight(1) val partialFrame = fullFrame.dropRight(1)
@ -316,8 +348,18 @@ class FramingSpec extends StreamSpec {
.grouped(10000) .grouped(10000)
.runWith(Sink.head) .runWith(Sink.head)
.failed .failed
.futureValue shouldBe a[FramingException] .map(ex => (ex, (byteOrder, fieldOffset, fieldLength, frameLength)))
} }
val futureResults = Future.sequence(resultFutures)
futureResults.futureValue.foreach {
case (ex, (byteOrder, fieldOffset, fieldLength, frameLength)) =>
withClue(
s"byteOrder: $byteOrder, fieldOffset: $fieldOffset, fieldLength: $fieldLength, frameLength: $frameLength") {
ex shouldBe a[FramingException]
}
}
} }
"support simple framing adapter" in { "support simple framing adapter" in {