diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 35028fdacf..719ac4b459 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -551,23 +551,46 @@ object TestSubscriber { self } - def expectNextPF[T](f: PartialFunction[Any, T]): T = { - expectEventPF { - case OnNext(n) if f.isDefinedAt(n) ⇒ f(n) - } - } + /** + * Expect a stream element and test it with partial function. + * + */ + def expectNextPF[T](f: PartialFunction[Any, T]): T = + expectNextWithTimeoutPF(Duration.Undefined, f) /** - * Expect next element and test it with partial function. + * Expect a stream element and test it with partial function. + * + * @param max wait no more than max time, otherwise throw AssertionError + */ + def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = + expectEventWithTimeoutPF(max, { + case OnNext(n) if f.isDefinedAt(n) ⇒ f(n) + }) + + /** + * Expect a stream element during specified time or timeout and test it with partial function. + * + * Allows chaining probe methods. + * + * @param max wait no more than max time, otherwise throw AssertionError + */ + def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self = + expectNextWithTimeoutPF(max, f.andThen(_ ⇒ self)) + + /** + * Expect a stream element during specified time or timeout and test it with partial function. * * Allows chaining probe methods. */ - def expectNextChainingPF(f: PartialFunction[Any, Any]): Self = { - expectNextPF(f.andThen(_ ⇒ self)) - } + def expectNextChainingPF(f: PartialFunction[Any, Any]): Self = + expectNextChainingPF(Duration.Undefined, f) + + def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T = + probe.expectMsgPF[T](max, hint = "message matching partial function")(f.asInstanceOf[PartialFunction[Any, T]]) def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T = - probe.expectMsgPF[T](hint = "message matching partial function")(f.asInstanceOf[PartialFunction[Any, T]]) + expectEventWithTimeoutPF(Duration.Undefined, f) /** * Receive messages for a given duration or until one does not match a given partial function. diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala index fec9fe8ff3..773b3e7646 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestKitSpec.scala @@ -129,6 +129,18 @@ class StreamTestKitSpec extends AkkaSpec { }.getMessage should include("message matching partial function") } + "#expectNextWithTimeoutPF should fail after timeout when element delayed" in { + intercept[AssertionError] { + val timeout = 100 millis + val overTimeout = timeout + (10 millis) + Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe) + .request(1) + .expectNextWithTimeoutPF(timeout, { + case 1 ⇒ + }) + }.getMessage should include("timeout") + } + "#expectNextChainingPF should pass with right element" in { Source.single(1).runWith(TestSink.probe) .request(1) @@ -155,6 +167,18 @@ class StreamTestKitSpec extends AkkaSpec { }.getMessage should include("message matching partial function") } + "#expectNextChainingPF should fail after timeout when element delayed" in { + intercept[AssertionError] { + val timeout = 100 millis + val overTimeout = timeout + (10 millis) + Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe) + .request(1) + .expectNextChainingPF(timeout, { + case 1 ⇒ + }) + }.getMessage should include("timeout") + } + "#expectNextN given a number of elements" in { Source(1 to 4).runWith(TestSink.probe) .request(4)