From 996f42483523093323c9d30cb41a6005adc38558 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 9 Jul 2020 16:57:53 +0200 Subject: [PATCH] Ok/error protocol and failable ask #29186 (#29190) New type StatusReply simplifies the very common use case of replying to a request with either a successful reply or an error reply which can be repetitive to define for every actor, with the additional overhead of having to make sure each such sealed top type + 2 concrete reply classes has working serialization. --- .../java/akka/pattern/StatusReplyTest.java | 123 ++++ .../scala/akka/pattern/StatusReplySpec.scala | 90 +++ .../typed/javadsl/ActorContextAskTest.java | 99 ++- .../InteractionPatternsAskWithStatusTest.java | 239 +++++++ .../akka/typed/InteractionPatternsTest.java | 18 +- .../test/scala/akka/actor/typed/AskSpec.scala | 32 +- .../typed/scaladsl/ActorContextAskSpec.scala | 75 ++- .../akka/typed/InteractionPatternsSpec.scala | 119 ++++ .../ask-with-status.excludes | 3 + .../typed/internal/ActorContextImpl.scala | 34 +- .../actor/typed/javadsl/ActorContext.scala | 16 +- .../akka/actor/typed/javadsl/AskPattern.scala | 16 +- .../actor/typed/scaladsl/ActorContext.scala | 12 +- .../actor/typed/scaladsl/AskPattern.scala | 12 + .../src/main/scala/akka/actor/Actor.scala | 2 + .../main/scala/akka/pattern/AskSupport.scala | 27 + .../main/scala/akka/pattern/Patterns.scala | 9 + .../main/scala/akka/pattern/StatusReply.scala | 167 +++++ .../ask-with-status.excludes | 3 + .../typed/internal/ClusterShardingImpl.scala | 9 +- .../internal/testkit/TestEntityRefImpl.scala | 8 +- .../typed/javadsl/ClusterSharding.scala | 10 +- .../typed/scaladsl/ClusterSharding.scala | 10 +- .../sharding/typed/AccountExampleDocTest.java | 34 +- .../sharding/typed/AccountExampleTest.java | 60 +- ...ccountExampleWithEventHandlersInState.java | 53 +- .../typed/AccountExampleWithMutableState.java | 51 +- .../typed/AccountExampleWithNullState.java | 51 +- .../typed/AccountExampleDocSpec.scala | 28 +- .../sharding/typed/AccountExampleSpec.scala | 60 +- ...untExampleWithCommandHandlersInState.scala | 34 +- ...countExampleWithEventHandlersInState.scala | 46 +- .../typed/AccountExampleWithOptionState.scala | 34 +- .../internal/AkkaClusterTypedSerializer.scala | 1 + .../paradox/typed/interaction-patterns.md | 50 +- .../src/main/paradox/typed/persistence.md | 9 +- .../src/main/paradox/typed/style-guide.md | 3 + .../persistence/typed/BlogPostEntity.scala | 12 +- .../java/akka/remote/ContainerFormats.java | 636 +++++++++++++++++- .../src/main/protobuf/ContainerFormats.proto | 6 + akka-remote/src/main/resources/reference.conf | 2 + .../serialization/MiscMessageSerializer.scala | 105 ++- .../MiscMessageSerializerSpec.scala | 9 +- 43 files changed, 2093 insertions(+), 324 deletions(-) create mode 100644 akka-actor-tests/src/test/java/akka/pattern/StatusReplyTest.java create mode 100644 akka-actor-tests/src/test/scala/akka/pattern/StatusReplySpec.scala create mode 100644 akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java create mode 100644 akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes create mode 100644 akka-actor/src/main/scala/akka/pattern/StatusReply.scala create mode 100644 akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes diff --git a/akka-actor-tests/src/test/java/akka/pattern/StatusReplyTest.java b/akka-actor-tests/src/test/java/akka/pattern/StatusReplyTest.java new file mode 100644 index 0000000000..1e47a45f2f --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/pattern/StatusReplyTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.pattern; + +import akka.actor.Actor; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import akka.testkit.TestException; +import akka.testkit.TestProbe; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static akka.pattern.Patterns.askWithStatus; + +import static org.junit.Assert.*; + +public class StatusReplyTest extends JUnitSuite { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("JavaAPI", AkkaSpec.testConf()); + + @Test + public void testSuccessApi() { + StatusReply reply = StatusReply.success("woho"); + assertTrue(reply.isSuccess()); + assertFalse(reply.isError()); + assertEquals("woho", reply.getValue()); + try { + reply.getError(); + Assert.fail("Calling get error on success did not throw"); + } catch (IllegalArgumentException ex) { + // this is what we expect + } + } + + @Test + public void testErrorMessageApi() { + StatusReply reply = StatusReply.error("boho"); + assertTrue(reply.isError()); + assertFalse(reply.isSuccess()); + assertEquals("boho", reply.getError().getMessage()); + try { + reply.getValue(); + Assert.fail("Calling get value on error did not throw"); + } catch (StatusReply.ErrorMessage ex) { + // this is what we expect + } catch (Throwable th) { + Assert.fail("Unexpected exception type: " + th); + } + } + + @Test + public void testErrorExceptionApi() { + StatusReply reply = StatusReply.error(new TestException("boho")); + assertTrue(reply.isError()); + assertFalse(reply.isSuccess()); + assertEquals("boho", reply.getError().getMessage()); + try { + reply.getValue(); + Assert.fail("Calling get value on error did not throw"); + } catch (TestException ex) { + // this is what we expect + } catch (Throwable th) { + Assert.fail("Unexpected exception type: " + th); + } + } + + @Test + public void testAskWithStatusSuccess() throws Exception { + TestProbe probe = new TestProbe(actorSystemResource.getSystem()); + + CompletionStage response = askWithStatus(probe.ref(), "request", Duration.ofSeconds(3)); + probe.expectMsg("request"); + probe.lastSender().tell(StatusReply.success("woho"), Actor.noSender()); + + Object result = response.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals("woho", result); + } + + @Test + public void testAskWithStatusErrorMessage() throws Exception { + TestProbe probe = new TestProbe(actorSystemResource.getSystem()); + + CompletionStage response = askWithStatus(probe.ref(), "request", Duration.ofSeconds(3)); + probe.expectMsg("request"); + probe.lastSender().tell(StatusReply.error("boho"), Actor.noSender()); + + try { + Object result = response.toCompletableFuture().get(3, TimeUnit.SECONDS); + } catch (ExecutionException ex) { + // what we expected + assertEquals(StatusReply.ErrorMessage.class, ex.getCause().getClass()); + assertEquals("boho", ex.getCause().getMessage()); + } + } + + @Test + public void testAskWithStatusErrorException() throws Exception { + TestProbe probe = new TestProbe(actorSystemResource.getSystem()); + + CompletionStage response = askWithStatus(probe.ref(), "request", Duration.ofSeconds(3)); + probe.expectMsg("request"); + probe.lastSender().tell(StatusReply.error(new TestException("boho")), Actor.noSender()); + + try { + Object result = response.toCompletableFuture().get(3, TimeUnit.SECONDS); + } catch (ExecutionException ex) { + // what we expected + assertEquals(TestException.class, ex.getCause().getClass()); + assertEquals("boho", ex.getCause().getMessage()); + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/pattern/StatusReplySpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/StatusReplySpec.scala new file mode 100644 index 0000000000..586ba15166 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/StatusReplySpec.scala @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.pattern + +import akka.Done +import akka.testkit.AkkaSpec +import akka.testkit.TestException +import akka.testkit.TestProbe +import akka.util.Timeout +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.Future +import scala.concurrent.duration._ + +class StatusReplySpec extends AkkaSpec with ScalaFutures { + + "StatusReply" should { + "pattern match success" in { + // like in a classic actor receive Any => ... + (StatusReply.Success("woho!"): Any) match { + case StatusReply.Success(_: Int) => fail() + case StatusReply.Success(text: String) if text == "woho!" => + case _ => fail() + } + } + "pattern match success (Ack)" in { + // like in a classic actor receive Any => ... + (StatusReply.Ack: Any) match { + case StatusReply.Ack => + case _ => fail() + } + } + "pattern match error with text" in { + StatusReply.Error("boho!") match { + case StatusReply.Error(_) => + case _ => fail() + } + } + + "pattern match error with exception" in { + StatusReply.Error(TestException("boho!")) match { + case StatusReply.Error(_) => + case _ => fail() + } + } + + "flatten a Future[StatusReply]" in { + import system.dispatcher + StatusReply.flattenStatusFuture(Future(StatusReply.Success("woho"))).futureValue should ===("woho") + StatusReply.flattenStatusFuture(Future(StatusReply.Ack)).futureValue should ===(Done) + StatusReply.flattenStatusFuture(Future(StatusReply.Error("boo"))).failed.futureValue should ===( + StatusReply.ErrorMessage("boo")) + StatusReply.flattenStatusFuture(Future(StatusReply.Error(TestException("boo")))).failed.futureValue should ===( + TestException("boo")) + + } + } + + "askWithStatus" should { + implicit val timeout: Timeout = 3.seconds + + "unwrap success" in { + val probe = TestProbe() + val result = probe.ref.askWithStatus("request") + probe.expectMsg("request") + probe.lastSender ! StatusReply.Success("woho") + result.futureValue should ===("woho") + } + + "unwrap Error with message" in { + val probe = TestProbe() + val result = probe.ref.askWithStatus("request") + probe.expectMsg("request") + probe.lastSender ! StatusReply.Error("boho") + result.failed.futureValue should ===(StatusReply.ErrorMessage("boho")) + } + + "unwrap Error with exception" in { + val probe = TestProbe() + val result = probe.ref.askWithStatus("request") + probe.expectMsg("request") + probe.lastSender ! StatusReply.Error(TestException("boho")) + result.failed.futureValue should ===(TestException("boho")) + } + + } + +} diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java index cc36eb2501..042831fd21 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorContextAskTest.java @@ -4,9 +4,11 @@ package akka.actor.typed.javadsl; +import akka.actor.testkit.typed.TestException; import akka.actor.testkit.typed.javadsl.LogCapturing; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; +import akka.pattern.StatusReply; import akka.testkit.AkkaSpec; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; @@ -16,7 +18,6 @@ import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; import java.time.Duration; -import java.util.concurrent.TimeUnit; public class ActorContextAskTest extends JUnitSuite { @@ -26,10 +27,10 @@ public class ActorContextAskTest extends JUnitSuite { @Rule public final LogCapturing logCapturing = new LogCapturing(); static class Ping { - final ActorRef respondTo; + final ActorRef replyTo; - public Ping(ActorRef respondTo) { - this.respondTo = respondTo; + public Ping(ActorRef replyTo) { + this.replyTo = replyTo; } } @@ -40,7 +41,7 @@ public class ActorContextAskTest extends JUnitSuite { final Behavior pingPongBehavior = Behaviors.receive( (ActorContext context, Ping message) -> { - message.respondTo.tell(new Pong()); + message.replyTo.tell(new Pong()); return Behaviors.same(); }); @@ -71,4 +72,92 @@ public class ActorContextAskTest extends JUnitSuite { probe.expectMessageClass(Pong.class); } + + static class PingWithStatus { + final ActorRef> replyTo; + + public PingWithStatus(ActorRef> replyTo) { + this.replyTo = replyTo; + } + } + + @Test + public void askWithStatusUnwrapsSuccess() { + final TestProbe probe = testKit.createTestProbe(); + + testKit.spawn( + Behaviors.setup( + context -> { + context.askWithStatus( + Pong.class, + probe.getRef(), + Duration.ofSeconds(3), + PingWithStatus::new, + (pong, failure) -> { + if (pong != null) return pong; + else throw new RuntimeException(failure); + }); + + return Behaviors.receive(Pong.class) + .onAnyMessage( + pong -> { + probe.ref().tell("got pong"); + return Behaviors.same(); + }) + .build(); + })); + + ActorRef> replyTo = probe.expectMessageClass(PingWithStatus.class).replyTo; + + replyTo.tell(StatusReply.success(new Pong())); + probe.expectMessage("got pong"); + } + + private static Behavior exceptionCapturingBehavior(ActorRef probe) { + return Behaviors.setup( + context -> { + context.askWithStatus( + Pong.class, + probe.narrow(), + Duration.ofSeconds(3), + PingWithStatus::new, + (pong, failure) -> { + if (pong != null) throw new IllegalArgumentException("did not expect pong"); + else return failure; + }); + + return Behaviors.receive(Throwable.class) + .onAnyMessage( + throwable -> { + probe.tell( + "got error: " + + throwable.getClass().getName() + + ", " + + throwable.getMessage()); + return Behaviors.same(); + }) + .build(); + }); + } + + @Test + public void askWithStatusUnwrapsErrorMessages() { + final TestProbe probe = testKit.createTestProbe(); + testKit.spawn(exceptionCapturingBehavior(probe.getRef())); + ActorRef> replyTo = probe.expectMessageClass(PingWithStatus.class).replyTo; + + replyTo.tell(StatusReply.error("boho")); + probe.expectMessage("got error: akka.pattern.StatusReply$ErrorMessage, boho"); + } + + @Test + public void askWithStatusUnwrapsErrorCustomExceptions() { + final TestProbe probe = testKit.createTestProbe(); + testKit.spawn(exceptionCapturingBehavior(probe.getRef())); + ActorRef> replyTo = probe.expectMessageClass(PingWithStatus.class).replyTo; + + // with custom exception + replyTo.tell(StatusReply.error(new TestException("boho"))); + probe.expectMessage("got error: akka.actor.testkit.typed.TestException, boho"); + } } diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java new file mode 100644 index 0000000000..c6e60df065 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java @@ -0,0 +1,239 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.typed; + +import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.*; + +// #actor-ask-with-status +import akka.pattern.StatusReply; + +// #actor-ask-with-status +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; + +public class InteractionPatternsAskWithStatusTest extends JUnitSuite { + + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(); + + @Rule public final LogCapturing logCapturing = new LogCapturing(); + + // separate from InteractionPatternsTest to avoid name clashes while keeping the ask samples + // almost identical + interface Samples { + // #actor-ask-with-status + public class Hal extends AbstractBehavior { + + public static Behavior create() { + return Behaviors.setup(Hal::new); + } + + private Hal(ActorContext context) { + super(context); + } + + public interface Command {} + + public static final class OpenThePodBayDoorsPlease implements Hal.Command { + public final ActorRef> respondTo; + + public OpenThePodBayDoorsPlease(ActorRef> respondTo) { + this.respondTo = respondTo; + } + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(Hal.OpenThePodBayDoorsPlease.class, this::onOpenThePodBayDoorsPlease) + .build(); + } + + private Behavior onOpenThePodBayDoorsPlease( + Hal.OpenThePodBayDoorsPlease message) { + message.respondTo.tell(StatusReply.error("I'm sorry, Dave. I'm afraid I can't do that.")); + return this; + } + } + + public class Dave extends AbstractBehavior { + + public interface Command {} + + // this is a part of the protocol that is internal to the actor itself + private static final class AdaptedResponse implements Dave.Command { + public final String message; + + public AdaptedResponse(String message) { + this.message = message; + } + } + + public static Behavior create(ActorRef hal) { + return Behaviors.setup(context -> new Dave(context, hal)); + } + + private Dave(ActorContext context, ActorRef hal) { + super(context); + + // asking someone requires a timeout, if the timeout hits without response + // the ask is failed with a TimeoutException + final Duration timeout = Duration.ofSeconds(3); + + context.askWithStatus( + String.class, + hal, + timeout, + // construct the outgoing message + (ActorRef> ref) -> new Hal.OpenThePodBayDoorsPlease(ref), + // adapt the response (or failure to respond) + (response, throwable) -> { + if (response != null) { + // a ReponseWithStatus.success(m) is unwrapped and passed as response + return new Dave.AdaptedResponse(response); + } else { + // a ResponseWithStatus.error will end up as a StatusReply.ErrorMessage() + // exception here + return new Dave.AdaptedResponse("Request failed: " + throwable.getMessage()); + } + }); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + // the adapted message ends up being processed like any other + // message sent to the actor + .onMessage(Dave.AdaptedResponse.class, this::onAdaptedResponse) + .build(); + } + + private Behavior onAdaptedResponse(Dave.AdaptedResponse response) { + getContext().getLog().info("Got response from HAL: {}", response.message); + return this; + } + } + // #actor-ask-with-status + + } + + interface StandaloneAskSample { + // #standalone-ask-with-status + public class CookieFabric extends AbstractBehavior { + + interface Command {} + + public static class GiveMeCookies implements CookieFabric.Command { + public final int count; + public final ActorRef> replyTo; + + public GiveMeCookies(int count, ActorRef> replyTo) { + this.count = count; + this.replyTo = replyTo; + } + } + + public static class Cookies { + public final int count; + + public Cookies(int count) { + this.count = count; + } + } + + public static Behavior create() { + return Behaviors.setup(CookieFabric::new); + } + + private CookieFabric(ActorContext context) { + super(context); + } + + @Override + public Receive createReceive() { + return newReceiveBuilder() + .onMessage(CookieFabric.GiveMeCookies.class, this::onGiveMeCookies) + .build(); + } + + private Behavior onGiveMeCookies(CookieFabric.GiveMeCookies request) { + if (request.count >= 5) request.replyTo.tell(StatusReply.error("Too many cookies.")); + else request.replyTo.tell(StatusReply.success(new CookieFabric.Cookies(request.count))); + + return this; + } + } + // #standalone-ask-with-status + + static class NotShown { + + // #standalone-ask-with-status + + public void askAndPrint( + ActorSystem system, ActorRef cookieFabric) { + CompletionStage result = + AskPattern.askWithStatus( + cookieFabric, + replyTo -> new CookieFabric.GiveMeCookies(3, replyTo), + // asking someone requires a timeout and a scheduler, if the timeout hits without + // response the ask is failed with a TimeoutException + Duration.ofSeconds(3), + system.scheduler()); + + result.whenComplete( + (reply, failure) -> { + if (reply != null) System.out.println("Yay, " + reply.count + " cookies!"); + else if (failure instanceof StatusReply.ErrorMessage) + System.out.println("No cookies for me. " + failure.getMessage()); + else System.out.println("Boo! didn't get cookies in time. " + failure); + }); + } + // #standalone-ask-with-status + + public void askAndMapInvalid( + ActorSystem system, ActorRef cookieFabric) { + // #standalone-ask-with-status-fail-future + CompletionStage cookies = + AskPattern.askWithStatus( + cookieFabric, + replyTo -> new CookieFabric.GiveMeCookies(3, replyTo), + Duration.ofSeconds(3), + system.scheduler()); + + cookies.whenComplete( + (cookiesReply, failure) -> { + if (cookies != null) System.out.println("Yay, " + cookiesReply.count + " cookies!"); + else System.out.println("Boo! didn't get cookies in time. " + failure); + }); + // #standalone-ask-with-status-fail-future + } + } + } + + @Test + public void askWithStatusExample() { + // no assert but should at least throw if completely broken + ActorRef cookieFabric = + testKit.spawn(StandaloneAskSample.CookieFabric.create()); + StandaloneAskSample.NotShown notShown = new StandaloneAskSample.NotShown(); + notShown.askAndPrint(testKit.system(), cookieFabric); + } + + @Test + public void askInActorWithStatusExample() { + // no assert but should at least throw if completely broken + ActorRef hal = testKit.spawn(Samples.Hal.create()); + ActorRef dave = testKit.spawn(Samples.Dave.create(hal)); + } +} diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java index 1430acc3f0..ea418fbbb1 100644 --- a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsTest.java @@ -341,7 +341,7 @@ public class InteractionPatternsTest extends JUnitSuite { // #actor-ask public class Hal extends AbstractBehavior { - public Behavior create() { + public static Behavior create() { return Behaviors.setup(Hal::new); } @@ -925,4 +925,20 @@ public class InteractionPatternsTest extends JUnitSuite { "123", probe.expectMessageClass(PipeToSelfSample.CustomerRepository.UpdateSuccess.class).id); } + + @Test + public void askWithStatusExample() { + // no assert but should at least throw if completely broken + ActorRef cookieFabric = + testKit.spawn(StandaloneAskSample.CookieFabric.create()); + StandaloneAskSample.NotShown notShown = new StandaloneAskSample.NotShown(); + notShown.askAndPrint(testKit.system(), cookieFabric); + } + + @Test + public void askInActorWithStatusExample() { + // no assert but should at least throw if completely broken + ActorRef hal = testKit.spawn(Samples.Hal.create()); + ActorRef dave = testKit.spawn(Samples.Dave.create(hal)); + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala index 649a4a7e13..3adc148374 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala @@ -9,9 +9,7 @@ import scala.concurrent.Future import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.util.Success - import org.scalatest.wordspec.AnyWordSpecLike - import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.LoggingTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -20,6 +18,8 @@ import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors._ +import akka.pattern.StatusReply +import akka.testkit.TestException import akka.util.Timeout object AskSpec { @@ -182,4 +182,32 @@ class AskSpec extends ScalaTestWithActorTestKit(""" probe.expectTerminated(ref, probe.remainingOrDefault) } } + + case class Request(replyTo: ActorRef[StatusReply[String]]) + + "askWithStatus pattern" must { + "unwrap nested response a successful response" in { + val probe = createTestProbe[Request] + val result: Future[String] = probe.ref.askWithStatus(Request(_)) + probe.expectMessageType[Request].replyTo ! StatusReply.success("goodie") + result.futureValue should ===("goodie") + } + "fail future for a fail response with text" in { + val probe = createTestProbe[Request] + val result: Future[String] = probe.ref.askWithStatus(Request(_)) + probe.expectMessageType[Request].replyTo ! StatusReply.error("boom") + val exception = result.failed.futureValue + exception should be(a[StatusReply.ErrorMessage]) + exception.getMessage should ===("boom") + } + "fail future for a fail response with custom exception" in { + val probe = createTestProbe[Request] + val result: Future[String] = probe.ref.askWithStatus(Request(_)) + probe.expectMessageType[Request].replyTo ! StatusReply.error(TestException("boom")) + val exception = result.failed.futureValue + exception should be(a[TestException]) + exception.getMessage should ===("boom") + } + + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala index 30ee43d410..8d2e530c05 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextAskSpec.scala @@ -4,19 +4,20 @@ package akka.actor.typed.scaladsl +import akka.actor.testkit.typed.TestException + import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.{ Failure, Success } - import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike - import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.LoggingTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.{ ActorRef, PostStop, Props } +import akka.pattern.StatusReply object ActorContextAskSpec { val config = ConfigFactory.parseString(""" @@ -188,6 +189,76 @@ class ActorContextAskSpec probe.receiveMessages(N).map(_.n) should ===(1 to N) } + "unwrap successful StatusReply messages using askWithStatus" in { + case class Ping(ref: ActorRef[StatusReply[Pong.type]]) + case object Pong + + val probe = createTestProbe[Any]() + spawn(Behaviors.setup[Pong.type] { ctx => + ctx.askWithStatus(probe.ref, Ping) { + case Success(Pong) => Pong + case Failure(ex) => throw ex + } + + Behaviors.receiveMessage { + case Pong => + probe.ref ! "got pong" + Behaviors.same + } + }) + + val replyTo = probe.expectMessageType[Ping].ref + replyTo ! StatusReply.Success(Pong) + probe.expectMessage("got pong") + } + + "unwrap error message StatusReply messages using askWithStatus" in { + case class Ping(ref: ActorRef[StatusReply[Pong.type]]) + case object Pong + + val probe = createTestProbe[Any]() + spawn(Behaviors.setup[Throwable] { ctx => + ctx.askWithStatus(probe.ref, Ping) { + case Failure(ex) => ex + case wat => throw new IllegalArgumentException(s"Unexpected response $wat") + } + + Behaviors.receiveMessage { + case ex: Throwable => + probe.ref ! s"got error: ${ex.getClass.getName}, ${ex.getMessage}" + Behaviors.same + } + }) + + val replyTo = probe.expectMessageType[Ping].ref + replyTo ! StatusReply.Error("boho") + probe.expectMessage("got error: akka.pattern.StatusReply$ErrorMessage, boho") + } + + "unwrap error with custom exception StatusReply messages using askWithStatus" in { + case class Ping(ref: ActorRef[StatusReply[Pong.type]]) + case object Pong + + val probe = createTestProbe[Any]() + case class Message(any: Any) + spawn(Behaviors.setup[Throwable] { ctx => + ctx.askWithStatus(probe.ref, Ping) { + case Failure(ex) => ex + case wat => throw new IllegalArgumentException(s"Unexpected response $wat") + } + + Behaviors.receiveMessage { + case ex: Throwable => + probe.ref ! s"got error: ${ex.getClass.getName}, ${ex.getMessage}" + Behaviors.same + } + }) + + val replyTo = probe.expectMessageType[Ping].ref + replyTo ! StatusReply.Error(TestException("boho")) + probe.expectMessage("got error: akka.actor.testkit.typed.TestException, boho") + } + } } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala index 2472d7f85b..f9b82ba55a 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala @@ -20,6 +20,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.TimerScheduler +import akka.pattern.StatusReply import org.scalatest.wordspec.AnyWordSpecLike class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { @@ -292,6 +293,61 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpec monitor.expectMessageType[Hal.OpenThePodBayDoorsPlease] } + "contain a sample for outside ask with status" in { + import akka.util.Timeout + + // #actor-ask-with-status + object Hal { + sealed trait Command + case class OpenThePodBayDoorsPlease(replyTo: ActorRef[StatusReply[String]]) extends Command + + def apply(): Behaviors.Receive[Hal.Command] = + Behaviors.receiveMessage[Command] { + case OpenThePodBayDoorsPlease(replyTo) => + // reply with a validation error description + replyTo ! StatusReply.Error("I'm sorry, Dave. I'm afraid I can't do that.") + Behaviors.same + } + } + + object Dave { + + sealed trait Command + // this is a part of the protocol that is internal to the actor itself + private case class AdaptedResponse(message: String) extends Command + + def apply(hal: ActorRef[Hal.Command]): Behavior[Dave.Command] = + Behaviors.setup[Command] { context => + // asking someone requires a timeout, if the timeout hits without response + // the ask is failed with a TimeoutException + implicit val timeout: Timeout = 3.seconds + + // A StatusReply.Success(m) ends up as a Success(m) here, while a + // StatusReply.Error(text) becomes a Failure(ErrorMessage(text)) + context.askWithStatus(hal, Hal.OpenThePodBayDoorsPlease) { + case Success(message) => AdaptedResponse(message) + case Failure(StatusReply.ErrorMessage(text)) => AdaptedResponse(s"Request denied: $text") + case Failure(_) => AdaptedResponse("Request failed") + } + + Behaviors.receiveMessage { + // the adapted message ends up being processed like any other + // message sent to the actor + case AdaptedResponse(message) => + context.log.info("Got response from hal: {}", message) + Behaviors.same + } + } + } + // #actor-ask-with-status + + // somewhat modified behavior to let us know we saw the two requests + val monitor = createTestProbe[Hal.Command]() + val hal = spawn(Behaviors.monitor(monitor.ref, Hal())) + spawn(Dave(hal)) + monitor.expectMessageType[Hal.OpenThePodBayDoorsPlease] + } + "contain a sample for per session child" in { // #per-session-child // dummy data types just for this sample @@ -461,6 +517,69 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpec cookies.futureValue shouldEqual CookieFabric.Cookies(3) } + "contain a sample for ask with status from outside the actor system" in { + // #standalone-ask-with-status + object CookieFabric { + sealed trait Command {} + case class GiveMeCookies(count: Int, replyTo: ActorRef[StatusReply[Cookies]]) extends Command + case class Cookies(count: Int) + + def apply(): Behaviors.Receive[CookieFabric.GiveMeCookies] = + Behaviors.receiveMessage { message => + if (message.count >= 5) + message.replyTo ! StatusReply.Error("Too many cookies.") + else + message.replyTo ! StatusReply.Success(Cookies(message.count)) + Behaviors.same + } + } + // #standalone-ask-with-status + + // keep this out of the sample as it uses the testkit spawn + val cookieFabric = spawn(CookieFabric()) + + val theSystem = testKit.system + + // #standalone-ask-with-status + + import akka.actor.typed.scaladsl.AskPattern._ + import akka.util.Timeout + + // asking someone requires a timeout if the timeout hits without response + // the ask is failed with a TimeoutException + implicit val timeout: Timeout = 3.seconds + // implicit ActorSystem in scope + implicit val system: ActorSystem[_] = theSystem + + val result: Future[CookieFabric.Cookies] = cookieFabric.askWithStatus(ref => CookieFabric.GiveMeCookies(3, ref)) + + // the response callback will be executed on this execution context + implicit val ec = system.executionContext + + result.onComplete { + case Success(CookieFabric.Cookies(count)) => println(s"Yay, $count cookies!") + case Failure(StatusReply.ErrorMessage(reason)) => println(s"No cookies for me. $reason") + case Failure(ex) => println(s"Boo! didn't get cookies: ${ex.getMessage}") + } + // #standalone-ask-with-status + + result.futureValue shouldEqual CookieFabric.Cookies(3) + + // #standalone-ask-with-status-fail-future + val cookies: Future[CookieFabric.Cookies] = + cookieFabric.askWithStatus[CookieFabric.Cookies](ref => CookieFabric.GiveMeCookies(3, ref)).flatMap { + case c: CookieFabric.Cookies => Future.successful(c) + } + + cookies.onComplete { + case Success(CookieFabric.Cookies(count)) => println(s"Yay, $count cookies!") + case Failure(ex) => println(s"Boo! didn't get cookies: ${ex.getMessage}") + } + // #standalone-ask-with-status-fail-future + + cookies.futureValue shouldEqual CookieFabric.Cookies(3) + } + "contain a sample for pipeToSelf" in { //#pipeToSelf diff --git a/akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes b/akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes new file mode 100644 index 0000000000..e7ec205cb0 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes @@ -0,0 +1,3 @@ +# actor ask with status #29190 - safe because ActorContext is not for user extension +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.ActorContext.askWithStatus") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.ActorContext.askWithStatus") \ No newline at end of file diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index a1ba9b6e41..dd66ecde23 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -13,20 +13,22 @@ import java.util.concurrent.CompletionStage import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.reflect.ClassTag import scala.util.Try - import com.github.ghik.silencer.silent import org.slf4j.Logger import org.slf4j.LoggerFactory - import akka.actor.Address import akka.actor.typed.internal.adapter.ActorSystemAdapter import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts +import akka.pattern.StatusReply import akka.util.{ BoxedType, Timeout } import akka.util.JavaDurationConverters._ import akka.util.OptionVal import akka.util.Timeout +import scala.util.Failure +import scala.util.Success + /** * INTERNAL API */ @@ -206,6 +208,14 @@ import akka.util.Timeout pipeToSelf((target.ask(createRequest))(responseTimeout, system.scheduler))(mapResponse) } + override def askWithStatus[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[StatusReply[Res]] => Req)( + mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = + ask(target, createRequest) { + case Success(StatusReply.Success(t: Res)) => mapResponse(Success(t)) + case Success(StatusReply.Error(why)) => mapResponse(Failure(why)) + case fail: Failure[_] => mapResponse(fail.asInstanceOf[Failure[Res]]) + } + // Java API impl @silent("never used") // resClass is just a pretend param override def ask[Req, Res]( @@ -218,6 +228,26 @@ import akka.util.Timeout pipeToSelf(AskPattern.ask(target, (ref) => createRequest(ref), responseTimeout, system.scheduler), applyToResponse) } + override def askWithStatus[Req, Res]( + resClass: Class[Res], + target: RecipientRef[Req], + responseTimeout: Duration, + createRequest: akka.japi.function.Function[ActorRef[StatusReply[Res]], Req], + applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit = { + implicit val classTag: ClassTag[Res] = ClassTag(resClass) + ask[Req, StatusReply[Res]]( + classOf[StatusReply[Res]], + target, + responseTimeout, + createRequest, + (ok: StatusReply[Res], failure: Throwable) => + ok match { + case StatusReply.Success(value: Res) => applyToResponse(value, null) + case StatusReply.Error(why) => applyToResponse(null.asInstanceOf[Res], why) + case null => applyToResponse(null.asInstanceOf[Res], failure) + }) + } + // Scala API impl def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = { future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 87eab9f4dd..3015756a3a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -9,12 +9,11 @@ import java.util.Optional import java.util.concurrent.CompletionStage import scala.concurrent.ExecutionContextExecutor - import org.slf4j.Logger - import akka.actor.ClassicActorContextProvider import akka.actor.typed._ import akka.annotation.DoNotInherit +import akka.pattern.StatusReply /** * An Actor is given by the combination of a [[Behavior]] and a context in @@ -299,6 +298,19 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi createRequest: akka.japi.function.Function[ActorRef[Res], Req], applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit + /** + * The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]]. + * If the response is a [[akka.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response. + * If the status response is a [[akka.pattern.StatusReply#error]] the returned future will be failed with the + * exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]). + */ + def askWithStatus[Req, Res]( + resClass: Class[Res], + target: RecipientRef[Req], + responseTimeout: Duration, + createRequest: akka.japi.function.Function[ActorRef[StatusReply[Res]], Req], + applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit + /** * Sends the result of the given `CompletionStage` to this Actor (“`self`”), after adapted it with * the given function. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala index 22547cd992..b0a9445291 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala @@ -9,10 +9,10 @@ import java.time.Duration import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ - import akka.actor.typed.Scheduler import akka.actor.typed.scaladsl.AskPattern._ import akka.japi.function.{ Function => JFunction } +import akka.pattern.StatusReply import akka.util.JavaDurationConverters._ /** @@ -41,4 +41,18 @@ object AskPattern { timeout: Duration, scheduler: Scheduler): CompletionStage[Res] = (actor.ask(messageFactory.apply)(timeout.asScala, scheduler)).toJava + + /** + * The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]]. + * If the response is a [[akka.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response. + * If the status response is a [[akka.pattern.StatusReply#error]] the returned future will be failed with the + * exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]). + */ + def askWithStatus[Req, Res]( + actor: RecipientRef[Req], + messageFactory: JFunction[ActorRef[StatusReply[Res]], Req], + timeout: Duration, + scheduler: Scheduler): CompletionStage[Res] = + (actor.askWithStatus(messageFactory.apply)(timeout.asScala, scheduler).toJava) + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index 86dfb5f1d2..22c9742dc2 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -8,13 +8,12 @@ import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import scala.util.Try - import org.slf4j.Logger - import akka.actor.ClassicActorContextProvider import akka.actor.typed._ import akka.annotation.DoNotInherit import akka.annotation.InternalApi +import akka.pattern.StatusReply import akka.util.Timeout /** @@ -298,6 +297,15 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)( mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + /** + * The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]]. + * If the response is a [[akka.pattern.StatusReply.Success]] the returned future is completed successfully with the wrapped response. + * If the status response is a [[akka.pattern.StatusReply.Error]] the returned future will be failed with the + * exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]). + */ + def askWithStatus[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[StatusReply[Res]] => Req)( + mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit + /** * Sends the result of the given `Future` to this Actor (“`self`”), after adapted it with * the given function. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 082989dafb..a560d51592 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -16,6 +16,7 @@ import akka.actor.typed.internal.{ adapter => adapt } import akka.actor.typed.internal.InternalRecipientRef import akka.annotation.InternalStableApi import akka.pattern.PromiseActorRef +import akka.pattern.StatusReply import akka.util.{ unused, Timeout } /** @@ -114,6 +115,17 @@ object AskPattern { "native system is implemented: " + a.getClass) } } + + /** + * The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]]. + * If the response is a [[akka.pattern.StatusReply.Success]] the returned future is completed successfully with the wrapped response. + * If the status response is a [[akka.pattern.StatusReply.Error]] the returned future will be failed with the + * exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]). + */ + def askWithStatus[Res]( + replyTo: ActorRef[StatusReply[Res]] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] = + StatusReply.flattenStatusFuture(ask(replyTo)) + } private val onTimeout: String => Throwable = msg => new TimeoutException(msg) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 1f6c0cd137..6a55d6bc30 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -293,6 +293,8 @@ final case class UnhandledMessage( with AllDeadLetters /** + * Superseeded by [[akka.pattern.StatusReply]], prefer that when possible. + * * Classes for passing status back to the sender. * Used for internal ACKing protocol. But exposed as utility class for user-specific ACKing protocols as well. */ diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 183f5aea5c..b5e82faf08 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -86,6 +86,22 @@ trait AskSupport { def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] = actorRef.internalAsk(message, timeout, sender) + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply.Success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply.Error]] arrives the future is instead + * failed. + */ + def askWithStatus(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = + actorRef.internalAskWithStatus(message)(timeout, Actor.noSender) + + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply.Success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply.Error]] arrives the future is instead + * failed. + */ + def askWithStatus(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] = + actorRef.internalAskWithStatus(message)(timeout, sender) + /** * Import this implicit conversion to gain `?` and `ask` methods on * [[akka.actor.ActorSelection]], which will defer to the @@ -320,6 +336,17 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = internalAsk(message, timeout, sender) + def askWithStatus(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = + internalAskWithStatus(message) + + /** + * INTERNAL API + */ + @InternalApi + private[pattern] def internalAskWithStatus( + message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = + StatusReply.flattenStatusFuture[Any](internalAsk(message, timeout, sender).mapTo[StatusReply[Any]]) + /** * INTERNAL API: for binary compatibility */ diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 2bac6b8250..82a376b162 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -24,6 +24,7 @@ object Patterns { import akka.pattern.{ after => scalaAfter, ask => scalaAsk, + askWithStatus => scalaAskWithStatus, gracefulStop => scalaGracefulStop, pipe => scalaPipe, retry => scalaRetry @@ -94,6 +95,14 @@ object Patterns { def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = scalaAsk(actor, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** + * Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response + * arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead + * failed. + */ + def askWithStatus(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] = + scalaAskWithStatus(actor, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]] + /** * A variation of ask which allows to implement "replyTo" pattern by including * sender reference in message. diff --git a/akka-actor/src/main/scala/akka/pattern/StatusReply.scala b/akka-actor/src/main/scala/akka/pattern/StatusReply.scala new file mode 100644 index 0000000000..c7d61a034f --- /dev/null +++ b/akka-actor/src/main/scala/akka/pattern/StatusReply.scala @@ -0,0 +1,167 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.pattern + +import akka.Done +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts + +import scala.concurrent.Future +import scala.util.Try +import scala.util.control.NoStackTrace +import scala.util.{ Failure => ScalaFailure } +import scala.util.{ Success => ScalaSuccess } + +/** + * Generic top-level message type for replies that signal failure or success. Convenient to use together with the + * `askWithStatus` ask variants. + * + * Create using the factory methods [[StatusReply#success]] and [[StatusReply#error]]. + * + * Akka contains predefined serializers for the wrapper type and the textual error messages. + * + * @tparam T the type of value a successful reply would have + */ +final class StatusReply[+T] private (private val status: Try[T]) { + + /** + * Java API: in the case of a successful reply returns the value, if the reply was not successful the exception + * the failure was created with is thrown + */ + def getValue: T = status.get + + /** + * Java API: returns the exception if the reply is a failure, or throws an exception if not. + */ + def getError: Throwable = status match { + case ScalaFailure(ex) => ex + case _ => throw new IllegalArgumentException("Expected reply to be a failure, but was a success") + } + + def isError: Boolean = status.isFailure + def isSuccess: Boolean = status.isSuccess + + override def equals(other: Any): Boolean = other match { + case that: StatusReply[_] => status == that.status + case _ => false + } + + override def hashCode(): Int = status.hashCode + + override def toString: String = status match { + case ScalaSuccess(t) => s"Success($t)" + case ScalaFailure(ex) => s"Error(${ex.getMessage})" + } + +} + +object StatusReply { + + /** + * Scala API: A general purpose message for using as an Ack + */ + val Ack: StatusReply[Done] = success(Done) + + /** + * Java API: A general purpose message for using as an Ack + */ + def ack(): StatusReply[Done] = Ack + + /** + * Java API: Create a successful reply containing `value` + */ + def success[T](value: T): StatusReply[T] = new StatusReply(ScalaSuccess(value)) + + /** + * Java API: Create an status response with a error message describing why the request was failed or denied. + */ + def error[T](errorMessage: String): StatusReply[T] = Error(errorMessage) + + /** + * Java API: Create an error response with a user defined [[Throwable]]. + * + * Prefer the string based error response over this one when possible to avoid tightly coupled logic across + * actors and passing internal failure details on to callers that can not do much to handle them. + * + * For cases where types are needed to identify errors and behave differently enumerating them with a specific + * set of response messages may be a better alternative to encoding them as generic exceptions. + * + * Also note that Akka does not contain pre-build serializers for arbitrary exceptions. + */ + def error[T](exception: Throwable): StatusReply[T] = Error(exception) + + /** + * Carrier exception used for textual error descriptions. + * + * Not meant for usage outside of [[StatusReply]]. + */ + final case class ErrorMessage(private val errorMessage: String) + extends RuntimeException(errorMessage) + with NoStackTrace { + override def toString: String = errorMessage + } + + /** + * Scala API for creation and pattern matching a successful response. + * + * For example: + * ``` + * case StatusReply.Success(value: String) => ... + * ``` + */ + object Success { + + /** + * Scala API: Create a successful reply containing `value` + */ + def apply[T](value: T): StatusReply[T] = new StatusReply(ScalaSuccess(value)) + def unapply(status: StatusReply[Any]): Option[Any] = + if (status.isSuccess) Some(status.getValue) + else None + } + + /** + * Scala API for creating and pattern matching an error response + * + * For example: + * ``` + * case StatusReply.Error(exception) => ... + * ``` + */ + object Error { + + /** + * Scala API: Create an status response with a error message describing why the request was failed or denied. + */ + def apply[T](errorMessage: String): StatusReply[T] = error(new ErrorMessage(errorMessage)) + + /** + * Scala API: Create an error response with a user defined [[Throwable]]. + * + * Prefer the string based error response over this one when possible to avoid tightly coupled logic across + * actors and passing internal failure details on to callers that can not do much to handle them. + * + * For cases where types are needed to identify errors and behave differently enumerating them with a specific + * set of response messages may be a better alternative to encoding them as generic exceptions. + * + * Also note that Akka does not contain pre-build serializers for arbitrary exceptions. + */ + def apply[T](exception: Throwable): StatusReply[T] = new StatusReply(ScalaFailure(exception)) + def unapply(status: StatusReply[_]): Option[Throwable] = + if (status.isError) Some(status.getError) + else None + } + + /** + * INTERNAL API + */ + @InternalApi + private[akka] def flattenStatusFuture[T](f: Future[StatusReply[T]]): Future[T] = + f.transform { + case ScalaSuccess(StatusReply.Success(v)) => ScalaSuccess(v.asInstanceOf[T]) + case ScalaSuccess(StatusReply.Error(ex)) => ScalaFailure[T](ex) + case fail @ ScalaFailure(_) => fail.asInstanceOf[Try[T]] + }(ExecutionContexts.parasitic) +} diff --git a/akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes new file mode 100644 index 0000000000..4070b877d5 --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.6.backwards.excludes/ask-with-status.excludes @@ -0,0 +1,3 @@ +# ask with status #29133, safe since EntityRef not user extendable +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.javadsl.EntityRef.askWithStatus") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.askWithStatus") \ No newline at end of file diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala index df61d1b9e5..ad30aeb6dd 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ClusterShardingImpl.scala @@ -39,6 +39,7 @@ import akka.event.LoggingAdapter import akka.japi.function.{ Function => JFunction } import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef +import akka.pattern.StatusReply import akka.util.{ unused, ByteString, Timeout } import akka.util.JavaDurationConverters._ @@ -314,9 +315,15 @@ import akka.util.JavaDurationConverters._ replyTo.ask(shardRegion, entityId, m, timeout) } - def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] = + override def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] = ask[U](replyTo => message.apply(replyTo))(timeout.asScala).toJava + override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M)(implicit timeout: Timeout): Future[Res] = + StatusReply.flattenStatusFuture(ask[StatusReply[Res]](f)) + + override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res] = + askWithStatus(f.apply)(timeout.asScala).toJava + /** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */ @InternalApi private final class EntityPromiseRef[U](classic: InternalActorRef, timeout: Timeout, refPathPrefix: String) { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala index 90e56f138a..2c11f5bb8f 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/testkit/TestEntityRefImpl.scala @@ -9,7 +9,6 @@ import java.util.concurrent.CompletionStage import scala.concurrent.Future import scala.compat.java8.FutureConverters._ - import akka.actor.ActorRefProvider import akka.actor.typed.ActorRef import akka.actor.typed.Scheduler @@ -18,6 +17,7 @@ import akka.annotation.InternalApi import akka.cluster.sharding.typed.javadsl import akka.cluster.sharding.typed.scaladsl import akka.japi.function.{ Function => JFunction } +import akka.pattern.StatusReply import akka.util.JavaDurationConverters._ import akka.util.Timeout @@ -42,6 +42,12 @@ import akka.util.Timeout def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] = ask[U](replyTo => message.apply(replyTo))(timeout.asScala).toJava + override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res] = + askWithStatus(f)(timeout.asScala).toJava + + override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M)(implicit timeout: Timeout): Future[Res] = + StatusReply.flattenStatusFuture(ask(f)) + // impl InternalRecipientRef override def provider: ActorRefProvider = { probe.asInstanceOf[InternalRecipientRef[_]].provider diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 6ca84200e5..16e7c4af96 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -10,7 +10,6 @@ import java.util.Optional import java.util.concurrent.CompletionStage import com.github.ghik.silencer.silent - import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -22,6 +21,7 @@ import akka.annotation.InternalApi import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl import akka.japi.function.{ Function => JFunction } +import akka.pattern.StatusReply @FunctionalInterface trait EntityFactory[M] { @@ -439,6 +439,14 @@ object EntityTypeKey { */ def ask[Res](message: JFunction[ActorRef[Res], M], timeout: Duration): CompletionStage[Res] + /** + * The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]]. + * If the response is a [[akka.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response. + * If the status response is a [[akka.pattern.StatusReply#error]] the returned future will be failed with the + * exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]). + */ + def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res] + /** * INTERNAL API */ diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index 02bf7778a4..4aa97ff6a0 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -7,7 +7,6 @@ package scaladsl import scala.concurrent.Future import scala.reflect.ClassTag - import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -24,6 +23,7 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity } import akka.cluster.sharding.typed.internal.ClusterShardingImpl import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl +import akka.pattern.StatusReply import akka.util.Timeout object ClusterSharding extends ExtensionId[ClusterSharding] { @@ -448,6 +448,14 @@ object EntityTypeKey { */ def ask[Res](f: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] + /** + * The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]]. + * If the response is a [[akka.pattern.StatusReply.Success]] the returned future is completed successfully with the wrapped response. + * If the status response is a [[akka.pattern.StatusReply.Error]] the returned future will be failed with the + * exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]). + */ + def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M)(implicit timeout: Timeout): Future[Res] + /** * Allows to "ask" the [[EntityRef]] for a reply. * See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java index b0c8d057d8..3aacdd5c9e 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java @@ -4,6 +4,8 @@ package jdocs.akka.cluster.sharding.typed; +import akka.Done; +import akka.pattern.StatusReply; import org.scalatestplus.junit.JUnitSuite; import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; @@ -55,12 +57,9 @@ public class AccountExampleDocTest @Test public void createWithEmptyBalance() { CommandResultWithReply< - AccountEntity.Command, - AccountEntity.Event, - AccountEntity.Account, - AccountEntity.OperationResult> + AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply> result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); - assertEquals(AccountEntity.Confirmed.INSTANCE, result.reply()); + assertEquals(StatusReply.ack(), result.reply()); assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event()); assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance); } @@ -70,28 +69,22 @@ public class AccountExampleDocTest eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); CommandResultWithReply< - AccountEntity.Command, - AccountEntity.Event, - AccountEntity.Account, - AccountEntity.OperationResult> + AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply> result1 = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); - assertEquals(AccountEntity.Confirmed.INSTANCE, result1.reply()); + assertEquals(StatusReply.ack(), result1.reply()); assertEquals( BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount); assertEquals( BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance); CommandResultWithReply< - AccountEntity.Command, - AccountEntity.Event, - AccountEntity.Account, - AccountEntity.OperationResult> + AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply> result2 = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo)); - assertEquals(AccountEntity.Confirmed.INSTANCE, result2.reply()); + assertEquals(StatusReply.ack(), result2.reply()); assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount); assertEquals( BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance); @@ -101,18 +94,15 @@ public class AccountExampleDocTest public void rejectWithdrawOverdraft() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); eventSourcedTestKit.runCommand( - (ActorRef replyTo) -> + (ActorRef> replyTo) -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); CommandResultWithReply< - AccountEntity.Command, - AccountEntity.Event, - AccountEntity.Account, - AccountEntity.OperationResult> + AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply> result = eventSourcedTestKit.runCommand( replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo)); - result.replyOfType(AccountEntity.Rejected.class); + assertTrue(result.reply().isError()); assertTrue(result.hasNoEvents()); } @@ -120,7 +110,7 @@ public class AccountExampleDocTest public void handleGetBalance() { eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); eventSourcedTestKit.runCommand( - (ActorRef replyTo) -> + (ActorRef> replyTo) -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); CommandResultWithReply< diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java index 287aba78c6..76efefd5a7 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java @@ -4,15 +4,16 @@ package jdocs.akka.cluster.sharding.typed; +import akka.Done; import akka.actor.testkit.typed.javadsl.LogCapturing; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; -import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.Entity; import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.cluster.typed.Cluster; import akka.cluster.typed.Join; +import akka.pattern.StatusReply; import akka.persistence.typed.PersistenceId; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -26,9 +27,11 @@ import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import static akka.Done.done; import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity.*; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class AccountExampleTest extends JUnitSuite { @@ -70,47 +73,47 @@ public class AccountExampleTest extends JUnitSuite { @Test public void handleDeposit() { EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "1"); - TestProbe probe = testKit.createTestProbe(OperationResult.class); + TestProbe> probe = testKit.createTestProbe(); ref.tell(new CreateAccount(probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); ref.tell(new Deposit(BigDecimal.valueOf(10), probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); } @Test public void handleWithdraw() { EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "2"); - TestProbe probe = testKit.createTestProbe(OperationResult.class); + TestProbe> probe = testKit.createTestProbe(); ref.tell(new CreateAccount(probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); ref.tell(new Withdraw(BigDecimal.valueOf(10), probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); } @Test public void rejectWithdrawOverdraft() { EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "3"); - TestProbe probe = testKit.createTestProbe(OperationResult.class); + TestProbe> probe = testKit.createTestProbe(); ref.tell(new CreateAccount(probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef())); - probe.expectMessage(Confirmed.INSTANCE); + probe.expectMessage(StatusReply.ack()); ref.tell(new Withdraw(BigDecimal.valueOf(110), probe.getRef())); - probe.expectMessageClass(Rejected.class); + assertTrue(probe.receiveMessage().isError()); } @Test public void handleGetBalance() { EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "4"); - TestProbe opProbe = testKit.createTestProbe(OperationResult.class); + TestProbe> opProbe = testKit.createTestProbe(); ref.tell(new CreateAccount(opProbe.getRef())); - opProbe.expectMessage(Confirmed.INSTANCE); + opProbe.expectMessage(StatusReply.ack()); ref.tell(new Deposit(BigDecimal.valueOf(100), opProbe.getRef())); - opProbe.expectMessage(Confirmed.INSTANCE); + opProbe.expectMessage(StatusReply.ack()); TestProbe getProbe = testKit.createTestProbe(CurrentBalance.class); ref.tell(new GetBalance(getProbe.getRef())); @@ -122,27 +125,21 @@ public class AccountExampleTest extends JUnitSuite { public void beUsableWithAsk() throws Exception { Duration timeout = Duration.ofSeconds(3); EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "5"); - CompletionStage createResult = ref.ask(CreateAccount::new, timeout); - assertEquals(Confirmed.INSTANCE, createResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + CompletionStage createResult = ref.askWithStatus(CreateAccount::new, timeout); + assertEquals(done(), createResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); // above works because then the response type is inferred by the lhs type - // below requires (ActorRef replyTo) + // below requires explicit typing assertEquals( - Confirmed.INSTANCE, - ref.ask( - (ActorRef replyTo) -> - new Deposit(BigDecimal.valueOf(100), replyTo), - timeout) + done(), + ref.askWithStatus(replyTo -> new Deposit(BigDecimal.valueOf(100), replyTo), timeout) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); assertEquals( - Confirmed.INSTANCE, - ref.ask( - (ActorRef replyTo) -> - new Withdraw(BigDecimal.valueOf(10), replyTo), - timeout) + done(), + ref.askWithStatus(replyTo -> new Withdraw(BigDecimal.valueOf(10), replyTo), timeout) .toCompletableFuture() .get(3, TimeUnit.SECONDS)); @@ -156,7 +153,7 @@ public class AccountExampleTest extends JUnitSuite { @Test public void verifySerialization() { - TestProbe opProbe = testKit.createTestProbe(); + TestProbe> opProbe = testKit.createTestProbe(); testKit.serializationTestKit().verifySerialization(new CreateAccount(opProbe.getRef()), false); Deposit deposit2 = testKit @@ -169,9 +166,6 @@ public class AccountExampleTest extends JUnitSuite { .verifySerialization(new Withdraw(BigDecimal.valueOf(90), opProbe.getRef()), false); testKit.serializationTestKit().verifySerialization(new CloseAccount(opProbe.getRef()), false); - testKit.serializationTestKit().verifySerialization(Confirmed.INSTANCE, false); - testKit.serializationTestKit().verifySerialization(new Rejected("overdraft"), false); - TestProbe getProbe = testKit.createTestProbe(); testKit.serializationTestKit().verifySerialization(new GetBalance(getProbe.getRef()), false); diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java index ef9f2c586d..e7ee8dc31b 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java @@ -4,8 +4,10 @@ package jdocs.akka.cluster.sharding.typed; +import akka.Done; import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.pattern.StatusReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; @@ -42,19 +44,19 @@ public interface AccountExampleWithEventHandlersInState { // #reply-command public static class CreateAccount implements Command { - public final ActorRef replyTo; + public final ActorRef> replyTo; @JsonCreator - public CreateAccount(ActorRef replyTo) { + public CreateAccount(ActorRef> replyTo) { this.replyTo = replyTo; } } public static class Deposit implements Command { public final BigDecimal amount; - public final ActorRef replyTo; + public final ActorRef> replyTo; - public Deposit(BigDecimal amount, ActorRef replyTo) { + public Deposit(BigDecimal amount, ActorRef> replyTo) { this.replyTo = replyTo; this.amount = amount; } @@ -62,9 +64,9 @@ public interface AccountExampleWithEventHandlersInState { public static class Withdraw implements Command { public final BigDecimal amount; - public final ActorRef replyTo; + public final ActorRef> replyTo; - public Withdraw(BigDecimal amount, ActorRef replyTo) { + public Withdraw(BigDecimal amount, ActorRef> replyTo) { this.amount = amount; this.replyTo = replyTo; } @@ -80,35 +82,16 @@ public interface AccountExampleWithEventHandlersInState { } public static class CloseAccount implements Command { - public final ActorRef replyTo; + public final ActorRef> replyTo; @JsonCreator - public CloseAccount(ActorRef replyTo) { + public CloseAccount(ActorRef> replyTo) { this.replyTo = replyTo; } } // Reply - // #reply-command - interface CommandReply extends CborSerializable {} - - interface OperationResult extends CommandReply {} - - enum Confirmed implements OperationResult { - INSTANCE - } - - public static class Rejected implements OperationResult { - public final String reason; - - @JsonCreator - public Rejected(String reason) { - this.reason = reason; - } - } - // #reply-command - - public static class CurrentBalance implements CommandReply { + public static class CurrentBalance implements CborSerializable { public final BigDecimal balance; @JsonCreator @@ -222,24 +205,26 @@ public interface AccountExampleWithEventHandlersInState { private ReplyEffect createAccount(EmptyAccount account, CreateAccount command) { return Effect() .persist(AccountCreated.INSTANCE) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } // #reply private ReplyEffect withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() - .reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount)); + .reply( + command.replyTo, + StatusReply.error("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } } // #reply @@ -252,10 +237,10 @@ public interface AccountExampleWithEventHandlersInState { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } else { return Effect() - .reply(command.replyTo, new Rejected("balance must be zero for closing account")); + .reply(command.replyTo, StatusReply.error("balance must be zero for closing account")); } } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java index 2c9b9ac4fc..ce3abdc9b4 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java @@ -4,8 +4,10 @@ package jdocs.akka.cluster.sharding.typed; +import akka.Done; import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.pattern.StatusReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; @@ -38,19 +40,19 @@ public interface AccountExampleWithMutableState { interface Command extends CborSerializable {} public static class CreateAccount implements Command { - public final ActorRef replyTo; + public final ActorRef> replyTo; @JsonCreator - public CreateAccount(ActorRef replyTo) { + public CreateAccount(ActorRef> replyTo) { this.replyTo = replyTo; } } public static class Deposit implements Command { public final BigDecimal amount; - public final ActorRef replyTo; + public final ActorRef> replyTo; - public Deposit(BigDecimal amount, ActorRef replyTo) { + public Deposit(BigDecimal amount, ActorRef> replyTo) { this.replyTo = replyTo; this.amount = amount; } @@ -58,9 +60,9 @@ public interface AccountExampleWithMutableState { public static class Withdraw implements Command { public final BigDecimal amount; - public final ActorRef replyTo; + public final ActorRef> replyTo; - public Withdraw(BigDecimal amount, ActorRef replyTo) { + public Withdraw(BigDecimal amount, ActorRef> replyTo) { this.amount = amount; this.replyTo = replyTo; } @@ -76,33 +78,16 @@ public interface AccountExampleWithMutableState { } public static class CloseAccount implements Command { - public final ActorRef replyTo; + public final ActorRef> replyTo; @JsonCreator - public CloseAccount(ActorRef replyTo) { + public CloseAccount(ActorRef> replyTo) { this.replyTo = replyTo; } } // Reply - interface CommandReply extends CborSerializable {} - - interface OperationResult extends CommandReply {} - - enum Confirmed implements OperationResult { - INSTANCE - } - - public static class Rejected implements OperationResult { - public final String reason; - - @JsonCreator - public Rejected(String reason) { - this.reason = reason; - } - } - - public static class CurrentBalance implements CommandReply { + public static class CurrentBalance implements CborSerializable { public final BigDecimal balance; @JsonCreator @@ -215,23 +200,25 @@ public interface AccountExampleWithMutableState { private ReplyEffect createAccount(EmptyAccount account, CreateAccount command) { return Effect() .persist(AccountCreated.INSTANCE) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() - .reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount)); + .reply( + command.replyTo, + StatusReply.error("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } } @@ -243,10 +230,10 @@ public interface AccountExampleWithMutableState { if (account.getBalance().equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } else { return Effect() - .reply(command.replyTo, new Rejected("balance must be zero for closing account")); + .reply(command.replyTo, StatusReply.error("balance must be zero for closing account")); } } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java index 0afce2091e..2131e0629a 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java @@ -4,8 +4,10 @@ package jdocs.akka.cluster.sharding.typed; +import akka.Done; import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.pattern.StatusReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; @@ -38,19 +40,19 @@ public interface AccountExampleWithNullState { interface Command extends CborSerializable {} public static class CreateAccount implements Command { - public final ActorRef replyTo; + public final ActorRef> replyTo; @JsonCreator - public CreateAccount(ActorRef replyTo) { + public CreateAccount(ActorRef> replyTo) { this.replyTo = replyTo; } } public static class Deposit implements Command { public final BigDecimal amount; - public final ActorRef replyTo; + public final ActorRef> replyTo; - public Deposit(BigDecimal amount, ActorRef replyTo) { + public Deposit(BigDecimal amount, ActorRef> replyTo) { this.replyTo = replyTo; this.amount = amount; } @@ -58,9 +60,9 @@ public interface AccountExampleWithNullState { public static class Withdraw implements Command { public final BigDecimal amount; - public final ActorRef replyTo; + public final ActorRef> replyTo; - public Withdraw(BigDecimal amount, ActorRef replyTo) { + public Withdraw(BigDecimal amount, ActorRef> replyTo) { this.amount = amount; this.replyTo = replyTo; } @@ -76,33 +78,16 @@ public interface AccountExampleWithNullState { } public static class CloseAccount implements Command { - public final ActorRef replyTo; + public final ActorRef> replyTo; @JsonCreator - public CloseAccount(ActorRef replyTo) { + public CloseAccount(ActorRef> replyTo) { this.replyTo = replyTo; } } // Reply - interface CommandReply extends CborSerializable {} - - interface OperationResult extends CommandReply {} - - enum Confirmed implements OperationResult { - INSTANCE - } - - public static class Rejected implements OperationResult { - public final String reason; - - @JsonCreator - public Rejected(String reason) { - this.reason = reason; - } - } - - public static class CurrentBalance implements CommandReply { + public static class CurrentBalance implements CborSerializable { public final BigDecimal balance; @JsonCreator @@ -214,23 +199,25 @@ public interface AccountExampleWithNullState { private ReplyEffect createAccount(CreateAccount command) { return Effect() .persist(AccountCreated.INSTANCE) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } private ReplyEffect withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() - .reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount)); + .reply( + command.replyTo, + StatusReply.error("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } } @@ -242,10 +229,10 @@ public interface AccountExampleWithNullState { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) - .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> StatusReply.ack()); } else { return Effect() - .reply(command.replyTo, new Rejected("balance must be zero for closing account")); + .reply(command.replyTo, StatusReply.error("balance must be zero for closing account")); } } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala index fff3609ea3..b051dc2c52 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala @@ -5,10 +5,12 @@ package docs.akka.cluster.sharding.typed //#test +import akka.Done import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit import akka.persistence.typed.PersistenceId import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.pattern.StatusReply import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike @@ -38,38 +40,38 @@ class AccountExampleDocSpec "Account" must { "be created with zero balance" in { - val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) - result.reply shouldBe AccountEntity.Confirmed + val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) + result.reply shouldBe StatusReply.Ack result.event shouldBe AccountEntity.AccountCreated result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0 } "handle Withdraw" in { - eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) + eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) - val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) - result1.reply shouldBe AccountEntity.Confirmed + val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) + result1.reply shouldBe StatusReply.Ack result1.event shouldBe AccountEntity.Deposited(100) result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100 - val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _)) - result2.reply shouldBe AccountEntity.Confirmed + val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _)) + result2.reply shouldBe StatusReply.Ack result2.event shouldBe AccountEntity.Withdrawn(10) result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90 } "reject Withdraw overdraft" in { - eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) - eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) + eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) + eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) - val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _)) - result.replyOfType[AccountEntity.Rejected] + val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _)) + result.reply.isError shouldBe true result.hasNoEvents shouldBe true } "handle GetBalance" in { - eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) - eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) + eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) + eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)) result.reply.balance shouldBe 100 diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala index 6574180b95..5a89278cc8 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala @@ -4,6 +4,8 @@ package docs.akka.cluster.sharding.typed +import akka.Done + import scala.concurrent.ExecutionContext import scala.concurrent.Future import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -12,6 +14,7 @@ import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.typed.Cluster import akka.cluster.typed.Join +import akka.pattern.StatusReply import akka.persistence.typed.PersistenceId import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike @@ -51,42 +54,40 @@ class AccountExampleSpec "Account example" must { "handle Deposit" in { - val probe = createTestProbe[OperationResult]() + val probe = createTestProbe[StatusReply[Done]]() val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "1") ref ! CreateAccount(probe.ref) - probe.expectMessage(Confirmed) + probe.expectMessage(StatusReply.Ack) ref ! Deposit(100, probe.ref) - probe.expectMessage(Confirmed) + probe.expectMessage(StatusReply.Ack) ref ! Deposit(10, probe.ref) - probe.expectMessage(Confirmed) + probe.expectMessage(StatusReply.Ack) } "handle Withdraw" in { - // OperationResult is the expected reply type for these commands, but it should also be - // possible to use the super type AccountCommandReply - val probe = createTestProbe[CommandReply]() + val doneProbe = createTestProbe[StatusReply[Done]]() val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "2") - ref ! CreateAccount(probe.ref) - probe.expectMessage(Confirmed) - ref ! Deposit(100, probe.ref) - probe.expectMessage(Confirmed) - ref ! Withdraw(10, probe.ref) - probe.expectMessage(Confirmed) + ref ! CreateAccount(doneProbe.ref) + doneProbe.expectMessage(StatusReply.Ack) + ref ! Deposit(100, doneProbe.ref) + doneProbe.expectMessage(StatusReply.Ack) + ref ! Withdraw(10, doneProbe.ref) + doneProbe.expectMessage(StatusReply.Ack) - // The same probe can be used with other commands too: - ref ! GetBalance(probe.ref) - probe.expectMessage(CurrentBalance(90)) + val balanceProbe = createTestProbe[CurrentBalance]() + ref ! GetBalance(balanceProbe.ref) + balanceProbe.expectMessage(CurrentBalance(90)) } "reject Withdraw overdraft" in { - val probe = createTestProbe[OperationResult]() + val probe = createTestProbe[StatusReply[Done]]() val ref = ClusterSharding(system).entityRefFor[Command](AccountEntity.TypeKey, "3") ref ! CreateAccount(probe.ref) - probe.expectMessage(Confirmed) + probe.expectMessage(StatusReply.Ack) ref ! Deposit(100, probe.ref) - probe.expectMessage(Confirmed) + probe.expectMessage(StatusReply.Ack) ref ! Withdraw(110, probe.ref) - probe.expectMessageType[Rejected] + probe.expectMessageType[StatusReply[Done]].isError should ===(true) // Account.Command is the command type, but it should also be possible to narrow it // ... thus restricting the entity ref from being sent other commands, e.g.: @@ -97,12 +98,12 @@ class AccountExampleSpec } "handle GetBalance" in { - val opProbe = createTestProbe[OperationResult]() + val opProbe = createTestProbe[StatusReply[Done]]() val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "4") ref ! CreateAccount(opProbe.ref) - opProbe.expectMessage(Confirmed) + opProbe.expectMessage(StatusReply.Ack) ref ! Deposit(100, opProbe.ref) - opProbe.expectMessage(Confirmed) + opProbe.expectMessage(StatusReply.Ack) val getProbe = createTestProbe[CurrentBalance]() ref ! GetBalance(getProbe.ref) @@ -111,27 +112,24 @@ class AccountExampleSpec "be usable with ask" in { val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "5") - val createResult: Future[OperationResult] = ref.ask(CreateAccount(_)) - createResult.futureValue should ===(Confirmed) + val createResult: Future[Done] = ref.askWithStatus(CreateAccount(_)) + createResult.futureValue should ===(Done) implicit val ec: ExecutionContext = testKit.system.executionContext // Errors are shown in IntelliJ Scala plugin 2019.1.6, but compiles with Scala 2.12.8. // Ok in IntelliJ if using ref.ask[OperationResult]. - ref.ask(Deposit(100, _)).futureValue should ===(Confirmed) - ref.ask(Withdraw(10, _)).futureValue should ===(Confirmed) + ref.askWithStatus(Deposit(100, _)).futureValue should ===(Done) + ref.askWithStatus(Withdraw(10, _)).futureValue should ===(Done) ref.ask(GetBalance(_)).map(_.balance).futureValue should ===(90) } "verifySerialization" in { - val opProbe = createTestProbe[OperationResult]() + val opProbe = createTestProbe[StatusReply[Done]]() serializationTestKit.verifySerialization(CreateAccount(opProbe.ref)) serializationTestKit.verifySerialization(Deposit(100, opProbe.ref)) serializationTestKit.verifySerialization(Withdraw(90, opProbe.ref)) serializationTestKit.verifySerialization(CloseAccount(opProbe.ref)) - serializationTestKit.verifySerialization(Confirmed) - serializationTestKit.verifySerialization(Rejected("overdraft")) - val getProbe = createTestProbe[CurrentBalance]() serializationTestKit.verifySerialization(GetBalance(getProbe.ref)) diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala index c5f3772b16..8fa0e41552 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala @@ -4,9 +4,11 @@ package docs.akka.cluster.sharding.typed +import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.pattern.StatusReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -25,18 +27,14 @@ object AccountExampleWithCommandHandlersInState { object AccountEntity { // Command sealed trait Command extends CborSerializable - final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command - final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command - final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command + final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command + final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command + final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command - final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command + final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command // Reply - sealed trait CommandReply extends CborSerializable - sealed trait OperationResult extends CommandReply - case object Confirmed extends OperationResult - final case class Rejected(reason: String) extends OperationResult - final case class CurrentBalance(balance: BigDecimal) extends CommandReply + final case class CurrentBalance(balance: BigDecimal) // Event sealed trait Event extends CborSerializable @@ -59,7 +57,7 @@ object AccountExampleWithCommandHandlersInState { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case CreateAccount(replyTo) => - Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed) + Effect.persist(AccountCreated).thenReply(replyTo)(_ => StatusReply.Ack) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() @@ -77,25 +75,25 @@ object AccountExampleWithCommandHandlersInState { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case Deposit(amount, replyTo) => - Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed) + Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => StatusReply.Ack) case Withdraw(amount, replyTo) => if (canWithdraw(amount)) - Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed) + Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => StatusReply.Ack) else - Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) + Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount")) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(balance)) case CloseAccount(replyTo) => if (balance == Zero) - Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed) + Effect.persist(AccountClosed).thenReply(replyTo)(_ => StatusReply.Ack) else - Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance")) + Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance")) case CreateAccount(replyTo) => - Effect.reply(replyTo)(Rejected("Account is already created")) + Effect.reply(replyTo)(StatusReply.Error("Account is already created")) } @@ -127,8 +125,8 @@ object AccountExampleWithCommandHandlersInState { replyClosed(replyTo) } - private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect = - Effect.reply(replyTo)(Rejected(s"Account is closed")) + private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect = + Effect.reply(replyTo)(StatusReply.Error(s"Account is closed")) override def applyEvent(event: Event): Account = throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala index dc4c888136..076d6bae59 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala @@ -4,9 +4,11 @@ package docs.akka.cluster.sharding.typed +import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.pattern.StatusReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -29,22 +31,16 @@ object AccountExampleWithEventHandlersInState { //#reply-command sealed trait Command extends CborSerializable //#reply-command - final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command - final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command + final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command + final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command //#reply-command - final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command + final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command //#reply-command final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command - final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command + final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command // Reply - //#reply-command - sealed trait CommandReply extends CborSerializable - sealed trait OperationResult extends CommandReply - case object Confirmed extends OperationResult - final case class Rejected(reason: String) extends OperationResult - //#reply-command - final case class CurrentBalance(balance: BigDecimal) extends CommandReply + final case class CurrentBalance(balance: BigDecimal) extends CborSerializable // Event sealed trait Event extends CborSerializable @@ -111,11 +107,12 @@ object AccountExampleWithEventHandlersInState { case acc @ OpenedAccount(_) => cmd match { - case c: Deposit => deposit(c) - case c: Withdraw => withdraw(acc, c) - case c: GetBalance => getBalance(acc, c) - case c: CloseAccount => closeAccount(acc, c) - case c: CreateAccount => Effect.reply(c.replyTo)(Rejected(s"Account $accountNumber is already created")) + case c: Deposit => deposit(c) + case c: Withdraw => withdraw(acc, c) + case c: GetBalance => getBalance(acc, c) + case c: CloseAccount => closeAccount(acc, c) + case c: CreateAccount => + Effect.reply(c.replyTo)(StatusReply.Error(s"Account $accountNumber is already created")) } case ClosedAccount => @@ -136,8 +133,8 @@ object AccountExampleWithEventHandlersInState { private def replyClosed( accountNumber: String, - replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect[Event, Account] = { - Effect.reply(replyTo)(Rejected(s"Account $accountNumber is closed")) + replyTo: ActorRef[StatusReply[Done]]): ReplyEffect[Event, Account] = { + Effect.reply(replyTo)(StatusReply.Error(s"Account $accountNumber is closed")) } private val eventHandler: (Account, Event) => Account = { (state, event) => @@ -145,19 +142,20 @@ object AccountExampleWithEventHandlersInState { } private def createAccount(cmd: CreateAccount): ReplyEffect[Event, Account] = { - Effect.persist(AccountCreated).thenReply(cmd.replyTo)(_ => Confirmed) + Effect.persist(AccountCreated).thenReply(cmd.replyTo)(_ => StatusReply.Ack) } private def deposit(cmd: Deposit): ReplyEffect[Event, Account] = { - Effect.persist(Deposited(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed) + Effect.persist(Deposited(cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack) } //#reply private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = { if (acc.canWithdraw(cmd.amount)) - Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed) + Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack) else - Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) + Effect.reply(cmd.replyTo)( + StatusReply.Error(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) } //#reply @@ -167,9 +165,9 @@ object AccountExampleWithEventHandlersInState { private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[Event, Account] = { if (acc.balance == Zero) - Effect.persist(AccountClosed).thenReply(cmd.replyTo)(_ => Confirmed) + Effect.persist(AccountClosed).thenReply(cmd.replyTo)(_ => StatusReply.Ack) else - Effect.reply(cmd.replyTo)(Rejected("Can't close account with non-zero balance")) + Effect.reply(cmd.replyTo)(StatusReply.Error("Can't close account with non-zero balance")) } } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala index 16197c6023..933ebd236d 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala @@ -4,9 +4,11 @@ package docs.akka.cluster.sharding.typed +import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.pattern.StatusReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -25,18 +27,14 @@ object AccountExampleWithOptionState { object AccountEntity { // Command sealed trait Command extends CborSerializable - final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command - final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command - final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command + final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command + final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command + final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command - final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command + final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command // Reply - sealed trait CommandReply extends CborSerializable - sealed trait OperationResult extends CommandReply - case object Confirmed extends OperationResult - final case class Rejected(reason: String) extends OperationResult - final case class CurrentBalance(balance: BigDecimal) extends CommandReply + final case class CurrentBalance(balance: BigDecimal) extends CborSerializable // Event sealed trait Event extends CborSerializable @@ -61,25 +59,25 @@ object AccountExampleWithOptionState { override def applyCommand(cmd: Command): ReplyEffect = cmd match { case Deposit(amount, replyTo) => - Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed) + Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => StatusReply.Ack) case Withdraw(amount, replyTo) => if (canWithdraw(amount)) - Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed) + Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => StatusReply.Ack) else - Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) + Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount")) case GetBalance(replyTo) => Effect.reply(replyTo)(CurrentBalance(balance)) case CloseAccount(replyTo) => if (balance == Zero) - Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed) + Effect.persist(AccountClosed).thenReply(replyTo)(_ => StatusReply.Ack) else - Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance")) + Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance")) case CreateAccount(replyTo) => - Effect.reply(replyTo)(Rejected("Account is already created")) + Effect.reply(replyTo)(StatusReply.Error("Account is already created")) } @@ -111,8 +109,8 @@ object AccountExampleWithOptionState { replyClosed(replyTo) } - private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect = - Effect.reply(replyTo)(Rejected(s"Account is closed")) + private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect = + Effect.reply(replyTo)(StatusReply.Error(s"Account is closed")) override def applyEvent(event: Event): Account = throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") @@ -141,7 +139,7 @@ object AccountExampleWithOptionState { def onFirstCommand(cmd: Command): ReplyEffect = { cmd match { case CreateAccount(replyTo) => - Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed) + Effect.persist(AccountCreated).thenReply(replyTo)(_ => StatusReply.Ack) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala index 2c882a6eb5..61e4272caf 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala @@ -84,4 +84,5 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend val createdTimestamp = if (re.hasCreatedTimestamp) re.getCreatedTimestamp else 0L Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)(createdTimestamp) } + } diff --git a/akka-docs/src/main/paradox/typed/interaction-patterns.md b/akka-docs/src/main/paradox/typed/interaction-patterns.md index e6abe11e6a..74376cb1ef 100644 --- a/akka-docs/src/main/paradox/typed/interaction-patterns.md +++ b/akka-docs/src/main/paradox/typed/interaction-patterns.md @@ -160,7 +160,9 @@ The adapter function is running in the receiving actor and can safely access its In an interaction where there is a 1:1 mapping between a request and a response we can use `ask` on the `ActorContext` to interact with another actor. -The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef`] to put as recipient in the outgoing message. The second step is to transform the successful `Response` or failure into a message that is part of the protocol of the sending actor. +The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef`] to put as recipient in the outgoing message. +The second step is to transform the successful `Response` or failure into a message that is part of the protocol of the sending actor. +See also the [Generic response wrapper](#generic-response-wrapper) for replies that are either a success or an error. **Example:** @@ -212,7 +214,7 @@ Java Note that validation errors are also explicit in the message protocol. The `GiveMeCookies` request can reply with `Cookies` or `InvalidRequest`. The requestor has to decide how to handle an `InvalidRequest` reply. Sometimes it should be treated as a failed @scala[`Future`]@java[`Future`] and for that the reply can be mapped on the -requestor side. +requestor side. See also the [Generic response wrapper](#generic-response-wrapper) for replies that are either a success or an error. Scala : @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask-fail-future } @@ -230,6 +232,50 @@ Java * There can only be a single response to one `ask` (see @ref:[per session child Actor](#per-session-child-actor)) * When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact +## Generic response wrapper + +In many cases the response can either be a successful result or an error (a validation error that the command was invalid for example). +Having to define two response classes and a shared supertype for every request type can be repetitive, especially in a cluster context +where you also have to make sure the messages can be serialized to be sent over the network. + +To help with this a generic status-response type is included in Akka: @apidoc[StatusReply], everywhere where `ask` can be used +there is also a second method `askWithStatus` which, given that the response is a `StatusReply` will unwrap successful responses +and help with handling validation errors. Akka includes pre-built serializers for the type, so in the normal use case a clustered +application only needs to provide a serializer for the successful result. + +For the case where the successful reply does not contain an actual value but is more of an acknowledgment there is a pre defined +@scala[`StatusReply.Ack`]@java[`StatusReply.ack()`] of type @scala[`StatusReply[Done]`]@java[`StatusReply`]. + +Errors are preferably sent as a text describing what is wrong, but using exceptions to attach a type is also possible. + +**Example actor to actor ask:** + +Scala +: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #actor-ask-with-status } + +Java +: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java) { #actor-ask-with-status } + +A validation error is turned into a `Failure` for the message adapter. In this case we are explicitly handling the valdation error separately from +other ask failures. + +**Example ask from the outside:** + +Scala +: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask-with-status } + +Java +: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java) { #standalone-ask-with-status } + +Note that validation errors are also explicit in the message protocol, but encoded as the wrapper type, constructed using @scala[`StatusReply.Error(text)`]@java[`StatusReply.error(text)`]: + +Scala +: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask-with-status-fail-future } + +Java +: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java) { #standalone-ask-with-status-fail-future } + + ## Ignoring replies In some situations an actor has a response for a particular request message but you are not interested in the response. In this case you can pass @scala[`system.ignoreRef`]@java[`system.ignoreRef()`] turning the request-response into a fire-and-forget. diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index aa8d196e58..eae94d72e7 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -386,8 +386,13 @@ The @ref:[Request-Response interaction pattern](interaction-patterns.md#request- persistent actors, because you typically want to know if the command was rejected due to validation errors and when accepted you want a confirmation when the events have been successfully stored. -Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`] in the -commands. After validation errors or after persisting events, using a `thenRun` side effect, the reply message can +Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`]. If the +command can either have a successful response or a validation error returned, the generic response type @scala[`StatusReply[ReplyType]]`] +@java[`StatusReply`] can be used. If the successful reply does not contain a value but is more of an acknowledgement +a pre defined @scala[`StatusReply.Ack`]@java[`StatusReply.ack()`] of type @scala[`StatusReply[Done]`]@java[`StatusReply`] +can be used. + +After validation errors or after persisting events, using a `thenRun` side effect, the reply message can be sent to the `ActorRef`. Scala diff --git a/akka-docs/src/main/paradox/typed/style-guide.md b/akka-docs/src/main/paradox/typed/style-guide.md index ac42c38477..dcc8f5182c 100644 --- a/akka-docs/src/main/paradox/typed/style-guide.md +++ b/akka-docs/src/main/paradox/typed/style-guide.md @@ -271,6 +271,9 @@ Scala Java : @@snip [StyleGuideDocExamples.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StyleGuideDocExamples.java) { #message-protocol } +Note that the response message hierarchy in this case could be completely avoided by using the @apiDoc[StatusReply] API +instead (see @ref[Generic Response Wrapper](interaction-patterns.md#generic-response-wrapper)). + ## Public versus private messages Often an actor has some messages that are only for its internal implementation and not part of the public diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala index 64178088ad..148b2cee52 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala @@ -8,6 +8,7 @@ import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors +import akka.pattern.StatusReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -46,7 +47,7 @@ object BlogPostEntity { //#commands sealed trait Command //#reply-command - final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends Command + final case class AddPost(content: PostContent, replyTo: ActorRef[StatusReply[AddPostDone]]) extends Command final case class AddPostDone(postId: String) //#reply-command final case class GetPost(replyTo: ActorRef[PostContent]) extends Command @@ -79,13 +80,16 @@ object BlogPostEntity { case cmd: ChangeBody => changeBody(draftState, cmd) case Publish(replyTo) => publish(draftState, replyTo) case GetPost(replyTo) => getPost(draftState, replyTo) - case _: AddPost => Effect.unhandled + case AddPost(_, replyTo) => + Effect.unhandled.thenRun(_ => replyTo ! StatusReply.Error("Cannot add post while in draft state")) } case publishedState: PublishedState => command match { case GetPost(replyTo) => getPost(publishedState, replyTo) - case _ => Effect.unhandled + case AddPost(_, replyTo) => + Effect.unhandled.thenRun(_ => replyTo ! StatusReply.Error("Cannot add post, already published")) + case _ => Effect.unhandled } } } @@ -95,7 +99,7 @@ object BlogPostEntity { val evt = PostAdded(cmd.content.postId, cmd.content) Effect.persist(evt).thenRun { _ => // After persist is done additional side effects can be performed - cmd.replyTo ! AddPostDone(cmd.content.postId) + cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId)) } //#reply } diff --git a/akka-remote/src/main/java/akka/remote/ContainerFormats.java b/akka-remote/src/main/java/akka/remote/ContainerFormats.java index c15a92afbe..f336fb1ff9 100644 --- a/akka-remote/src/main/java/akka/remote/ContainerFormats.java +++ b/akka-remote/src/main/java/akka/remote/ContainerFormats.java @@ -10583,6 +10583,624 @@ public final class ContainerFormats { } + public interface StatusReplyErrorMessageOrBuilder extends + // @@protoc_insertion_point(interface_extends:StatusReplyErrorMessage) + akka.protobufv3.internal.MessageOrBuilder { + + /** + * required string errorMessage = 1; + * @return Whether the errorMessage field is set. + */ + boolean hasErrorMessage(); + /** + * required string errorMessage = 1; + * @return The errorMessage. + */ + java.lang.String getErrorMessage(); + /** + * required string errorMessage = 1; + * @return The bytes for errorMessage. + */ + akka.protobufv3.internal.ByteString + getErrorMessageBytes(); + } + /** + *
+   * ReplyWith pattern message(s)
+   * 
+ * + * Protobuf type {@code StatusReplyErrorMessage} + */ + public static final class StatusReplyErrorMessage extends + akka.protobufv3.internal.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:StatusReplyErrorMessage) + StatusReplyErrorMessageOrBuilder { + private static final long serialVersionUID = 0L; + // Use StatusReplyErrorMessage.newBuilder() to construct. + private StatusReplyErrorMessage(akka.protobufv3.internal.GeneratedMessageV3.Builder builder) { + super(builder); + } + private StatusReplyErrorMessage() { + errorMessage_ = ""; + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) { + return new StatusReplyErrorMessage(); + } + + @java.lang.Override + public final akka.protobufv3.internal.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private StatusReplyErrorMessage( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields = + akka.protobufv3.internal.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + akka.protobufv3.internal.ByteString bs = input.readBytes(); + bitField0_ |= 0x00000001; + errorMessage_ = bs; + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobufv3.internal.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.ContainerFormats.StatusReplyErrorMessage.class, akka.remote.ContainerFormats.StatusReplyErrorMessage.Builder.class); + } + + private int bitField0_; + public static final int ERRORMESSAGE_FIELD_NUMBER = 1; + private volatile java.lang.Object errorMessage_; + /** + * required string errorMessage = 1; + * @return Whether the errorMessage field is set. + */ + public boolean hasErrorMessage() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * required string errorMessage = 1; + * @return The errorMessage. + */ + public java.lang.String getErrorMessage() { + java.lang.Object ref = errorMessage_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobufv3.internal.ByteString bs = + (akka.protobufv3.internal.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + errorMessage_ = s; + } + return s; + } + } + /** + * required string errorMessage = 1; + * @return The bytes for errorMessage. + */ + public akka.protobufv3.internal.ByteString + getErrorMessageBytes() { + java.lang.Object ref = errorMessage_; + if (ref instanceof java.lang.String) { + akka.protobufv3.internal.ByteString b = + akka.protobufv3.internal.ByteString.copyFromUtf8( + (java.lang.String) ref); + errorMessage_ = b; + return b; + } else { + return (akka.protobufv3.internal.ByteString) ref; + } + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + if (!hasErrorMessage()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(akka.protobufv3.internal.CodedOutputStream output) + throws java.io.IOException { + if (((bitField0_ & 0x00000001) != 0)) { + akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 1, errorMessage_); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) != 0)) { + size += akka.protobufv3.internal.GeneratedMessageV3.computeStringSize(1, errorMessage_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof akka.remote.ContainerFormats.StatusReplyErrorMessage)) { + return super.equals(obj); + } + akka.remote.ContainerFormats.StatusReplyErrorMessage other = (akka.remote.ContainerFormats.StatusReplyErrorMessage) obj; + + if (hasErrorMessage() != other.hasErrorMessage()) return false; + if (hasErrorMessage()) { + if (!getErrorMessage() + .equals(other.getErrorMessage())) return false; + } + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (hasErrorMessage()) { + hash = (37 * hash) + ERRORMESSAGE_FIELD_NUMBER; + hash = (53 * hash) + getErrorMessage().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + java.nio.ByteBuffer data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + java.nio.ByteBuffer data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + akka.protobufv3.internal.ByteString data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + akka.protobufv3.internal.ByteString data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(byte[] data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + byte[] data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseDelimitedFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + akka.protobufv3.internal.CodedInputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(akka.remote.ContainerFormats.StatusReplyErrorMessage prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + *
+     * ReplyWith pattern message(s)
+     * 
+ * + * Protobuf type {@code StatusReplyErrorMessage} + */ + public static final class Builder extends + akka.protobufv3.internal.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:StatusReplyErrorMessage) + akka.remote.ContainerFormats.StatusReplyErrorMessageOrBuilder { + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.ContainerFormats.StatusReplyErrorMessage.class, akka.remote.ContainerFormats.StatusReplyErrorMessage.Builder.class); + } + + // Construct using akka.remote.ContainerFormats.StatusReplyErrorMessage.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobufv3.internal.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + errorMessage_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + @java.lang.Override + public akka.protobufv3.internal.Descriptors.Descriptor + getDescriptorForType() { + return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_descriptor; + } + + @java.lang.Override + public akka.remote.ContainerFormats.StatusReplyErrorMessage getDefaultInstanceForType() { + return akka.remote.ContainerFormats.StatusReplyErrorMessage.getDefaultInstance(); + } + + @java.lang.Override + public akka.remote.ContainerFormats.StatusReplyErrorMessage build() { + akka.remote.ContainerFormats.StatusReplyErrorMessage result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public akka.remote.ContainerFormats.StatusReplyErrorMessage buildPartial() { + akka.remote.ContainerFormats.StatusReplyErrorMessage result = new akka.remote.ContainerFormats.StatusReplyErrorMessage(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) != 0)) { + to_bitField0_ |= 0x00000001; + } + result.errorMessage_ = errorMessage_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(akka.protobufv3.internal.Message other) { + if (other instanceof akka.remote.ContainerFormats.StatusReplyErrorMessage) { + return mergeFrom((akka.remote.ContainerFormats.StatusReplyErrorMessage)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.remote.ContainerFormats.StatusReplyErrorMessage other) { + if (other == akka.remote.ContainerFormats.StatusReplyErrorMessage.getDefaultInstance()) return this; + if (other.hasErrorMessage()) { + bitField0_ |= 0x00000001; + errorMessage_ = other.errorMessage_; + onChanged(); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + if (!hasErrorMessage()) { + return false; + } + return true; + } + + @java.lang.Override + public Builder mergeFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.remote.ContainerFormats.StatusReplyErrorMessage parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + parsedMessage = (akka.remote.ContainerFormats.StatusReplyErrorMessage) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private java.lang.Object errorMessage_ = ""; + /** + * required string errorMessage = 1; + * @return Whether the errorMessage field is set. + */ + public boolean hasErrorMessage() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * required string errorMessage = 1; + * @return The errorMessage. + */ + public java.lang.String getErrorMessage() { + java.lang.Object ref = errorMessage_; + if (!(ref instanceof java.lang.String)) { + akka.protobufv3.internal.ByteString bs = + (akka.protobufv3.internal.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + errorMessage_ = s; + } + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string errorMessage = 1; + * @return The bytes for errorMessage. + */ + public akka.protobufv3.internal.ByteString + getErrorMessageBytes() { + java.lang.Object ref = errorMessage_; + if (ref instanceof String) { + akka.protobufv3.internal.ByteString b = + akka.protobufv3.internal.ByteString.copyFromUtf8( + (java.lang.String) ref); + errorMessage_ = b; + return b; + } else { + return (akka.protobufv3.internal.ByteString) ref; + } + } + /** + * required string errorMessage = 1; + * @param value The errorMessage to set. + * @return This builder for chaining. + */ + public Builder setErrorMessage( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + errorMessage_ = value; + onChanged(); + return this; + } + /** + * required string errorMessage = 1; + * @return This builder for chaining. + */ + public Builder clearErrorMessage() { + bitField0_ = (bitField0_ & ~0x00000001); + errorMessage_ = getDefaultInstance().getErrorMessage(); + onChanged(); + return this; + } + /** + * required string errorMessage = 1; + * @param value The bytes for errorMessage to set. + * @return This builder for chaining. + */ + public Builder setErrorMessageBytes( + akka.protobufv3.internal.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + errorMessage_ = value; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:StatusReplyErrorMessage) + } + + // @@protoc_insertion_point(class_scope:StatusReplyErrorMessage) + private static final akka.remote.ContainerFormats.StatusReplyErrorMessage DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new akka.remote.ContainerFormats.StatusReplyErrorMessage(); + } + + public static akka.remote.ContainerFormats.StatusReplyErrorMessage getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final akka.protobufv3.internal.Parser + PARSER = new akka.protobufv3.internal.AbstractParser() { + @java.lang.Override + public StatusReplyErrorMessage parsePartialFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return new StatusReplyErrorMessage(input, extensionRegistry); + } + }; + + public static akka.protobufv3.internal.Parser parser() { + return PARSER; + } + + @java.lang.Override + public akka.protobufv3.internal.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public akka.remote.ContainerFormats.StatusReplyErrorMessage getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + private static final akka.protobufv3.internal.Descriptors.Descriptor internal_static_SelectionEnvelope_descriptor; private static final @@ -10643,6 +11261,11 @@ public final class ContainerFormats { private static final akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable internal_static_StackTraceElement_fieldAccessorTable; + private static final akka.protobufv3.internal.Descriptors.Descriptor + internal_static_StatusReplyErrorMessage_descriptor; + private static final + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internal_static_StatusReplyErrorMessage_fieldAccessorTable; public static akka.protobufv3.internal.Descriptors.FileDescriptor getDescriptor() { @@ -10675,9 +11298,10 @@ public final class ContainerFormats { "\n\007message\030\002 \002(\t\022\027\n\005cause\030\003 \002(\0132\010.Payload" + "\"`\n\021StackTraceElement\022\021\n\tclassName\030\001 \002(\t" + "\022\022\n\nmethodName\030\002 \002(\t\022\020\n\010fileName\030\003 \002(\t\022\022" + - "\n\nlineNumber\030\004 \002(\005*<\n\013PatternType\022\n\n\006PAR" + - "ENT\020\000\022\016\n\nCHILD_NAME\020\001\022\021\n\rCHILD_PATTERN\020\002" + - "B\017\n\013akka.remoteH\001" + "\n\nlineNumber\030\004 \002(\005\"/\n\027StatusReplyErrorMe" + + "ssage\022\024\n\014errorMessage\030\001 \002(\t*<\n\013PatternTy" + + "pe\022\n\n\006PARENT\020\000\022\016\n\nCHILD_NAME\020\001\022\021\n\rCHILD_" + + "PATTERN\020\002B\017\n\013akka.remoteH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -10755,6 +11379,12 @@ public final class ContainerFormats { akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_StackTraceElement_descriptor, new java.lang.String[] { "ClassName", "MethodName", "FileName", "LineNumber", }); + internal_static_StatusReplyErrorMessage_descriptor = + getDescriptor().getMessageTypes().get(12); + internal_static_StatusReplyErrorMessage_fieldAccessorTable = new + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( + internal_static_StatusReplyErrorMessage_descriptor, + new java.lang.String[] { "ErrorMessage", }); } // @@protoc_insertion_point(outer_class_scope) diff --git a/akka-remote/src/main/protobuf/ContainerFormats.proto b/akka-remote/src/main/protobuf/ContainerFormats.proto index 1e8e7de404..da9cd0c8a6 100644 --- a/akka-remote/src/main/protobuf/ContainerFormats.proto +++ b/akka-remote/src/main/protobuf/ContainerFormats.proto @@ -82,3 +82,9 @@ message StackTraceElement { required string fileName = 3; required int32 lineNumber = 4; } + + +// ReplyWith pattern message(s) +message StatusReplyErrorMessage { + required string errorMessage = 1; +} \ No newline at end of file diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 76de8dc1a5..2e95f51e81 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -98,6 +98,8 @@ akka { "akka.routing.TailChoppingPool" = akka-misc "akka.remote.routing.RemoteRouterConfig" = akka-misc + "akka.pattern.StatusReply" = akka-misc + "akka.dispatch.sysmsg.SystemMessage" = akka-system-msg # Java Serializer is by default used for exceptions and will by default diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala index 89ac982fd0..ec5825e881 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MiscMessageSerializer.scala @@ -10,12 +10,11 @@ import java.util.Optional import java.util.concurrent.TimeUnit import scala.concurrent.duration.{ FiniteDuration, TimeUnit } - import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } - import akka.{ Done, NotUsed } import akka.actor._ import akka.dispatch.Dispatchers +import akka.pattern.StatusReply import akka.remote._ import akka.remote.WireFormats.AddressData import akka.remote.routing.RemoteRouterConfig @@ -42,6 +41,9 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW case r: ActorRef => serializeActorRef(r) case s: Status.Success => serializeStatusSuccess(s) case f: Status.Failure => serializeStatusFailure(f) + case StatusReply.Ack => Array.emptyByteArray + case r @ StatusReply.Success(_) => serializeStatusReplySuccess(r) + case r @ StatusReply.Error(_) => serializeStatusReplyError(r) case ex: ActorInitializationException => serializeActorInitializationException(ex) case ex: ThrowableNotSerializableException => serializeThrowableNotSerializableException(ex) case t: Throwable => throwableSupport.serializeThrowable(t) @@ -120,6 +122,22 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW private def serializeStatusFailure(failure: Status.Failure): Array[Byte] = payloadSupport.payloadBuilder(failure.cause).build().toByteArray + def serializeStatusReplySuccess(r: StatusReply[Any]): Array[Byte] = + // no specific message, serialized id and manifest together with payload is enough (no wrapping overhead) + payloadSupport.payloadBuilder(r.getValue).build().toByteArray + + def serializeStatusReplyError(r: StatusReply[_]): Array[Byte] = { + r.getError match { + case em: StatusReply.ErrorMessage => + // somewhat optimized for the recommended usage, avoiding the additional payload metadata + ContainerFormats.StatusReplyErrorMessage.newBuilder().setErrorMessage(em.getMessage).build().toByteArray + case ex: Throwable => + // depends on user providing exception serializer + // no specific message, serialized id and manifest together with payload is enough (less wrapping overhead) + payloadSupport.payloadBuilder(ex).build().toByteArray + } + } + private def serializeActorInitializationException(ex: ActorInitializationException): Array[Byte] = { val builder = ContainerFormats.ActorInitializationException.newBuilder() if (ex.getActor ne null) @@ -312,12 +330,20 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW private val ScatterGatherPoolManifest = "ROSGP" private val TailChoppingPoolManifest = "ROTCP" private val RemoteRouterConfigManifest = "RORRC" + private val StatusReplySuccessManifest = "S" + private val StatusReplyErrorMessageManifest = "SM" + private val StatusReplyErrorExceptionManifest = "SE" + private val StatusReplyAckManifest = "SA" private val fromBinaryMap = Map[String, Array[Byte] => AnyRef]( IdentifyManifest -> deserializeIdentify, ActorIdentityManifest -> deserializeActorIdentity, StatusSuccessManifest -> deserializeStatusSuccess, StatusFailureManifest -> deserializeStatusFailure, + StatusReplyAckManifest -> ((_) => StatusReply.Ack), + StatusReplySuccessManifest -> deserializeStatusReplySuccess, + StatusReplyErrorMessageManifest -> deserializeStatusReplyErrorMessage, + StatusReplyErrorExceptionManifest -> deserializeStatusReplyErrorException, ThrowableManifest -> throwableSupport.deserializeThrowable, ActorRefManifest -> deserializeActorRefBytes, OptionManifest -> deserializeOption, @@ -347,36 +373,41 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW override def manifest(o: AnyRef): String = o match { - case _: Identify => IdentifyManifest - case _: ActorIdentity => ActorIdentityManifest - case _: Option[Any] => OptionManifest - case _: Optional[_] => OptionalManifest - case _: ActorRef => ActorRefManifest - case _: Status.Success => StatusSuccessManifest - case _: Status.Failure => StatusFailureManifest - case _: ActorInitializationException => ActorInitializationExceptionManifest - case _: ThrowableNotSerializableException => ThrowableNotSerializableExceptionManifest - case _: Throwable => ThrowableManifest - case PoisonPill => PoisonPillManifest - case Kill => KillManifest - case RemoteWatcher.Heartbeat => RemoteWatcherHBManifest - case Done => DoneManifest - case NotUsed => NotUsedManifest - case _: Address => AddressManifest - case _: UniqueAddress => UniqueAddressManifest - case _: RemoteWatcher.HeartbeatRsp => RemoteWatcherHBRespManifest - case LocalScope => LocalScopeManifest - case _: RemoteScope => RemoteScopeManifest - case _: Config => ConfigManifest - case _: FromConfig => FromConfigManifest - case _: DefaultResizer => DefaultResizerManifest - case _: BalancingPool => BalancingPoolManifest - case _: BroadcastPool => BroadcastPoolManifest - case _: RandomPool => RandomPoolManifest - case _: RoundRobinPool => RoundRobinPoolManifest - case _: ScatterGatherFirstCompletedPool => ScatterGatherPoolManifest - case _: TailChoppingPool => TailChoppingPoolManifest - case _: RemoteRouterConfig => RemoteRouterConfigManifest + case _: Identify => IdentifyManifest + case _: ActorIdentity => ActorIdentityManifest + case _: Option[Any] => OptionManifest + case _: Optional[_] => OptionalManifest + case _: ActorRef => ActorRefManifest + case _: Status.Success => StatusSuccessManifest + case _: Status.Failure => StatusFailureManifest + case StatusReply.Ack => StatusReplyAckManifest + case StatusReply.Success(_) => StatusReplySuccessManifest + case StatusReply.Error(_: StatusReply.ErrorMessage) => StatusReplyErrorMessageManifest + case StatusReply.Error(_) => StatusReplyErrorExceptionManifest + case _: ActorInitializationException => ActorInitializationExceptionManifest + case _: ThrowableNotSerializableException => ThrowableNotSerializableExceptionManifest + case _: Throwable => ThrowableManifest + case PoisonPill => PoisonPillManifest + case Kill => KillManifest + case RemoteWatcher.Heartbeat => RemoteWatcherHBManifest + case Done => DoneManifest + case NotUsed => NotUsedManifest + case _: Address => AddressManifest + case _: UniqueAddress => UniqueAddressManifest + case _: RemoteWatcher.HeartbeatRsp => RemoteWatcherHBRespManifest + case LocalScope => LocalScopeManifest + case _: RemoteScope => RemoteScopeManifest + case _: Config => ConfigManifest + case _: FromConfig => FromConfigManifest + case _: DefaultResizer => DefaultResizerManifest + case _: BalancingPool => BalancingPoolManifest + case _: BroadcastPool => BroadcastPoolManifest + case _: RandomPool => RandomPoolManifest + case _: RoundRobinPool => RoundRobinPoolManifest + case _: ScatterGatherFirstCompletedPool => ScatterGatherPoolManifest + case _: TailChoppingPool => TailChoppingPoolManifest + case _: RemoteRouterConfig => RemoteRouterConfigManifest + case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } @@ -436,6 +467,16 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW private def deserializeStatusFailure(bytes: Array[Byte]): Status.Failure = Status.Failure(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)).asInstanceOf[Throwable]) + private def deserializeStatusReplySuccess(bytes: Array[Byte]): StatusReply[_] = + StatusReply.success(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes))) + + private def deserializeStatusReplyErrorMessage(bytes: Array[Byte]): StatusReply[_] = + StatusReply.error(ContainerFormats.StatusReplyErrorMessage.parseFrom(bytes).getErrorMessage) + + private def deserializeStatusReplyErrorException(bytes: Array[Byte]): StatusReply[_] = + StatusReply.error( + payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)).asInstanceOf[Throwable]) + private def deserializeAddressData(bytes: Array[Byte]): Address = addressFromDataProto(WireFormats.AddressData.parseFrom(bytes)) diff --git a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala index cab843cce8..2a4ba22be2 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/MiscMessageSerializerSpec.scala @@ -10,12 +10,11 @@ import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import com.typesafe.config.ConfigFactory - import akka.{ Done, NotUsed } import akka.actor._ import akka.pattern.AskTimeoutException +import akka.pattern.StatusReply import akka.remote.{ RemoteScope, RemoteWatcher } import akka.remote.routing.RemoteRouterConfig import akka.routing._ @@ -130,7 +129,11 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC "TailChoppingPool" -> TailChoppingPool(25, within = 3.seconds, interval = 1.second), "RemoteRouterConfig" -> RemoteRouterConfig( local = RandomPool(25), - nodes = List(Address("akka", "system", "localhost", 2525)))).foreach { + nodes = List(Address("akka", "system", "localhost", 2525))), + "StatusReply.success" -> StatusReply.success("woho!"), + "StatusReply.Ack" -> StatusReply.Ack, + "StatusReply.error(errorMessage)" -> StatusReply.error("boho!"), + "StatusReply.error(exception)" -> StatusReply.error(new TestException("boho!"))).foreach { case (scenario, item) => s"resolve serializer for $scenario" in { val serializer = SerializationExtension(system)