From d6eebd22effb04b925d1ef2173ce7d64222625ab Mon Sep 17 00:00:00 2001 From: kbrowder Date: Wed, 22 Jun 2016 15:27:09 -0400 Subject: [PATCH] +htt #20815 Add expectNext(d: FiniteDuration) and requestNext(d: FiniteDuration) (#20819) * Add expectNext(d: FiniteDuration) and requestNext(d: FiniteDuration) for issue 20815 * add some comments on requestNext --- .../akka/stream/testkit/StreamTestKit.scala | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 9fd8966d1c..b01e48dd2f 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -252,7 +252,14 @@ object TestSubscriber { * Expect and return a stream element. */ def expectNext(): I = { - val t = probe.remainingOr(probe.testKitSettings.SingleExpectDefaultTimeout.dilated) + expectNext(probe.testKitSettings.SingleExpectDefaultTimeout.dilated) + } + + /** + * Expect and return a stream element during specified time or timeout. + */ + def expectNext(d: FiniteDuration): I = { + val t = probe.remainingOr(d) probe.receiveOne(t) match { case null ⇒ throw new AssertionError(s"Expected OnNext(_), yet no element signaled during $t") case OnNext(elem) ⇒ elem.asInstanceOf[I] @@ -598,6 +605,9 @@ object TestSubscriber { this } + /** + * Request and expect a stream element. + */ def requestNext(element: T): Self = { subscription.request(1) expectNext(element) @@ -609,10 +619,21 @@ object TestSubscriber { this } + /** + * Request and expect a stream element. + */ def requestNext(): T = { subscription.request(1) expectNext() } + + /** + * Request and expect a stream element during the specified time or timeout. + */ + def requestNext(d: FiniteDuration): T = { + subscription.request(1) + expectNext(d) + } } }