From 9e2ea2579baf96c8f3d064a356dab9d50b77054c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 24 Apr 2019 17:27:20 +0200 Subject: [PATCH] Type inference for EntityRef.ask, #26709 * move AccountExample to Sharding, to be able to test with EntityRef etc * add tests for AccountExample * for javadsl there is a problem * explicity replyTo type solves javadsl issue * use EventSourcedEntity.withEnforcedReplies * try replyTo with super type AccountCommandReply * Reply <: AccountCommandReply * replace lambda with method (constructor) reference --- .../javadsl/AsyncTestingExampleTest.java | 13 +- .../actor/typed/javadsl/ActorCompile.java | 16 ++ .../akka/actor/typed/javadsl/AskPattern.scala | 13 +- .../actor/typed/scaladsl/AskPattern.scala | 20 ++- .../typed/javadsl/ClusterSharding.scala | 4 +- .../typed/scaladsl/ClusterSharding.scala | 13 +- .../sharding/typed/AccountExampleTest.java | 145 ++++++++++++++++++ ...ountExampleWithCommandHandlersInState.java | 19 ++- ...ccountExampleWithEventHandlersInState.java | 28 ++-- .../typed/AccountExampleWithMutableState.java | 21 +-- .../typed/AccountExampleWithNullState.java | 19 ++- .../sharding/typed/AccountExampleSpec.scala | 108 +++++++++++++ ...untExampleWithCommandHandlersInState.scala | 48 +++--- ...countExampleWithEventHandlersInState.scala | 37 ++--- .../typed/AccountExampleWithOptionState.scala | 44 +++--- .../main/paradox/typed/persistence-style.md | 14 +- .../src/main/paradox/typed/persistence.md | 12 +- 17 files changed, 435 insertions(+), 139 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java rename {akka-persistence-typed/src/test/java/jdocs/akka/persistence => akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding}/typed/AccountExampleWithCommandHandlersInState.java (93%) rename {akka-persistence-typed/src/test/java/jdocs/akka/persistence => akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding}/typed/AccountExampleWithEventHandlersInState.java (91%) rename {akka-persistence-typed/src/test/java/jdocs/akka/persistence => akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding}/typed/AccountExampleWithMutableState.java (92%) rename {akka-persistence-typed/src/test/java/jdocs/akka/persistence => akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding}/typed/AccountExampleWithNullState.java (92%) create mode 100644 akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala rename {akka-persistence-typed/src/test/scala/docs/akka/persistence => akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding}/typed/AccountExampleWithCommandHandlersInState.scala (77%) rename {akka-persistence-typed/src/test/scala/docs/akka/persistence => akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding}/typed/AccountExampleWithEventHandlersInState.scala (84%) rename {akka-persistence-typed/src/test/scala/docs/akka/persistence => akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding}/typed/AccountExampleWithOptionState.scala (79%) diff --git a/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java b/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java index 1cfbf72870..e5994ef93d 100644 --- a/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java +++ b/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java @@ -18,9 +18,6 @@ import org.junit.AfterClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import scala.util.Success; -import scala.util.Try; - import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.stream.IntStream; @@ -82,9 +79,9 @@ public class AsyncTestingExampleTest static class Message { int i; - ActorRef> replyTo; + ActorRef replyTo; - Message(int i, ActorRef> replyTo) { + Message(int i, ActorRef replyTo) { this.i = i; this.replyTo = replyTo; } @@ -104,10 +101,10 @@ public class AsyncTestingExampleTest IntStream.range(0, messages).forEach(this::publish); } - private CompletionStage> publish(int i) { + private CompletionStage publish(int i) { return AskPattern.ask( publisher, - (ActorRef> ref) -> new Message(i, ref), + (ActorRef ref) -> new Message(i, ref), Duration.ofSeconds(3), scheduler); } @@ -165,7 +162,7 @@ public class AsyncTestingExampleTest Behavior mockedBehavior = Behaviors.receiveMessage( message -> { - message.replyTo.tell(new Success<>(message.i)); + message.replyTo.tell(message.i); return Behaviors.same(); }); TestProbe probe = testKit.createTestProbe(); diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index 1653579f11..6fe338f0ae 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -8,6 +8,7 @@ import akka.actor.typed.*; import akka.actor.typed.TypedActorContext; import java.time.Duration; +import java.util.concurrent.CompletionStage; import static akka.actor.typed.javadsl.Behaviors.*; @@ -66,6 +67,21 @@ public class ActorCompile { ActorSystem system = ActorSystem.create(actor1, "Sys"); + { + ActorRef recipient = null; + + CompletionStage reply = + AskPattern.ask( + recipient, replyTo -> new MyMsgA(replyTo), Duration.ofSeconds(3), system.scheduler()); + + AskPattern.ask( + recipient, + (ActorRef replyTo) -> new MyMsgA(replyTo), + Duration.ofSeconds(3), + system.scheduler()) + .thenApply(rsp -> rsp.toUpperCase()); + } + { Behaviors.receive( (context, message) -> { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala index 71b79d3c5b..6bf2e76d93 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/AskPattern.scala @@ -30,10 +30,15 @@ import scala.compat.java8.FutureConverters._ * */ object AskPattern { - def ask[T, U]( - actor: RecipientRef[T], - message: JFunction[ActorRef[U], T], + + /** + * @tparam Req The request protocol, what the other actor accepts + * @tparam Res The response protocol, what the other actor sends back + */ + def ask[Req, Res]( + actor: RecipientRef[Req], + message: JFunction[ActorRef[Res], Req], timeout: Duration, - scheduler: Scheduler): CompletionStage[U] = + scheduler: Scheduler): CompletionStage[Res] = (actor.ask(message.apply)(timeout.asScala, scheduler)).toJava } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 643dc20a0f..e73585cc8c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -27,8 +27,10 @@ object AskPattern { /** * See [[ask]] + * + * @tparam Req The request protocol, what the other actor accepts */ - implicit final class Askable[T](val ref: RecipientRef[T]) extends AnyVal { + implicit final class Askable[Req](val ref: RecipientRef[Req]) extends AnyVal { /** * The ask-pattern implements the initiator side of a request–reply protocol. @@ -54,13 +56,15 @@ object AskPattern { * implicit val scheduler = system.scheduler * implicit val timeout = Timeout(3.seconds) * val target: ActorRef[Request] = ... - * val f: Future[Reply] = target ? (replyTo => (Request("hello", replyTo))) + * val f: Future[Reply] = target ? (replyTo => Request("hello", replyTo)) * }}} * * Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for - * the `ActorRef`. + * the `replyTo: ActorRef`. + * + * @tparam Res The response protocol, what the other actor sends back */ - def ?[U](replyTo: ActorRef[U] => T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { + def ?[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] = { ask(replyTo)(timeout, scheduler) } @@ -85,15 +89,15 @@ object AskPattern { * implicit val scheduler = system.scheduler * implicit val timeout = Timeout(3.seconds) * val target: ActorRef[Request] = ... - * val f: Future[Reply] = target.ask(replyTo => (Request("hello", replyTo))) + * val f: Future[Reply] = target.ask(replyTo => Request("hello", replyTo)) * // alternatively * val f2: Future[Reply] = target.ask(Request("hello", _)) - * // note that the explicit type on f2 is important for the compiler - * // to understand the type of the wildcard * }}} + * + * @tparam Res The response protocol, what the other actor sends back */ @silent - def ask[U](replyTo: ActorRef[U] => T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { + def ask[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] = { // We do not currently use the implicit sched, but want to require it // because it might be needed when we move to a 'native' typed runtime, see #24219 ref match { diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala index 18459d4a5c..ac4b21892c 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/javadsl/ClusterSharding.scala @@ -468,8 +468,10 @@ object EntityTypeKey { * * Note that if you are inside of an actor you should prefer [[akka.actor.typed.javadsl.ActorContext.ask]] * as that provides better safety. + * + * @tparam Res The response protocol, what the other actor sends back */ - def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U] + def ask[Res](message: JFunction[ActorRef[Res], M], timeout: Duration): CompletionStage[Res] /** * INTERNAL API diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala index e2f964af21..952d5801e8 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/ClusterSharding.scala @@ -406,8 +406,10 @@ object EntityTypeKey { * }}} * * Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern. + * + * @tparam Res The response protocol, what the other actor sends back */ - def ask[U](f: ActorRef[U] => M)(implicit timeout: Timeout): Future[U] + def ask[Res](f: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] /** * Allows to "ask" the [[EntityRef]] for a reply. @@ -423,12 +425,17 @@ object EntityTypeKey { * * implicit val timeout = Timeout(3.seconds) * val target: EntityRef[Request] = ... - * val f: Future[Reply] = target ? (Request("hello", _)) + * val f: Future[Reply] = target ? (replyTo => Request("hello", replyTo)) * }}} * * Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern. + * + * Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for + * the `replyTo: ActorRef`. + * + * @tparam Res The response protocol, what the other actor sends back */ - def ?[U](message: ActorRef[U] => M)(implicit timeout: Timeout): Future[U] = + def ?[Res](message: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] = this.ask(message)(timeout) } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java new file mode 100644 index 0000000000..884c11f92c --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java @@ -0,0 +1,145 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package jdocs.akka.cluster.sharding.typed; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.sharding.typed.javadsl.EntityRef; +import akka.cluster.typed.Cluster; +import akka.cluster.typed.Join; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.math.BigDecimal; +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; +import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity.*; +import static org.junit.Assert.assertEquals; + +public class AccountExampleTest extends JUnitSuite { + + public static final Config config = + ConfigFactory.parseString( + "akka.actor.provider = cluster \n" + + "akka.remote.netty.tcp.port = 0 \n" + + "akka.remote.artery.canonical.port = 0 \n" + + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n" + + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"); + + @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + private ClusterSharding _sharding = null; + + private ClusterSharding sharding() { + if (_sharding == null) { + // initialize first time only + Cluster cluster = Cluster.get(testKit.system()); + cluster.manager().tell(new Join(cluster.selfMember().address())); + + ClusterSharding sharding = ClusterSharding.get(testKit.system()); + sharding.init( + Entity.ofEventSourcedEntityWithEnforcedReplies( + AccountEntity.ENTITY_TYPE_KEY, ctx -> AccountEntity.create(ctx.getEntityId()))); + _sharding = sharding; + } + return _sharding; + } + + @Test + public void handleDeposit() { + EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "1"); + TestProbe probe = testKit.createTestProbe(OperationResult.class); + ref.tell(new CreateAccount(probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Deposit(BigDecimal.valueOf(10), probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + } + + @Test + public void handleWithdraw() { + EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "2"); + TestProbe probe = testKit.createTestProbe(OperationResult.class); + ref.tell(new CreateAccount(probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Withdraw(BigDecimal.valueOf(10), probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + } + + @Test + public void rejectWithdrawOverdraft() { + EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "3"); + TestProbe probe = testKit.createTestProbe(OperationResult.class); + ref.tell(new CreateAccount(probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef())); + probe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Withdraw(BigDecimal.valueOf(110), probe.getRef())); + probe.expectMessageClass(Rejected.class); + } + + @Test + public void handleGetBalance() { + EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "4"); + TestProbe opProbe = testKit.createTestProbe(OperationResult.class); + ref.tell(new CreateAccount(opProbe.getRef())); + opProbe.expectMessage(Confirmed.INSTANCE); + ref.tell(new Deposit(BigDecimal.valueOf(100), opProbe.getRef())); + opProbe.expectMessage(Confirmed.INSTANCE); + + TestProbe getProbe = testKit.createTestProbe(CurrentBalance.class); + ref.tell(new GetBalance(getProbe.getRef())); + assertEquals( + BigDecimal.valueOf(100), getProbe.expectMessageClass(CurrentBalance.class).balance); + } + + @Test + public void beUsableWithAsk() throws Exception { + Duration timeout = Duration.ofSeconds(3); + EntityRef ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "5"); + CompletionStage createResult = ref.ask(CreateAccount::new, timeout); + assertEquals(Confirmed.INSTANCE, createResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + + // above works because then the response type is inferred by the lhs type + // below requires (ActorRef replyTo) + + assertEquals( + Confirmed.INSTANCE, + ref.ask( + (ActorRef replyTo) -> + new Deposit(BigDecimal.valueOf(100), replyTo), + timeout) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + + assertEquals( + Confirmed.INSTANCE, + ref.ask( + (ActorRef replyTo) -> + new Withdraw(BigDecimal.valueOf(10), replyTo), + timeout) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS)); + + BigDecimal balance = + ref.ask(GetBalance::new, timeout) + .thenApply(currentBalance -> currentBalance.balance) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + assertEquals(BigDecimal.valueOf(90), balance); + } +} diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.java similarity index 93% rename from akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.java rename to akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.java index 572a6bb084..06b8ca0686 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.java @@ -2,19 +2,19 @@ * Copyright (C) 2018-2019 Lightbend Inc. */ -package jdocs.akka.persistence.typed; +package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventHandlerBuilder; -import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies; import akka.persistence.typed.javadsl.ReplyEffect; import java.math.BigDecimal; @@ -29,9 +29,12 @@ public interface AccountExampleWithCommandHandlersInState { // #account-entity public class AccountEntity - extends EventSourcedBehaviorWithEnforcedReplies< + extends EventSourcedEntityWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(AccountCommand.class, "Account"); + // Command interface AccountCommand extends ExpectingReply {} @@ -226,12 +229,12 @@ public interface AccountExampleWithCommandHandlersInState { public static class ClosedAccount implements Account {} - public static Behavior behavior(String accountNumber) { - return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); + public static AccountEntity create(String accountNumber) { + return new AccountEntity(accountNumber); } - public AccountEntity(ActorContext context, String accountNumber) { - super(new PersistenceId(accountNumber)); + public AccountEntity(String accountNumber) { + super(ENTITY_TYPE_KEY, accountNumber); } @Override diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java similarity index 91% rename from akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java rename to akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java index c2d531251f..75a219db91 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java @@ -2,38 +2,38 @@ * Copyright (C) 2018-2019 Lightbend Inc. */ -package jdocs.akka.persistence.typed; +package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; -import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.ActorContext; -import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; import akka.persistence.typed.ExpectingReply; -import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; -import akka.persistence.typed.javadsl.ReplyEffect; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventHandlerBuilder; -import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies; +import akka.persistence.typed.javadsl.ReplyEffect; import java.math.BigDecimal; /** * Bank account example illustrating: - different state classes representing the lifecycle of the * account - event handlers that delegate to methods in the state classes - command handlers that - * delegate to methods in the EventSourcedBehavior class - replies of various types, using - * ExpectingReply and EventSourcedBehaviorWithEnforcedReplies + * delegate to methods in the class - replies of various types, using ExpectingReply and + * EventSourcedEntityWithEnforcedReplies */ public interface AccountExampleWithEventHandlersInState { // #account-entity // #withEnforcedReplies public class AccountEntity - extends EventSourcedBehaviorWithEnforcedReplies< + extends EventSourcedEntityWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { // #withEnforcedReplies + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(AccountCommand.class, "Account"); + // Command // #reply-command interface AccountCommand extends ExpectingReply {} @@ -195,12 +195,12 @@ public interface AccountExampleWithEventHandlersInState { public static class ClosedAccount implements Account {} - public static Behavior behavior(String accountNumber) { - return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); + public static AccountEntity create(String accountNumber) { + return new AccountEntity(accountNumber); } - public AccountEntity(ActorContext context, String accountNumber) { - super(new PersistenceId(accountNumber)); + public AccountEntity(String accountNumber) { + super(ENTITY_TYPE_KEY, accountNumber); } @Override diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithMutableState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java similarity index 92% rename from akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithMutableState.java rename to akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java index 523fe2b518..c0cd2f5d22 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithMutableState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java @@ -2,19 +2,19 @@ * Copyright (C) 2018-2019 Lightbend Inc. */ -package jdocs.akka.persistence.typed; +package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; -import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventHandlerBuilder; -import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies; import akka.persistence.typed.javadsl.ReplyEffect; import java.math.BigDecimal; @@ -23,15 +23,18 @@ import java.math.BigDecimal; * Bank account example illustrating: - different state classes representing the lifecycle of the * account - mutable state - event handlers that delegate to methods in the state classes - command * handlers that delegate to methods in the EventSourcedBehavior class - replies of various types, - * using ExpectingReply and EventSourcedBehaviorWithEnforcedReplies + * using ExpectingReply and EventSourcedEntityWithEnforcedReplies */ public interface AccountExampleWithMutableState { // #account-entity public class AccountEntity - extends EventSourcedBehaviorWithEnforcedReplies< + extends EventSourcedEntityWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(AccountCommand.class, "Account"); + // Command interface AccountCommand extends ExpectingReply {} @@ -189,12 +192,12 @@ public interface AccountExampleWithMutableState { public static class ClosedAccount implements Account {} - public static Behavior behavior(String accountNumber) { - return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); + public static AccountEntity create(String accountNumber) { + return new AccountEntity(accountNumber); } - public AccountEntity(ActorContext context, String accountNumber) { - super(new PersistenceId(accountNumber)); + public AccountEntity(String accountNumber) { + super(ENTITY_TYPE_KEY, accountNumber); } @Override diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithNullState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java similarity index 92% rename from akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithNullState.java rename to akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java index d443036860..45253f4d8f 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithNullState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java @@ -2,12 +2,14 @@ * Copyright (C) 2018-2019 Lightbend Inc. */ -package jdocs.akka.persistence.typed; +package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.CommandHandlerWithReply; @@ -23,15 +25,18 @@ import java.math.BigDecimal; * Bank account example illustrating: - different state classes representing the lifecycle of the * account - null as emptyState - event handlers that delegate to methods in the state classes - * command handlers that delegate to methods in the EventSourcedBehavior class - replies of various - * types, using ExpectingReply and EventSourcedBehaviorWithEnforcedReplies + * types, using ExpectingReply and EventSourcedEntityWithEnforcedReplies */ public interface AccountExampleWithNullState { // #account-entity public class AccountEntity - extends EventSourcedBehaviorWithEnforcedReplies< + extends EventSourcedEntityWithEnforcedReplies< AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> { + public static final EntityTypeKey ENTITY_TYPE_KEY = + EntityTypeKey.create(AccountCommand.class, "Account"); + // Command interface AccountCommand extends ExpectingReply {} @@ -187,12 +192,12 @@ public interface AccountExampleWithNullState { public static class ClosedAccount implements Account {} - public static Behavior behavior(String accountNumber) { - return Behaviors.setup(context -> new AccountEntity(context, accountNumber)); + public static AccountEntity create(String accountNumber) { + return new AccountEntity(accountNumber); } - public AccountEntity(ActorContext context, String accountNumber) { - super(new PersistenceId(accountNumber)); + public AccountEntity(String accountNumber) { + super(ENTITY_TYPE_KEY, accountNumber); } @Override diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala new file mode 100644 index 0000000000..730147c74e --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleSpec.scala @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2017-2019 Lightbend Inc. + */ + +package docs.akka.cluster.sharding.typed + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object AccountExampleSpec { + val config = ConfigFactory.parseString(""" + akka.actor.provider = cluster + + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) + +} + +class AccountExampleSpec extends ScalaTestWithActorTestKit(AccountExampleSpec.config) with WordSpecLike { + import AccountExampleWithEventHandlersInState.AccountEntity + import AccountExampleWithEventHandlersInState.AccountEntity._ + + private val sharding = ClusterSharding(system) + + override def beforeAll(): Unit = { + super.beforeAll() + Cluster(system).manager ! Join(Cluster(system).selfMember.address) + + sharding.init(Entity(AccountEntity.TypeKey, ctx => AccountEntity(ctx.entityId))) + } + + "Account example" must { + + "handle Deposit" in { + val probe = createTestProbe[OperationResult]() + val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "1") + ref ! CreateAccount(probe.ref) + probe.expectMessage(Confirmed) + ref ! Deposit(100, probe.ref) + probe.expectMessage(Confirmed) + ref ! Deposit(10, probe.ref) + probe.expectMessage(Confirmed) + } + + "handle Withdraw" in { + // OperationResult is the expected reply type for these commands, but it should also be + // possible to use the super type AccountCommandReply + val probe = createTestProbe[AccountCommandReply]() + val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "2") + ref ! CreateAccount(probe.ref) + probe.expectMessage(Confirmed) + ref ! Deposit(100, probe.ref) + probe.expectMessage(Confirmed) + ref ! Withdraw(10, probe.ref) + probe.expectMessage(Confirmed) + } + + "reject Withdraw overdraft" in { + val probe = createTestProbe[OperationResult]() + val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "3") + ref ! CreateAccount(probe.ref) + probe.expectMessage(Confirmed) + ref ! Deposit(100, probe.ref) + probe.expectMessage(Confirmed) + ref ! Withdraw(110, probe.ref) + probe.expectMessageType[Rejected] + } + + "handle GetBalance" in { + val opProbe = createTestProbe[OperationResult]() + val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "4") + ref ! CreateAccount(opProbe.ref) + opProbe.expectMessage(Confirmed) + ref ! Deposit(100, opProbe.ref) + opProbe.expectMessage(Confirmed) + + val getProbe = createTestProbe[CurrentBalance]() + ref ! GetBalance(getProbe.ref) + getProbe.expectMessage(CurrentBalance(100)) + } + + "be usable with ask" in { + val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "5") + val createResult: Future[OperationResult] = ref.ask(CreateAccount(_)) + createResult.futureValue should ===(Confirmed) + implicit val ec: ExecutionContext = testKit.system.executionContext + + // Errors are shown in IntelliJ Scala plugin 2019.1.6, but compiles with Scala 2.12.8. + // Ok in IntelliJ if using ref.ask[OperationResult]. + ref.ask(Deposit(100, _)).futureValue should ===(Confirmed) + ref.ask(Withdraw(10, _)).futureValue should ===(Confirmed) + ref.ask(GetBalance(_)).map(_.balance).futureValue should ===(90) + } + + } +} diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala similarity index 77% rename from akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala rename to akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala index 19d1a7c56a..a22742a15b 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala @@ -1,15 +1,15 @@ /* - * Copyright (C) 2017-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ -package docs.akka.persistence.typed +package docs.akka.cluster.sharding.typed import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity import akka.persistence.typed.ExpectingReply -import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior /** * Bank account example illustrating: @@ -20,18 +20,18 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior */ object AccountExampleWithCommandHandlersInState { - //##account-entity + //#account-entity object AccountEntity { // Command sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] - final case class CreateAccount()(override val replyTo: ActorRef[OperationResult]) + final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) + final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) + final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] - final case class CloseAccount()(override val replyTo: ActorRef[OperationResult]) + final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] + final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] // Reply @@ -79,16 +79,14 @@ object AccountExampleWithCommandHandlersInState { override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { - case c @ Deposit(amount) => - Effect.persist(Deposited(amount)).thenReply(c)(_ => Confirmed) + case c: Deposit => + Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed) - case c @ Withdraw(amount) => - if (canWithdraw(amount)) { - Effect.persist(Withdrawn(amount)).thenReply(c)(_ => Confirmed) - - } else { - Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) - } + case c: Withdraw => + if (canWithdraw(c.amount)) + Effect.persist(Withdrawn(c.amount)).thenReply(c)(_ => Confirmed) + else + Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw ${c.amount}")) case c: GetBalance => Effect.reply(c)(CurrentBalance(balance)) @@ -134,15 +132,19 @@ object AccountExampleWithCommandHandlersInState { throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } - def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { - EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account]( - PersistenceId(s"Account|$accountNumber"), + val TypeKey: EntityTypeKey[AccountCommand[_]] = + EntityTypeKey[AccountCommand[_]]("Account") + + def apply(accountNumber: String): Behavior[AccountCommand[_]] = { + EventSourcedEntity.withEnforcedReplies[AccountCommand[_], AccountEvent, Account]( + TypeKey, + accountNumber, EmptyAccount, (state, cmd) => state.applyCommand(cmd), (state, event) => state.applyEvent(event)) } } - //##account-entity + //#account-entity } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala similarity index 84% rename from akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala rename to akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala index 2564e5756c..f902bd67b9 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala @@ -1,15 +1,15 @@ /* - * Copyright (C) 2017-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ -package docs.akka.persistence.typed +package docs.akka.cluster.sharding.typed import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity import akka.persistence.typed.ExpectingReply -import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.ReplyEffect /** @@ -26,18 +26,18 @@ object AccountExampleWithEventHandlersInState { object AccountEntity { // Command //#reply-command - sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] + sealed trait AccountCommand[Reply <: AccountCommandReply] extends ExpectingReply[Reply] //#reply-command - final case class CreateAccount()(override val replyTo: ActorRef[OperationResult]) + final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) + final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] //#reply-command - final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) + final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] //#reply-command - final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] - final case class CloseAccount()(override val replyTo: ActorRef[OperationResult]) + final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] + final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] // Reply @@ -89,17 +89,16 @@ object AccountExampleWithEventHandlersInState { throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } + val TypeKey: EntityTypeKey[AccountCommand[_]] = + EntityTypeKey[AccountCommand[_]]("Account") + // Note that after defining command, event and state classes you would probably start here when writing this. // When filling in the parameters of EventSourcedBehavior.apply you can use IntelliJ alt+Enter > createValue // to generate the stub with types for the command and event handlers. //#withEnforcedReplies - def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { - EventSourcedBehavior.withEnforcedReplies( - PersistenceId(s"Account|$accountNumber"), - EmptyAccount, - commandHandler, - eventHandler) + def apply(accountNumber: String): Behavior[AccountCommand[_]] = { + EventSourcedEntity.withEnforcedReplies(TypeKey, accountNumber, EmptyAccount, commandHandler, eventHandler) } //#withEnforcedReplies @@ -148,12 +147,10 @@ object AccountExampleWithEventHandlersInState { //#reply private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = { - if (acc.canWithdraw(cmd.amount)) { + if (acc.canWithdraw(cmd.amount)) Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ => Confirmed) - - } else { + else Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) - } } //#reply diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala similarity index 79% rename from akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala rename to akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala index c5d5f1e12b..e3cea3b68b 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala @@ -1,15 +1,15 @@ /* - * Copyright (C) 2017-2019 Lightbend Inc. + * Copyright (C) 2019 Lightbend Inc. */ -package docs.akka.persistence.typed +package docs.akka.cluster.sharding.typed import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity import akka.persistence.typed.ExpectingReply -import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior /** * Bank account example illustrating: @@ -24,14 +24,14 @@ object AccountExampleWithOptionState { object AccountEntity { // Command sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] - final case class CreateAccount()(override val replyTo: ActorRef[OperationResult]) + final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) + final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult]) + final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] - final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] - final case class CloseAccount()(override val replyTo: ActorRef[OperationResult]) + final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance] + final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends AccountCommand[OperationResult] // Reply @@ -63,16 +63,14 @@ object AccountExampleWithOptionState { override def applyCommand(cmd: AccountCommand[_]): ReplyEffect = cmd match { - case c @ Deposit(amount) => - Effect.persist(Deposited(amount)).thenReply(c)(_ => Confirmed) + case c: Deposit => + Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed) - case c @ Withdraw(amount) => - if (canWithdraw(amount)) { - Effect.persist(Withdrawn(amount)).thenReply(c)(_ => Confirmed) - - } else { - Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) - } + case c: Withdraw => + if (canWithdraw(c.amount)) + Effect.persist(Withdrawn(c.amount)).thenReply(c)(_ => Confirmed) + else + Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw ${c.amount}")) case c: GetBalance => Effect.reply(c)(CurrentBalance(balance)) @@ -118,9 +116,13 @@ object AccountExampleWithOptionState { throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") } - def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = { - EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]]( - PersistenceId(s"Account|$accountNumber"), + val TypeKey: EntityTypeKey[AccountCommand[_]] = + EntityTypeKey[AccountCommand[_]]("Account") + + def apply(accountNumber: String): Behavior[AccountCommand[_]] = { + EventSourcedEntity.withEnforcedReplies[AccountCommand[_], AccountEvent, Option[Account]]( + TypeKey, + accountNumber, None, (state, cmd) => state match { diff --git a/akka-docs/src/main/paradox/typed/persistence-style.md b/akka-docs/src/main/paradox/typed/persistence-style.md index b5aa8c5459..9440346a8b 100644 --- a/akka-docs/src/main/paradox/typed/persistence-style.md +++ b/akka-docs/src/main/paradox/typed/persistence-style.md @@ -14,10 +14,10 @@ Here we are using a bank account as the example domain. It has 3 state classes t of the account; `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`. Scala -: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #account-entity } +: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #account-entity } Java -: @@snip [AccountExampleWithEventHandlersInState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #account-entity } +: @@snip [AccountExampleWithEventHandlersInState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #account-entity } @scala[Notice how the `eventHandler` delegates to the `applyEvent` in the `Account` (state), which is implemented in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.] @@ -29,10 +29,10 @@ in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.] We can take the previous bank account example one step further by handling the commands in the state too. Scala -: @@snip [AccountExampleWithCommandHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala) { #account-entity } +: @@snip [AccountExampleWithCommandHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala) { #account-entity } Java -: @@snip [AccountExampleWithCommandHandlersInState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.java) { #account-entity } +: @@snip [AccountExampleWithCommandHandlersInState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.java) { #account-entity } @scala[Notice how the command handler is delegating to `applyCommand` in the `Account` (state), which is implemented in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.] @@ -52,10 +52,10 @@ illustrates using `null` as the `emptyState`.] is then used in command and event handlers at the outer layer before delegating to the state or other methods.] Scala -: @@snip [AccountExampleWithOptionState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala) { #account-entity } +: @@snip [AccountExampleWithOptionState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala) { #account-entity } Java -: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithNullState.java) { #account-entity } +: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java) { #account-entity } @@@ div { .group-java } ## Mutable state @@ -69,6 +69,6 @@ e.g. as a reply to a command. Messages must be immutable to avoid concurrency pr The above examples are using immutable state classes and below is corresponding example with mutable state. Java -: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithMutableState.java) { #account-entity } +: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java) { #account-entity } @@@ diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 1da871e4ba..a78a1e598b 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -285,19 +285,19 @@ created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noRe @scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`]. Scala -: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #withEnforcedReplies } +: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #withEnforcedReplies } Java -: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #withEnforcedReplies } +: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #withEnforcedReplies } The commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`] in a standardized way. Scala -: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #reply-command } +: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #reply-command } Java -: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #reply-command } +: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #reply-command } The `ReplyEffect` is created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`], @scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`]. @@ -306,10 +306,10 @@ The `ReplyEffect` is created with @scala[`Effect.reply`]@java[`Effects().reply`] `EventSourcedBehaviorWithEnforcedReplies`], as opposed to newCommandHandlerBuilder when using `EventSourcedBehavior`.] Scala -: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #reply } +: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #reply } Java -: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #reply } +: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #reply } These effects will send the reply message even when @scala[`EventSourcedBehavior.withEnforcedReplies`]@java[`EventSourcedBehaviorWithEnforcedReplies`] is not used, but then there will be no compilation errors if the reply decision is left out.