diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala index fc58f1895d..a5577e54a5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FramingSpec.scala @@ -11,11 +11,10 @@ import akka.stream._ import akka.stream.scaladsl.Framing.FramingException import akka.stream.stage.{ GraphStage, _ } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } -import akka.testkit.LongRunningTest import akka.util.{ ByteString, ByteStringBuilder } -import org.scalatest.concurrent.PatienceConfiguration.Timeout import scala.collection.immutable +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Random @@ -88,13 +87,26 @@ class FramingSpec extends StreamSpec { yield delimiter.take(prefix) ++ s "work with various delimiters and test sequences" in { - for (delimiter <- delimiterBytes; _ <- 1 to 5) { + import system.dispatcher + val resultFutures = for { + delimiter <- delimiterBytes + _ <- 1 to 5 + } yield { val testSequence = completeTestSequences(delimiter) val f = Source(testSequence).map(_ ++ delimiter).via(rechunk).via(Framing.delimiter(delimiter, 256)).runWith(Sink.seq) - f.futureValue should ===(testSequence) + f.map(result => (result, testSequence, delimiter)) } + + val futureResults = Future.sequence(resultFutures) + futureResults.futureValue.foreach { + case (result, expected, delimiter) => + withClue(s"delimiter: $delimiter") { + result should ===(expected) + } + } + } "Respect maximum line settings" in { @@ -186,12 +198,13 @@ class FramingSpec extends StreamSpec { offset ++ header ++ payload ++ tail } - "work with various byte orders, frame lengths and offsets" taggedAs LongRunningTest in { - for { + "work with various byte orders, frame lengths and offsets" in { + import system.dispatcher + val resultFutures = for { byteOrder <- byteOrders fieldOffset <- fieldOffsets fieldLength <- fieldLengths - } { + } yield { val encodedFrames = frameLengths.filter(_ < (1L << (fieldLength * 8))).map { length => val payload = referenceChunk.take(length) @@ -203,17 +216,27 @@ class FramingSpec extends StreamSpec { .via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder)) .grouped(10000) .runWith(Sink.head) - .futureValue(Timeout(5.seconds)) should ===(encodedFrames) + .map(result => (result, encodedFrames, (byteOrder, fieldOffset, fieldLength))) + + } + + val futureResults = Future.sequence(resultFutures) + futureResults.futureValue.foreach { + case (result, expected, (byteOrder, fieldOffset, fieldLength)) => + withClue(s"byteOrder: $byteOrder, fieldOffset: $fieldOffset, fieldLength: $fieldLength") { + result should ===(expected) + } } } - "work with various byte orders, frame lengths and offsets using computeFrameSize" taggedAs LongRunningTest in { - for { + "work with various byte orders, frame lengths and offsets using computeFrameSize" in { + import system.dispatcher + val resultFutures = for { byteOrder <- byteOrders fieldOffset <- fieldOffsets fieldLength <- fieldLengths - } { + } yield { def computeFrameSize(offset: Array[Byte], length: Int): Int = { val sizeWithoutTail = offset.length + fieldLength + length @@ -244,7 +267,15 @@ class FramingSpec extends StreamSpec { .via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder, computeFrameSize)) .grouped(10000) .runWith(Sink.head) - .futureValue(Timeout(5.seconds)) should ===(encodedFrames) + .map(result => (result, encodedFrames, (byteOrder, fieldOffset, fieldLength))) + } + + val futureResults = Future.sequence(resultFutures) + futureResults.futureValue.foreach { + case (result, encodedFrames, (byteOrder, fieldOffset, fieldLength)) => + withClue(s"byteOrder: $byteOrder, fieldOffset: $fieldOffset, fieldLength: $fieldLength") { + result should ===(encodedFrames) + } } } @@ -298,14 +329,15 @@ class FramingSpec extends StreamSpec { .futureValue shouldBe a[FramingException] } - "report truncated frames" taggedAs LongRunningTest in { - for { + "report truncated frames" in { + import system.dispatcher + val resultFutures: List[Future[(Throwable, (ByteOrder, Int, Int, Int))]] = for { //_ <- 1 to 10 byteOrder <- byteOrders fieldOffset <- fieldOffsets fieldLength <- fieldLengths frameLength <- frameLengths if frameLength < (1 << (fieldLength * 8)) && (frameLength != 0) - } { + } yield { val fullFrame = encode(referenceChunk.take(frameLength), fieldOffset, fieldLength, byteOrder) val partialFrame = fullFrame.dropRight(1) @@ -316,8 +348,18 @@ class FramingSpec extends StreamSpec { .grouped(10000) .runWith(Sink.head) .failed - .futureValue shouldBe a[FramingException] + .map(ex => (ex, (byteOrder, fieldOffset, fieldLength, frameLength))) } + + val futureResults = Future.sequence(resultFutures) + futureResults.futureValue.foreach { + case (ex, (byteOrder, fieldOffset, fieldLength, frameLength)) => + withClue( + s"byteOrder: $byteOrder, fieldOffset: $fieldOffset, fieldLength: $fieldLength, frameLength: $frameLength") { + ex shouldBe a[FramingException] + } + } + } "support simple framing adapter" in {