Add missing java api for StreamTestKit (#1186)

* Add missing java api for StreamTestKit

* polish

* chore: Add more methods to streamKit

---------

Co-authored-by: naosense <pingao777@gmail.com>
This commit is contained in:
He-Pin(kerr) 2024-03-20 23:31:25 +08:00 committed by GitHub
parent b4ff0fcabf
commit 55477ac6eb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 290 additions and 44 deletions

View file

@ -30,7 +30,6 @@ import org.junit.Ignore;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
@ -235,7 +234,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
Thread.sleep(500);
assertEquals(BackpressureTimeoutException.class, probe.expectError().getClass());
probe.request(1); // send demand
probe.expectNoMessage(FiniteDuration.create(200, "milliseconds")); // but no more restart
probe.expectNoMessage(Duration.ofMillis(200)); // but no more restart
}
};
}

View file

@ -15,11 +15,17 @@ package jdocs.stream.javadsl.cookbook;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.stream.stage.*;
import org.apache.pekko.stream.stage.AbstractInHandler;
import org.apache.pekko.stream.stage.AbstractOutHandler;
import org.apache.pekko.stream.stage.GraphStage;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.testkit.TestPublisher;
import org.apache.pekko.stream.testkit.TestSubscriber;
import org.apache.pekko.stream.testkit.javadsl.TestSink;
@ -28,9 +34,8 @@ import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
public class RecipeHold extends RecipeTest {
static ActorSystem system;
@ -188,10 +193,8 @@ public class RecipeHold extends RecipeTest {
TestPublisher.Probe<Integer> pub = pubSub.first();
TestSubscriber.Probe<Integer> sub = pubSub.second();
FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS);
sub.request(1);
sub.expectNoMessage(timeout);
sub.expectNoMessage(Duration.ofMillis(200));
pub.sendNext(1);
sub.expectNext(1);

View file

@ -15,7 +15,10 @@ package jdocs.stream.javadsl.cookbook;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.stream.*;
import org.apache.pekko.stream.ClosedShape;
import org.apache.pekko.stream.FanInShape2;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.stream.testkit.TestPublisher;
import org.apache.pekko.stream.testkit.TestSubscriber;
@ -25,10 +28,9 @@ import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class RecipeManualTrigger extends RecipeTest {
static ActorSystem system;
@ -85,7 +87,7 @@ public class RecipeManualTrigger extends RecipeTest {
TestPublisher.Probe<Trigger> pub = pubSub.first();
TestSubscriber.Probe<Message> sub = pubSub.second();
FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS);
Duration timeout = Duration.ofMillis(100);
sub.expectSubscription().request(1000);
sub.expectNoMessage(timeout);
@ -140,7 +142,7 @@ public class RecipeManualTrigger extends RecipeTest {
TestPublisher.Probe<Trigger> pub = pubSub.first();
TestSubscriber.Probe<Message> sub = pubSub.second();
FiniteDuration timeout = FiniteDuration.create(100, TimeUnit.MILLISECONDS);
Duration timeout = Duration.ofMillis(100);
sub.expectSubscription().request(1000);
sub.expectNoMessage(timeout);

View file

@ -31,6 +31,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class RecipeMissedTicks extends RecipeTest {
@ -83,8 +84,7 @@ public class RecipeMissedTicks extends RecipeTest {
pub.sendNext(Tick);
pub.sendNext(Tick);
scala.concurrent.duration.FiniteDuration timeout =
scala.concurrent.duration.FiniteDuration.create(200, TimeUnit.MILLISECONDS);
Duration timeout = Duration.ofMillis(200);
Await.ready(latch, scala.concurrent.duration.Duration.create(1, TimeUnit.SECONDS));

View file

@ -13,8 +13,7 @@
package org.apache.pekko.stream.testkit
import java.io.PrintWriter
import java.io.StringWriter
import java.io.{ PrintWriter, StringWriter }
import java.util.concurrent.CountDownLatch
import scala.annotation.tailrec
@ -23,14 +22,21 @@ import scala.concurrent.duration._
import scala.reflect.ClassTag
import org.apache.pekko
import pekko.actor.{ ActorRef, ActorSystem, DeadLetterSuppression, NoSerializationVerificationNeeded }
import pekko.actor.ClassicActorSystemProvider
import pekko.actor.{
ActorRef,
ActorSystem,
ClassicActorSystemProvider,
DeadLetterSuppression,
NoSerializationVerificationNeeded
}
import pekko.japi._
import pekko.stream._
import pekko.stream.impl._
import pekko.testkit.{ TestActor, TestProbe }
import pekko.testkit.TestActor.AutoPilot
import pekko.util.JavaDurationConverters
import pekko.util.JavaDurationConverters._
import pekko.util.ccompat._
import pekko.util.ccompat.JavaConverters._
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
@ -93,6 +99,15 @@ object TestPublisher {
*/
def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T] =
new ManualProbe(autoOnSubscribe)(system.classicSystem)
/**
* JAVA API
*
* Probe that implements [[org.reactivestreams.Publisher]] interface.
* @since 1.1.0
*/
def create[T](autoOnSubscribe: Boolean, system: ClassicActorSystemProvider): ManualProbe[T] =
new ManualProbe(autoOnSubscribe)(system.classicSystem)
}
/**
@ -136,6 +151,14 @@ object TestPublisher {
f
}
/**
* JAVA API
* @since 1.1.0
*/
def executeAfterSubscription[T](f: function.Creator[T]): T = {
executeAfterSubscription(f.create())
}
/**
* Expect a subscription.
*/
@ -189,17 +212,40 @@ object TestPublisher {
self
}
/**
* JAVA API
*
* Expect no messages for a given duration.
* @since 1.1.0
*/
def expectNoMessage(max: java.time.Duration): Self = expectNoMessage(max.asScala)
/**
* Receive messages for a given duration or until one does not match a given partial function.
*/
def receiveWhile[T](
max: Duration = Duration.Undefined,
def receiveWhile[T](max: Duration = Duration.Undefined,
idle: Duration = Duration.Inf,
messages: Int = Int.MaxValue)(f: PartialFunction[PublisherEvent, T]): immutable.Seq[T] =
executeAfterSubscription { probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]]) }
executeAfterSubscription {
probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]])
}
/**
* JAVA API
*
* Receive messages for a given duration or until one does not match a given partial function.
* @since 1.1.0
*/
def receiveWhile[T](max: java.time.Duration,
idle: java.time.Duration,
messages: Int,
f: PartialFunction[PublisherEvent, T]): java.util.List[T] =
receiveWhile(max.asScala, idle.asScala, messages)(f).asJava
def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T =
executeAfterSubscription { probe.expectMsgPF[T]()(f.asInstanceOf[PartialFunction[Any, T]]) }
executeAfterSubscription {
probe.expectMsgPF[T]()(f.asInstanceOf[PartialFunction[Any, T]])
}
def getPublisher: Publisher[I] = this
@ -223,15 +269,58 @@ object TestPublisher {
probe.within(min, max)(f)
}
/**
* JAVA API
*
* Execute code block while bounding its execution time between `min` and
* `max`. `within` blocks may be nested. All methods in this trait which
* take maximum wait times are available in a version which implicitly uses
* the remaining time governed by the innermost enclosing `within` block.
*
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "pekko.test.timefactor", while the min Duration is not.
*
* {{{
* val ret = within(Duration.ofMillis(50)) {
* test ! "ping"
* expectMsgClass(classOf[String])
* }
* }}}
*
* @since 1.1.0
*/
def within[T](min: java.time.Duration,
max: java.time.Duration,
creator: function.Creator[T]): T =
within(min.asScala, max.asScala)(creator.create())
/**
* Same as calling `within(0 seconds, max)(f)`.
*/
def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription { probe.within(max)(f) }
def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription {
probe.within(max)(f)
}
/**
* JAVA API
*
* Same as calling `within(Duration.ofSeconds(0), max)(f)`.
* @since 1.1.0
*/
def within[T](max: java.time.Duration,
creator: function.Creator[T]): T = within(max.asScala)(creator.create())
}
object Probe {
def apply[T](initialPendingRequests: Long = 0)(implicit system: ClassicActorSystemProvider): Probe[T] =
new Probe(initialPendingRequests)(system.classicSystem)
/**
* JAVA API
* @since 1.1.0
*/
def create[T](initialPendingRequests: Long, system: ClassicActorSystemProvider): Probe[T] =
apply(initialPendingRequests)(system.classicSystem)
}
/**
@ -291,6 +380,7 @@ object TestPublisher {
assert(cause == expectedCause, s"Expected cancellation cause to be $expectedCause but was $cause")
this
}
def expectCancellationWithCause[E <: Throwable: ClassTag](): E = subscription.expectCancellation() match {
case e: E => e
case cause =>
@ -334,6 +424,13 @@ object TestSubscriber {
object ManualProbe {
def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T] =
new ManualProbe()(system.classicSystem)
/**
* JAVA API
* @since 1.1.0
*/
def create[T]()(system: ClassicActorSystemProvider): ManualProbe[T] =
apply()(system.classicSystem)
}
/**
@ -372,6 +469,14 @@ object TestSubscriber {
def expectEvent(max: FiniteDuration): SubscriberEvent =
probe.expectMsgType[SubscriberEvent](max)
/**
* JAVA API
*
* Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`).
* @since 1.1.0
*/
def expectEvent(max: java.time.Duration): SubscriberEvent = expectEvent(max.asScala)
/**
* Fluent DSL
*
@ -401,6 +506,14 @@ object TestSubscriber {
}
}
/**
* JAVA API
*
* Expect and return a stream element during specified time or timeout.
* @since 1.1.0
*/
def expectNext(d: java.time.Duration): I = expectNext(d.asScala)
/**
* Fluent DSL
*
@ -421,6 +534,16 @@ object TestSubscriber {
self
}
/**
* JAVA PAI
*
* Fluent DSL
*
* Expect a stream element during specified time or timeout.
* @since 1.1.0
*/
def expectNext(d: java.time.Duration, element: I): Self = expectNext(d.asScala, element)
/**
* Fluent DSL
*
@ -491,6 +614,15 @@ object TestSubscriber {
self
}
/**
* JAVA API
*
* Fluent DSL
* Expect the given elements to be signalled in any order.
* @since 1.1.0
*/
def expectNextUnorderedN(all: java.util.List[I]): Self = expectNextUnorderedN(Util.immutableSeq(all))
/**
* Fluent DSL
*
@ -704,7 +836,6 @@ object TestSubscriber {
* Java API: Assert that no message is received for the specified time.
*/
def expectNoMessage(remaining: java.time.Duration): Self = {
import JavaDurationConverters._
probe.expectNoMessage(remaining.asScala)
self
}
@ -720,11 +851,23 @@ object TestSubscriber {
*
* @param max wait no more than max time, otherwise throw AssertionError
*/
def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T =
expectEventWithTimeoutPF(max,
{
case OnNext(n) if f.isDefinedAt(n) => f(n)
})
def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T = {
val pf: PartialFunction[SubscriberEvent, Any] = {
case OnNext(n) => n
}
expectEventWithTimeoutPF(max, pf.andThen(f))
}
/**
* JAVA API
*
* Expect a stream element and test it with partial function.
*
* @param max wait no more than max time, otherwise throw AssertionError
* @since 1.1.0
*/
def expectNextWithTimeoutPF[T](max: java.time.Duration, f: PartialFunction[Any, T]): T =
expectEventWithTimeoutPF(max.asScala, f)
/**
* Expect a stream element during specified time or timeout and test it with partial function.
@ -736,6 +879,19 @@ object TestSubscriber {
def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self =
expectNextWithTimeoutPF(max, f.andThen(_ => self))
/**
* JAVA API
*
* Expect a stream element during specified time or timeout and test it with partial function.
*
* Allows chaining probe methods.
*
* @param max wait no more than max time, otherwise throw AssertionError
* @since 1.1.0
*/
def expectNextChainingPF(max: java.time.Duration, f: PartialFunction[Any, Any]): Self =
expectNextChainingPF(max.asScala, f)
/**
* Expect a stream element during specified time or timeout and test it with partial function.
*
@ -747,6 +903,13 @@ object TestSubscriber {
def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T =
probe.expectMsgPF[T](max, hint = "message matching partial function")(f.asInstanceOf[PartialFunction[Any, T]])
/**
* JAVA API
* @since 1.1.0
*/
def expectEventWithTimeoutPF[T](max: java.time.Duration, f: PartialFunction[SubscriberEvent, T]): T =
expectEventWithTimeoutPF(max.asScala, f)
def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T =
expectEventWithTimeoutPF(Duration.Undefined, f)
@ -759,6 +922,19 @@ object TestSubscriber {
messages: Int = Int.MaxValue)(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T] =
probe.receiveWhile(max, idle, messages)(f.asInstanceOf[PartialFunction[AnyRef, T]])
/**
* JAVA API
*
* Receive messages for a given duration or until one does not match a given partial function.
* @since 1.1.0
*/
def receiveWhile[T](
max: java.time.Duration,
idle: java.time.Duration,
messages: Int,
f: PartialFunction[SubscriberEvent, T]): java.util.List[T] =
receiveWhile(max.asScala, idle.asScala, messages)(f).asJava
/**
* Drains a given number of messages
*/
@ -770,6 +946,15 @@ object TestSubscriber {
}
.flatten
/**
* JAVA API
*
* Drains a given number of messages
* @since 1.1.0
*/
def receiveWithin(max: java.time.Duration, messages: Int): java.util.List[I] =
receiveWithin(max.asScala, messages).asJava
/**
* Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements).
*
@ -800,6 +985,17 @@ object TestSubscriber {
drain()
}
/**
* JAVA API
*
* Attempt to drain the stream into a strict collection (by requesting `Long.MaxValue` elements).
*
* '''Use with caution: Be warned that this may not be a good idea if the stream is infinite or its elements are very large!'''
* @since 1.1.0
*/
def toStrict(atMost: java.time.Duration): java.util.List[I] =
toStrict(atMost.asScala).asJava
/**
* Execute code block while bounding its execution time between `min` and
* `max`. `within` blocks may be nested. All methods in this trait which
@ -818,11 +1014,43 @@ object TestSubscriber {
*/
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T = probe.within(min, max)(f)
/**
* JAVA API
*
* Execute code block while bounding its execution time between `min` and
* `max`. `within` blocks may be nested. All methods in this trait which
* take maximum wait times are available in a version which implicitly uses
* the remaining time governed by the innermost enclosing `within` block.
*
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "pekko.test.timefactor", while the min Duration is not.
*
* {{{
* val ret = within(Duration.ofMillis(50)) {
* test ! "ping"
* expectMsgClass(classOf[String])
* }
* }}}
*
* @since 1.1.0
*/
def within[T](min: java.time.Duration,
max: java.time.Duration,
creator: function.Creator[T]): T = within(min.asScala, max.asScala)(creator.create())
/**
* Same as calling `within(0 seconds, max)(f)`.
*/
def within[T](max: FiniteDuration)(f: => T): T = probe.within(max)(f)
/**
* JAVA API
*
* Same as calling `within(Duration.ofSeconds(0), max)(f)`.
* @since 1.1.0
*/
def within[T](max: java.time.Duration)(creator: function.Creator[T]): T = within(max.asScala)(creator.create())
def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription)
def onNext(element: I): Unit = probe.ref ! OnNext(element)
def onComplete(): Unit = probe.ref ! OnComplete
@ -831,6 +1059,13 @@ object TestSubscriber {
object Probe {
def apply[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = new Probe()(system.classicSystem)
/**
* JAVA API
*
* @since 1.1.0
*/
def create[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = apply()(system)
}
/**
@ -891,6 +1126,14 @@ object TestSubscriber {
subscription.request(1)
expectNext(d)
}
/**
* JAVA API
*
* Request and expect a stream element during the specified time or timeout.
* @since 1.1.0
*/
def requestNext(d: java.time.Duration): T = requestNext(d.asScala)
}
}

View file

@ -136,15 +136,15 @@ class StreamTestKitSpec extends PekkoSpec {
// It also needs to be dilated since the testkit will dilate the timeout
// accordingly to `-Dpekko.test.timefactor` value.
val initialDelay = (timeout * 2).dilated
val pf: PartialFunction[Any, Unit] = {
case 1 =>
system.log.info("Message received :(")
}
Source
.tick(initialDelay, 1.millis, 1)
.runWith(TestSink.probe)
.request(1)
.expectNextWithTimeoutPF(timeout,
{
case 1 =>
system.log.info("Message received :(")
})
.expectNextWithTimeoutPF(timeout, pf)
}.getMessage should include("timeout")
}
@ -180,15 +180,15 @@ class StreamTestKitSpec extends PekkoSpec {
// It also needs to be dilated since the testkit will dilate the timeout
// accordingly to `-Dpekko.test.timefactor` value.
val initialDelay = (timeout * 2).dilated
val pf: PartialFunction[Any, Unit] = {
case 1 =>
system.log.info("Message received :(")
}
Source
.tick(initialDelay, 1.millis, 1)
.runWith(TestSink.probe)
.request(1)
.expectNextChainingPF(timeout,
{
case 1 =>
system.log.info("Message received :(")
})
.expectNextChainingPF(timeout, pf)
}.getMessage should include("timeout")
}

View file

@ -1242,8 +1242,7 @@ public class SourceTest extends StreamTest {
.mergeAll(Arrays.asList(sourceB, sourceC), false)
.runWith(TestSink.probe(system), system);
sub.expectSubscription().request(9);
sub.expectNextUnorderedN(Util.immutableSeq(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)))
.expectComplete();
sub.expectNextUnorderedN(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)).expectComplete();
}
@Test