diff --git a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala index a8525b03ba..9486e0d11a 100644 --- a/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala +++ b/stream-testkit/src/main/scala/org/apache/pekko/stream/testkit/StreamTestKit.scala @@ -852,10 +852,10 @@ object TestSubscriber { * @param max wait no more than max time, otherwise throw AssertionError */ def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = { - val pf: PartialFunction[SubscriberEvent, Any] = { - case OnNext(n) => n + val pf: PartialFunction[SubscriberEvent, T] = { + case OnNext(n) if f.isDefinedAt(n) => f(n) } - expectEventWithTimeoutPF(max, pf.andThen(f)) + expectEventWithTimeoutPF[T](max, pf) } /**