diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index d682362c33..d1f5111496 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -123,7 +123,7 @@ object TestPublisher { probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]]) def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T = - probe.expectMsgPF[T](probe.remaining)(f.asInstanceOf[PartialFunction[Any, T]]) + probe.expectMsgPF[T]()(f.asInstanceOf[PartialFunction[Any, T]]) def getPublisher: Publisher[I] = this } @@ -252,7 +252,7 @@ object TestSubscriber { * Expect and return a stream element. */ def expectNext(): I = { - val t = probe.remaining + val t = probe.remainingOr(probe.testKitSettings.SingleExpectDefaultTimeout.dilated) probe.receiveOne(t) match { case null ⇒ throw new AssertionError(s"Expected OnNext(_), yet no element signaled during $t") case OnNext(elem) ⇒ elem.asInstanceOf[I] @@ -526,7 +526,7 @@ object TestSubscriber { } def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T = - probe.expectMsgPF[T](probe.remaining)(f.asInstanceOf[PartialFunction[Any, T]]) + probe.expectMsgPF[T]()(f.asInstanceOf[PartialFunction[Any, T]]) /** * Receive messages for a given duration or until one does not match a given partial function.