diff --git a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java index 41c53b4284..bcd1cb0206 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java @@ -8,17 +8,20 @@ import akka.actor.*; import akka.dispatch.Futures; import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaSpec; +import akka.testkit.TestLatch; import akka.testkit.TestProbe; import akka.util.Timeout; import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import scala.concurrent.Await; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.CompletionStage; +import java.util.Arrays; +import java.util.concurrent.*; import static akka.pattern.Patterns.ask; import static akka.pattern.Patterns.pipe; @@ -50,12 +53,24 @@ public class PatternsTest extends JUnitSuite { } } + public static final class StopActor extends AbstractActor { + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(String.class, message -> sender().tell("Pong", getSelf())) + .build(); + } + } + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("JavaAPI", AkkaSpec.testConf()); private final ActorSystem system = actorSystemResource.getSystem(); + private final ExecutionContext ec = system.dispatcher(); + @Test public void useAsk() throws Exception { @@ -72,6 +87,28 @@ public class PatternsTest extends JUnitSuite { assertEquals("Ask (Identify) should return the proper ActorIdentity", testActor, id.getActorRef().get()); } + @Test + public void testCSAsk() throws Exception { + ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class)); + CompletionStage result = PatternsCS.ask(target, "hello", 3000).thenApply(o -> (String)o); + + String actual = result.toCompletableFuture().get(3, SECONDS); + assertEquals(JavaAPITestActor.ANSWER, actual); + } + + @Test + public void testCSAskWithActorSelection() throws Exception { + ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class), "test3"); + + ActorSelection selection = system.actorSelection("/user/test3"); + ActorIdentity id = PatternsCS.ask(selection, new Identify("hello"), 3000) + .toCompletableFuture() + .thenApply(o -> (ActorIdentity)o) + .get(3, SECONDS); + + assertEquals(target, id.getActorRef().get()); + } + @Test public void testCSAskWithReplyToTimeout() throws Exception { final String expected = "hello"; @@ -186,4 +223,250 @@ public class PatternsTest extends JUnitSuite { pipe(Futures.successful("hi!"), system.dispatcher()).to(selection); probe.expectMsg("hi!"); } + + @Test + public void testCSPipeToActorRef() throws Exception { + TestProbe probe = new TestProbe(system); + CompletableFuture f = new CompletableFuture<>(); + f.complete("ho!"); + PatternsCS.pipe(f, ec).to(probe.ref()); + probe.expectMsg("ho!"); + } + + @Test + public void testCSPipeToActorSelection() throws Exception { + TestProbe probe = new TestProbe(system); + ActorSelection selection = system.actorSelection(probe.ref().path()); + CompletableFuture f = new CompletableFuture<>(); + f.complete("hi!"); + PatternsCS.pipe(f, ec).to(selection); + probe.expectMsg("hi!"); + } + + @Test + public void testRetry() throws Exception { + final String expected = "hello"; + + Future retriedFuture = + Patterns.retry( + () -> Futures.successful(expected), + 3, + Duration.apply(200, "millis"), + system.scheduler(), ec); + + String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS)); + assertEquals(expected, actual); + } + + @Test + public void testCSRetry() throws Exception { + final String expected = "hello"; + + Callable> attempt = () -> CompletableFuture.completedFuture(expected); + + CompletionStage retriedStage = + PatternsCS.retry( + attempt, + 3, + java.time.Duration.ofMillis(200), + system.scheduler(), ec); + + final String actual = retriedStage.toCompletableFuture().get(3, SECONDS); + assertEquals(expected, actual); + } + + @Test(expected = IllegalStateException.class) + public void testAfterFailedCallable() throws Exception { + Callable> failedCallable = () -> Futures.failed(new IllegalStateException("Illegal!")); + + Future delayedFuture = Patterns + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + failedCallable); + + Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); + Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); + } + + @Test(expected = IllegalStateException.class) + public void testAfterFailedFuture() throws Exception { + Future failedFuture = Futures.failed(new IllegalStateException("Illegal!")); + + Future delayedFuture = Patterns + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + failedFuture); + + Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); + Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); + } + + @Test + public void testAfterSuccessfulCallable() throws Exception { + final String expected = "Hello"; + + Future delayedFuture = Patterns + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + () -> Futures.successful(expected)); + + Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); + final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); + + assertEquals(expected, actual); + } + + @Test + public void testAfterSuccessfulFuture() throws Exception { + final String expected = "Hello"; + + Future delayedFuture = Patterns + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + Futures.successful(expected)); + + Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); + + final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); + assertEquals(expected, actual); + } + + @Test + public void testAfterFiniteDuration() throws Exception { + final String expected = "Hello"; + + Future delayedFuture = Patterns + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + Futures.successful("world")); + + Future immediateFuture = Futures.future(() -> expected, ec); + + Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec); + + final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); + assertEquals(expected, actual); + } + + @Test(expected = ExecutionException.class) + public void testCSAfterFailedCallable() throws Exception { + Callable> failedCallable = () -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new IllegalStateException("Illegal!")); + return f; + }; + + CompletionStage delayedStage = PatternsCS + .after( + java.time.Duration.ofMillis(200), + system.scheduler(), + ec, + failedCallable); + + delayedStage.toCompletableFuture().get(3, SECONDS); + } + + @Test(expected = ExecutionException.class) + public void testCSAfterFailedFuture() throws Exception { + Callable> failedFuture = () -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new IllegalStateException("Illegal!")); + return f; + }; + + CompletionStage delayedStage = PatternsCS + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + failedFuture); + + String result = delayedStage.toCompletableFuture().get(3, SECONDS); + } + + @Test + public void testCSAfterSuccessfulCallable() throws Exception { + final String expected = "Hello"; + + final Callable> cf = () -> { + CompletableFuture f = CompletableFuture.completedFuture(expected); + return f; + }; + + CompletionStage delayedStage = PatternsCS + .after( + java.time.Duration.ofMillis(200), + system.scheduler(), + ec, + cf); + + final String actual = delayedStage.toCompletableFuture().get(3, SECONDS); + assertEquals(expected, actual); + } + + @Test + public void testCSAfterSuccessfulFuture() throws Exception { + final String expected = "Hello"; + + final CompletionStage f = CompletableFuture.completedFuture(expected); + + CompletionStage delayedStage = PatternsCS + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + f); + + final String actual = delayedStage.toCompletableFuture().get(3, SECONDS); + assertEquals(expected, actual); + } + + @Test + public void testCSAfterDuration() throws Exception { + final String expected = "Hello"; + + final CompletionStage f = CompletableFuture.completedFuture("world!"); + + CompletionStage delayedStage = PatternsCS + .after( + Duration.create(200, "millis"), + system.scheduler(), + ec, + f); + + CompletableFuture immediateStage = CompletableFuture.completedFuture(expected); + CompletableFuture resultStage = CompletableFuture.anyOf(delayedStage.toCompletableFuture(), immediateStage); + + final String actual = (String) resultStage.get(3, SECONDS); + assertEquals(expected, actual); + } + + @Test + public void testGracefulStop() throws Exception { + ActorRef target = system.actorOf(Props.create(StopActor.class)); + Future result = Patterns.gracefulStop(target, FiniteDuration.apply(200, TimeUnit.MILLISECONDS)); + + Boolean actual = Await.result(result, FiniteDuration.apply(3, SECONDS)); + assertEquals(true, actual); + } + + @Test + public void testCSGracefulStop() throws Exception { + ActorRef target = system.actorOf(Props.create(StopActor.class)); + CompletionStage result = PatternsCS.gracefulStop(target, java.time.Duration.ofMillis(200)); + + Boolean actual = result.toCompletableFuture().get(3, SECONDS); + assertEquals(true, actual); + } + } diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 766b6a537e..886a429ba0 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -219,7 +219,7 @@ object Patterns { * // apply some transformation (i.e. enrich with request info) * final Future transformed = f.map(new akka.japi.Function() { ... }); * // send it on to the next stage - * Patterns.pipe(transformed).to(nextActor); + * Patterns.pipe(transformed, context).to(nextActor); * }}} */ def pipe[T](future: Future[T], context: ExecutionContext): PipeableFuture[T] = scalaPipe(future)(context) @@ -314,12 +314,8 @@ object PatternsCS { * Recommended usage: * * {{{ - * final CompletionStage f = Patterns.ask(worker, request, timeout); - * f.onSuccess(new Procedure() { - * public void apply(Object o) { - * nextActor.tell(new EnrichedResult(request, o)); - * } - * }); + * final CompletionStage f = PatternsCS.ask(worker, request, timeout); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] = @@ -365,11 +361,7 @@ object PatternsCS { * * {{{ * final CompletionStage f = PatternsCS.ask(worker, request, timeout); - * f.onSuccess(new Procedure() { - * public void apply(Object o) { - * nextActor.tell(new EnrichedResult(request, o)); - * } - * }); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = @@ -414,12 +406,8 @@ object PatternsCS { * Recommended usage: * * {{{ - * final CompletionStage f = Patterns.ask(selection, request, timeout); - * f.onSuccess(new Procedure() { - * public void apply(Object o) { - * nextActor.tell(new EnrichedResult(request, o)); - * } - * }); + * final CompletionStage f = PatternsCS.ask(selection, request, timeout); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] = @@ -446,12 +434,8 @@ object PatternsCS { * Recommended usage: * * {{{ - * final CompletionStage f = Patterns.ask(selection, request, timeout); - * f.onSuccess(new Procedure() { - * public void apply(Object o) { - * nextActor.tell(new EnrichedResult(request, o)); - * } - * }); + * final CompletionStage f = PatternsCS.ask(selection, request, timeout); + * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] = @@ -472,8 +456,8 @@ object PatternsCS { extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]] /** - * Register an onComplete callback on this [[java.util.concurrent.CompletionStage]] to send - * the result to the given [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]]. + * When this [[java.util.concurrent.CompletionStage]] finishes, send its result to the given + * [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]]. * Returns the original CompletionStage to allow method chaining. * If the future was completed with failure it is sent as a [[akka.actor.Status.Failure]] * to the recipient. @@ -481,11 +465,11 @@ object PatternsCS { * Recommended usage example: * * {{{ - * final CompletionStage f = Patterns.ask(worker, request, timeout); + * final CompletionStage f = PatternsCS.ask(worker, request, timeout); * // apply some transformation (i.e. enrich with request info) - * final CompletionStage transformed = f.map(new akka.japi.Function() { ... }); + * final CompletionStage transformed = f.thenApply(result -> { ... }); * // send it on to the next stage - * Patterns.pipe(transformed).to(nextActor); + * PatternsCS.pipe(transformed, context).to(nextActor); * }}} */ def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context)