Merge pull request #26069 from akka/wip-PatternCS-followup-patriknw

doc and test follow up of PatternCS, #26000
This commit is contained in:
Patrik Nordwall 2018-12-06 21:26:59 +01:00 committed by GitHub
commit 712b72f649
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 40 additions and 54 deletions

View file

@ -91,7 +91,7 @@ public class PatternsTest extends JUnitSuite {
@Test @Test
public void testCSAsk() throws Exception { public void testCSAsk() throws Exception {
ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class)); ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class));
CompletionStage<String> result = PatternsCS.ask(target, "hello", 3000).thenApply(o -> (String)o); CompletionStage<String> result = Patterns.ask(target, "hello", Duration.ofSeconds(3)).thenApply(o -> (String)o);
String actual = result.toCompletableFuture().get(3, SECONDS); String actual = result.toCompletableFuture().get(3, SECONDS);
assertEquals(JavaAPITestActor.ANSWER, actual); assertEquals(JavaAPITestActor.ANSWER, actual);
@ -102,7 +102,7 @@ public class PatternsTest extends JUnitSuite {
ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class), "test3"); ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class), "test3");
ActorSelection selection = system.actorSelection("/user/test3"); ActorSelection selection = system.actorSelection("/user/test3");
ActorIdentity id = PatternsCS.ask(selection, new Identify("hello"), 3000) ActorIdentity id = Patterns.ask(selection, new Identify("hello"), Duration.ofSeconds(3))
.toCompletableFuture() .toCompletableFuture()
.thenApply(o -> (ActorIdentity)o) .thenApply(o -> (ActorIdentity)o)
.get(3, SECONDS); .get(3, SECONDS);
@ -115,7 +115,7 @@ public class PatternsTest extends JUnitSuite {
final String expected = "hello"; final String expected = "hello";
final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class)); final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class));
final CompletionStage<String> response = PatternsCS final CompletionStage<String> response = Patterns
.askWithReplyTo( .askWithReplyTo(
echo, echo,
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
@ -132,11 +132,11 @@ public class PatternsTest extends JUnitSuite {
final String expected = "hello"; final String expected = "hello";
final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class)); final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class));
final CompletionStage<String> response = PatternsCS final CompletionStage<String> response = Patterns
.askWithReplyTo( .askWithReplyTo(
echo, echo,
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
3000 Duration.ofSeconds(3)
) )
.thenApply(o -> (String)o); .thenApply(o -> (String)o);
@ -150,11 +150,11 @@ public class PatternsTest extends JUnitSuite {
final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class)); final ActorRef echo = system.actorOf(Props.create(ExplicitAskTestActor.class));
final ActorSelection selection = system.actorSelection(echo.path()); final ActorSelection selection = system.actorSelection(echo.path());
final CompletionStage<String> response = PatternsCS final CompletionStage<String> response = Patterns
.askWithReplyTo( .askWithReplyTo(
selection, selection,
replyTo -> new ExplicitAskTestActor.Message(expected, replyTo), replyTo -> new ExplicitAskTestActor.Message(expected, replyTo),
3000 Duration.ofSeconds(3)
) )
.thenApply(o -> (String)o); .thenApply(o -> (String)o);
@ -268,7 +268,7 @@ public class PatternsTest extends JUnitSuite {
Callable<CompletionStage<String>> attempt = () -> CompletableFuture.completedFuture(expected); Callable<CompletionStage<String>> attempt = () -> CompletableFuture.completedFuture(expected);
CompletionStage<String> retriedStage = CompletionStage<String> retriedStage =
PatternsCS.retry( Patterns.retry(
attempt, attempt,
3, 3,
Duration.ofMillis(200), Duration.ofMillis(200),
@ -369,7 +369,7 @@ public class PatternsTest extends JUnitSuite {
return f; return f;
}; };
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = Patterns
.after( .after(
Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
@ -387,7 +387,7 @@ public class PatternsTest extends JUnitSuite {
return f; return f;
}; };
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = Patterns
.after( .after(
Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
@ -406,7 +406,7 @@ public class PatternsTest extends JUnitSuite {
return f; return f;
}; };
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = Patterns
.after( .after(
Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
@ -423,7 +423,7 @@ public class PatternsTest extends JUnitSuite {
final CompletionStage<String> f = CompletableFuture.completedFuture(expected); final CompletionStage<String> f = CompletableFuture.completedFuture(expected);
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = Patterns
.after( .after(
Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
@ -440,7 +440,7 @@ public class PatternsTest extends JUnitSuite {
final CompletionStage<String> f = CompletableFuture.completedFuture("world!"); final CompletionStage<String> f = CompletableFuture.completedFuture("world!");
CompletionStage<String> delayedStage = PatternsCS CompletionStage<String> delayedStage = Patterns
.after( .after(
Duration.ofMillis(200), Duration.ofMillis(200),
system.scheduler(), system.scheduler(),
@ -466,7 +466,7 @@ public class PatternsTest extends JUnitSuite {
@Test @Test
public void testCSGracefulStop() throws Exception { public void testCSGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class)); ActorRef target = system.actorOf(Props.create(StopActor.class));
CompletionStage<Boolean> result = PatternsCS.gracefulStop(target, Duration.ofMillis(200)); CompletionStage<Boolean> result = Patterns.gracefulStop(target, Duration.ofMillis(200));
Boolean actual = result.toCompletableFuture().get(3, SECONDS); Boolean actual = result.toCompletableFuture().get(3, SECONDS);
assertEquals(true, actual); assertEquals(true, actual);

View file

@ -13,13 +13,7 @@ import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
/** /**
* "Pre Java 8" Java API for Akka patterns such as `ask`, `pipe` and others. * Java API: for Akka patterns such as `ask`, `pipe` and others which work with [[java.util.concurrent.CompletionStage]].
*
* These methods are possible to call from Java however work with the Scala [[scala.concurrent.Future]],
* due to the lack of non-blocking reactive Future implementation before Java 8.
*
* For Java applications developed with Java 8 and later, you might want to use [[akka.pattern.PatternsCS]] instead,
* which provide alternatives for these patterns which work with [[java.util.concurrent.CompletionStage]].
*/ */
object Patterns { object Patterns {
import akka.actor.ActorRef import akka.actor.ActorRef
@ -92,7 +86,7 @@ object Patterns {
* <b>Recommended usage:</b> * <b>Recommended usage:</b>
* *
* {{{ * {{{
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, duration); * final CompletionStage<Object> f = Patterns.ask(worker, request, duration);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@ -118,7 +112,7 @@ object Patterns {
* sender reference in message. * sender reference in message.
* *
* {{{ * {{{
* final CompletionStage<Object> f = PatternsCS.askWithReplyTo( * final CompletionStage<Object> f = Patterns.askWithReplyTo(
* worker, * worker,
* askSender -> new Request(askSender), * askSender -> new Request(askSender),
* timeout); * timeout);
@ -236,7 +230,7 @@ object Patterns {
* <b>Recommended usage:</b> * <b>Recommended usage:</b>
* *
* {{{ * {{{
* final CompletionStage<Object> f = PatternsCS.ask(selection, request, duration); * final CompletionStage<Object> f = Patterns.ask(selection, request, duration);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result))); * f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}} * }}}
*/ */
@ -334,11 +328,11 @@ object Patterns {
* <b>Recommended usage example:</b> * <b>Recommended usage example:</b>
* *
* {{{ * {{{
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, timeout); * final CompletionStage<Object> f = Patterns.ask(worker, request, timeout);
* // apply some transformation (i.e. enrich with request info) * // apply some transformation (i.e. enrich with request info)
* final CompletionStage<Object> transformed = f.thenApply(result -> { ... }); * final CompletionStage<Object> transformed = f.thenApply(result -> { ... });
* // send it on to the next operator * // send it on to the next operator
* PatternsCS.pipe(transformed, context).to(nextActor); * Patterns.pipe(transformed, context).to(nextActor);
* }}} * }}}
*/ */
def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context) def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context)

View file

@ -662,7 +662,7 @@ Messages are sent to an Actor through one of the following methods.
* @scala[`!`] @java[`tell` ] means “fire-and-forget”, e.g. send a message asynchronously and return * @scala[`!`] @java[`tell` ] means “fire-and-forget”, e.g. send a message asynchronously and return
immediately. @scala[Also known as `tell`.] immediately. @scala[Also known as `tell`.]
* @scala[`?`] @java[`ask`] sends a message asynchronously and returns a `Future` * @scala[`?`] @java[`ask`] sends a message asynchronously and returns a @scala:[`Future`]@java:[`CompletionStage`]
representing a possible reply. @scala[Also known as `ask`.] representing a possible reply. @scala[Also known as `ask`.]
Message ordering is guaranteed on a per-sender basis. Message ordering is guaranteed on a per-sender basis.
@ -737,32 +737,25 @@ Java
This example demonstrates `ask` together with the `pipeTo` pattern on This example demonstrates `ask` together with the `pipeTo` pattern on
futures, because this is likely to be a common combination. Please note that futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: `ask` produces all of the above is completely non-blocking and asynchronous: `ask` produces
a `Future`, @scala[three] @java[two] of which are composed into a new future using the a @scala:[`Future`]@java:[`CompletionStage`], @scala[three] @java[two] of which are composed into a new future using the
@scala[for-comprehension and then `pipeTo` installs an `onComplete`-handler on the future to affect] @scala[for-comprehension and then `pipeTo` installs an `onComplete`-handler on the `Future` to affect]
@java[`Futures.sequence` and `map` methods and then `pipe` installs an `onComplete`-handler on the future to effect] @java[`CompletableFuture.allOf` and `thenApply` methods and then `pipe` installs a handler on the `CompletionStage` to effect]
the submission of the aggregated `Result` to another actor. the submission of the aggregated `Result` to another actor.
Using `ask` will send a message to the receiving Actor as with `tell`, and Using `ask` will send a message to the receiving Actor as with `tell`, and
the receiving actor must reply with @scala[`sender() ! reply`] @java[`getSender().tell(reply, getSelf())` ] in order to the receiving actor must reply with @scala[`sender() ! reply`] @java[`getSender().tell(reply, getSelf())` ] in order to
complete the returned `Future` with a value. The `ask` operation involves creating complete the returned @scala:[`Future`]@java:[`CompletionStage`] with a value. The `ask` operation involves creating
an internal actor for handling this reply, which needs to have a timeout after an internal actor for handling this reply, which needs to have a timeout after
which it is destroyed in order not to leak resources; see more below. which it is destroyed in order not to leak resources; see more below.
@@@ note { .group-java }
A variant of the `ask` pattern that returns a `CompletionStage` instead of a Scala `Future`
is available in the `akka.pattern.PatternsCS` object.
@@@
@@@ warning @@@ warning
To complete the future with an exception you need to send an `akka.actor.Status.Failure` message to the sender. To complete the @scala:[`Future`]@java:[`CompletionStage`] with an exception you need to send an `akka.actor.Status.Failure` message to the sender.
This is *not done automatically* when an actor throws an exception while processing a message. This is *not done automatically* when an actor throws an exception while processing a message.
Please note that Scala's `Try` sub types `scala.util.Failure` and `scala.util.Success` are not treated @scala:[Please note that Scala's `Try` sub types `scala.util.Failure` and `scala.util.Success` are not treated
specially, and would complete the ask Future with the given value - only the `akka.actor.Status` messages specially, and would complete the ask @scala:[`Future`]@java:[`CompletionStage`] with the given value - only the `akka.actor.Status` messages
are treated specially by the ask pattern. are treated specially by the ask pattern.]
@@@ @@@
@ -772,9 +765,9 @@ Scala
Java Java
: @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #reply-exception } : @@snip [ActorDocTest.java](/akka-docs/src/test/java/jdocs/actor/ActorDocTest.java) { #reply-exception }
If the actor does not complete the future, it will expire after the timeout period, If the actor does not complete the @scala:[`Future`]@java:[`CompletionStage`], it will expire after the timeout period,
@scala[completing it with an `AskTimeoutException`. The timeout is taken from one of the following locations in order of precedence:] @scala[completing it with an `AskTimeoutException`. The timeout is taken from one of the following locations in order of precedence:]
@java[specified as parameter to the `ask` method; this will complete the `Future` with an `AskTimeoutException`.] @java[specified as parameter to the `ask` method; this will complete the `CompletionStage` with an `AskTimeoutException`.]
@@@ div { .group-scala } @@@ div { .group-scala }
@ -786,18 +779,17 @@ If the actor does not complete the future, it will expire after the timeout peri
@@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #using-implicit-timeout } @@snip [ActorDocSpec.scala](/akka-docs/src/test/scala/docs/actor/ActorDocSpec.scala) { #using-implicit-timeout }
See @ref:[Futures](futures.md) for more information on how to await or query a future.
@@@ @@@
See @ref:[Futures](futures.md) for more information on how to await or query a The @scala:[`onComplete` method of the `Future`]@java:[`thenRun` method of the `CompletionStage`] can be
future. used to register a callback to get a notification when the @scala:[`Future`]@java:[`CompletionStage`] completes, giving
The `onComplete`, `onSuccess`, or `onFailure` methods of the `Future` can be
used to register a callback to get a notification when the Future completes, giving
you a way to avoid blocking. you a way to avoid blocking.
@@@ warning @@@ warning
When using future callbacks, @scala[such as `onComplete`, `onSuccess`, and `onFailure`,] When using future callbacks, @scala[such as `onComplete`, or `map`]@scala[such as `thenRun`, or `thenApply`]
inside actors you need to carefully avoid closing over inside actors you need to carefully avoid closing over
the containing actors reference, i.e. do not call methods or access mutable state the containing actors reference, i.e. do not call methods or access mutable state
on the enclosing actor from within the callback. This would break the actor on the enclosing actor from within the callback. This would break the actor

View file

@ -96,7 +96,7 @@ or a `ClassCastException` if not. Handling `Exception`s will be discussed furthe
## Use the pipe pattern ## Use the pipe pattern
Another useful message-transfer pattern is "pipe", which is to send the result of @scala[`Future`]@java[`CompletableFuture`] to another actor, upon completion of the @scala[`Future`]@java[`CompletableFuture`]. Another useful message-transfer pattern is "pipe", which is to send the result of @scala[`Future`]@java[`CompletableFuture`] to another actor, upon completion of the @scala[`Future`]@java[`CompletableFuture`].
The pipe pattern can be used by importing @java[`akka.pattern.PatternsCS.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope.] The pipe pattern can be used by importing @java[`akka.pattern.Patterns.pipe`.]@scala[`akka.pattern.pipe`, and define or import an implicit instance of `ExecutionContext` in the scope.]
Scala Scala
: @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #pipe-to-usage } : @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #pipe-to-usage }
@ -466,7 +466,7 @@ Java
## Retry ## Retry
@scala[`akka.pattern.retry`]@java[`akka.pattern.PatternsCS.retry`] will retry a @scala[`Future` class]@java[`CompletionStage` class] some number of times with a delay between each attempt. @scala[`akka.pattern.retry`]@java[`akka.pattern.Patterns.retry`] will retry a @scala[`Future` class]@java[`CompletionStage` class] some number of times with a delay between each attempt.
Scala Scala
: @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #retry } : @@snip [FutureDocSpec.scala](/akka-docs/src/test/scala/docs/future/FutureDocSpec.scala) { #retry }

View file

@ -67,7 +67,7 @@ used for writing stream tests that use familiar `TestProbe` from the
`akka-testkit` API. `akka-testkit` API.
One of the more straightforward tests would be to materialize stream to a One of the more straightforward tests would be to materialize stream to a
@scala[`Future`]@java[`CompletionStage`] and then use @scala[`pipe`]@java[`PatternsCS.pipe`] pattern to pipe the result of that future @scala[`Future`]@java[`CompletionStage`] and then use @scala[`pipe`]@java[`Patterns.pipe`] pattern to pipe the result of that future
to the probe. to the probe.
Scala Scala

View file

@ -105,7 +105,7 @@ Java
For `RespondTemperature` and `DeviceTerminated` we keep track of the replies by updating `repliesSoFar` and remove the actor from `stillWaiting`, and then delegate to a method `respondWhenAllCollected`, which we will discuss soon. For `RespondTemperature` and `DeviceTerminated` we keep track of the replies by updating `repliesSoFar` and remove the actor from `stillWaiting`, and then delegate to a method `respondWhenAllCollected`, which we will discuss soon.
In the case of timeout, we need to take all the actors that have not yet replied yet (the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. In the case of timeout, we need to take all the actors that have not yet replied (the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply.
We now have to figure out what to do in `respondWhenAllCollected`. First, we need to record the new result in the map `repliesSoFar` and remove the actor from `stillWaiting`. The next step is to check if there are any remaining actors we are waiting for. If there is none, we send the result of the query to the original requester and stop the query actor. Otherwise, we need to update the `repliesSoFar` and `stillWaiting` structures and wait for more messages. We now have to figure out what to do in `respondWhenAllCollected`. First, we need to record the new result in the map `repliesSoFar` and remove the actor from `stillWaiting`. The next step is to check if there are any remaining actors we are waiting for. If there is none, we send the result of the query to the original requester and stop the query actor. Otherwise, we need to update the `repliesSoFar` and `stillWaiting` structures and wait for more messages.