* Increased timeout in long running test FramingSpec #21673 * Increase default polling interval for futureValue in our tests
This commit is contained in:
parent
50370c69a3
commit
c7163c2d0c
2 changed files with 59 additions and 71 deletions
|
|
@ -5,16 +5,15 @@ package akka.stream.scaladsl
|
|||
|
||||
import java.nio.ByteOrder
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl.Framing.FramingException
|
||||
import akka.stream.stage.{ GraphStage, _ }
|
||||
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
|
||||
import akka.stream._
|
||||
import akka.util.{ ByteString, ByteStringBuilder }
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Random
|
||||
|
||||
|
|
@ -94,53 +93,49 @@ class FramingSpec extends StreamSpec {
|
|||
.via(Framing.delimiter(delimiter, 256))
|
||||
.runWith(Sink.seq)
|
||||
|
||||
Await.result(f, 3.seconds) should ===(testSequence)
|
||||
f.futureValue should ===(testSequence)
|
||||
}
|
||||
}
|
||||
|
||||
"Respect maximum line settings" in {
|
||||
// The buffer will contain more than 1 bytes, but the individual frames are less
|
||||
Await.result(
|
||||
Source.single(ByteString("a\nb\nc\nd\n")).via(simpleLines("\n", 1)).limit(100).runWith(Sink.seq),
|
||||
3.seconds) should ===(List("a", "b", "c", "d"))
|
||||
Source.single(ByteString("a\nb\nc\nd\n"))
|
||||
.via(simpleLines("\n", 1))
|
||||
.limit(100)
|
||||
.runWith(Sink.seq).futureValue should ===(List("a", "b", "c", "d"))
|
||||
|
||||
an[FramingException] should be thrownBy {
|
||||
Await.result(
|
||||
Source.single(ByteString("ab\n")).via(simpleLines("\n", 1)).limit(100).runWith(Sink.seq),
|
||||
3.seconds)
|
||||
}
|
||||
Source.single(ByteString("ab\n"))
|
||||
.via(simpleLines("\n", 1))
|
||||
.limit(100)
|
||||
.runWith(Sink.seq).failed.futureValue shouldBe a[FramingException]
|
||||
|
||||
Source.single(ByteString("aaa"))
|
||||
.via(simpleLines("\n", 2))
|
||||
.limit(100)
|
||||
.runWith(Sink.seq).failed.futureValue shouldBe a[FramingException]
|
||||
|
||||
an[FramingException] should be thrownBy {
|
||||
Await.result(
|
||||
Source.single(ByteString("aaa")).via(simpleLines("\n", 2)).limit(100).runWith(Sink.seq),
|
||||
3.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
"work with empty streams" in {
|
||||
Await.result(
|
||||
Source.empty.via(simpleLines("\n", 256)).runFold(Vector.empty[String])(_ :+ _),
|
||||
3.seconds) should ===(Vector.empty)
|
||||
Source.empty.via(simpleLines("\n", 256))
|
||||
.runFold(Vector.empty[String])(_ :+ _)
|
||||
.futureValue should ===(Vector.empty)
|
||||
}
|
||||
|
||||
"report truncated frames" in {
|
||||
an[FramingException] should be thrownBy {
|
||||
Await.result(
|
||||
Source.single(ByteString("I have no end"))
|
||||
.via(simpleLines("\n", 256, allowTruncation = false))
|
||||
.grouped(1000)
|
||||
.runWith(Sink.head),
|
||||
3.seconds)
|
||||
}
|
||||
Source.single(ByteString("I have no end"))
|
||||
.via(simpleLines("\n", 256, allowTruncation = false))
|
||||
.grouped(1000)
|
||||
.runWith(Sink.head)
|
||||
.failed.futureValue shouldBe a[FramingException]
|
||||
}
|
||||
|
||||
"allow truncated frames if configured so" in {
|
||||
Await.result(
|
||||
Source.single(ByteString("I have no end"))
|
||||
.via(simpleLines("\n", 256, allowTruncation = true))
|
||||
.grouped(1000)
|
||||
.runWith(Sink.head),
|
||||
3.seconds) should ===(List("I have no end"))
|
||||
Source.single(ByteString("I have no end"))
|
||||
.via(simpleLines("\n", 256, allowTruncation = true))
|
||||
.grouped(1000)
|
||||
.runWith(Sink.head)
|
||||
.futureValue should ===(List("I have no end"))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -168,7 +163,7 @@ class FramingSpec extends StreamSpec {
|
|||
|
||||
"work with various byte orders, frame lengths and offsets" in {
|
||||
for {
|
||||
_ ← 1 to 10
|
||||
_ ← (1 to 10)
|
||||
byteOrder ← byteOrders
|
||||
fieldOffset ← fieldOffsets
|
||||
fieldLength ← fieldLengths
|
||||
|
|
@ -179,21 +174,20 @@ class FramingSpec extends StreamSpec {
|
|||
encode(payload, fieldOffset, fieldLength, byteOrder)
|
||||
}
|
||||
|
||||
Await.result(
|
||||
Source(encodedFrames)
|
||||
.via(rechunk)
|
||||
.via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder))
|
||||
.grouped(10000)
|
||||
.runWith(Sink.head),
|
||||
3.seconds) should ===(encodedFrames)
|
||||
Source(encodedFrames)
|
||||
.via(rechunk)
|
||||
.via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder))
|
||||
.grouped(10000)
|
||||
.runWith(Sink.head)
|
||||
.futureValue(Timeout(5.seconds)) should ===(encodedFrames)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"work with empty streams" in {
|
||||
Await.result(
|
||||
Source.empty.via(Framing.lengthField(4, 0, Int.MaxValue, ByteOrder.BIG_ENDIAN)).runFold(Vector.empty[ByteString])(_ :+ _),
|
||||
3.seconds) should ===(Vector.empty)
|
||||
Source.empty.via(Framing.lengthField(4, 0, Int.MaxValue, ByteOrder.BIG_ENDIAN))
|
||||
.runFold(Vector.empty[ByteString])(_ :+ _)
|
||||
.futureValue should ===(Vector.empty)
|
||||
}
|
||||
|
||||
"work with grouped frames" in {
|
||||
|
|
@ -224,19 +218,15 @@ class FramingSpec extends StreamSpec {
|
|||
}
|
||||
|
||||
"report oversized frames" in {
|
||||
an[FramingException] should be thrownBy {
|
||||
Await.result(
|
||||
Source.single(encode(referenceChunk.take(100), 0, 1, ByteOrder.BIG_ENDIAN))
|
||||
.via(Framing.lengthField(1, 0, 99, ByteOrder.BIG_ENDIAN)).runFold(Vector.empty[ByteString])(_ :+ _),
|
||||
3.seconds)
|
||||
}
|
||||
Source.single(encode(referenceChunk.take(100), 0, 1, ByteOrder.BIG_ENDIAN))
|
||||
.via(Framing.lengthField(1, 0, 99, ByteOrder.BIG_ENDIAN))
|
||||
.runFold(Vector.empty[ByteString])(_ :+ _)
|
||||
.failed.futureValue shouldBe a[FramingException]
|
||||
|
||||
an[FramingException] should be thrownBy {
|
||||
Await.result(
|
||||
Source.single(encode(referenceChunk.take(100), 49, 1, ByteOrder.BIG_ENDIAN))
|
||||
.via(Framing.lengthField(1, 0, 100, ByteOrder.BIG_ENDIAN)).runFold(Vector.empty[ByteString])(_ :+ _),
|
||||
3.seconds)
|
||||
}
|
||||
Source.single(encode(referenceChunk.take(100), 49, 1, ByteOrder.BIG_ENDIAN))
|
||||
.via(Framing.lengthField(1, 0, 100, ByteOrder.BIG_ENDIAN))
|
||||
.runFold(Vector.empty[ByteString])(_ :+ _)
|
||||
.failed.futureValue shouldBe a[FramingException]
|
||||
}
|
||||
|
||||
"report truncated frames" in {
|
||||
|
|
@ -251,15 +241,12 @@ class FramingSpec extends StreamSpec {
|
|||
val fullFrame = encode(referenceChunk.take(frameLength), fieldOffset, fieldLength, byteOrder)
|
||||
val partialFrame = fullFrame.dropRight(1)
|
||||
|
||||
an[FramingException] should be thrownBy {
|
||||
Await.result(
|
||||
Source(List(fullFrame, partialFrame))
|
||||
.via(rechunk)
|
||||
.via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder))
|
||||
.grouped(10000)
|
||||
.runWith(Sink.head),
|
||||
3.seconds)
|
||||
}
|
||||
Source(List(fullFrame, partialFrame))
|
||||
.via(rechunk)
|
||||
.via(Framing.lengthField(fieldLength, fieldOffset, Int.MaxValue, byteOrder))
|
||||
.grouped(10000)
|
||||
.runWith(Sink.head)
|
||||
.failed.futureValue shouldBe a[FramingException]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -272,9 +259,8 @@ class FramingSpec extends StreamSpec {
|
|||
.join(Flow[ByteString]) // Loopback
|
||||
|
||||
val testMessages = List.fill(100)(referenceChunk.take(Random.nextInt(1024)))
|
||||
Await.result(
|
||||
Source(testMessages).via(codecFlow).limit(1000).runWith(Sink.seq),
|
||||
3.seconds) should ===(testMessages)
|
||||
Source(testMessages).via(codecFlow).limit(1000).runWith(Sink.seq)
|
||||
.futureValue should ===(testMessages)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,10 +6,11 @@ package akka.testkit
|
|||
import org.scalactic.Constraint
|
||||
|
||||
import language.postfixOps
|
||||
import org.scalatest.{ WordSpecLike, BeforeAndAfterAll }
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpecLike }
|
||||
import org.scalatest.Matchers
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
|
|
@ -17,6 +18,7 @@ import akka.dispatch.Dispatchers
|
|||
import akka.testkit.TestEvent._
|
||||
import org.scalactic.ConversionCheckedTripleEquals
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.time.Span
|
||||
|
||||
object AkkaSpec {
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
|
|
@ -58,7 +60,7 @@ abstract class AkkaSpec(_system: ActorSystem)
|
|||
extends TestKit(_system) with WordSpecLike with Matchers with BeforeAndAfterAll with WatchedByCoroner
|
||||
with ConversionCheckedTripleEquals with ScalaFutures {
|
||||
|
||||
implicit val patience = PatienceConfig(testKitSettings.DefaultTimeout.duration)
|
||||
implicit val patience = PatienceConfig(testKitSettings.DefaultTimeout.duration, Span(100, org.scalatest.time.Millis))
|
||||
|
||||
def this(config: Config) = this(ActorSystem(
|
||||
AkkaSpec.getCallerName(getClass),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue