Ok/error protocol and failable ask #29186 (#29190)

New type StatusReply simplifies the very common use case of replying to a request with either a successful reply or an error reply which can be repetitive to define for every actor, with the additional overhead of having to make sure each such sealed top type + 2 concrete reply classes has working serialization.
This commit is contained in:
Johan Andrén 2020-07-09 16:57:53 +02:00 committed by GitHub
parent ec08c9dde4
commit 996f424835
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 2093 additions and 324 deletions

View file

@ -0,0 +1,123 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern;
import akka.actor.Actor;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import akka.testkit.TestException;
import akka.testkit.TestProbe;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static akka.pattern.Patterns.askWithStatus;
import static org.junit.Assert.*;
public class StatusReplyTest extends JUnitSuite {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("JavaAPI", AkkaSpec.testConf());
@Test
public void testSuccessApi() {
StatusReply<String> reply = StatusReply.success("woho");
assertTrue(reply.isSuccess());
assertFalse(reply.isError());
assertEquals("woho", reply.getValue());
try {
reply.getError();
Assert.fail("Calling get error on success did not throw");
} catch (IllegalArgumentException ex) {
// this is what we expect
}
}
@Test
public void testErrorMessageApi() {
StatusReply<String> reply = StatusReply.error("boho");
assertTrue(reply.isError());
assertFalse(reply.isSuccess());
assertEquals("boho", reply.getError().getMessage());
try {
reply.getValue();
Assert.fail("Calling get value on error did not throw");
} catch (StatusReply.ErrorMessage ex) {
// this is what we expect
} catch (Throwable th) {
Assert.fail("Unexpected exception type: " + th);
}
}
@Test
public void testErrorExceptionApi() {
StatusReply<String> reply = StatusReply.error(new TestException("boho"));
assertTrue(reply.isError());
assertFalse(reply.isSuccess());
assertEquals("boho", reply.getError().getMessage());
try {
reply.getValue();
Assert.fail("Calling get value on error did not throw");
} catch (TestException ex) {
// this is what we expect
} catch (Throwable th) {
Assert.fail("Unexpected exception type: " + th);
}
}
@Test
public void testAskWithStatusSuccess() throws Exception {
TestProbe probe = new TestProbe(actorSystemResource.getSystem());
CompletionStage<Object> response = askWithStatus(probe.ref(), "request", Duration.ofSeconds(3));
probe.expectMsg("request");
probe.lastSender().tell(StatusReply.success("woho"), Actor.noSender());
Object result = response.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("woho", result);
}
@Test
public void testAskWithStatusErrorMessage() throws Exception {
TestProbe probe = new TestProbe(actorSystemResource.getSystem());
CompletionStage<Object> response = askWithStatus(probe.ref(), "request", Duration.ofSeconds(3));
probe.expectMsg("request");
probe.lastSender().tell(StatusReply.error("boho"), Actor.noSender());
try {
Object result = response.toCompletableFuture().get(3, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
// what we expected
assertEquals(StatusReply.ErrorMessage.class, ex.getCause().getClass());
assertEquals("boho", ex.getCause().getMessage());
}
}
@Test
public void testAskWithStatusErrorException() throws Exception {
TestProbe probe = new TestProbe(actorSystemResource.getSystem());
CompletionStage<Object> response = askWithStatus(probe.ref(), "request", Duration.ofSeconds(3));
probe.expectMsg("request");
probe.lastSender().tell(StatusReply.error(new TestException("boho")), Actor.noSender());
try {
Object result = response.toCompletableFuture().get(3, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
// what we expected
assertEquals(TestException.class, ex.getCause().getClass());
assertEquals("boho", ex.getCause().getMessage());
}
}
}

View file

@ -0,0 +1,90 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern
import akka.Done
import akka.testkit.AkkaSpec
import akka.testkit.TestException
import akka.testkit.TestProbe
import akka.util.Timeout
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Future
import scala.concurrent.duration._
class StatusReplySpec extends AkkaSpec with ScalaFutures {
"StatusReply" should {
"pattern match success" in {
// like in a classic actor receive Any => ...
(StatusReply.Success("woho!"): Any) match {
case StatusReply.Success(_: Int) => fail()
case StatusReply.Success(text: String) if text == "woho!" =>
case _ => fail()
}
}
"pattern match success (Ack)" in {
// like in a classic actor receive Any => ...
(StatusReply.Ack: Any) match {
case StatusReply.Ack =>
case _ => fail()
}
}
"pattern match error with text" in {
StatusReply.Error("boho!") match {
case StatusReply.Error(_) =>
case _ => fail()
}
}
"pattern match error with exception" in {
StatusReply.Error(TestException("boho!")) match {
case StatusReply.Error(_) =>
case _ => fail()
}
}
"flatten a Future[StatusReply]" in {
import system.dispatcher
StatusReply.flattenStatusFuture(Future(StatusReply.Success("woho"))).futureValue should ===("woho")
StatusReply.flattenStatusFuture(Future(StatusReply.Ack)).futureValue should ===(Done)
StatusReply.flattenStatusFuture(Future(StatusReply.Error("boo"))).failed.futureValue should ===(
StatusReply.ErrorMessage("boo"))
StatusReply.flattenStatusFuture(Future(StatusReply.Error(TestException("boo")))).failed.futureValue should ===(
TestException("boo"))
}
}
"askWithStatus" should {
implicit val timeout: Timeout = 3.seconds
"unwrap success" in {
val probe = TestProbe()
val result = probe.ref.askWithStatus("request")
probe.expectMsg("request")
probe.lastSender ! StatusReply.Success("woho")
result.futureValue should ===("woho")
}
"unwrap Error with message" in {
val probe = TestProbe()
val result = probe.ref.askWithStatus("request")
probe.expectMsg("request")
probe.lastSender ! StatusReply.Error("boho")
result.failed.futureValue should ===(StatusReply.ErrorMessage("boho"))
}
"unwrap Error with exception" in {
val probe = TestProbe()
val result = probe.ref.askWithStatus("request")
probe.expectMsg("request")
probe.lastSender ! StatusReply.Error(TestException("boho"))
result.failed.futureValue should ===(TestException("boho"))
}
}
}

View file

@ -4,9 +4,11 @@
package akka.actor.typed.javadsl;
import akka.actor.testkit.typed.TestException;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.pattern.StatusReply;
import akka.testkit.AkkaSpec;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
@ -16,7 +18,6 @@ import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public class ActorContextAskTest extends JUnitSuite {
@ -26,10 +27,10 @@ public class ActorContextAskTest extends JUnitSuite {
@Rule public final LogCapturing logCapturing = new LogCapturing();
static class Ping {
final ActorRef<Pong> respondTo;
final ActorRef<Pong> replyTo;
public Ping(ActorRef<Pong> respondTo) {
this.respondTo = respondTo;
public Ping(ActorRef<Pong> replyTo) {
this.replyTo = replyTo;
}
}
@ -40,7 +41,7 @@ public class ActorContextAskTest extends JUnitSuite {
final Behavior<Ping> pingPongBehavior =
Behaviors.receive(
(ActorContext<Ping> context, Ping message) -> {
message.respondTo.tell(new Pong());
message.replyTo.tell(new Pong());
return Behaviors.same();
});
@ -71,4 +72,92 @@ public class ActorContextAskTest extends JUnitSuite {
probe.expectMessageClass(Pong.class);
}
static class PingWithStatus {
final ActorRef<StatusReply<Pong>> replyTo;
public PingWithStatus(ActorRef<StatusReply<Pong>> replyTo) {
this.replyTo = replyTo;
}
}
@Test
public void askWithStatusUnwrapsSuccess() {
final TestProbe<Object> probe = testKit.createTestProbe();
testKit.spawn(
Behaviors.<Pong>setup(
context -> {
context.askWithStatus(
Pong.class,
probe.getRef(),
Duration.ofSeconds(3),
PingWithStatus::new,
(pong, failure) -> {
if (pong != null) return pong;
else throw new RuntimeException(failure);
});
return Behaviors.receive(Pong.class)
.onAnyMessage(
pong -> {
probe.ref().tell("got pong");
return Behaviors.same();
})
.build();
}));
ActorRef<StatusReply<Pong>> replyTo = probe.expectMessageClass(PingWithStatus.class).replyTo;
replyTo.tell(StatusReply.success(new Pong()));
probe.expectMessage("got pong");
}
private static Behavior<Throwable> exceptionCapturingBehavior(ActorRef<Object> probe) {
return Behaviors.setup(
context -> {
context.askWithStatus(
Pong.class,
probe.narrow(),
Duration.ofSeconds(3),
PingWithStatus::new,
(pong, failure) -> {
if (pong != null) throw new IllegalArgumentException("did not expect pong");
else return failure;
});
return Behaviors.receive(Throwable.class)
.onAnyMessage(
throwable -> {
probe.tell(
"got error: "
+ throwable.getClass().getName()
+ ", "
+ throwable.getMessage());
return Behaviors.same();
})
.build();
});
}
@Test
public void askWithStatusUnwrapsErrorMessages() {
final TestProbe<Object> probe = testKit.createTestProbe();
testKit.spawn(exceptionCapturingBehavior(probe.getRef()));
ActorRef<StatusReply<Pong>> replyTo = probe.expectMessageClass(PingWithStatus.class).replyTo;
replyTo.tell(StatusReply.error("boho"));
probe.expectMessage("got error: akka.pattern.StatusReply$ErrorMessage, boho");
}
@Test
public void askWithStatusUnwrapsErrorCustomExceptions() {
final TestProbe<Object> probe = testKit.createTestProbe();
testKit.spawn(exceptionCapturingBehavior(probe.getRef()));
ActorRef<StatusReply<Pong>> replyTo = probe.expectMessageClass(PingWithStatus.class).replyTo;
// with custom exception
replyTo.tell(StatusReply.error(new TestException("boho")));
probe.expectMessage("got error: akka.actor.testkit.typed.TestException, boho");
}
}

View file

@ -0,0 +1,239 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.typed;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
// #actor-ask-with-status
import akka.pattern.StatusReply;
// #actor-ask-with-status
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
public class InteractionPatternsAskWithStatusTest extends JUnitSuite {
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
@Rule public final LogCapturing logCapturing = new LogCapturing();
// separate from InteractionPatternsTest to avoid name clashes while keeping the ask samples
// almost identical
interface Samples {
// #actor-ask-with-status
public class Hal extends AbstractBehavior<Hal.Command> {
public static Behavior<Hal.Command> create() {
return Behaviors.setup(Hal::new);
}
private Hal(ActorContext<Hal.Command> context) {
super(context);
}
public interface Command {}
public static final class OpenThePodBayDoorsPlease implements Hal.Command {
public final ActorRef<StatusReply<String>> respondTo;
public OpenThePodBayDoorsPlease(ActorRef<StatusReply<String>> respondTo) {
this.respondTo = respondTo;
}
}
@Override
public Receive<Hal.Command> createReceive() {
return newReceiveBuilder()
.onMessage(Hal.OpenThePodBayDoorsPlease.class, this::onOpenThePodBayDoorsPlease)
.build();
}
private Behavior<Hal.Command> onOpenThePodBayDoorsPlease(
Hal.OpenThePodBayDoorsPlease message) {
message.respondTo.tell(StatusReply.error("I'm sorry, Dave. I'm afraid I can't do that."));
return this;
}
}
public class Dave extends AbstractBehavior<Dave.Command> {
public interface Command {}
// this is a part of the protocol that is internal to the actor itself
private static final class AdaptedResponse implements Dave.Command {
public final String message;
public AdaptedResponse(String message) {
this.message = message;
}
}
public static Behavior<Dave.Command> create(ActorRef<Hal.Command> hal) {
return Behaviors.setup(context -> new Dave(context, hal));
}
private Dave(ActorContext<Dave.Command> context, ActorRef<Hal.Command> hal) {
super(context);
// asking someone requires a timeout, if the timeout hits without response
// the ask is failed with a TimeoutException
final Duration timeout = Duration.ofSeconds(3);
context.askWithStatus(
String.class,
hal,
timeout,
// construct the outgoing message
(ActorRef<StatusReply<String>> ref) -> new Hal.OpenThePodBayDoorsPlease(ref),
// adapt the response (or failure to respond)
(response, throwable) -> {
if (response != null) {
// a ReponseWithStatus.success(m) is unwrapped and passed as response
return new Dave.AdaptedResponse(response);
} else {
// a ResponseWithStatus.error will end up as a StatusReply.ErrorMessage()
// exception here
return new Dave.AdaptedResponse("Request failed: " + throwable.getMessage());
}
});
}
@Override
public Receive<Dave.Command> createReceive() {
return newReceiveBuilder()
// the adapted message ends up being processed like any other
// message sent to the actor
.onMessage(Dave.AdaptedResponse.class, this::onAdaptedResponse)
.build();
}
private Behavior<Dave.Command> onAdaptedResponse(Dave.AdaptedResponse response) {
getContext().getLog().info("Got response from HAL: {}", response.message);
return this;
}
}
// #actor-ask-with-status
}
interface StandaloneAskSample {
// #standalone-ask-with-status
public class CookieFabric extends AbstractBehavior<CookieFabric.Command> {
interface Command {}
public static class GiveMeCookies implements CookieFabric.Command {
public final int count;
public final ActorRef<StatusReply<CookieFabric.Cookies>> replyTo;
public GiveMeCookies(int count, ActorRef<StatusReply<CookieFabric.Cookies>> replyTo) {
this.count = count;
this.replyTo = replyTo;
}
}
public static class Cookies {
public final int count;
public Cookies(int count) {
this.count = count;
}
}
public static Behavior<CookieFabric.Command> create() {
return Behaviors.setup(CookieFabric::new);
}
private CookieFabric(ActorContext<CookieFabric.Command> context) {
super(context);
}
@Override
public Receive<CookieFabric.Command> createReceive() {
return newReceiveBuilder()
.onMessage(CookieFabric.GiveMeCookies.class, this::onGiveMeCookies)
.build();
}
private Behavior<CookieFabric.Command> onGiveMeCookies(CookieFabric.GiveMeCookies request) {
if (request.count >= 5) request.replyTo.tell(StatusReply.error("Too many cookies."));
else request.replyTo.tell(StatusReply.success(new CookieFabric.Cookies(request.count)));
return this;
}
}
// #standalone-ask-with-status
static class NotShown {
// #standalone-ask-with-status
public void askAndPrint(
ActorSystem<Void> system, ActorRef<CookieFabric.Command> cookieFabric) {
CompletionStage<CookieFabric.Cookies> result =
AskPattern.askWithStatus(
cookieFabric,
replyTo -> new CookieFabric.GiveMeCookies(3, replyTo),
// asking someone requires a timeout and a scheduler, if the timeout hits without
// response the ask is failed with a TimeoutException
Duration.ofSeconds(3),
system.scheduler());
result.whenComplete(
(reply, failure) -> {
if (reply != null) System.out.println("Yay, " + reply.count + " cookies!");
else if (failure instanceof StatusReply.ErrorMessage)
System.out.println("No cookies for me. " + failure.getMessage());
else System.out.println("Boo! didn't get cookies in time. " + failure);
});
}
// #standalone-ask-with-status
public void askAndMapInvalid(
ActorSystem<Void> system, ActorRef<CookieFabric.Command> cookieFabric) {
// #standalone-ask-with-status-fail-future
CompletionStage<CookieFabric.Cookies> cookies =
AskPattern.askWithStatus(
cookieFabric,
replyTo -> new CookieFabric.GiveMeCookies(3, replyTo),
Duration.ofSeconds(3),
system.scheduler());
cookies.whenComplete(
(cookiesReply, failure) -> {
if (cookies != null) System.out.println("Yay, " + cookiesReply.count + " cookies!");
else System.out.println("Boo! didn't get cookies in time. " + failure);
});
// #standalone-ask-with-status-fail-future
}
}
}
@Test
public void askWithStatusExample() {
// no assert but should at least throw if completely broken
ActorRef<StandaloneAskSample.CookieFabric.Command> cookieFabric =
testKit.spawn(StandaloneAskSample.CookieFabric.create());
StandaloneAskSample.NotShown notShown = new StandaloneAskSample.NotShown();
notShown.askAndPrint(testKit.system(), cookieFabric);
}
@Test
public void askInActorWithStatusExample() {
// no assert but should at least throw if completely broken
ActorRef<Samples.Hal.Command> hal = testKit.spawn(Samples.Hal.create());
ActorRef<Samples.Dave.Command> dave = testKit.spawn(Samples.Dave.create(hal));
}
}

View file

@ -341,7 +341,7 @@ public class InteractionPatternsTest extends JUnitSuite {
// #actor-ask
public class Hal extends AbstractBehavior<Hal.Command> {
public Behavior<Hal.Command> create() {
public static Behavior<Hal.Command> create() {
return Behaviors.setup(Hal::new);
}
@ -925,4 +925,20 @@ public class InteractionPatternsTest extends JUnitSuite {
"123",
probe.expectMessageClass(PipeToSelfSample.CustomerRepository.UpdateSuccess.class).id);
}
@Test
public void askWithStatusExample() {
// no assert but should at least throw if completely broken
ActorRef<StandaloneAskSample.CookieFabric.Command> cookieFabric =
testKit.spawn(StandaloneAskSample.CookieFabric.create());
StandaloneAskSample.NotShown notShown = new StandaloneAskSample.NotShown();
notShown.askAndPrint(testKit.system(), cookieFabric);
}
@Test
public void askInActorWithStatusExample() {
// no assert but should at least throw if completely broken
ActorRef<Samples.Hal.Command> hal = testKit.spawn(Samples.Hal.create());
ActorRef<Samples.Dave.Command> dave = testKit.spawn(Samples.Dave.create(hal));
}
}

View file

@ -9,9 +9,7 @@ import scala.concurrent.Future
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.Success
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -20,6 +18,8 @@ import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors._
import akka.pattern.StatusReply
import akka.testkit.TestException
import akka.util.Timeout
object AskSpec {
@ -182,4 +182,32 @@ class AskSpec extends ScalaTestWithActorTestKit("""
probe.expectTerminated(ref, probe.remainingOrDefault)
}
}
case class Request(replyTo: ActorRef[StatusReply[String]])
"askWithStatus pattern" must {
"unwrap nested response a successful response" in {
val probe = createTestProbe[Request]
val result: Future[String] = probe.ref.askWithStatus(Request(_))
probe.expectMessageType[Request].replyTo ! StatusReply.success("goodie")
result.futureValue should ===("goodie")
}
"fail future for a fail response with text" in {
val probe = createTestProbe[Request]
val result: Future[String] = probe.ref.askWithStatus(Request(_))
probe.expectMessageType[Request].replyTo ! StatusReply.error("boom")
val exception = result.failed.futureValue
exception should be(a[StatusReply.ErrorMessage])
exception.getMessage should ===("boom")
}
"fail future for a fail response with custom exception" in {
val probe = createTestProbe[Request]
val result: Future[String] = probe.ref.askWithStatus(Request(_))
probe.expectMessageType[Request].replyTo ! StatusReply.error(TestException("boom"))
val exception = result.failed.futureValue
exception should be(a[TestException])
exception.getMessage should ===("boom")
}
}
}

View file

@ -4,19 +4,20 @@
package akka.actor.typed.scaladsl
import akka.actor.testkit.typed.TestException
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.{ Failure, Success }
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.{ ActorRef, PostStop, Props }
import akka.pattern.StatusReply
object ActorContextAskSpec {
val config = ConfigFactory.parseString("""
@ -188,6 +189,76 @@ class ActorContextAskSpec
probe.receiveMessages(N).map(_.n) should ===(1 to N)
}
"unwrap successful StatusReply messages using askWithStatus" in {
case class Ping(ref: ActorRef[StatusReply[Pong.type]])
case object Pong
val probe = createTestProbe[Any]()
spawn(Behaviors.setup[Pong.type] { ctx =>
ctx.askWithStatus(probe.ref, Ping) {
case Success(Pong) => Pong
case Failure(ex) => throw ex
}
Behaviors.receiveMessage {
case Pong =>
probe.ref ! "got pong"
Behaviors.same
}
})
val replyTo = probe.expectMessageType[Ping].ref
replyTo ! StatusReply.Success(Pong)
probe.expectMessage("got pong")
}
"unwrap error message StatusReply messages using askWithStatus" in {
case class Ping(ref: ActorRef[StatusReply[Pong.type]])
case object Pong
val probe = createTestProbe[Any]()
spawn(Behaviors.setup[Throwable] { ctx =>
ctx.askWithStatus(probe.ref, Ping) {
case Failure(ex) => ex
case wat => throw new IllegalArgumentException(s"Unexpected response $wat")
}
Behaviors.receiveMessage {
case ex: Throwable =>
probe.ref ! s"got error: ${ex.getClass.getName}, ${ex.getMessage}"
Behaviors.same
}
})
val replyTo = probe.expectMessageType[Ping].ref
replyTo ! StatusReply.Error("boho")
probe.expectMessage("got error: akka.pattern.StatusReply$ErrorMessage, boho")
}
"unwrap error with custom exception StatusReply messages using askWithStatus" in {
case class Ping(ref: ActorRef[StatusReply[Pong.type]])
case object Pong
val probe = createTestProbe[Any]()
case class Message(any: Any)
spawn(Behaviors.setup[Throwable] { ctx =>
ctx.askWithStatus(probe.ref, Ping) {
case Failure(ex) => ex
case wat => throw new IllegalArgumentException(s"Unexpected response $wat")
}
Behaviors.receiveMessage {
case ex: Throwable =>
probe.ref ! s"got error: ${ex.getClass.getName}, ${ex.getMessage}"
Behaviors.same
}
})
val replyTo = probe.expectMessageType[Ping].ref
replyTo ! StatusReply.Error(TestException("boho"))
probe.expectMessage("got error: akka.actor.testkit.typed.TestException, boho")
}
}
}

View file

@ -20,6 +20,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.TimerScheduler
import akka.pattern.StatusReply
import org.scalatest.wordspec.AnyWordSpecLike
class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
@ -292,6 +293,61 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpec
monitor.expectMessageType[Hal.OpenThePodBayDoorsPlease]
}
"contain a sample for outside ask with status" in {
import akka.util.Timeout
// #actor-ask-with-status
object Hal {
sealed trait Command
case class OpenThePodBayDoorsPlease(replyTo: ActorRef[StatusReply[String]]) extends Command
def apply(): Behaviors.Receive[Hal.Command] =
Behaviors.receiveMessage[Command] {
case OpenThePodBayDoorsPlease(replyTo) =>
// reply with a validation error description
replyTo ! StatusReply.Error("I'm sorry, Dave. I'm afraid I can't do that.")
Behaviors.same
}
}
object Dave {
sealed trait Command
// this is a part of the protocol that is internal to the actor itself
private case class AdaptedResponse(message: String) extends Command
def apply(hal: ActorRef[Hal.Command]): Behavior[Dave.Command] =
Behaviors.setup[Command] { context =>
// asking someone requires a timeout, if the timeout hits without response
// the ask is failed with a TimeoutException
implicit val timeout: Timeout = 3.seconds
// A StatusReply.Success(m) ends up as a Success(m) here, while a
// StatusReply.Error(text) becomes a Failure(ErrorMessage(text))
context.askWithStatus(hal, Hal.OpenThePodBayDoorsPlease) {
case Success(message) => AdaptedResponse(message)
case Failure(StatusReply.ErrorMessage(text)) => AdaptedResponse(s"Request denied: $text")
case Failure(_) => AdaptedResponse("Request failed")
}
Behaviors.receiveMessage {
// the adapted message ends up being processed like any other
// message sent to the actor
case AdaptedResponse(message) =>
context.log.info("Got response from hal: {}", message)
Behaviors.same
}
}
}
// #actor-ask-with-status
// somewhat modified behavior to let us know we saw the two requests
val monitor = createTestProbe[Hal.Command]()
val hal = spawn(Behaviors.monitor(monitor.ref, Hal()))
spawn(Dave(hal))
monitor.expectMessageType[Hal.OpenThePodBayDoorsPlease]
}
"contain a sample for per session child" in {
// #per-session-child
// dummy data types just for this sample
@ -461,6 +517,69 @@ class InteractionPatternsSpec extends ScalaTestWithActorTestKit with AnyWordSpec
cookies.futureValue shouldEqual CookieFabric.Cookies(3)
}
"contain a sample for ask with status from outside the actor system" in {
// #standalone-ask-with-status
object CookieFabric {
sealed trait Command {}
case class GiveMeCookies(count: Int, replyTo: ActorRef[StatusReply[Cookies]]) extends Command
case class Cookies(count: Int)
def apply(): Behaviors.Receive[CookieFabric.GiveMeCookies] =
Behaviors.receiveMessage { message =>
if (message.count >= 5)
message.replyTo ! StatusReply.Error("Too many cookies.")
else
message.replyTo ! StatusReply.Success(Cookies(message.count))
Behaviors.same
}
}
// #standalone-ask-with-status
// keep this out of the sample as it uses the testkit spawn
val cookieFabric = spawn(CookieFabric())
val theSystem = testKit.system
// #standalone-ask-with-status
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
// asking someone requires a timeout if the timeout hits without response
// the ask is failed with a TimeoutException
implicit val timeout: Timeout = 3.seconds
// implicit ActorSystem in scope
implicit val system: ActorSystem[_] = theSystem
val result: Future[CookieFabric.Cookies] = cookieFabric.askWithStatus(ref => CookieFabric.GiveMeCookies(3, ref))
// the response callback will be executed on this execution context
implicit val ec = system.executionContext
result.onComplete {
case Success(CookieFabric.Cookies(count)) => println(s"Yay, $count cookies!")
case Failure(StatusReply.ErrorMessage(reason)) => println(s"No cookies for me. $reason")
case Failure(ex) => println(s"Boo! didn't get cookies: ${ex.getMessage}")
}
// #standalone-ask-with-status
result.futureValue shouldEqual CookieFabric.Cookies(3)
// #standalone-ask-with-status-fail-future
val cookies: Future[CookieFabric.Cookies] =
cookieFabric.askWithStatus[CookieFabric.Cookies](ref => CookieFabric.GiveMeCookies(3, ref)).flatMap {
case c: CookieFabric.Cookies => Future.successful(c)
}
cookies.onComplete {
case Success(CookieFabric.Cookies(count)) => println(s"Yay, $count cookies!")
case Failure(ex) => println(s"Boo! didn't get cookies: ${ex.getMessage}")
}
// #standalone-ask-with-status-fail-future
cookies.futureValue shouldEqual CookieFabric.Cookies(3)
}
"contain a sample for pipeToSelf" in {
//#pipeToSelf

View file

@ -0,0 +1,3 @@
# actor ask with status #29190 - safe because ActorContext is not for user extension
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.ActorContext.askWithStatus")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.ActorContext.askWithStatus")

View file

@ -13,20 +13,22 @@ import java.util.concurrent.CompletionStage
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.reflect.ClassTag
import scala.util.Try
import com.github.ghik.silencer.silent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import akka.actor.Address
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.pattern.StatusReply
import akka.util.{ BoxedType, Timeout }
import akka.util.JavaDurationConverters._
import akka.util.OptionVal
import akka.util.Timeout
import scala.util.Failure
import scala.util.Success
/**
* INTERNAL API
*/
@ -206,6 +208,14 @@ import akka.util.Timeout
pipeToSelf((target.ask(createRequest))(responseTimeout, system.scheduler))(mapResponse)
}
override def askWithStatus[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[StatusReply[Res]] => Req)(
mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit =
ask(target, createRequest) {
case Success(StatusReply.Success(t: Res)) => mapResponse(Success(t))
case Success(StatusReply.Error(why)) => mapResponse(Failure(why))
case fail: Failure[_] => mapResponse(fail.asInstanceOf[Failure[Res]])
}
// Java API impl
@silent("never used") // resClass is just a pretend param
override def ask[Req, Res](
@ -218,6 +228,26 @@ import akka.util.Timeout
pipeToSelf(AskPattern.ask(target, (ref) => createRequest(ref), responseTimeout, system.scheduler), applyToResponse)
}
override def askWithStatus[Req, Res](
resClass: Class[Res],
target: RecipientRef[Req],
responseTimeout: Duration,
createRequest: akka.japi.function.Function[ActorRef[StatusReply[Res]], Req],
applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit = {
implicit val classTag: ClassTag[Res] = ClassTag(resClass)
ask[Req, StatusReply[Res]](
classOf[StatusReply[Res]],
target,
responseTimeout,
createRequest,
(ok: StatusReply[Res], failure: Throwable) =>
ok match {
case StatusReply.Success(value: Res) => applyToResponse(value, null)
case StatusReply.Error(why) => applyToResponse(null.asInstanceOf[Res], why)
case null => applyToResponse(null.asInstanceOf[Res], failure)
})
}
// Scala API impl
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = {
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic)

View file

@ -9,12 +9,11 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.ExecutionContextExecutor
import org.slf4j.Logger
import akka.actor.ClassicActorContextProvider
import akka.actor.typed._
import akka.annotation.DoNotInherit
import akka.pattern.StatusReply
/**
* An Actor is given by the combination of a [[Behavior]] and a context in
@ -299,6 +298,19 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
createRequest: akka.japi.function.Function[ActorRef[Res], Req],
applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
* If the response is a [[akka.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response.
* If the status response is a [[akka.pattern.StatusReply#error]] the returned future will be failed with the
* exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]).
*/
def askWithStatus[Req, Res](
resClass: Class[Res],
target: RecipientRef[Req],
responseTimeout: Duration,
createRequest: akka.japi.function.Function[ActorRef[StatusReply[Res]], Req],
applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit
/**
* Sends the result of the given `CompletionStage` to this Actor (`self`), after adapted it with
* the given function.

View file

@ -9,10 +9,10 @@ import java.time.Duration
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import akka.actor.typed.Scheduler
import akka.actor.typed.scaladsl.AskPattern._
import akka.japi.function.{ Function => JFunction }
import akka.pattern.StatusReply
import akka.util.JavaDurationConverters._
/**
@ -41,4 +41,18 @@ object AskPattern {
timeout: Duration,
scheduler: Scheduler): CompletionStage[Res] =
(actor.ask(messageFactory.apply)(timeout.asScala, scheduler)).toJava
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
* If the response is a [[akka.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response.
* If the status response is a [[akka.pattern.StatusReply#error]] the returned future will be failed with the
* exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]).
*/
def askWithStatus[Req, Res](
actor: RecipientRef[Req],
messageFactory: JFunction[ActorRef[StatusReply[Res]], Req],
timeout: Duration,
scheduler: Scheduler): CompletionStage[Res] =
(actor.askWithStatus(messageFactory.apply)(timeout.asScala, scheduler).toJava)
}

View file

@ -8,13 +8,12 @@ import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import scala.util.Try
import org.slf4j.Logger
import akka.actor.ClassicActorContextProvider
import akka.actor.typed._
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.pattern.StatusReply
import akka.util.Timeout
/**
@ -298,6 +297,15 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)(
mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
* If the response is a [[akka.pattern.StatusReply.Success]] the returned future is completed successfully with the wrapped response.
* If the status response is a [[akka.pattern.StatusReply.Error]] the returned future will be failed with the
* exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]).
*/
def askWithStatus[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[StatusReply[Res]] => Req)(
mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit
/**
* Sends the result of the given `Future` to this Actor (`self`), after adapted it with
* the given function.

View file

@ -16,6 +16,7 @@ import akka.actor.typed.internal.{ adapter => adapt }
import akka.actor.typed.internal.InternalRecipientRef
import akka.annotation.InternalStableApi
import akka.pattern.PromiseActorRef
import akka.pattern.StatusReply
import akka.util.{ unused, Timeout }
/**
@ -114,6 +115,17 @@ object AskPattern {
"native system is implemented: " + a.getClass)
}
}
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
* If the response is a [[akka.pattern.StatusReply.Success]] the returned future is completed successfully with the wrapped response.
* If the status response is a [[akka.pattern.StatusReply.Error]] the returned future will be failed with the
* exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]).
*/
def askWithStatus[Res](
replyTo: ActorRef[StatusReply[Res]] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] =
StatusReply.flattenStatusFuture(ask(replyTo))
}
private val onTimeout: String => Throwable = msg => new TimeoutException(msg)

View file

@ -293,6 +293,8 @@ final case class UnhandledMessage(
with AllDeadLetters
/**
* Superseeded by [[akka.pattern.StatusReply]], prefer that when possible.
*
* Classes for passing status back to the sender.
* Used for internal ACKing protocol. But exposed as utility class for user-specific ACKing protocols as well.
*/

View file

@ -86,6 +86,22 @@ trait AskSupport {
def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] =
actorRef.internalAsk(message, timeout, sender)
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply.Success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply.Error]] arrives the future is instead
* failed.
*/
def askWithStatus(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] =
actorRef.internalAskWithStatus(message)(timeout, Actor.noSender)
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply.Success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply.Error]] arrives the future is instead
* failed.
*/
def askWithStatus(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] =
actorRef.internalAskWithStatus(message)(timeout, sender)
/**
* Import this implicit conversion to gain `?` and `ask` methods on
* [[akka.actor.ActorSelection]], which will defer to the
@ -320,6 +336,17 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
internalAsk(message, timeout, sender)
def askWithStatus(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
internalAskWithStatus(message)
/**
* INTERNAL API
*/
@InternalApi
private[pattern] def internalAskWithStatus(
message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
StatusReply.flattenStatusFuture[Any](internalAsk(message, timeout, sender).mapTo[StatusReply[Any]])
/**
* INTERNAL API: for binary compatibility
*/

View file

@ -24,6 +24,7 @@ object Patterns {
import akka.pattern.{
after => scalaAfter,
ask => scalaAsk,
askWithStatus => scalaAskWithStatus,
gracefulStop => scalaGracefulStop,
pipe => scalaPipe,
retry => scalaRetry
@ -94,6 +95,14 @@ object Patterns {
def ask(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
scalaAsk(actor, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]]
/**
* Use for messages whose response is known to be a [[akka.pattern.StatusReply]]. When a [[akka.pattern.StatusReply#success]] response
* arrives the future is completed with the wrapped value, if a [[akka.pattern.StatusReply#error]] arrives the future is instead
* failed.
*/
def askWithStatus(actor: ActorRef, message: Any, timeout: java.time.Duration): CompletionStage[AnyRef] =
scalaAskWithStatus(actor, message)(timeout.asScala).toJava.asInstanceOf[CompletionStage[AnyRef]]
/**
* A variation of ask which allows to implement "replyTo" pattern by including
* sender reference in message.

View file

@ -0,0 +1,167 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.pattern
import akka.Done
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NoStackTrace
import scala.util.{ Failure => ScalaFailure }
import scala.util.{ Success => ScalaSuccess }
/**
* Generic top-level message type for replies that signal failure or success. Convenient to use together with the
* `askWithStatus` ask variants.
*
* Create using the factory methods [[StatusReply#success]] and [[StatusReply#error]].
*
* Akka contains predefined serializers for the wrapper type and the textual error messages.
*
* @tparam T the type of value a successful reply would have
*/
final class StatusReply[+T] private (private val status: Try[T]) {
/**
* Java API: in the case of a successful reply returns the value, if the reply was not successful the exception
* the failure was created with is thrown
*/
def getValue: T = status.get
/**
* Java API: returns the exception if the reply is a failure, or throws an exception if not.
*/
def getError: Throwable = status match {
case ScalaFailure(ex) => ex
case _ => throw new IllegalArgumentException("Expected reply to be a failure, but was a success")
}
def isError: Boolean = status.isFailure
def isSuccess: Boolean = status.isSuccess
override def equals(other: Any): Boolean = other match {
case that: StatusReply[_] => status == that.status
case _ => false
}
override def hashCode(): Int = status.hashCode
override def toString: String = status match {
case ScalaSuccess(t) => s"Success($t)"
case ScalaFailure(ex) => s"Error(${ex.getMessage})"
}
}
object StatusReply {
/**
* Scala API: A general purpose message for using as an Ack
*/
val Ack: StatusReply[Done] = success(Done)
/**
* Java API: A general purpose message for using as an Ack
*/
def ack(): StatusReply[Done] = Ack
/**
* Java API: Create a successful reply containing `value`
*/
def success[T](value: T): StatusReply[T] = new StatusReply(ScalaSuccess(value))
/**
* Java API: Create an status response with a error message describing why the request was failed or denied.
*/
def error[T](errorMessage: String): StatusReply[T] = Error(errorMessage)
/**
* Java API: Create an error response with a user defined [[Throwable]].
*
* Prefer the string based error response over this one when possible to avoid tightly coupled logic across
* actors and passing internal failure details on to callers that can not do much to handle them.
*
* For cases where types are needed to identify errors and behave differently enumerating them with a specific
* set of response messages may be a better alternative to encoding them as generic exceptions.
*
* Also note that Akka does not contain pre-build serializers for arbitrary exceptions.
*/
def error[T](exception: Throwable): StatusReply[T] = Error(exception)
/**
* Carrier exception used for textual error descriptions.
*
* Not meant for usage outside of [[StatusReply]].
*/
final case class ErrorMessage(private val errorMessage: String)
extends RuntimeException(errorMessage)
with NoStackTrace {
override def toString: String = errorMessage
}
/**
* Scala API for creation and pattern matching a successful response.
*
* For example:
* ```
* case StatusReply.Success(value: String) => ...
* ```
*/
object Success {
/**
* Scala API: Create a successful reply containing `value`
*/
def apply[T](value: T): StatusReply[T] = new StatusReply(ScalaSuccess(value))
def unapply(status: StatusReply[Any]): Option[Any] =
if (status.isSuccess) Some(status.getValue)
else None
}
/**
* Scala API for creating and pattern matching an error response
*
* For example:
* ```
* case StatusReply.Error(exception) => ...
* ```
*/
object Error {
/**
* Scala API: Create an status response with a error message describing why the request was failed or denied.
*/
def apply[T](errorMessage: String): StatusReply[T] = error(new ErrorMessage(errorMessage))
/**
* Scala API: Create an error response with a user defined [[Throwable]].
*
* Prefer the string based error response over this one when possible to avoid tightly coupled logic across
* actors and passing internal failure details on to callers that can not do much to handle them.
*
* For cases where types are needed to identify errors and behave differently enumerating them with a specific
* set of response messages may be a better alternative to encoding them as generic exceptions.
*
* Also note that Akka does not contain pre-build serializers for arbitrary exceptions.
*/
def apply[T](exception: Throwable): StatusReply[T] = new StatusReply(ScalaFailure(exception))
def unapply(status: StatusReply[_]): Option[Throwable] =
if (status.isError) Some(status.getError)
else None
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] def flattenStatusFuture[T](f: Future[StatusReply[T]]): Future[T] =
f.transform {
case ScalaSuccess(StatusReply.Success(v)) => ScalaSuccess(v.asInstanceOf[T])
case ScalaSuccess(StatusReply.Error(ex)) => ScalaFailure[T](ex)
case fail @ ScalaFailure(_) => fail.asInstanceOf[Try[T]]
}(ExecutionContexts.parasitic)
}

View file

@ -0,0 +1,3 @@
# ask with status #29133, safe since EntityRef not user extendable
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.javadsl.EntityRef.askWithStatus")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.typed.scaladsl.EntityRef.askWithStatus")

View file

@ -39,6 +39,7 @@ import akka.event.LoggingAdapter
import akka.japi.function.{ Function => JFunction }
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.pattern.StatusReply
import akka.util.{ unused, ByteString, Timeout }
import akka.util.JavaDurationConverters._
@ -314,9 +315,15 @@ import akka.util.JavaDurationConverters._
replyTo.ask(shardRegion, entityId, m, timeout)
}
def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] =
override def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] =
ask[U](replyTo => message.apply(replyTo))(timeout.asScala).toJava
override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M)(implicit timeout: Timeout): Future[Res] =
StatusReply.flattenStatusFuture(ask[StatusReply[Res]](f))
override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res] =
askWithStatus(f.apply)(timeout.asScala).toJava
/** Similar to [[akka.actor.typed.scaladsl.AskPattern.PromiseRef]] but for an `EntityRef` target. */
@InternalApi
private final class EntityPromiseRef[U](classic: InternalActorRef, timeout: Timeout, refPathPrefix: String) {

View file

@ -9,7 +9,6 @@ import java.util.concurrent.CompletionStage
import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import akka.actor.ActorRefProvider
import akka.actor.typed.ActorRef
import akka.actor.typed.Scheduler
@ -18,6 +17,7 @@ import akka.annotation.InternalApi
import akka.cluster.sharding.typed.javadsl
import akka.cluster.sharding.typed.scaladsl
import akka.japi.function.{ Function => JFunction }
import akka.pattern.StatusReply
import akka.util.JavaDurationConverters._
import akka.util.Timeout
@ -42,6 +42,12 @@ import akka.util.Timeout
def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] =
ask[U](replyTo => message.apply(replyTo))(timeout.asScala).toJava
override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res] =
askWithStatus(f)(timeout.asScala).toJava
override def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M)(implicit timeout: Timeout): Future[Res] =
StatusReply.flattenStatusFuture(ask(f))
// impl InternalRecipientRef
override def provider: ActorRefProvider = {
probe.asInstanceOf[InternalRecipientRef[_]].provider

View file

@ -10,7 +10,6 @@ import java.util.Optional
import java.util.concurrent.CompletionStage
import com.github.ghik.silencer.silent
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
@ -22,6 +21,7 @@ import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.japi.function.{ Function => JFunction }
import akka.pattern.StatusReply
@FunctionalInterface
trait EntityFactory[M] {
@ -439,6 +439,14 @@ object EntityTypeKey {
*/
def ask[Res](message: JFunction[ActorRef[Res], M], timeout: Duration): CompletionStage[Res]
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
* If the response is a [[akka.pattern.StatusReply#success]] the returned future is completed successfully with the wrapped response.
* If the status response is a [[akka.pattern.StatusReply#error]] the returned future will be failed with the
* exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]).
*/
def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M, timeout: Duration): CompletionStage[Res]
/**
* INTERNAL API
*/

View file

@ -7,7 +7,6 @@ package scaladsl
import scala.concurrent.Future
import scala.reflect.ClassTag
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
@ -24,6 +23,7 @@ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.{ StartEntity => ClassicStartEntity }
import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.pattern.StatusReply
import akka.util.Timeout
object ClusterSharding extends ExtensionId[ClusterSharding] {
@ -448,6 +448,14 @@ object EntityTypeKey {
*/
def ask[Res](f: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res]
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
* If the response is a [[akka.pattern.StatusReply.Success]] the returned future is completed successfully with the wrapped response.
* If the status response is a [[akka.pattern.StatusReply.Error]] the returned future will be failed with the
* exception in the error (normally a [[akka.pattern.StatusReply.ErrorMessage]]).
*/
def askWithStatus[Res](f: ActorRef[StatusReply[Res]] => M)(implicit timeout: Timeout): Future[Res]
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.actor.typed.scaladsl.AskPattern]] for a complete write-up of this pattern

View file

@ -4,6 +4,8 @@
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.pattern.StatusReply;
import org.scalatestplus.junit.JUnitSuite;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
@ -55,12 +57,9 @@ public class AccountExampleDocTest
@Test
public void createWithEmptyBalance() {
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
assertEquals(AccountEntity.Confirmed.INSTANCE, result.reply());
assertEquals(StatusReply.ack(), result.reply());
assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event());
assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance);
}
@ -70,28 +69,22 @@ public class AccountExampleDocTest
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result1 =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
assertEquals(AccountEntity.Confirmed.INSTANCE, result1.reply());
assertEquals(StatusReply.ack(), result1.reply());
assertEquals(
BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount);
assertEquals(
BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance);
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result2 =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo));
assertEquals(AccountEntity.Confirmed.INSTANCE, result2.reply());
assertEquals(StatusReply.ack(), result2.reply());
assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount);
assertEquals(
BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance);
@ -101,18 +94,15 @@ public class AccountExampleDocTest
public void rejectWithdrawOverdraft() {
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
eventSourcedTestKit.runCommand(
(ActorRef<AccountEntity.OperationResult> replyTo) ->
(ActorRef<StatusReply<Done>> replyTo) ->
new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account, StatusReply<Done>>
result =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo));
result.replyOfType(AccountEntity.Rejected.class);
assertTrue(result.reply().isError());
assertTrue(result.hasNoEvents());
}
@ -120,7 +110,7 @@ public class AccountExampleDocTest
public void handleGetBalance() {
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
eventSourcedTestKit.runCommand(
(ActorRef<AccountEntity.OperationResult> replyTo) ->
(ActorRef<StatusReply<Done>> replyTo) ->
new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
CommandResultWithReply<

View file

@ -4,15 +4,16 @@
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import akka.pattern.StatusReply;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@ -26,9 +27,11 @@ import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static akka.Done.done;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AccountExampleTest extends JUnitSuite {
@ -70,47 +73,47 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void handleDeposit() {
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "1");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
TestProbe<StatusReply<Done>> probe = testKit.createTestProbe();
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
ref.tell(new Deposit(BigDecimal.valueOf(10), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
}
@Test
public void handleWithdraw() {
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "2");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
TestProbe<StatusReply<Done>> probe = testKit.createTestProbe();
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
ref.tell(new Withdraw(BigDecimal.valueOf(10), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
}
@Test
public void rejectWithdrawOverdraft() {
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "3");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
TestProbe<StatusReply<Done>> probe = testKit.createTestProbe();
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
probe.expectMessage(StatusReply.ack());
ref.tell(new Withdraw(BigDecimal.valueOf(110), probe.getRef()));
probe.expectMessageClass(Rejected.class);
assertTrue(probe.receiveMessage().isError());
}
@Test
public void handleGetBalance() {
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "4");
TestProbe<OperationResult> opProbe = testKit.createTestProbe(OperationResult.class);
TestProbe<StatusReply<Done>> opProbe = testKit.createTestProbe();
ref.tell(new CreateAccount(opProbe.getRef()));
opProbe.expectMessage(Confirmed.INSTANCE);
opProbe.expectMessage(StatusReply.ack());
ref.tell(new Deposit(BigDecimal.valueOf(100), opProbe.getRef()));
opProbe.expectMessage(Confirmed.INSTANCE);
opProbe.expectMessage(StatusReply.ack());
TestProbe<CurrentBalance> getProbe = testKit.createTestProbe(CurrentBalance.class);
ref.tell(new GetBalance(getProbe.getRef()));
@ -122,27 +125,21 @@ public class AccountExampleTest extends JUnitSuite {
public void beUsableWithAsk() throws Exception {
Duration timeout = Duration.ofSeconds(3);
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "5");
CompletionStage<OperationResult> createResult = ref.ask(CreateAccount::new, timeout);
assertEquals(Confirmed.INSTANCE, createResult.toCompletableFuture().get(3, TimeUnit.SECONDS));
CompletionStage<Done> createResult = ref.askWithStatus(CreateAccount::new, timeout);
assertEquals(done(), createResult.toCompletableFuture().get(3, TimeUnit.SECONDS));
// above works because then the response type is inferred by the lhs type
// below requires (ActorRef<OperationResult> replyTo)
// below requires explicit typing
assertEquals(
Confirmed.INSTANCE,
ref.ask(
(ActorRef<OperationResult> replyTo) ->
new Deposit(BigDecimal.valueOf(100), replyTo),
timeout)
done(),
ref.<Done>askWithStatus(replyTo -> new Deposit(BigDecimal.valueOf(100), replyTo), timeout)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
Confirmed.INSTANCE,
ref.ask(
(ActorRef<OperationResult> replyTo) ->
new Withdraw(BigDecimal.valueOf(10), replyTo),
timeout)
done(),
ref.<Done>askWithStatus(replyTo -> new Withdraw(BigDecimal.valueOf(10), replyTo), timeout)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
@ -156,7 +153,7 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void verifySerialization() {
TestProbe<OperationResult> opProbe = testKit.createTestProbe();
TestProbe<StatusReply<Done>> opProbe = testKit.createTestProbe();
testKit.serializationTestKit().verifySerialization(new CreateAccount(opProbe.getRef()), false);
Deposit deposit2 =
testKit
@ -169,9 +166,6 @@ public class AccountExampleTest extends JUnitSuite {
.verifySerialization(new Withdraw(BigDecimal.valueOf(90), opProbe.getRef()), false);
testKit.serializationTestKit().verifySerialization(new CloseAccount(opProbe.getRef()), false);
testKit.serializationTestKit().verifySerialization(Confirmed.INSTANCE, false);
testKit.serializationTestKit().verifySerialization(new Rejected("overdraft"), false);
TestProbe<CurrentBalance> getProbe = testKit.createTestProbe();
testKit.serializationTestKit().verifySerialization(new GetBalance(getProbe.getRef()), false);

View file

@ -4,8 +4,10 @@
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.pattern.StatusReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
@ -42,19 +44,19 @@ public interface AccountExampleWithEventHandlersInState {
// #reply-command
public static class CreateAccount implements Command {
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CreateAccount(ActorRef<OperationResult> replyTo) {
public CreateAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
public static class Deposit implements Command {
public final BigDecimal amount;
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) {
public Deposit(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
this.amount = amount;
}
@ -62,9 +64,9 @@ public interface AccountExampleWithEventHandlersInState {
public static class Withdraw implements Command {
public final BigDecimal amount;
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) {
public Withdraw(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.amount = amount;
this.replyTo = replyTo;
}
@ -80,35 +82,16 @@ public interface AccountExampleWithEventHandlersInState {
}
public static class CloseAccount implements Command {
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CloseAccount(ActorRef<OperationResult> replyTo) {
public CloseAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
// Reply
// #reply-command
interface CommandReply extends CborSerializable {}
interface OperationResult extends CommandReply {}
enum Confirmed implements OperationResult {
INSTANCE
}
public static class Rejected implements OperationResult {
public final String reason;
@JsonCreator
public Rejected(String reason) {
this.reason = reason;
}
}
// #reply-command
public static class CurrentBalance implements CommandReply {
public static class CurrentBalance implements CborSerializable {
public final BigDecimal balance;
@JsonCreator
@ -222,24 +205,26 @@ public interface AccountExampleWithEventHandlersInState {
private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Event, Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(new Deposited(command.amount))
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
// #reply
private ReplyEffect<Event, Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount));
.reply(
command.replyTo,
StatusReply.error("not enough funds to withdraw " + command.amount));
} else {
return Effect()
.persist(new Withdrawn(command.amount))
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
}
// #reply
@ -252,10 +237,10 @@ public interface AccountExampleWithEventHandlersInState {
if (account.balance.equals(BigDecimal.ZERO)) {
return Effect()
.persist(new AccountClosed())
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
} else {
return Effect()
.reply(command.replyTo, new Rejected("balance must be zero for closing account"));
.reply(command.replyTo, StatusReply.error("balance must be zero for closing account"));
}
}

View file

@ -4,8 +4,10 @@
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.pattern.StatusReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
@ -38,19 +40,19 @@ public interface AccountExampleWithMutableState {
interface Command extends CborSerializable {}
public static class CreateAccount implements Command {
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CreateAccount(ActorRef<OperationResult> replyTo) {
public CreateAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
public static class Deposit implements Command {
public final BigDecimal amount;
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) {
public Deposit(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
this.amount = amount;
}
@ -58,9 +60,9 @@ public interface AccountExampleWithMutableState {
public static class Withdraw implements Command {
public final BigDecimal amount;
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) {
public Withdraw(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.amount = amount;
this.replyTo = replyTo;
}
@ -76,33 +78,16 @@ public interface AccountExampleWithMutableState {
}
public static class CloseAccount implements Command {
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CloseAccount(ActorRef<OperationResult> replyTo) {
public CloseAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
// Reply
interface CommandReply extends CborSerializable {}
interface OperationResult extends CommandReply {}
enum Confirmed implements OperationResult {
INSTANCE
}
public static class Rejected implements OperationResult {
public final String reason;
@JsonCreator
public Rejected(String reason) {
this.reason = reason;
}
}
public static class CurrentBalance implements CommandReply {
public static class CurrentBalance implements CborSerializable {
public final BigDecimal balance;
@JsonCreator
@ -215,23 +200,25 @@ public interface AccountExampleWithMutableState {
private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Event, Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(new Deposited(command.amount))
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Event, Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount));
.reply(
command.replyTo,
StatusReply.error("not enough funds to withdraw " + command.amount));
} else {
return Effect()
.persist(new Withdrawn(command.amount))
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
}
@ -243,10 +230,10 @@ public interface AccountExampleWithMutableState {
if (account.getBalance().equals(BigDecimal.ZERO)) {
return Effect()
.persist(new AccountClosed())
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
} else {
return Effect()
.reply(command.replyTo, new Rejected("balance must be zero for closing account"));
.reply(command.replyTo, StatusReply.error("balance must be zero for closing account"));
}
}

View file

@ -4,8 +4,10 @@
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.pattern.StatusReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
@ -38,19 +40,19 @@ public interface AccountExampleWithNullState {
interface Command extends CborSerializable {}
public static class CreateAccount implements Command {
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CreateAccount(ActorRef<OperationResult> replyTo) {
public CreateAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
public static class Deposit implements Command {
public final BigDecimal amount;
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
public Deposit(BigDecimal amount, ActorRef<OperationResult> replyTo) {
public Deposit(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
this.amount = amount;
}
@ -58,9 +60,9 @@ public interface AccountExampleWithNullState {
public static class Withdraw implements Command {
public final BigDecimal amount;
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
public Withdraw(BigDecimal amount, ActorRef<OperationResult> replyTo) {
public Withdraw(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.amount = amount;
this.replyTo = replyTo;
}
@ -76,33 +78,16 @@ public interface AccountExampleWithNullState {
}
public static class CloseAccount implements Command {
public final ActorRef<OperationResult> replyTo;
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CloseAccount(ActorRef<OperationResult> replyTo) {
public CloseAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
// Reply
interface CommandReply extends CborSerializable {}
interface OperationResult extends CommandReply {}
enum Confirmed implements OperationResult {
INSTANCE
}
public static class Rejected implements OperationResult {
public final String reason;
@JsonCreator
public Rejected(String reason) {
this.reason = reason;
}
}
public static class CurrentBalance implements CommandReply {
public static class CurrentBalance implements CborSerializable {
public final BigDecimal balance;
@JsonCreator
@ -214,23 +199,25 @@ public interface AccountExampleWithNullState {
private ReplyEffect<Event, Account> createAccount(CreateAccount command) {
return Effect()
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Event, Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(new Deposited(command.amount))
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Event, Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount));
.reply(
command.replyTo,
StatusReply.error("not enough funds to withdraw " + command.amount));
} else {
return Effect()
.persist(new Withdrawn(command.amount))
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
}
@ -242,10 +229,10 @@ public interface AccountExampleWithNullState {
if (account.balance.equals(BigDecimal.ZERO)) {
return Effect()
.persist(new AccountClosed())
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
.thenReply(command.replyTo, account2 -> StatusReply.ack());
} else {
return Effect()
.reply(command.replyTo, new Rejected("balance must be zero for closing account"));
.reply(command.replyTo, StatusReply.error("balance must be zero for closing account"));
}
}

View file

@ -5,10 +5,12 @@
package docs.akka.cluster.sharding.typed
//#test
import akka.Done
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import akka.persistence.typed.PersistenceId
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.pattern.StatusReply
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike
@ -38,38 +40,38 @@ class AccountExampleDocSpec
"Account" must {
"be created with zero balance" in {
val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
result.reply shouldBe AccountEntity.Confirmed
val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
result.reply shouldBe StatusReply.Ack
result.event shouldBe AccountEntity.AccountCreated
result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0
}
"handle Withdraw" in {
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
result1.reply shouldBe AccountEntity.Confirmed
val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
result1.reply shouldBe StatusReply.Ack
result1.event shouldBe AccountEntity.Deposited(100)
result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100
val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _))
result2.reply shouldBe AccountEntity.Confirmed
val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _))
result2.reply shouldBe StatusReply.Ack
result2.event shouldBe AccountEntity.Withdrawn(10)
result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90
}
"reject Withdraw overdraft" in {
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _))
result.replyOfType[AccountEntity.Rejected]
val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _))
result.reply.isError shouldBe true
result.hasNoEvents shouldBe true
}
"handle GetBalance" in {
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_))
result.reply.balance shouldBe 100

View file

@ -4,6 +4,8 @@
package docs.akka.cluster.sharding.typed
import akka.Done
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -12,6 +14,7 @@ import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
@ -51,42 +54,40 @@ class AccountExampleSpec
"Account example" must {
"handle Deposit" in {
val probe = createTestProbe[OperationResult]()
val probe = createTestProbe[StatusReply[Done]]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "1")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
probe.expectMessage(StatusReply.Ack)
ref ! Deposit(100, probe.ref)
probe.expectMessage(Confirmed)
probe.expectMessage(StatusReply.Ack)
ref ! Deposit(10, probe.ref)
probe.expectMessage(Confirmed)
probe.expectMessage(StatusReply.Ack)
}
"handle Withdraw" in {
// OperationResult is the expected reply type for these commands, but it should also be
// possible to use the super type AccountCommandReply
val probe = createTestProbe[CommandReply]()
val doneProbe = createTestProbe[StatusReply[Done]]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "2")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
ref ! Deposit(100, probe.ref)
probe.expectMessage(Confirmed)
ref ! Withdraw(10, probe.ref)
probe.expectMessage(Confirmed)
ref ! CreateAccount(doneProbe.ref)
doneProbe.expectMessage(StatusReply.Ack)
ref ! Deposit(100, doneProbe.ref)
doneProbe.expectMessage(StatusReply.Ack)
ref ! Withdraw(10, doneProbe.ref)
doneProbe.expectMessage(StatusReply.Ack)
// The same probe can be used with other commands too:
ref ! GetBalance(probe.ref)
probe.expectMessage(CurrentBalance(90))
val balanceProbe = createTestProbe[CurrentBalance]()
ref ! GetBalance(balanceProbe.ref)
balanceProbe.expectMessage(CurrentBalance(90))
}
"reject Withdraw overdraft" in {
val probe = createTestProbe[OperationResult]()
val probe = createTestProbe[StatusReply[Done]]()
val ref = ClusterSharding(system).entityRefFor[Command](AccountEntity.TypeKey, "3")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
probe.expectMessage(StatusReply.Ack)
ref ! Deposit(100, probe.ref)
probe.expectMessage(Confirmed)
probe.expectMessage(StatusReply.Ack)
ref ! Withdraw(110, probe.ref)
probe.expectMessageType[Rejected]
probe.expectMessageType[StatusReply[Done]].isError should ===(true)
// Account.Command is the command type, but it should also be possible to narrow it
// ... thus restricting the entity ref from being sent other commands, e.g.:
@ -97,12 +98,12 @@ class AccountExampleSpec
}
"handle GetBalance" in {
val opProbe = createTestProbe[OperationResult]()
val opProbe = createTestProbe[StatusReply[Done]]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "4")
ref ! CreateAccount(opProbe.ref)
opProbe.expectMessage(Confirmed)
opProbe.expectMessage(StatusReply.Ack)
ref ! Deposit(100, opProbe.ref)
opProbe.expectMessage(Confirmed)
opProbe.expectMessage(StatusReply.Ack)
val getProbe = createTestProbe[CurrentBalance]()
ref ! GetBalance(getProbe.ref)
@ -111,27 +112,24 @@ class AccountExampleSpec
"be usable with ask" in {
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "5")
val createResult: Future[OperationResult] = ref.ask(CreateAccount(_))
createResult.futureValue should ===(Confirmed)
val createResult: Future[Done] = ref.askWithStatus(CreateAccount(_))
createResult.futureValue should ===(Done)
implicit val ec: ExecutionContext = testKit.system.executionContext
// Errors are shown in IntelliJ Scala plugin 2019.1.6, but compiles with Scala 2.12.8.
// Ok in IntelliJ if using ref.ask[OperationResult].
ref.ask(Deposit(100, _)).futureValue should ===(Confirmed)
ref.ask(Withdraw(10, _)).futureValue should ===(Confirmed)
ref.askWithStatus(Deposit(100, _)).futureValue should ===(Done)
ref.askWithStatus(Withdraw(10, _)).futureValue should ===(Done)
ref.ask(GetBalance(_)).map(_.balance).futureValue should ===(90)
}
"verifySerialization" in {
val opProbe = createTestProbe[OperationResult]()
val opProbe = createTestProbe[StatusReply[Done]]()
serializationTestKit.verifySerialization(CreateAccount(opProbe.ref))
serializationTestKit.verifySerialization(Deposit(100, opProbe.ref))
serializationTestKit.verifySerialization(Withdraw(90, opProbe.ref))
serializationTestKit.verifySerialization(CloseAccount(opProbe.ref))
serializationTestKit.verifySerialization(Confirmed)
serializationTestKit.verifySerialization(Rejected("overdraft"))
val getProbe = createTestProbe[CurrentBalance]()
serializationTestKit.verifySerialization(GetBalance(getProbe.ref))

View file

@ -4,9 +4,11 @@
package docs.akka.cluster.sharding.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -25,18 +27,14 @@ object AccountExampleWithCommandHandlersInState {
object AccountEntity {
// Command
sealed trait Command extends CborSerializable
final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
final case class CurrentBalance(balance: BigDecimal)
// Event
sealed trait Event extends CborSerializable
@ -59,7 +57,7 @@ object AccountExampleWithCommandHandlersInState {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed)
Effect.persist(AccountCreated).thenReply(replyTo)(_ => StatusReply.Ack)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()
@ -77,25 +75,25 @@ object AccountExampleWithCommandHandlersInState {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case Deposit(amount, replyTo) =>
Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed)
Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => StatusReply.Ack)
case Withdraw(amount, replyTo) =>
if (canWithdraw(amount))
Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed)
Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed)
Effect.persist(AccountClosed).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance"))
Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(Rejected("Account is already created"))
Effect.reply(replyTo)(StatusReply.Error("Account is already created"))
}
@ -127,8 +125,8 @@ object AccountExampleWithCommandHandlersInState {
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect =
Effect.reply(replyTo)(Rejected(s"Account is closed"))
private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect =
Effect.reply(replyTo)(StatusReply.Error(s"Account is closed"))
override def applyEvent(event: Event): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")

View file

@ -4,9 +4,11 @@
package docs.akka.cluster.sharding.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -29,22 +31,16 @@ object AccountExampleWithEventHandlersInState {
//#reply-command
sealed trait Command extends CborSerializable
//#reply-command
final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
//#reply-command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
//#reply-command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
//#reply-command
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
//#reply-command
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
final case class CurrentBalance(balance: BigDecimal) extends CborSerializable
// Event
sealed trait Event extends CborSerializable
@ -111,11 +107,12 @@ object AccountExampleWithEventHandlersInState {
case acc @ OpenedAccount(_) =>
cmd match {
case c: Deposit => deposit(c)
case c: Withdraw => withdraw(acc, c)
case c: GetBalance => getBalance(acc, c)
case c: CloseAccount => closeAccount(acc, c)
case c: CreateAccount => Effect.reply(c.replyTo)(Rejected(s"Account $accountNumber is already created"))
case c: Deposit => deposit(c)
case c: Withdraw => withdraw(acc, c)
case c: GetBalance => getBalance(acc, c)
case c: CloseAccount => closeAccount(acc, c)
case c: CreateAccount =>
Effect.reply(c.replyTo)(StatusReply.Error(s"Account $accountNumber is already created"))
}
case ClosedAccount =>
@ -136,8 +133,8 @@ object AccountExampleWithEventHandlersInState {
private def replyClosed(
accountNumber: String,
replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect[Event, Account] = {
Effect.reply(replyTo)(Rejected(s"Account $accountNumber is closed"))
replyTo: ActorRef[StatusReply[Done]]): ReplyEffect[Event, Account] = {
Effect.reply(replyTo)(StatusReply.Error(s"Account $accountNumber is closed"))
}
private val eventHandler: (Account, Event) => Account = { (state, event) =>
@ -145,19 +142,20 @@ object AccountExampleWithEventHandlersInState {
}
private def createAccount(cmd: CreateAccount): ReplyEffect[Event, Account] = {
Effect.persist(AccountCreated).thenReply(cmd.replyTo)(_ => Confirmed)
Effect.persist(AccountCreated).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
}
private def deposit(cmd: Deposit): ReplyEffect[Event, Account] = {
Effect.persist(Deposited(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed)
Effect.persist(Deposited(cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
}
//#reply
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = {
if (acc.canWithdraw(cmd.amount))
Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed)
Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
else
Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
Effect.reply(cmd.replyTo)(
StatusReply.Error(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
}
//#reply
@ -167,9 +165,9 @@ object AccountExampleWithEventHandlersInState {
private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[Event, Account] = {
if (acc.balance == Zero)
Effect.persist(AccountClosed).thenReply(cmd.replyTo)(_ => Confirmed)
Effect.persist(AccountClosed).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
else
Effect.reply(cmd.replyTo)(Rejected("Can't close account with non-zero balance"))
Effect.reply(cmd.replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
}
}

View file

@ -4,9 +4,11 @@
package docs.akka.cluster.sharding.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -25,18 +27,14 @@ object AccountExampleWithOptionState {
object AccountEntity {
// Command
sealed trait Command extends CborSerializable
final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
final case class CurrentBalance(balance: BigDecimal) extends CborSerializable
// Event
sealed trait Event extends CborSerializable
@ -61,25 +59,25 @@ object AccountExampleWithOptionState {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case Deposit(amount, replyTo) =>
Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed)
Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => StatusReply.Ack)
case Withdraw(amount, replyTo) =>
if (canWithdraw(amount))
Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed)
Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed)
Effect.persist(AccountClosed).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance"))
Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(Rejected("Account is already created"))
Effect.reply(replyTo)(StatusReply.Error("Account is already created"))
}
@ -111,8 +109,8 @@ object AccountExampleWithOptionState {
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[AccountEntity.OperationResult]): ReplyEffect =
Effect.reply(replyTo)(Rejected(s"Account is closed"))
private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect =
Effect.reply(replyTo)(StatusReply.Error(s"Account is closed"))
override def applyEvent(event: Event): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
@ -141,7 +139,7 @@ object AccountExampleWithOptionState {
def onFirstCommand(cmd: Command): ReplyEffect = {
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed)
Effect.persist(AccountCreated).thenReply(replyTo)(_ => StatusReply.Ack)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()

View file

@ -84,4 +84,5 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
val createdTimestamp = if (re.hasCreatedTimestamp) re.getCreatedTimestamp else 0L
Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)(createdTimestamp)
}
}

View file

@ -160,7 +160,9 @@ The adapter function is running in the receiving actor and can safely access its
In an interaction where there is a 1:1 mapping between a request and a response we can use `ask` on the `ActorContext` to interact with another actor.
The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`] to put as recipient in the outgoing message. The second step is to transform the successful `Response` or failure into a message that is part of the protocol of the sending actor.
The interaction has two steps, first we need to construct the outgoing message, to do that we need an @scala[`ActorRef[Response]`]@java[`ActorRef<Response>`] to put as recipient in the outgoing message.
The second step is to transform the successful `Response` or failure into a message that is part of the protocol of the sending actor.
See also the [Generic response wrapper](#generic-response-wrapper) for replies that are either a success or an error.
**Example:**
@ -212,7 +214,7 @@ Java
Note that validation errors are also explicit in the message protocol. The `GiveMeCookies` request can reply
with `Cookies` or `InvalidRequest`. The requestor has to decide how to handle an `InvalidRequest` reply. Sometimes
it should be treated as a failed @scala[`Future`]@java[`Future`] and for that the reply can be mapped on the
requestor side.
requestor side. See also the [Generic response wrapper](#generic-response-wrapper) for replies that are either a success or an error.
Scala
: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask-fail-future }
@ -230,6 +232,50 @@ Java
* There can only be a single response to one `ask` (see @ref:[per session child Actor](#per-session-child-actor))
* When `ask` times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
## Generic response wrapper
In many cases the response can either be a successful result or an error (a validation error that the command was invalid for example).
Having to define two response classes and a shared supertype for every request type can be repetitive, especially in a cluster context
where you also have to make sure the messages can be serialized to be sent over the network.
To help with this a generic status-response type is included in Akka: @apidoc[StatusReply], everywhere where `ask` can be used
there is also a second method `askWithStatus` which, given that the response is a `StatusReply` will unwrap successful responses
and help with handling validation errors. Akka includes pre-built serializers for the type, so in the normal use case a clustered
application only needs to provide a serializer for the successful result.
For the case where the successful reply does not contain an actual value but is more of an acknowledgment there is a pre defined
@scala[`StatusReply.Ack`]@java[`StatusReply.ack()`] of type @scala[`StatusReply[Done]`]@java[`StatusReply<Done>`].
Errors are preferably sent as a text describing what is wrong, but using exceptions to attach a type is also possible.
**Example actor to actor ask:**
Scala
: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #actor-ask-with-status }
Java
: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java) { #actor-ask-with-status }
A validation error is turned into a `Failure` for the message adapter. In this case we are explicitly handling the valdation error separately from
other ask failures.
**Example ask from the outside:**
Scala
: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask-with-status }
Java
: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java) { #standalone-ask-with-status }
Note that validation errors are also explicit in the message protocol, but encoded as the wrapper type, constructed using @scala[`StatusReply.Error(text)`]@java[`StatusReply.error(text)`]:
Scala
: @@snip [InteractionPatternsSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/InteractionPatternsSpec.scala) { #standalone-ask-with-status-fail-future }
Java
: @@snip [InteractionPatternsTest.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/InteractionPatternsAskWithStatusTest.java) { #standalone-ask-with-status-fail-future }
## Ignoring replies
In some situations an actor has a response for a particular request message but you are not interested in the response. In this case you can pass @scala[`system.ignoreRef`]@java[`system.ignoreRef()`] turning the request-response into a fire-and-forget.

View file

@ -386,8 +386,13 @@ The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-
persistent actors, because you typically want to know if the command was rejected due to validation errors and
when accepted you want a confirmation when the events have been successfully stored.
Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`] in the
commands. After validation errors or after persisting events, using a `thenRun` side effect, the reply message can
Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`]. If the
command can either have a successful response or a validation error returned, the generic response type @scala[`StatusReply[ReplyType]]`]
@java[`StatusReply<ReplyType>`] can be used. If the successful reply does not contain a value but is more of an acknowledgement
a pre defined @scala[`StatusReply.Ack`]@java[`StatusReply.ack()`] of type @scala[`StatusReply[Done]`]@java[`StatusReply<Done>`]
can be used.
After validation errors or after persisting events, using a `thenRun` side effect, the reply message can
be sent to the `ActorRef`.
Scala

View file

@ -271,6 +271,9 @@ Scala
Java
: @@snip [StyleGuideDocExamples.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/StyleGuideDocExamples.java) { #message-protocol }
Note that the response message hierarchy in this case could be completely avoided by using the @apiDoc[StatusReply] API
instead (see @ref[Generic Response Wrapper](interaction-patterns.md#generic-response-wrapper)).
## Public versus private messages
Often an actor has some messages that are only for its internal implementation and not part of the public

View file

@ -8,6 +8,7 @@ import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -46,7 +47,7 @@ object BlogPostEntity {
//#commands
sealed trait Command
//#reply-command
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends Command
final case class AddPost(content: PostContent, replyTo: ActorRef[StatusReply[AddPostDone]]) extends Command
final case class AddPostDone(postId: String)
//#reply-command
final case class GetPost(replyTo: ActorRef[PostContent]) extends Command
@ -79,13 +80,16 @@ object BlogPostEntity {
case cmd: ChangeBody => changeBody(draftState, cmd)
case Publish(replyTo) => publish(draftState, replyTo)
case GetPost(replyTo) => getPost(draftState, replyTo)
case _: AddPost => Effect.unhandled
case AddPost(_, replyTo) =>
Effect.unhandled.thenRun(_ => replyTo ! StatusReply.Error("Cannot add post while in draft state"))
}
case publishedState: PublishedState =>
command match {
case GetPost(replyTo) => getPost(publishedState, replyTo)
case _ => Effect.unhandled
case AddPost(_, replyTo) =>
Effect.unhandled.thenRun(_ => replyTo ! StatusReply.Error("Cannot add post, already published"))
case _ => Effect.unhandled
}
}
}
@ -95,7 +99,7 @@ object BlogPostEntity {
val evt = PostAdded(cmd.content.postId, cmd.content)
Effect.persist(evt).thenRun { _ =>
// After persist is done additional side effects can be performed
cmd.replyTo ! AddPostDone(cmd.content.postId)
cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId))
}
//#reply
}

View file

@ -10583,6 +10583,624 @@ public final class ContainerFormats {
}
public interface StatusReplyErrorMessageOrBuilder extends
// @@protoc_insertion_point(interface_extends:StatusReplyErrorMessage)
akka.protobufv3.internal.MessageOrBuilder {
/**
* <code>required string errorMessage = 1;</code>
* @return Whether the errorMessage field is set.
*/
boolean hasErrorMessage();
/**
* <code>required string errorMessage = 1;</code>
* @return The errorMessage.
*/
java.lang.String getErrorMessage();
/**
* <code>required string errorMessage = 1;</code>
* @return The bytes for errorMessage.
*/
akka.protobufv3.internal.ByteString
getErrorMessageBytes();
}
/**
* <pre>
* ReplyWith pattern message(s)
* </pre>
*
* Protobuf type {@code StatusReplyErrorMessage}
*/
public static final class StatusReplyErrorMessage extends
akka.protobufv3.internal.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:StatusReplyErrorMessage)
StatusReplyErrorMessageOrBuilder {
private static final long serialVersionUID = 0L;
// Use StatusReplyErrorMessage.newBuilder() to construct.
private StatusReplyErrorMessage(akka.protobufv3.internal.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private StatusReplyErrorMessage() {
errorMessage_ = "";
}
@java.lang.Override
@SuppressWarnings({"unused"})
protected java.lang.Object newInstance(
akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) {
return new StatusReplyErrorMessage();
}
@java.lang.Override
public final akka.protobufv3.internal.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private StatusReplyErrorMessage(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
this();
if (extensionRegistry == null) {
throw new java.lang.NullPointerException();
}
int mutable_bitField0_ = 0;
akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields =
akka.protobufv3.internal.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 10: {
akka.protobufv3.internal.ByteString bs = input.readBytes();
bitField0_ |= 0x00000001;
errorMessage_ = bs;
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
done = true;
}
break;
}
}
}
} catch (akka.protobufv3.internal.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new akka.protobufv3.internal.InvalidProtocolBufferException(
e).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final akka.protobufv3.internal.Descriptors.Descriptor
getDescriptor() {
return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_descriptor;
}
@java.lang.Override
protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.remote.ContainerFormats.StatusReplyErrorMessage.class, akka.remote.ContainerFormats.StatusReplyErrorMessage.Builder.class);
}
private int bitField0_;
public static final int ERRORMESSAGE_FIELD_NUMBER = 1;
private volatile java.lang.Object errorMessage_;
/**
* <code>required string errorMessage = 1;</code>
* @return Whether the errorMessage field is set.
*/
public boolean hasErrorMessage() {
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <code>required string errorMessage = 1;</code>
* @return The errorMessage.
*/
public java.lang.String getErrorMessage() {
java.lang.Object ref = errorMessage_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
akka.protobufv3.internal.ByteString bs =
(akka.protobufv3.internal.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
errorMessage_ = s;
}
return s;
}
}
/**
* <code>required string errorMessage = 1;</code>
* @return The bytes for errorMessage.
*/
public akka.protobufv3.internal.ByteString
getErrorMessageBytes() {
java.lang.Object ref = errorMessage_;
if (ref instanceof java.lang.String) {
akka.protobufv3.internal.ByteString b =
akka.protobufv3.internal.ByteString.copyFromUtf8(
(java.lang.String) ref);
errorMessage_ = b;
return b;
} else {
return (akka.protobufv3.internal.ByteString) ref;
}
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized == 1) return true;
if (isInitialized == 0) return false;
if (!hasErrorMessage()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
@java.lang.Override
public void writeTo(akka.protobufv3.internal.CodedOutputStream output)
throws java.io.IOException {
if (((bitField0_ & 0x00000001) != 0)) {
akka.protobufv3.internal.GeneratedMessageV3.writeString(output, 1, errorMessage_);
}
unknownFields.writeTo(output);
}
@java.lang.Override
public int getSerializedSize() {
int size = memoizedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) != 0)) {
size += akka.protobufv3.internal.GeneratedMessageV3.computeStringSize(1, errorMessage_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof akka.remote.ContainerFormats.StatusReplyErrorMessage)) {
return super.equals(obj);
}
akka.remote.ContainerFormats.StatusReplyErrorMessage other = (akka.remote.ContainerFormats.StatusReplyErrorMessage) obj;
if (hasErrorMessage() != other.hasErrorMessage()) return false;
if (hasErrorMessage()) {
if (!getErrorMessage()
.equals(other.getErrorMessage())) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@java.lang.Override
public int hashCode() {
if (memoizedHashCode != 0) {
return memoizedHashCode;
}
int hash = 41;
hash = (19 * hash) + getDescriptor().hashCode();
if (hasErrorMessage()) {
hash = (37 * hash) + ERRORMESSAGE_FIELD_NUMBER;
hash = (53 * hash) + getErrorMessage().hashCode();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
java.nio.ByteBuffer data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
java.nio.ByteBuffer data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
akka.protobufv3.internal.ByteString data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
akka.protobufv3.internal.ByteString data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(byte[] data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
byte[] data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(java.io.InputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
java.io.InputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseDelimitedFrom(
java.io.InputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
akka.protobufv3.internal.CodedInputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage parseFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
@java.lang.Override
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder() {
return DEFAULT_INSTANCE.toBuilder();
}
public static Builder newBuilder(akka.remote.ContainerFormats.StatusReplyErrorMessage prototype) {
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
}
@java.lang.Override
public Builder toBuilder() {
return this == DEFAULT_INSTANCE
? new Builder() : new Builder().mergeFrom(this);
}
@java.lang.Override
protected Builder newBuilderForType(
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* <pre>
* ReplyWith pattern message(s)
* </pre>
*
* Protobuf type {@code StatusReplyErrorMessage}
*/
public static final class Builder extends
akka.protobufv3.internal.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:StatusReplyErrorMessage)
akka.remote.ContainerFormats.StatusReplyErrorMessageOrBuilder {
public static final akka.protobufv3.internal.Descriptors.Descriptor
getDescriptor() {
return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_descriptor;
}
@java.lang.Override
protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.remote.ContainerFormats.StatusReplyErrorMessage.class, akka.remote.ContainerFormats.StatusReplyErrorMessage.Builder.class);
}
// Construct using akka.remote.ContainerFormats.StatusReplyErrorMessage.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (akka.protobufv3.internal.GeneratedMessageV3
.alwaysUseFieldBuilders) {
}
}
@java.lang.Override
public Builder clear() {
super.clear();
errorMessage_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@java.lang.Override
public akka.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
return akka.remote.ContainerFormats.internal_static_StatusReplyErrorMessage_descriptor;
}
@java.lang.Override
public akka.remote.ContainerFormats.StatusReplyErrorMessage getDefaultInstanceForType() {
return akka.remote.ContainerFormats.StatusReplyErrorMessage.getDefaultInstance();
}
@java.lang.Override
public akka.remote.ContainerFormats.StatusReplyErrorMessage build() {
akka.remote.ContainerFormats.StatusReplyErrorMessage result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
@java.lang.Override
public akka.remote.ContainerFormats.StatusReplyErrorMessage buildPartial() {
akka.remote.ContainerFormats.StatusReplyErrorMessage result = new akka.remote.ContainerFormats.StatusReplyErrorMessage(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) != 0)) {
to_bitField0_ |= 0x00000001;
}
result.errorMessage_ = errorMessage_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@java.lang.Override
public Builder clone() {
return super.clone();
}
@java.lang.Override
public Builder setField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
java.lang.Object value) {
return super.setField(field, value);
}
@java.lang.Override
public Builder clearField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field) {
return super.clearField(field);
}
@java.lang.Override
public Builder clearOneof(
akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) {
return super.clearOneof(oneof);
}
@java.lang.Override
public Builder setRepeatedField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
int index, java.lang.Object value) {
return super.setRepeatedField(field, index, value);
}
@java.lang.Override
public Builder addRepeatedField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
java.lang.Object value) {
return super.addRepeatedField(field, value);
}
@java.lang.Override
public Builder mergeFrom(akka.protobufv3.internal.Message other) {
if (other instanceof akka.remote.ContainerFormats.StatusReplyErrorMessage) {
return mergeFrom((akka.remote.ContainerFormats.StatusReplyErrorMessage)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.remote.ContainerFormats.StatusReplyErrorMessage other) {
if (other == akka.remote.ContainerFormats.StatusReplyErrorMessage.getDefaultInstance()) return this;
if (other.hasErrorMessage()) {
bitField0_ |= 0x00000001;
errorMessage_ = other.errorMessage_;
onChanged();
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
}
@java.lang.Override
public final boolean isInitialized() {
if (!hasErrorMessage()) {
return false;
}
return true;
}
@java.lang.Override
public Builder mergeFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.remote.ContainerFormats.StatusReplyErrorMessage parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobufv3.internal.InvalidProtocolBufferException e) {
parsedMessage = (akka.remote.ContainerFormats.StatusReplyErrorMessage) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
private java.lang.Object errorMessage_ = "";
/**
* <code>required string errorMessage = 1;</code>
* @return Whether the errorMessage field is set.
*/
public boolean hasErrorMessage() {
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <code>required string errorMessage = 1;</code>
* @return The errorMessage.
*/
public java.lang.String getErrorMessage() {
java.lang.Object ref = errorMessage_;
if (!(ref instanceof java.lang.String)) {
akka.protobufv3.internal.ByteString bs =
(akka.protobufv3.internal.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
errorMessage_ = s;
}
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>required string errorMessage = 1;</code>
* @return The bytes for errorMessage.
*/
public akka.protobufv3.internal.ByteString
getErrorMessageBytes() {
java.lang.Object ref = errorMessage_;
if (ref instanceof String) {
akka.protobufv3.internal.ByteString b =
akka.protobufv3.internal.ByteString.copyFromUtf8(
(java.lang.String) ref);
errorMessage_ = b;
return b;
} else {
return (akka.protobufv3.internal.ByteString) ref;
}
}
/**
* <code>required string errorMessage = 1;</code>
* @param value The errorMessage to set.
* @return This builder for chaining.
*/
public Builder setErrorMessage(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
errorMessage_ = value;
onChanged();
return this;
}
/**
* <code>required string errorMessage = 1;</code>
* @return This builder for chaining.
*/
public Builder clearErrorMessage() {
bitField0_ = (bitField0_ & ~0x00000001);
errorMessage_ = getDefaultInstance().getErrorMessage();
onChanged();
return this;
}
/**
* <code>required string errorMessage = 1;</code>
* @param value The bytes for errorMessage to set.
* @return This builder for chaining.
*/
public Builder setErrorMessageBytes(
akka.protobufv3.internal.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
errorMessage_ = value;
onChanged();
return this;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
}
@java.lang.Override
public final Builder mergeUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
return super.mergeUnknownFields(unknownFields);
}
// @@protoc_insertion_point(builder_scope:StatusReplyErrorMessage)
}
// @@protoc_insertion_point(class_scope:StatusReplyErrorMessage)
private static final akka.remote.ContainerFormats.StatusReplyErrorMessage DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new akka.remote.ContainerFormats.StatusReplyErrorMessage();
}
public static akka.remote.ContainerFormats.StatusReplyErrorMessage getDefaultInstance() {
return DEFAULT_INSTANCE;
}
@java.lang.Deprecated public static final akka.protobufv3.internal.Parser<StatusReplyErrorMessage>
PARSER = new akka.protobufv3.internal.AbstractParser<StatusReplyErrorMessage>() {
@java.lang.Override
public StatusReplyErrorMessage parsePartialFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return new StatusReplyErrorMessage(input, extensionRegistry);
}
};
public static akka.protobufv3.internal.Parser<StatusReplyErrorMessage> parser() {
return PARSER;
}
@java.lang.Override
public akka.protobufv3.internal.Parser<StatusReplyErrorMessage> getParserForType() {
return PARSER;
}
@java.lang.Override
public akka.remote.ContainerFormats.StatusReplyErrorMessage getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
private static final akka.protobufv3.internal.Descriptors.Descriptor
internal_static_SelectionEnvelope_descriptor;
private static final
@ -10643,6 +11261,11 @@ public final class ContainerFormats {
private static final
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internal_static_StackTraceElement_fieldAccessorTable;
private static final akka.protobufv3.internal.Descriptors.Descriptor
internal_static_StatusReplyErrorMessage_descriptor;
private static final
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internal_static_StatusReplyErrorMessage_fieldAccessorTable;
public static akka.protobufv3.internal.Descriptors.FileDescriptor
getDescriptor() {
@ -10675,9 +11298,10 @@ public final class ContainerFormats {
"\n\007message\030\002 \002(\t\022\027\n\005cause\030\003 \002(\0132\010.Payload" +
"\"`\n\021StackTraceElement\022\021\n\tclassName\030\001 \002(\t" +
"\022\022\n\nmethodName\030\002 \002(\t\022\020\n\010fileName\030\003 \002(\t\022\022" +
"\n\nlineNumber\030\004 \002(\005*<\n\013PatternType\022\n\n\006PAR" +
"ENT\020\000\022\016\n\nCHILD_NAME\020\001\022\021\n\rCHILD_PATTERN\020\002" +
"B\017\n\013akka.remoteH\001"
"\n\nlineNumber\030\004 \002(\005\"/\n\027StatusReplyErrorMe" +
"ssage\022\024\n\014errorMessage\030\001 \002(\t*<\n\013PatternTy" +
"pe\022\n\n\006PARENT\020\000\022\016\n\nCHILD_NAME\020\001\022\021\n\rCHILD_" +
"PATTERN\020\002B\017\n\013akka.remoteH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@ -10755,6 +11379,12 @@ public final class ContainerFormats {
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_StackTraceElement_descriptor,
new java.lang.String[] { "ClassName", "MethodName", "FileName", "LineNumber", });
internal_static_StatusReplyErrorMessage_descriptor =
getDescriptor().getMessageTypes().get(12);
internal_static_StatusReplyErrorMessage_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_StatusReplyErrorMessage_descriptor,
new java.lang.String[] { "ErrorMessage", });
}
// @@protoc_insertion_point(outer_class_scope)

View file

@ -82,3 +82,9 @@ message StackTraceElement {
required string fileName = 3;
required int32 lineNumber = 4;
}
// ReplyWith pattern message(s)
message StatusReplyErrorMessage {
required string errorMessage = 1;
}

View file

@ -98,6 +98,8 @@ akka {
"akka.routing.TailChoppingPool" = akka-misc
"akka.remote.routing.RemoteRouterConfig" = akka-misc
"akka.pattern.StatusReply" = akka-misc
"akka.dispatch.sysmsg.SystemMessage" = akka-system-msg
# Java Serializer is by default used for exceptions and will by default

View file

@ -10,12 +10,11 @@ import java.util.Optional
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{ FiniteDuration, TimeUnit }
import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions }
import akka.{ Done, NotUsed }
import akka.actor._
import akka.dispatch.Dispatchers
import akka.pattern.StatusReply
import akka.remote._
import akka.remote.WireFormats.AddressData
import akka.remote.routing.RemoteRouterConfig
@ -42,6 +41,9 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
case r: ActorRef => serializeActorRef(r)
case s: Status.Success => serializeStatusSuccess(s)
case f: Status.Failure => serializeStatusFailure(f)
case StatusReply.Ack => Array.emptyByteArray
case r @ StatusReply.Success(_) => serializeStatusReplySuccess(r)
case r @ StatusReply.Error(_) => serializeStatusReplyError(r)
case ex: ActorInitializationException => serializeActorInitializationException(ex)
case ex: ThrowableNotSerializableException => serializeThrowableNotSerializableException(ex)
case t: Throwable => throwableSupport.serializeThrowable(t)
@ -120,6 +122,22 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private def serializeStatusFailure(failure: Status.Failure): Array[Byte] =
payloadSupport.payloadBuilder(failure.cause).build().toByteArray
def serializeStatusReplySuccess(r: StatusReply[Any]): Array[Byte] =
// no specific message, serialized id and manifest together with payload is enough (no wrapping overhead)
payloadSupport.payloadBuilder(r.getValue).build().toByteArray
def serializeStatusReplyError(r: StatusReply[_]): Array[Byte] = {
r.getError match {
case em: StatusReply.ErrorMessage =>
// somewhat optimized for the recommended usage, avoiding the additional payload metadata
ContainerFormats.StatusReplyErrorMessage.newBuilder().setErrorMessage(em.getMessage).build().toByteArray
case ex: Throwable =>
// depends on user providing exception serializer
// no specific message, serialized id and manifest together with payload is enough (less wrapping overhead)
payloadSupport.payloadBuilder(ex).build().toByteArray
}
}
private def serializeActorInitializationException(ex: ActorInitializationException): Array[Byte] = {
val builder = ContainerFormats.ActorInitializationException.newBuilder()
if (ex.getActor ne null)
@ -312,12 +330,20 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val ScatterGatherPoolManifest = "ROSGP"
private val TailChoppingPoolManifest = "ROTCP"
private val RemoteRouterConfigManifest = "RORRC"
private val StatusReplySuccessManifest = "S"
private val StatusReplyErrorMessageManifest = "SM"
private val StatusReplyErrorExceptionManifest = "SE"
private val StatusReplyAckManifest = "SA"
private val fromBinaryMap = Map[String, Array[Byte] => AnyRef](
IdentifyManifest -> deserializeIdentify,
ActorIdentityManifest -> deserializeActorIdentity,
StatusSuccessManifest -> deserializeStatusSuccess,
StatusFailureManifest -> deserializeStatusFailure,
StatusReplyAckManifest -> ((_) => StatusReply.Ack),
StatusReplySuccessManifest -> deserializeStatusReplySuccess,
StatusReplyErrorMessageManifest -> deserializeStatusReplyErrorMessage,
StatusReplyErrorExceptionManifest -> deserializeStatusReplyErrorException,
ThrowableManifest -> throwableSupport.deserializeThrowable,
ActorRefManifest -> deserializeActorRefBytes,
OptionManifest -> deserializeOption,
@ -347,36 +373,41 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
override def manifest(o: AnyRef): String =
o match {
case _: Identify => IdentifyManifest
case _: ActorIdentity => ActorIdentityManifest
case _: Option[Any] => OptionManifest
case _: Optional[_] => OptionalManifest
case _: ActorRef => ActorRefManifest
case _: Status.Success => StatusSuccessManifest
case _: Status.Failure => StatusFailureManifest
case _: ActorInitializationException => ActorInitializationExceptionManifest
case _: ThrowableNotSerializableException => ThrowableNotSerializableExceptionManifest
case _: Throwable => ThrowableManifest
case PoisonPill => PoisonPillManifest
case Kill => KillManifest
case RemoteWatcher.Heartbeat => RemoteWatcherHBManifest
case Done => DoneManifest
case NotUsed => NotUsedManifest
case _: Address => AddressManifest
case _: UniqueAddress => UniqueAddressManifest
case _: RemoteWatcher.HeartbeatRsp => RemoteWatcherHBRespManifest
case LocalScope => LocalScopeManifest
case _: RemoteScope => RemoteScopeManifest
case _: Config => ConfigManifest
case _: FromConfig => FromConfigManifest
case _: DefaultResizer => DefaultResizerManifest
case _: BalancingPool => BalancingPoolManifest
case _: BroadcastPool => BroadcastPoolManifest
case _: RandomPool => RandomPoolManifest
case _: RoundRobinPool => RoundRobinPoolManifest
case _: ScatterGatherFirstCompletedPool => ScatterGatherPoolManifest
case _: TailChoppingPool => TailChoppingPoolManifest
case _: RemoteRouterConfig => RemoteRouterConfigManifest
case _: Identify => IdentifyManifest
case _: ActorIdentity => ActorIdentityManifest
case _: Option[Any] => OptionManifest
case _: Optional[_] => OptionalManifest
case _: ActorRef => ActorRefManifest
case _: Status.Success => StatusSuccessManifest
case _: Status.Failure => StatusFailureManifest
case StatusReply.Ack => StatusReplyAckManifest
case StatusReply.Success(_) => StatusReplySuccessManifest
case StatusReply.Error(_: StatusReply.ErrorMessage) => StatusReplyErrorMessageManifest
case StatusReply.Error(_) => StatusReplyErrorExceptionManifest
case _: ActorInitializationException => ActorInitializationExceptionManifest
case _: ThrowableNotSerializableException => ThrowableNotSerializableExceptionManifest
case _: Throwable => ThrowableManifest
case PoisonPill => PoisonPillManifest
case Kill => KillManifest
case RemoteWatcher.Heartbeat => RemoteWatcherHBManifest
case Done => DoneManifest
case NotUsed => NotUsedManifest
case _: Address => AddressManifest
case _: UniqueAddress => UniqueAddressManifest
case _: RemoteWatcher.HeartbeatRsp => RemoteWatcherHBRespManifest
case LocalScope => LocalScopeManifest
case _: RemoteScope => RemoteScopeManifest
case _: Config => ConfigManifest
case _: FromConfig => FromConfigManifest
case _: DefaultResizer => DefaultResizerManifest
case _: BalancingPool => BalancingPoolManifest
case _: BroadcastPool => BroadcastPoolManifest
case _: RandomPool => RandomPoolManifest
case _: RoundRobinPool => RoundRobinPoolManifest
case _: ScatterGatherFirstCompletedPool => ScatterGatherPoolManifest
case _: TailChoppingPool => TailChoppingPoolManifest
case _: RemoteRouterConfig => RemoteRouterConfigManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
@ -436,6 +467,16 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private def deserializeStatusFailure(bytes: Array[Byte]): Status.Failure =
Status.Failure(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)).asInstanceOf[Throwable])
private def deserializeStatusReplySuccess(bytes: Array[Byte]): StatusReply[_] =
StatusReply.success(payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)))
private def deserializeStatusReplyErrorMessage(bytes: Array[Byte]): StatusReply[_] =
StatusReply.error(ContainerFormats.StatusReplyErrorMessage.parseFrom(bytes).getErrorMessage)
private def deserializeStatusReplyErrorException(bytes: Array[Byte]): StatusReply[_] =
StatusReply.error(
payloadSupport.deserializePayload(ContainerFormats.Payload.parseFrom(bytes)).asInstanceOf[Throwable])
private def deserializeAddressData(bytes: Array[Byte]): Address =
addressFromDataProto(WireFormats.AddressData.parseFrom(bytes))

View file

@ -10,12 +10,11 @@ import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import com.typesafe.config.ConfigFactory
import akka.{ Done, NotUsed }
import akka.actor._
import akka.pattern.AskTimeoutException
import akka.pattern.StatusReply
import akka.remote.{ RemoteScope, RemoteWatcher }
import akka.remote.routing.RemoteRouterConfig
import akka.routing._
@ -130,7 +129,11 @@ class MiscMessageSerializerSpec extends AkkaSpec(MiscMessageSerializerSpec.testC
"TailChoppingPool" -> TailChoppingPool(25, within = 3.seconds, interval = 1.second),
"RemoteRouterConfig" -> RemoteRouterConfig(
local = RandomPool(25),
nodes = List(Address("akka", "system", "localhost", 2525)))).foreach {
nodes = List(Address("akka", "system", "localhost", 2525))),
"StatusReply.success" -> StatusReply.success("woho!"),
"StatusReply.Ack" -> StatusReply.Ack,
"StatusReply.error(errorMessage)" -> StatusReply.error("boho!"),
"StatusReply.error(exception)" -> StatusReply.error(new TestException("boho!"))).foreach {
case (scenario, item) =>
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(system)