diff --git a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java index 2e316fb5df..591ae15c4c 100644 --- a/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java +++ b/akka-actor-tests/src/test/java/akka/pattern/PatternsTest.java @@ -91,7 +91,7 @@ public class PatternsTest extends JUnitSuite { @Test public void testCSAsk() throws Exception { ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class)); - CompletionStage result = PatternsCS.ask(target, "hello", 3000).thenApply(o -> (String)o); + CompletionStage result = Patterns.ask(target, "hello", Duration.ofSeconds(3)).thenApply(o -> (String)o); String actual = result.toCompletableFuture().get(3, SECONDS); assertEquals(JavaAPITestActor.ANSWER, actual); @@ -102,7 +102,7 @@ public class PatternsTest extends JUnitSuite { ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class), "test3"); ActorSelection selection = system.actorSelection("/user/test3"); - ActorIdentity id = PatternsCS.ask(selection, new Identify("hello"), 3000) + ActorIdentity id = Patterns.ask(selection, new Identify("hello"), Duration.ofSeconds(3)) .toCompletableFuture() .thenApply(o -> (ActorIdentity)o) .get(3, SECONDS); @@ -115,7 +115,7 @@ public class PatternsTest extends JUnitSuite { final String expected = "hello"; final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class)); - final CompletionStage response = PatternsCS + final CompletionStage response = Patterns .askWithReplyTo( echo, replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), @@ -132,11 +132,11 @@ public class PatternsTest extends JUnitSuite { final String expected = "hello"; final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class)); - final CompletionStage response = PatternsCS + final CompletionStage response = Patterns .askWithReplyTo( echo, replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), - 3000 + Duration.ofSeconds(3) ) .thenApply(o -> (String)o); @@ -150,11 +150,11 @@ public class PatternsTest extends JUnitSuite { final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class)); final ActorSelection selection = system.actorSelection(echo.path()); - final CompletionStage response = PatternsCS + final CompletionStage response = Patterns .askWithReplyTo( selection, replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), - 3000 + Duration.ofSeconds(3) ) .thenApply(o -> (String)o); @@ -268,7 +268,7 @@ public class PatternsTest extends JUnitSuite { Callable> attempt = () -> CompletableFuture.completedFuture(expected); CompletionStage retriedStage = - PatternsCS.retry( + Patterns.retry( attempt, 3, Duration.ofMillis(200), @@ -369,7 +369,7 @@ public class PatternsTest extends JUnitSuite { return f; }; - CompletionStage delayedStage = PatternsCS + CompletionStage delayedStage = Patterns .after( Duration.ofMillis(200), system.scheduler(), @@ -387,7 +387,7 @@ public class PatternsTest extends JUnitSuite { return f; }; - CompletionStage delayedStage = PatternsCS + CompletionStage delayedStage = Patterns .after( Duration.ofMillis(200), system.scheduler(), @@ -406,7 +406,7 @@ public class PatternsTest extends JUnitSuite { return f; }; - CompletionStage delayedStage = PatternsCS + CompletionStage delayedStage = Patterns .after( Duration.ofMillis(200), system.scheduler(), @@ -423,7 +423,7 @@ public class PatternsTest extends JUnitSuite { final CompletionStage f = CompletableFuture.completedFuture(expected); - CompletionStage delayedStage = PatternsCS + CompletionStage delayedStage = Patterns .after( Duration.ofMillis(200), system.scheduler(), @@ -440,7 +440,7 @@ public class PatternsTest extends JUnitSuite { final CompletionStage f = CompletableFuture.completedFuture("world!"); - CompletionStage delayedStage = PatternsCS + CompletionStage delayedStage = Patterns .after( Duration.ofMillis(200), system.scheduler(), @@ -466,7 +466,7 @@ public class PatternsTest extends JUnitSuite { @Test public void testCSGracefulStop() throws Exception { ActorRef target = system.actorOf(Props.create(StopActor.class)); - CompletionStage result = PatternsCS.gracefulStop(target, Duration.ofMillis(200)); + CompletionStage result = Patterns.gracefulStop(target, Duration.ofMillis(200)); Boolean actual = result.toCompletableFuture().get(3, SECONDS); assertEquals(true, actual); diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index fde836919c..784bade9ca 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -13,13 +13,7 @@ import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext /** - * "Pre Java 8" Java API for Akka patterns such as `ask`, `pipe` and others. - * - * These methods are possible to call from Java however work with the Scala [[scala.concurrent.Future]], - * due to the lack of non-blocking reactive Future implementation before Java 8. - * - * For Java applications developed with Java 8 and later, you might want to use [[akka.pattern.PatternsCS]] instead, - * which provide alternatives for these patterns which work with [[java.util.concurrent.CompletionStage]]. + * Java API: for Akka patterns such as `ask`, `pipe` and others which work with [[java.util.concurrent.CompletionStage]]. */ object Patterns { import akka.actor.ActorRef @@ -92,7 +86,7 @@ object Patterns { * Recommended usage: * * {{{ - * final CompletionStage f = PatternsCS.ask(worker, request, duration); + * final CompletionStage f = Patterns.ask(worker, request, duration); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ @@ -118,7 +112,7 @@ object Patterns { * sender reference in message. * * {{{ - * final CompletionStage f = PatternsCS.askWithReplyTo( + * final CompletionStage f = Patterns.askWithReplyTo( * worker, * askSender -> new Request(askSender), * timeout); @@ -236,7 +230,7 @@ object Patterns { * Recommended usage: * * {{{ - * final CompletionStage f = PatternsCS.ask(selection, request, duration); + * final CompletionStage f = Patterns.ask(selection, request, duration); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * }}} */ @@ -334,11 +328,11 @@ object Patterns { * Recommended usage example: * * {{{ - * final CompletionStage f = PatternsCS.ask(worker, request, timeout); + * final CompletionStage f = Patterns.ask(worker, request, timeout); * // apply some transformation (i.e. enrich with request info) * final CompletionStage transformed = f.thenApply(result -> { ... }); * // send it on to the next operator - * PatternsCS.pipe(transformed, context).to(nextActor); + * Patterns.pipe(transformed, context).to(nextActor); * }}} */ def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context) diff --git a/akka-docs/src/main/paradox/actors.md b/akka-docs/src/main/paradox/actors.md index 7f2901d471..d0c91faaec 100644 --- a/akka-docs/src/main/paradox/actors.md +++ b/akka-docs/src/main/paradox/actors.md @@ -662,7 +662,7 @@ Messages are sent to an Actor through one of the following methods. * @scala[`!`] @java[`tell` ] means “fire-and-forget”, e.g. send a message asynchronously and return immediately. @scala[Also known as `tell`.] - * @scala[`?`] @java[`ask`] sends a message asynchronously and returns a `Future` + * @scala[`?`] @java[`ask`] sends a message asynchronously and returns a @scala:[`Future`]@java:[`CompletionStage`] representing a possible reply. @scala[Also known as `ask`.] Message ordering is guaranteed on a per-sender basis. @@ -737,32 +737,25 @@ Java This example demonstrates `ask` together with the `pipeTo` pattern on futures, because this is likely to be a common combination. Please note that all of the above is completely non-blocking and asynchronous: `ask` produces -a `Future`, @scala[three] @java[two] of which are composed into a new future using the -@scala[for-comprehension and then `pipeTo` installs an `onComplete`-handler on the future to affect] -@java[`Futures.sequence` and `map` methods and then `pipe` installs an `onComplete`-handler on the future to effect] +a @scala:[`Future`]@java:[`CompletionStage`], @scala[three] @java[two] of which are composed into a new future using the +@scala[for-comprehension and then `pipeTo` installs an `onComplete`-handler on the `Future` to affect] +@java[`CompletableFuture.allOf` and `thenApply` methods and then `pipe` installs a handler on the `CompletionStage` to effect] the submission of the aggregated `Result` to another actor. Using `ask` will send a message to the receiving Actor as with `tell`, and the receiving actor must reply with @scala[`sender() ! reply`] @java[`getSender().tell(reply, getSelf())` ] in order to -complete the returned `Future` with a value. The `ask` operation involves creating +complete the returned @scala:[`Future`]@java:[`CompletionStage`] with a value. The `ask` operation involves creating an internal actor for handling this reply, which needs to have a timeout after which it is destroyed in order not to leak resources; see more below. -@@@ note { .group-java } - -A variant of the `ask` pattern that returns a `CompletionStage` instead of a Scala `Future` -is available in the `akka.pattern.PatternsCS` object. - -@@@ - @@@ warning -To complete the future with an exception you need to send an `akka.actor.Status.Failure` message to the sender. +To complete the @scala:[`Future`]@java:[`CompletionStage`] with an exception you need to send an `akka.actor.Status.Failure` message to the sender. This is *not done automatically* when an actor throws an exception while processing a message. -Please note that Scala's `Try` sub types `scala.util.Failure` and `scala.util.Success` are not treated -specially, and would complete the ask Future with the given value - only the `akka.actor.Status` messages -are treated specially by the ask pattern. +@scala:[Please note that Scala's `Try` sub types `scala.util.Failure` and `scala.util.Success` are not treated +specially, and would complete the ask @scala:[`Future`]@java:[`CompletionStage`] with the given value - only the `akka.actor.Status` messages +are treated specially by the ask pattern.] @@@ @@ -772,9 +765,9 @@ Scala Java : @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #reply-exception } -If the actor does not complete the future, it will expire after the timeout period, +If the actor does not complete the @scala:[`Future`]@java:[`CompletionStage`], it will expire after the timeout period, @scala[completing it with an `AskTimeoutException`. The timeout is taken from one of the following locations in order of precedence:] -@java[specified as parameter to the `ask` method; this will complete the `Future` with an `AskTimeoutException`.] +@java[specified as parameter to the `ask` method; this will complete the `CompletionStage` with an `AskTimeoutException`.] @@@ div { .group-scala } @@ -786,18 +779,17 @@ If the actor does not complete the future, it will expire after the timeout peri @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #using-implicit-timeout } +See @ref:[Futures](futures.md) for more information on how to await or query a future. + @@@ -See @ref:[Futures](futures.md) for more information on how to await or query a -future. - -The `onComplete`, `onSuccess`, or `onFailure` methods of the `Future` can be -used to register a callback to get a notification when the Future completes, giving +The @scala:[`onComplete` method of the `Future`]@java:[`thenRun` method of the `CompletionStage`] can be +used to register a callback to get a notification when the @scala:[`Future`]@java:[`CompletionStage`] completes, giving you a way to avoid blocking. @@@ warning -When using future callbacks, @scala[such as `onComplete`, `onSuccess`, and `onFailure`,] +When using future callbacks, @scala[such as `onComplete`, or `map`]@scala[such as `thenRun`, or `thenApply`] inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md index 32eecddec7..0788292644 100644 --- a/akka-docs/src/main/paradox/futures.md +++ b/akka-docs/src/main/paradox/futures.md @@ -96,7 +96,7 @@ or a `ClassCastException` if not. Handling `Exception`s will be discussed furthe ## Use the pipe pattern Another useful message-transfer pattern is "pipe", which is to send the result of @scala[`Future`]@java[`CompletableFuture`] to another actor, upon completion of the @scala[`Future`]@java[`CompletableFuture`]. -The pipe pattern can be used by importing @java[`akka.pattern.PatternsCS.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope.] +The pipe pattern can be used by importing @java[`akka.pattern.Patterns.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope.] Scala : @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #pipe-to-usage } @@ -466,7 +466,7 @@ Java ## Retry -@scala[`akka.pattern.retry`]@java[`akka.pattern.PatternsCS.retry`] will retry a @scala[`Future` class]@java[`CompletionStage` class] some number of times with a delay between each attempt. +@scala[`akka.pattern.retry`]@java[`akka.pattern.Patterns.retry`] will retry a @scala[`Future` class]@java[`CompletionStage` class] some number of times with a delay between each attempt. Scala : @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #retry } diff --git a/akka-docs/src/main/paradox/stream/stream-testkit.md b/akka-docs/src/main/paradox/stream/stream-testkit.md index 6aa9e7c77d..a8115bfd73 100644 --- a/akka-docs/src/main/paradox/stream/stream-testkit.md +++ b/akka-docs/src/main/paradox/stream/stream-testkit.md @@ -67,7 +67,7 @@ used for writing stream tests that use familiar `TestProbe` from the `akka-testkit` API. One of the more straightforward tests would be to materialize stream to a -@scala[`Future`]@java[`CompletionStage`] and then use @scala[`pipe`]@java[`PatternsCS.pipe`] pattern to pipe the result of that future +@scala[`Future`]@java[`CompletionStage`] and then use @scala[`pipe`]@java[`Patterns.pipe`] pattern to pipe the result of that future to the probe. Scala diff --git a/akka-docs/src/main/paradox/typed/guide/tutorial_5.md b/akka-docs/src/main/paradox/typed/guide/tutorial_5.md index 54c09bd034..c8c937aeb8 100644 --- a/akka-docs/src/main/paradox/typed/guide/tutorial_5.md +++ b/akka-docs/src/main/paradox/typed/guide/tutorial_5.md @@ -105,7 +105,7 @@ Java For `RespondTemperature` and `DeviceTerminated` we keep track of the replies by updating `repliesSoFar` and remove the actor from `stillWaiting`, and then delegate to a method `respondWhenAllCollected`, which we will discuss soon. -In the case of timeout, we need to take all the actors that have not yet replied yet (the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. +In the case of timeout, we need to take all the actors that have not yet replied (the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. We now have to figure out what to do in `respondWhenAllCollected`. First, we need to record the new result in the map `repliesSoFar` and remove the actor from `stillWaiting`. The next step is to check if there are any remaining actors we are waiting for. If there is none, we send the result of the query to the original requester and stop the query actor. Otherwise, we need to update the `repliesSoFar` and `stillWaiting` structures and wait for more messages.