Merge pull request #26813 from akka/wip-26709-ask-ExpectingReply-patriknw

Type inference for EntityRef.ask, #26709
This commit is contained in:
Patrik Nordwall 2019-06-14 11:18:26 +02:00 committed by GitHub
commit beaa80b773
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 435 additions and 139 deletions

View file

@ -18,9 +18,6 @@ import org.junit.AfterClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.util.Success;
import scala.util.Try;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;
@ -82,9 +79,9 @@ public class AsyncTestingExampleTest
static class Message {
int i;
ActorRef<Try<Integer>> replyTo;
ActorRef<Integer> replyTo;
Message(int i, ActorRef<Try<Integer>> replyTo) {
Message(int i, ActorRef<Integer> replyTo) {
this.i = i;
this.replyTo = replyTo;
}
@ -104,10 +101,10 @@ public class AsyncTestingExampleTest
IntStream.range(0, messages).forEach(this::publish);
}
private CompletionStage<Try<Integer>> publish(int i) {
private CompletionStage<Integer> publish(int i) {
return AskPattern.ask(
publisher,
(ActorRef<Try<Integer>> ref) -> new Message(i, ref),
(ActorRef<Integer> ref) -> new Message(i, ref),
Duration.ofSeconds(3),
scheduler);
}
@ -165,7 +162,7 @@ public class AsyncTestingExampleTest
Behavior<Message> mockedBehavior =
Behaviors.receiveMessage(
message -> {
message.replyTo.tell(new Success<>(message.i));
message.replyTo.tell(message.i);
return Behaviors.same();
});
TestProbe<Message> probe = testKit.createTestProbe();

View file

@ -8,6 +8,7 @@ import akka.actor.typed.*;
import akka.actor.typed.TypedActorContext;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import static akka.actor.typed.javadsl.Behaviors.*;
@ -67,6 +68,21 @@ public class ActorCompile {
ActorSystem<MyMsg> system = ActorSystem.create(actor1, "Sys");
{
ActorRef<MyMsg> recipient = null;
CompletionStage<String> reply =
AskPattern.ask(
recipient, replyTo -> new MyMsgA(replyTo), Duration.ofSeconds(3), system.scheduler());
AskPattern.ask(
recipient,
(ActorRef<String> replyTo) -> new MyMsgA(replyTo),
Duration.ofSeconds(3),
system.scheduler())
.thenApply(rsp -> rsp.toUpperCase());
}
{
Behaviors.<MyMsg>receive(
(context, message) -> {

View file

@ -30,10 +30,15 @@ import scala.compat.java8.FutureConverters._
*
*/
object AskPattern {
def ask[T, U](
actor: RecipientRef[T],
message: JFunction[ActorRef[U], T],
/**
* @tparam Req The request protocol, what the other actor accepts
* @tparam Res The response protocol, what the other actor sends back
*/
def ask[Req, Res](
actor: RecipientRef[Req],
message: JFunction[ActorRef[Res], Req],
timeout: Duration,
scheduler: Scheduler): CompletionStage[U] =
scheduler: Scheduler): CompletionStage[Res] =
(actor.ask(message.apply)(timeout.asScala, scheduler)).toJava
}

View file

@ -27,8 +27,10 @@ object AskPattern {
/**
* See [[ask]]
*
* @tparam Req The request protocol, what the other actor accepts
*/
implicit final class Askable[T](val ref: RecipientRef[T]) extends AnyVal {
implicit final class Askable[Req](val ref: RecipientRef[Req]) extends AnyVal {
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
@ -54,13 +56,15 @@ object AskPattern {
* implicit val scheduler = system.scheduler
* implicit val timeout = Timeout(3.seconds)
* val target: ActorRef[Request] = ...
* val f: Future[Reply] = target ? (replyTo => (Request("hello", replyTo)))
* val f: Future[Reply] = target ? (replyTo => Request("hello", replyTo))
* }}}
*
* Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for
* the `ActorRef`.
* the `replyTo: ActorRef`.
*
* @tparam Res The response protocol, what the other actor sends back
*/
def ?[U](replyTo: ActorRef[U] => T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
def ?[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] = {
ask(replyTo)(timeout, scheduler)
}
@ -85,15 +89,15 @@ object AskPattern {
* implicit val scheduler = system.scheduler
* implicit val timeout = Timeout(3.seconds)
* val target: ActorRef[Request] = ...
* val f: Future[Reply] = target.ask(replyTo => (Request("hello", replyTo)))
* val f: Future[Reply] = target.ask(replyTo => Request("hello", replyTo))
* // alternatively
* val f2: Future[Reply] = target.ask(Request("hello", _))
* // note that the explicit type on f2 is important for the compiler
* // to understand the type of the wildcard
* }}}
*
* @tparam Res The response protocol, what the other actor sends back
*/
@silent
def ask[U](replyTo: ActorRef[U] => T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
def ask[Res](replyTo: ActorRef[Res] => Req)(implicit timeout: Timeout, scheduler: Scheduler): Future[Res] = {
// We do not currently use the implicit sched, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219
ref match {

View file

@ -468,8 +468,10 @@ object EntityTypeKey {
*
* Note that if you are inside of an actor you should prefer [[akka.actor.typed.javadsl.ActorContext.ask]]
* as that provides better safety.
*
* @tparam Res The response protocol, what the other actor sends back
*/
def ask[U](message: JFunction[ActorRef[U], M], timeout: Duration): CompletionStage[U]
def ask[Res](message: JFunction[ActorRef[Res], M], timeout: Duration): CompletionStage[Res]
/**
* INTERNAL API

View file

@ -406,8 +406,10 @@ object EntityTypeKey {
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern.
*
* @tparam Res The response protocol, what the other actor sends back
*/
def ask[U](f: ActorRef[U] => M)(implicit timeout: Timeout): Future[U]
def ask[Res](f: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res]
/**
* Allows to "ask" the [[EntityRef]] for a reply.
@ -423,12 +425,17 @@ object EntityTypeKey {
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* val f: Future[Reply] = target ? (replyTo => Request("hello", replyTo))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] must be available to use this pattern.
*
* Note: it is preferrable to use the non-symbolic ask method as it easier allows for wildcards for
* the `replyTo: ActorRef`.
*
* @tparam Res The response protocol, what the other actor sends back
*/
def ?[U](message: ActorRef[U] => M)(implicit timeout: Timeout): Future[U] =
def ?[Res](message: ActorRef[Res] => M)(implicit timeout: Timeout): Future[Res] =
this.ask(message)(timeout)
}

View file

@ -0,0 +1,145 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Join;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity.*;
import static org.junit.Assert.assertEquals;
public class AccountExampleTest extends JUnitSuite {
public static final Config config =
ConfigFactory.parseString(
"akka.actor.provider = cluster \n"
+ "akka.remote.netty.tcp.port = 0 \n"
+ "akka.remote.artery.canonical.port = 0 \n"
+ "akka.remote.artery.canonical.hostname = 127.0.0.1 \n"
+ "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
private ClusterSharding _sharding = null;
private ClusterSharding sharding() {
if (_sharding == null) {
// initialize first time only
Cluster cluster = Cluster.get(testKit.system());
cluster.manager().tell(new Join(cluster.selfMember().address()));
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.init(
Entity.ofEventSourcedEntityWithEnforcedReplies(
AccountEntity.ENTITY_TYPE_KEY, ctx -> AccountEntity.create(ctx.getEntityId())));
_sharding = sharding;
}
return _sharding;
}
@Test
public void handleDeposit() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "1");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Deposit(BigDecimal.valueOf(10), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
}
@Test
public void handleWithdraw() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "2");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Withdraw(BigDecimal.valueOf(10), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
}
@Test
public void rejectWithdrawOverdraft() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "3");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Withdraw(BigDecimal.valueOf(110), probe.getRef()));
probe.expectMessageClass(Rejected.class);
}
@Test
public void handleGetBalance() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "4");
TestProbe<OperationResult> opProbe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(opProbe.getRef()));
opProbe.expectMessage(Confirmed.INSTANCE);
ref.tell(new Deposit(BigDecimal.valueOf(100), opProbe.getRef()));
opProbe.expectMessage(Confirmed.INSTANCE);
TestProbe<CurrentBalance> getProbe = testKit.createTestProbe(CurrentBalance.class);
ref.tell(new GetBalance(getProbe.getRef()));
assertEquals(
BigDecimal.valueOf(100), getProbe.expectMessageClass(CurrentBalance.class).balance);
}
@Test
public void beUsableWithAsk() throws Exception {
Duration timeout = Duration.ofSeconds(3);
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "5");
CompletionStage<OperationResult> createResult = ref.ask(CreateAccount::new, timeout);
assertEquals(Confirmed.INSTANCE, createResult.toCompletableFuture().get(3, TimeUnit.SECONDS));
// above works because then the response type is inferred by the lhs type
// below requires (ActorRef<OperationResult> replyTo)
assertEquals(
Confirmed.INSTANCE,
ref.ask(
(ActorRef<OperationResult> replyTo) ->
new Deposit(BigDecimal.valueOf(100), replyTo),
timeout)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
assertEquals(
Confirmed.INSTANCE,
ref.ask(
(ActorRef<OperationResult> replyTo) ->
new Withdraw(BigDecimal.valueOf(10), replyTo),
timeout)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS));
BigDecimal balance =
ref.ask(GetBalance::new, timeout)
.thenApply(currentBalance -> currentBalance.balance)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
assertEquals(BigDecimal.valueOf(90), balance);
}
}

View file

@ -2,19 +2,19 @@
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import java.math.BigDecimal;
@ -29,9 +29,12 @@ public interface AccountExampleWithCommandHandlersInState {
// #account-entity
public class AccountEntity
extends EventSourcedBehaviorWithEnforcedReplies<
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
// Command
interface AccountCommand<Reply> extends ExpectingReply<Reply> {}
@ -226,12 +229,12 @@ public interface AccountExampleWithCommandHandlersInState {
public static class ClosedAccount implements Account {}
public static Behavior<AccountCommand> behavior(String accountNumber) {
return Behaviors.setup(context -> new AccountEntity(context, accountNumber));
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
}
public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) {
super(new PersistenceId(accountNumber));
public AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@Override

View file

@ -2,38 +2,38 @@
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.ReplyEffect;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import java.math.BigDecimal;
/**
* Bank account example illustrating: - different state classes representing the lifecycle of the
* account - event handlers that delegate to methods in the state classes - command handlers that
* delegate to methods in the EventSourcedBehavior class - replies of various types, using
* ExpectingReply and EventSourcedBehaviorWithEnforcedReplies
* delegate to methods in the class - replies of various types, using ExpectingReply and
* EventSourcedEntityWithEnforcedReplies
*/
public interface AccountExampleWithEventHandlersInState {
// #account-entity
// #withEnforcedReplies
public class AccountEntity
extends EventSourcedBehaviorWithEnforcedReplies<
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
// #withEnforcedReplies
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
// Command
// #reply-command
interface AccountCommand<Reply> extends ExpectingReply<Reply> {}
@ -195,12 +195,12 @@ public interface AccountExampleWithEventHandlersInState {
public static class ClosedAccount implements Account {}
public static Behavior<AccountCommand> behavior(String accountNumber) {
return Behaviors.setup(context -> new AccountEntity(context, accountNumber));
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
}
public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) {
super(new PersistenceId(accountNumber));
public AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@Override

View file

@ -2,19 +2,19 @@
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import java.math.BigDecimal;
@ -23,15 +23,18 @@ import java.math.BigDecimal;
* Bank account example illustrating: - different state classes representing the lifecycle of the
* account - mutable state - event handlers that delegate to methods in the state classes - command
* handlers that delegate to methods in the EventSourcedBehavior class - replies of various types,
* using ExpectingReply and EventSourcedBehaviorWithEnforcedReplies
* using ExpectingReply and EventSourcedEntityWithEnforcedReplies
*/
public interface AccountExampleWithMutableState {
// #account-entity
public class AccountEntity
extends EventSourcedBehaviorWithEnforcedReplies<
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
// Command
interface AccountCommand<Reply> extends ExpectingReply<Reply> {}
@ -189,12 +192,12 @@ public interface AccountExampleWithMutableState {
public static class ClosedAccount implements Account {}
public static Behavior<AccountCommand> behavior(String accountNumber) {
return Behaviors.setup(context -> new AccountEntity(context, accountNumber));
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
}
public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) {
super(new PersistenceId(accountNumber));
public AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@Override

View file

@ -2,12 +2,14 @@
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
@ -23,15 +25,18 @@ import java.math.BigDecimal;
* Bank account example illustrating: - different state classes representing the lifecycle of the
* account - null as emptyState - event handlers that delegate to methods in the state classes -
* command handlers that delegate to methods in the EventSourcedBehavior class - replies of various
* types, using ExpectingReply and EventSourcedBehaviorWithEnforcedReplies
* types, using ExpectingReply and EventSourcedEntityWithEnforcedReplies
*/
public interface AccountExampleWithNullState {
// #account-entity
public class AccountEntity
extends EventSourcedBehaviorWithEnforcedReplies<
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
// Command
interface AccountCommand<Reply> extends ExpectingReply<Reply> {}
@ -187,12 +192,12 @@ public interface AccountExampleWithNullState {
public static class ClosedAccount implements Account {}
public static Behavior<AccountCommand> behavior(String accountNumber) {
return Behaviors.setup(context -> new AccountEntity(context, accountNumber));
public static AccountEntity create(String accountNumber) {
return new AccountEntity(accountNumber);
}
public AccountEntity(ActorContext<AccountCommand> context, String accountNumber) {
super(new PersistenceId(accountNumber));
public AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@Override

View file

@ -0,0 +1,108 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object AccountExampleSpec {
val config = ConfigFactory.parseString("""
akka.actor.provider = cluster
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
}
class AccountExampleSpec extends ScalaTestWithActorTestKit(AccountExampleSpec.config) with WordSpecLike {
import AccountExampleWithEventHandlersInState.AccountEntity
import AccountExampleWithEventHandlersInState.AccountEntity._
private val sharding = ClusterSharding(system)
override def beforeAll(): Unit = {
super.beforeAll()
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
sharding.init(Entity(AccountEntity.TypeKey, ctx => AccountEntity(ctx.entityId)))
}
"Account example" must {
"handle Deposit" in {
val probe = createTestProbe[OperationResult]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "1")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
ref ! Deposit(100, probe.ref)
probe.expectMessage(Confirmed)
ref ! Deposit(10, probe.ref)
probe.expectMessage(Confirmed)
}
"handle Withdraw" in {
// OperationResult is the expected reply type for these commands, but it should also be
// possible to use the super type AccountCommandReply
val probe = createTestProbe[AccountCommandReply]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "2")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
ref ! Deposit(100, probe.ref)
probe.expectMessage(Confirmed)
ref ! Withdraw(10, probe.ref)
probe.expectMessage(Confirmed)
}
"reject Withdraw overdraft" in {
val probe = createTestProbe[OperationResult]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "3")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
ref ! Deposit(100, probe.ref)
probe.expectMessage(Confirmed)
ref ! Withdraw(110, probe.ref)
probe.expectMessageType[Rejected]
}
"handle GetBalance" in {
val opProbe = createTestProbe[OperationResult]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "4")
ref ! CreateAccount(opProbe.ref)
opProbe.expectMessage(Confirmed)
ref ! Deposit(100, opProbe.ref)
opProbe.expectMessage(Confirmed)
val getProbe = createTestProbe[CurrentBalance]()
ref ! GetBalance(getProbe.ref)
getProbe.expectMessage(CurrentBalance(100))
}
"be usable with ask" in {
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "5")
val createResult: Future[OperationResult] = ref.ask(CreateAccount(_))
createResult.futureValue should ===(Confirmed)
implicit val ec: ExecutionContext = testKit.system.executionContext
// Errors are shown in IntelliJ Scala plugin 2019.1.6, but compiles with Scala 2.12.8.
// Ok in IntelliJ if using ref.ask[OperationResult].
ref.ask(Deposit(100, _)).futureValue should ===(Confirmed)
ref.ask(Withdraw(10, _)).futureValue should ===(Confirmed)
ref.ask(GetBalance(_)).map(_.balance).futureValue should ===(90)
}
}
}

View file

@ -1,15 +1,15 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
/**
* Bank account example illustrating:
@ -20,18 +20,18 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior
*/
object AccountExampleWithCommandHandlersInState {
//##account-entity
//#account-entity
object AccountEntity {
// Command
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
final case class CreateAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
// Reply
@ -79,16 +79,14 @@ object AccountExampleWithCommandHandlersInState {
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
cmd match {
case c @ Deposit(amount) =>
Effect.persist(Deposited(amount)).thenReply(c)(_ => Confirmed)
case c: Deposit =>
Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed)
case c @ Withdraw(amount) =>
if (canWithdraw(amount)) {
Effect.persist(Withdrawn(amount)).thenReply(c)(_ => Confirmed)
} else {
Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
}
case c: Withdraw =>
if (canWithdraw(c.amount))
Effect.persist(Withdrawn(c.amount)).thenReply(c)(_ => Confirmed)
else
Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw ${c.amount}"))
case c: GetBalance =>
Effect.reply(c)(CurrentBalance(balance))
@ -134,15 +132,19 @@ object AccountExampleWithCommandHandlersInState {
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account](
PersistenceId(s"Account|$accountNumber"),
val TypeKey: EntityTypeKey[AccountCommand[_]] =
EntityTypeKey[AccountCommand[_]]("Account")
def apply(accountNumber: String): Behavior[AccountCommand[_]] = {
EventSourcedEntity.withEnforcedReplies[AccountCommand[_], AccountEvent, Account](
TypeKey,
accountNumber,
EmptyAccount,
(state, cmd) => state.applyCommand(cmd),
(state, event) => state.applyEvent(event))
}
}
//##account-entity
//#account-entity
}

View file

@ -1,15 +1,15 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplyEffect
/**
@ -26,18 +26,18 @@ object AccountExampleWithEventHandlersInState {
object AccountEntity {
// Command
//#reply-command
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
sealed trait AccountCommand[Reply <: AccountCommandReply] extends ExpectingReply[Reply]
//#reply-command
final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
final case class CreateAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
//#reply-command
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
//#reply-command
final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
// Reply
@ -89,17 +89,16 @@ object AccountExampleWithEventHandlersInState {
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
val TypeKey: EntityTypeKey[AccountCommand[_]] =
EntityTypeKey[AccountCommand[_]]("Account")
// Note that after defining command, event and state classes you would probably start here when writing this.
// When filling in the parameters of EventSourcedBehavior.apply you can use IntelliJ alt+Enter > createValue
// to generate the stub with types for the command and event handlers.
//#withEnforcedReplies
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
EventSourcedBehavior.withEnforcedReplies(
PersistenceId(s"Account|$accountNumber"),
EmptyAccount,
commandHandler,
eventHandler)
def apply(accountNumber: String): Behavior[AccountCommand[_]] = {
EventSourcedEntity.withEnforcedReplies(TypeKey, accountNumber, EmptyAccount, commandHandler, eventHandler)
}
//#withEnforcedReplies
@ -148,12 +147,10 @@ object AccountExampleWithEventHandlersInState {
//#reply
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = {
if (acc.canWithdraw(cmd.amount)) {
if (acc.canWithdraw(cmd.amount))
Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ => Confirmed)
} else {
else
Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
}
}
//#reply

View file

@ -1,15 +1,15 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
package docs.akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
/**
* Bank account example illustrating:
@ -24,14 +24,14 @@ object AccountExampleWithOptionState {
object AccountEntity {
// Command
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
final case class CreateAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
// Reply
@ -63,16 +63,14 @@ object AccountExampleWithOptionState {
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
cmd match {
case c @ Deposit(amount) =>
Effect.persist(Deposited(amount)).thenReply(c)(_ => Confirmed)
case c: Deposit =>
Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed)
case c @ Withdraw(amount) =>
if (canWithdraw(amount)) {
Effect.persist(Withdrawn(amount)).thenReply(c)(_ => Confirmed)
} else {
Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
}
case c: Withdraw =>
if (canWithdraw(c.amount))
Effect.persist(Withdrawn(c.amount)).thenReply(c)(_ => Confirmed)
else
Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw ${c.amount}"))
case c: GetBalance =>
Effect.reply(c)(CurrentBalance(balance))
@ -118,9 +116,13 @@ object AccountExampleWithOptionState {
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
EventSourcedBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]](
PersistenceId(s"Account|$accountNumber"),
val TypeKey: EntityTypeKey[AccountCommand[_]] =
EntityTypeKey[AccountCommand[_]]("Account")
def apply(accountNumber: String): Behavior[AccountCommand[_]] = {
EventSourcedEntity.withEnforcedReplies[AccountCommand[_], AccountEvent, Option[Account]](
TypeKey,
accountNumber,
None,
(state, cmd) =>
state match {

View file

@ -14,10 +14,10 @@ Here we are using a bank account as the example domain. It has 3 state classes t
of the account; `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.
Scala
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #account-entity }
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #account-entity }
Java
: @@snip [AccountExampleWithEventHandlersInState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #account-entity }
: @@snip [AccountExampleWithEventHandlersInState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #account-entity }
@scala[Notice how the `eventHandler` delegates to the `applyEvent` in the `Account` (state), which is implemented
in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.]
@ -29,10 +29,10 @@ in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.]
We can take the previous bank account example one step further by handling the commands in the state too.
Scala
: @@snip [AccountExampleWithCommandHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala) { #account-entity }
: @@snip [AccountExampleWithCommandHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala) { #account-entity }
Java
: @@snip [AccountExampleWithCommandHandlersInState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.java) { #account-entity }
: @@snip [AccountExampleWithCommandHandlersInState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.java) { #account-entity }
@scala[Notice how the command handler is delegating to `applyCommand` in the `Account` (state), which is implemented
in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.]
@ -52,10 +52,10 @@ illustrates using `null` as the `emptyState`.]
is then used in command and event handlers at the outer layer before delegating to the state or other methods.]
Scala
: @@snip [AccountExampleWithOptionState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala) { #account-entity }
: @@snip [AccountExampleWithOptionState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala) { #account-entity }
Java
: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithNullState.java) { #account-entity }
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java) { #account-entity }
@@@ div { .group-java }
## Mutable state
@ -69,6 +69,6 @@ e.g. as a reply to a command. Messages must be immutable to avoid concurrency pr
The above examples are using immutable state classes and below is corresponding example with mutable state.
Java
: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithMutableState.java) { #account-entity }
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java) { #account-entity }
@@@

View file

@ -285,19 +285,19 @@ created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noRe
@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`].
Scala
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #withEnforcedReplies }
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #withEnforcedReplies }
Java
: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #withEnforcedReplies }
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #withEnforcedReplies }
The commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`]
in a standardized way.
Scala
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #reply-command }
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #reply-command }
Java
: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #reply-command }
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #reply-command }
The `ReplyEffect` is created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`],
@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`].
@ -306,10 +306,10 @@ The `ReplyEffect` is created with @scala[`Effect.reply`]@java[`Effects().reply`]
`EventSourcedBehaviorWithEnforcedReplies`], as opposed to newCommandHandlerBuilder when using `EventSourcedBehavior`.]
Scala
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #reply }
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #reply }
Java
: @@snip [AccountExampleWithNullState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/AccountExampleWithEventHandlersInState.java) { #reply }
: @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #reply }
These effects will send the reply message even when @scala[`EventSourcedBehavior.withEnforcedReplies`]@java[`EventSourcedBehaviorWithEnforcedReplies`]
is not used, but then there will be no compilation errors if the reply decision is left out.