diff --git a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java index 317bbe85b1..449f56c6e4 100644 --- a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java +++ b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java @@ -220,7 +220,7 @@ public class PatternsTest extends JUnitSuite { @Test public void usePipe() throws Exception { TestProbe probe = new TestProbe(system); - pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref()); + pipe(CompletableFuture.completedFuture("ho!"), system.dispatcher()).to(probe.ref()); probe.expectMsg("ho!"); } @@ -228,7 +228,7 @@ public class PatternsTest extends JUnitSuite { public void usePipeWithActorSelection() throws Exception { TestProbe probe = new TestProbe(system); ActorSelection selection = system.actorSelection(probe.ref().path()); - pipe(Futures.successful("hi!"), system.dispatcher()).to(selection); + pipe(CompletableFuture.completedFuture("hi!"), system.dispatcher()).to(selection); probe.expectMsg("hi!"); } diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index 0d216479a7..b319a3554a 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -16,13 +16,14 @@ package jdocs.stream.javadsl.cookbook; import static org.junit.Assert.assertEquals; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.japi.pf.PFBuilder; import org.apache.pekko.stream.BackpressureTimeoutException; import org.apache.pekko.stream.javadsl.Keep; @@ -34,8 +35,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Promise; public class RecipeAdhocSourceTest extends RecipeTest { static ActorSystem system; @@ -111,12 +110,12 @@ public class RecipeAdhocSourceTest extends RecipeTest { public void shutdownStream() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture<>(); TestSubscriber.Probe probe = adhocSource( Source.repeat("a") .watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -124,7 +123,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { probe.requestNext("a"); Thread.sleep(300); - Await.result(shutdown.future(), duration("3 seconds")); + shutdown.get(3, TimeUnit.SECONDS); } }; } @@ -134,12 +133,12 @@ public class RecipeAdhocSourceTest extends RecipeTest { public void notShutDownStream() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture<>(); TestSubscriber.Probe probe = adhocSource( Source.repeat("a") .watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -156,7 +155,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { probe.requestNext("a"); Thread.sleep(100); - assertEquals(false, shutdown.isCompleted()); + assertEquals(false, shutdown.isDone()); } }; } @@ -166,7 +165,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { public void restartUponDemand() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture<>(); AtomicInteger startedCount = new AtomicInteger(0); Source source = @@ -177,7 +176,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { TestSubscriber.Probe probe = adhocSource( source.watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -186,7 +185,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { probe.requestNext("a"); assertEquals(1, startedCount.get()); Thread.sleep(200); - Await.result(shutdown.future(), duration("3 seconds")); + shutdown.get(3, TimeUnit.SECONDS); } }; } @@ -196,7 +195,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { public void restartUptoMaxRetries() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture<>(); AtomicInteger startedCount = new AtomicInteger(0); Source source = @@ -207,7 +206,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { TestSubscriber.Probe probe = adhocSource( source.watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -217,7 +216,7 @@ public class RecipeAdhocSourceTest extends RecipeTest { assertEquals(1, startedCount.get()); Thread.sleep(500); - assertEquals(true, shutdown.isCompleted()); + assertEquals(true, shutdown.isDone()); Thread.sleep(500); probe.requestNext("a");