use CompletionStage in more Java tests (#2094)

This commit is contained in:
PJ Fanning 2025-08-26 08:58:30 +01:00 committed by GitHub
parent 5a8e161ec5
commit 497c7c1b6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 16 additions and 17 deletions

View file

@ -220,7 +220,7 @@ public class PatternsTest extends JUnitSuite {
@Test @Test
public void usePipe() throws Exception { public void usePipe() throws Exception {
TestProbe probe = new TestProbe(system); TestProbe probe = new TestProbe(system);
pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref()); pipe(CompletableFuture.completedFuture("ho!"), system.dispatcher()).to(probe.ref());
probe.expectMsg("ho!"); probe.expectMsg("ho!");
} }
@ -228,7 +228,7 @@ public class PatternsTest extends JUnitSuite {
public void usePipeWithActorSelection() throws Exception { public void usePipeWithActorSelection() throws Exception {
TestProbe probe = new TestProbe(system); TestProbe probe = new TestProbe(system);
ActorSelection selection = system.actorSelection(probe.ref().path()); ActorSelection selection = system.actorSelection(probe.ref().path());
pipe(Futures.successful("hi!"), system.dispatcher()).to(selection); pipe(CompletableFuture.completedFuture("hi!"), system.dispatcher()).to(selection);
probe.expectMsg("hi!"); probe.expectMsg("hi!");
} }

View file

@ -16,13 +16,14 @@ package jdocs.stream.javadsl.cookbook;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pekko.Done; import org.apache.pekko.Done;
import org.apache.pekko.NotUsed; import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.dispatch.Futures;
import org.apache.pekko.japi.pf.PFBuilder; import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.BackpressureTimeoutException; import org.apache.pekko.stream.BackpressureTimeoutException;
import org.apache.pekko.stream.javadsl.Keep; import org.apache.pekko.stream.javadsl.Keep;
@ -34,8 +35,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Promise;
public class RecipeAdhocSourceTest extends RecipeTest { public class RecipeAdhocSourceTest extends RecipeTest {
static ActorSystem system; static ActorSystem system;
@ -111,12 +110,12 @@ public class RecipeAdhocSourceTest extends RecipeTest {
public void shutdownStream() throws Exception { public void shutdownStream() throws Exception {
new TestKit(system) { new TestKit(system) {
{ {
Promise<Done> shutdown = Futures.promise(); CompletableFuture<Done> shutdown = new CompletableFuture<>();
TestSubscriber.Probe<String> probe = TestSubscriber.Probe<String> probe =
adhocSource( adhocSource(
Source.repeat("a") Source.repeat("a")
.watchTermination( .watchTermination(
(a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))),
duration200mills, duration200mills,
3) 3)
.toMat(TestSink.probe(system), Keep.right()) .toMat(TestSink.probe(system), Keep.right())
@ -124,7 +123,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
probe.requestNext("a"); probe.requestNext("a");
Thread.sleep(300); Thread.sleep(300);
Await.result(shutdown.future(), duration("3 seconds")); shutdown.get(3, TimeUnit.SECONDS);
} }
}; };
} }
@ -134,12 +133,12 @@ public class RecipeAdhocSourceTest extends RecipeTest {
public void notShutDownStream() throws Exception { public void notShutDownStream() throws Exception {
new TestKit(system) { new TestKit(system) {
{ {
Promise<Done> shutdown = Futures.promise(); CompletableFuture<Done> shutdown = new CompletableFuture<>();
TestSubscriber.Probe<String> probe = TestSubscriber.Probe<String> probe =
adhocSource( adhocSource(
Source.repeat("a") Source.repeat("a")
.watchTermination( .watchTermination(
(a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))),
duration200mills, duration200mills,
3) 3)
.toMat(TestSink.probe(system), Keep.right()) .toMat(TestSink.probe(system), Keep.right())
@ -156,7 +155,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
probe.requestNext("a"); probe.requestNext("a");
Thread.sleep(100); Thread.sleep(100);
assertEquals(false, shutdown.isCompleted()); assertEquals(false, shutdown.isDone());
} }
}; };
} }
@ -166,7 +165,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
public void restartUponDemand() throws Exception { public void restartUponDemand() throws Exception {
new TestKit(system) { new TestKit(system) {
{ {
Promise<Done> shutdown = Futures.promise(); CompletableFuture<Done> shutdown = new CompletableFuture<>();
AtomicInteger startedCount = new AtomicInteger(0); AtomicInteger startedCount = new AtomicInteger(0);
Source<String, ?> source = Source<String, ?> source =
@ -177,7 +176,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
TestSubscriber.Probe<String> probe = TestSubscriber.Probe<String> probe =
adhocSource( adhocSource(
source.watchTermination( source.watchTermination(
(a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))),
duration200mills, duration200mills,
3) 3)
.toMat(TestSink.probe(system), Keep.right()) .toMat(TestSink.probe(system), Keep.right())
@ -186,7 +185,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
probe.requestNext("a"); probe.requestNext("a");
assertEquals(1, startedCount.get()); assertEquals(1, startedCount.get());
Thread.sleep(200); Thread.sleep(200);
Await.result(shutdown.future(), duration("3 seconds")); shutdown.get(3, TimeUnit.SECONDS);
} }
}; };
} }
@ -196,7 +195,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
public void restartUptoMaxRetries() throws Exception { public void restartUptoMaxRetries() throws Exception {
new TestKit(system) { new TestKit(system) {
{ {
Promise<Done> shutdown = Futures.promise(); CompletableFuture<Done> shutdown = new CompletableFuture<>();
AtomicInteger startedCount = new AtomicInteger(0); AtomicInteger startedCount = new AtomicInteger(0);
Source<String, ?> source = Source<String, ?> source =
@ -207,7 +206,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
TestSubscriber.Probe<String> probe = TestSubscriber.Probe<String> probe =
adhocSource( adhocSource(
source.watchTermination( source.watchTermination(
(a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))),
duration200mills, duration200mills,
3) 3)
.toMat(TestSink.probe(system), Keep.right()) .toMat(TestSink.probe(system), Keep.right())
@ -217,7 +216,7 @@ public class RecipeAdhocSourceTest extends RecipeTest {
assertEquals(1, startedCount.get()); assertEquals(1, startedCount.get());
Thread.sleep(500); Thread.sleep(500);
assertEquals(true, shutdown.isCompleted()); assertEquals(true, shutdown.isDone());
Thread.sleep(500); Thread.sleep(500);
probe.requestNext("a"); probe.requestNext("a");