diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala index 5df7fa6b26..a4d9ebe248 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/scaladsl/EventSourcedEntity.scala @@ -3,7 +3,6 @@ */ package akka.cluster.sharding.typed.scaladsl -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplyEffect } object EventSourcedEntity { @@ -40,7 +39,7 @@ object EventSourcedEntity { * automatically from the [[EntityTypeKey]] and `entityId` constructor parameters by using * [[EntityTypeKey.persistenceIdFrom]]. */ - def withEnforcedReplies[Command <: ExpectingReply[_], Event, State]( + def withEnforcedReplies[Command, Event, State]( entityTypeKey: EntityTypeKey[Command], entityId: String, emptyState: State, diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java index 8c1f461278..1164786148 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ClusterShardingPersistenceTest.java @@ -11,7 +11,6 @@ import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.ActorRef; import akka.cluster.typed.Cluster; import akka.cluster.typed.Join; -import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.Effect; import akka.persistence.typed.javadsl.EventHandler; @@ -50,19 +49,14 @@ public class ClusterShardingPersistenceTest extends JUnitSuite { } } - static class AddWithConfirmation implements Command, ExpectingReply { - final String s; - private final ActorRef replyTo; + static class AddWithConfirmation implements Command { + public final String s; + public final ActorRef replyTo; AddWithConfirmation(String s, ActorRef replyTo) { this.s = s; this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } static class Get implements Command { @@ -102,7 +96,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite { } private Effect addWithConfirmation(String state, AddWithConfirmation cmd) { - return Effect().persist(cmd.s).thenReply(cmd, newState -> Done.getInstance()); + return Effect().persist(cmd.s).thenReply(cmd.replyTo, newState -> Done.getInstance()); } private Effect getState(String state, Get cmd) { diff --git a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java index c36c031dc6..15fe52e6f6 100644 --- a/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/akka/cluster/sharding/typed/javadsl/ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest.java @@ -9,7 +9,6 @@ import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.typed.ActorRef; import akka.cluster.typed.Cluster; import akka.cluster.typed.Join; -import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.javadsl.*; import org.junit.ClassRule; import org.junit.Rule; @@ -20,20 +19,15 @@ public class ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest { @Rule public final LogCapturing logCapturing = new LogCapturing(); - interface Command extends ExpectingReply {} + interface Command {} static class Append implements Command { public final String s; - private final ActorRef replyToRef; + public final ActorRef replyTo; Append(String s, ActorRef replyTo) { this.s = s; - this.replyToRef = replyTo; - } - - @Override - public ActorRef replyTo() { - return replyToRef; + this.replyTo = replyTo; } } @@ -61,7 +55,7 @@ public class ShardingEventSourcedEntityWithEnforcedRepliesCompileOnlyTest { } private ReplyEffect add(String state, Append cmd) { - return Effect().persist(cmd.s).thenReply(cmd, s -> "Ok"); + return Effect().persist(cmd.s).thenReply(cmd.replyTo, s -> "Ok"); } @Override diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java index 8ebc18ae1b..ee140f3780 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleTest.java @@ -157,7 +157,7 @@ public class AccountExampleTest extends JUnitSuite { .serializationTestKit() .verifySerialization(new Deposit(BigDecimal.valueOf(100), opProbe.getRef()), false); assertEquals(BigDecimal.valueOf(100), deposit2.amount); - assertEquals(opProbe.getRef(), deposit2.replyTo()); + assertEquals(opProbe.getRef(), deposit2.replyTo); testKit .serializationTestKit() .verifySerialization(new Withdraw(BigDecimal.valueOf(90), opProbe.getRef()), false); diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java index b9018aebd8..0b6e1a3883 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java @@ -7,7 +7,6 @@ package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; -import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -21,7 +20,7 @@ import java.math.BigDecimal; /** * Bank account example illustrating: - different state classes representing the lifecycle of the * account - event handlers that delegate to methods in the state classes - command handlers that - * delegate to methods in the class - replies of various types, using ExpectingReply and + * delegate to methods in the class - replies of various types, using * EventSourcedEntityWithEnforcedReplies */ public interface AccountExampleWithEventHandlersInState { @@ -38,79 +37,54 @@ public interface AccountExampleWithEventHandlersInState { // Command // #reply-command - interface Command extends ExpectingReply, CborSerializable {} + interface Command extends CborSerializable {} // #reply-command public static class CreateAccount implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public CreateAccount(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class Deposit implements Command { public final BigDecimal amount; - private final ActorRef replyTo; + public final ActorRef replyTo; public Deposit(BigDecimal amount, ActorRef replyTo) { this.replyTo = replyTo; this.amount = amount; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class Withdraw implements Command { public final BigDecimal amount; - private final ActorRef replyTo; + public final ActorRef replyTo; public Withdraw(BigDecimal amount, ActorRef replyTo) { this.amount = amount; this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class GetBalance implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public GetBalance(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class CloseAccount implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public CloseAccount(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } // Reply @@ -242,39 +216,40 @@ public interface AccountExampleWithEventHandlersInState { private ReplyEffect createAccount(EmptyAccount account, CreateAccount command) { return Effect() .persist(new AccountCreated()) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } private ReplyEffect deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } // #reply private ReplyEffect withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() - .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); + .reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } } // #reply private ReplyEffect getBalance(OpenedAccount account, GetBalance command) { - return Effect().reply(command, new CurrentBalance(account.balance)); + return Effect().reply(command.replyTo, new CurrentBalance(account.balance)); } private ReplyEffect closeAccount(OpenedAccount account, CloseAccount command) { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } else { - return Effect().reply(command, new Rejected("balance must be zero for closing account")); + return Effect() + .reply(command.replyTo, new Rejected("balance must be zero for closing account")); } } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java index 7212b7773f..7b059ed351 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithMutableState.java @@ -7,7 +7,6 @@ package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; -import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -22,7 +21,7 @@ import java.math.BigDecimal; * Bank account example illustrating: - different state classes representing the lifecycle of the * account - mutable state - event handlers that delegate to methods in the state classes - command * handlers that delegate to methods in the EventSourcedBehavior class - replies of various types, - * using ExpectingReply and EventSourcedEntityWithEnforcedReplies + * using EventSourcedEntityWithEnforcedReplies */ public interface AccountExampleWithMutableState { @@ -35,78 +34,53 @@ public interface AccountExampleWithMutableState { EntityTypeKey.create(Command.class, "Account"); // Command - interface Command extends ExpectingReply, CborSerializable {} + interface Command extends CborSerializable {} public static class CreateAccount implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public CreateAccount(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class Deposit implements Command { public final BigDecimal amount; - private final ActorRef replyTo; + public final ActorRef replyTo; public Deposit(BigDecimal amount, ActorRef replyTo) { this.replyTo = replyTo; this.amount = amount; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class Withdraw implements Command { public final BigDecimal amount; - private final ActorRef replyTo; + public final ActorRef replyTo; public Withdraw(BigDecimal amount, ActorRef replyTo) { this.amount = amount; this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class GetBalance implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public GetBalance(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class CloseAccount implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public CloseAccount(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } // Reply @@ -235,37 +209,38 @@ public interface AccountExampleWithMutableState { private ReplyEffect createAccount(EmptyAccount account, CreateAccount command) { return Effect() .persist(new AccountCreated()) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } private ReplyEffect deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } private ReplyEffect withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() - .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); + .reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } } private ReplyEffect getBalance(OpenedAccount account, GetBalance command) { - return Effect().reply(command, new CurrentBalance(account.balance)); + return Effect().reply(command.replyTo, new CurrentBalance(account.balance)); } private ReplyEffect closeAccount(OpenedAccount account, CloseAccount command) { if (account.getBalance().equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } else { - return Effect().reply(command, new Rejected("balance must be zero for closing account")); + return Effect() + .reply(command.replyTo, new Rejected("balance must be zero for closing account")); } } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java index aed0ded528..2dfc9f991a 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullState.java @@ -7,7 +7,6 @@ package jdocs.akka.cluster.sharding.typed; import akka.actor.typed.ActorRef; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies; -import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.javadsl.CommandHandlerWithReply; import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder; import akka.persistence.typed.javadsl.EventHandler; @@ -22,7 +21,7 @@ import java.math.BigDecimal; * Bank account example illustrating: - different state classes representing the lifecycle of the * account - null as emptyState - event handlers that delegate to methods in the state classes - * command handlers that delegate to methods in the EventSourcedBehavior class - replies of various - * types, using ExpectingReply and EventSourcedEntityWithEnforcedReplies + * types, using EventSourcedEntityWithEnforcedReplies */ public interface AccountExampleWithNullState { @@ -35,78 +34,53 @@ public interface AccountExampleWithNullState { EntityTypeKey.create(Command.class, "Account"); // Command - interface Command extends ExpectingReply, CborSerializable {} + interface Command extends CborSerializable {} public static class CreateAccount implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public CreateAccount(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class Deposit implements Command { public final BigDecimal amount; - private final ActorRef replyTo; + public final ActorRef replyTo; public Deposit(BigDecimal amount, ActorRef replyTo) { this.replyTo = replyTo; this.amount = amount; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class Withdraw implements Command { public final BigDecimal amount; - private final ActorRef replyTo; + public final ActorRef replyTo; public Withdraw(BigDecimal amount, ActorRef replyTo) { this.amount = amount; this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class GetBalance implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public GetBalance(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public static class CloseAccount implements Command { - private final ActorRef replyTo; + public final ActorRef replyTo; @JsonCreator public CloseAccount(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } // Reply @@ -234,37 +208,38 @@ public interface AccountExampleWithNullState { private ReplyEffect createAccount(CreateAccount command) { return Effect() .persist(new AccountCreated()) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } private ReplyEffect deposit(OpenedAccount account, Deposit command) { return Effect() .persist(new Deposited(command.amount)) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } private ReplyEffect withdraw(OpenedAccount account, Withdraw command) { if (!account.canWithdraw(command.amount)) { return Effect() - .reply(command, new Rejected("not enough funds to withdraw " + command.amount)); + .reply(command.replyTo, new Rejected("not enough funds to withdraw " + command.amount)); } else { return Effect() .persist(new Withdrawn(command.amount)) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } } private ReplyEffect getBalance(OpenedAccount account, GetBalance command) { - return Effect().reply(command, new CurrentBalance(account.balance)); + return Effect().reply(command.replyTo, new CurrentBalance(account.balance)); } private ReplyEffect closeAccount(OpenedAccount account, CloseAccount command) { if (account.balance.equals(BigDecimal.ZERO)) { return Effect() .persist(new AccountClosed()) - .thenReply(command, account2 -> Confirmed.INSTANCE); + .thenReply(command.replyTo, account2 -> Confirmed.INSTANCE); } else { - return Effect().reply(command, new Rejected("balance must be zero for closing account")); + return Effect() + .reply(command.replyTo, new Rejected("balance must be zero for closing account")); } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index cb3172d731..da9bd85ff6 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -29,7 +29,6 @@ import akka.cluster.sharding.typed.scaladsl.ClusterSharding.ShardCommand import akka.cluster.sharding.{ ClusterSharding => ClassicClusterSharding } import akka.cluster.typed.Cluster import akka.cluster.typed.Join -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.scaladsl.Effect import com.typesafe.config.ConfigFactory @@ -52,12 +51,8 @@ object ClusterShardingPersistenceSpec { sealed trait Command final case class Add(s: String) extends Command - final case class AddWithConfirmation(s: String)(override val replyTo: ActorRef[Done]) - extends Command - with ExpectingReply[Done] - final case class PassivateAndPersist(s: String)(override val replyTo: ActorRef[Done]) - extends Command - with ExpectingReply[Done] + final case class AddWithConfirmation(s: String)(val replyTo: ActorRef[Done]) extends Command + final case class PassivateAndPersist(s: String)(val replyTo: ActorRef[Done]) extends Command final case class Get(replyTo: ActorRef[String]) extends Command final case class Echo(msg: String, replyTo: ActorRef[String]) extends Command final case class Block(latch: CountDownLatch) extends Command @@ -99,7 +94,7 @@ object ClusterShardingPersistenceSpec { if (stashing) Effect.stash() else - Effect.persist(s).thenReply(cmd)(_ => Done) + Effect.persist(s).thenReply(cmd.replyTo)(_ => Done) case Get(replyTo) => replyTo ! s"$entityId:$state" @@ -107,7 +102,7 @@ object ClusterShardingPersistenceSpec { case cmd @ PassivateAndPersist(s) => shard ! Passivate(ctx.self) - Effect.persist(s).thenReply(cmd)(_ => Done) + Effect.persist(s).thenReply(cmd.replyTo)(_ => Done) case Echo(msg, replyTo) => Effect.none.thenRun(_ => replyTo ! msg) diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala index 89f6f17b7d..ee71a326a4 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInState.scala @@ -8,7 +8,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.scaladsl.Effect import akka.serialization.jackson.CborSerializable @@ -17,21 +16,21 @@ import akka.serialization.jackson.CborSerializable * - different state classes representing the lifecycle of the account * - event handlers in the state classes * - command handlers in the state classes - * - replies of various types, using ExpectingReply and withEnforcedReplies + * - replies of various types, using withEnforcedReplies */ object AccountExampleWithCommandHandlersInState { //#account-entity object AccountEntity { // Command - sealed trait Command[Reply <: CommandReply] extends ExpectingReply[Reply] with CborSerializable - final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult] - final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) - extends Command[OperationResult] - final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) - extends Command[OperationResult] - final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance] - final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + sealed trait Command[Reply <: CommandReply] extends CborSerializable { + def replyTo: ActorRef[Reply] + } + final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance] + final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command[OperationResult] // Reply sealed trait CommandReply extends CborSerializable @@ -60,8 +59,8 @@ object AccountExampleWithCommandHandlersInState { case object EmptyAccount extends Account { override def applyCommand(cmd: Command[_]): ReplyEffect = cmd match { - case c: CreateAccount => - Effect.persist(AccountCreated).thenReply(c)(_ => Confirmed) + case CreateAccount(replyTo) => + Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() @@ -78,26 +77,26 @@ object AccountExampleWithCommandHandlersInState { override def applyCommand(cmd: Command[_]): ReplyEffect = cmd match { - case c: Deposit => - Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed) + case Deposit(amount, replyTo) => + Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed) - case c: Withdraw => - if (canWithdraw(c.amount)) - Effect.persist(Withdrawn(c.amount)).thenReply(c)(_ => Confirmed) + case Withdraw(amount, replyTo) => + if (canWithdraw(amount)) + Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed) else - Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw ${c.amount}")) + Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) - case c: GetBalance => - Effect.reply(c)(CurrentBalance(balance)) + case GetBalance(replyTo) => + Effect.reply(replyTo)(CurrentBalance(balance)) - case c: CloseAccount => + case CloseAccount(replyTo) => if (balance == Zero) - Effect.persist(AccountClosed).thenReply(c)(_ => Confirmed) + Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed) else - Effect.reply(c)(Rejected("Can't close account with non-zero balance")) + Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance")) - case c: CreateAccount => - Effect.reply(c)(Rejected("Account is already created")) + case CreateAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already created")) } @@ -118,13 +117,13 @@ object AccountExampleWithCommandHandlersInState { override def applyCommand(cmd: Command[_]): ReplyEffect = cmd match { case c @ (_: Deposit | _: Withdraw) => - Effect.reply(c)(Rejected("Account is closed")) - case c: GetBalance => - Effect.reply(c)(CurrentBalance(Zero)) - case c: CloseAccount => - Effect.reply(c)(Rejected("Account is already closed")) - case c: CreateAccount => - Effect.reply(c)(Rejected("Account is already created")) + Effect.reply(c.replyTo)(Rejected("Account is closed")) + case GetBalance(replyTo) => + Effect.reply(replyTo)(CurrentBalance(Zero)) + case CloseAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already closed")) + case CreateAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already created")) } override def applyEvent(event: Event): Account = diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala index 651170ffb5..93b1c3cb53 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala @@ -8,7 +8,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.ReplyEffect import akka.serialization.jackson.CborSerializable @@ -19,7 +18,7 @@ import akka.serialization.jackson.CborSerializable * - event handlers in the state classes * - command handlers outside the state classes, pattern matching of commands in one place that * is delegating to methods - * - replies of various types, using ExpectingReply and withEnforcedReplies + * - replies of various types, using withEnforcedReplies */ object AccountExampleWithEventHandlersInState { @@ -27,17 +26,17 @@ object AccountExampleWithEventHandlersInState { object AccountEntity { // Command //#reply-command - sealed trait Command[Reply <: CommandReply] extends ExpectingReply[Reply] with CborSerializable + sealed trait Command[Reply <: CommandReply] extends CborSerializable { + def replyTo: ActorRef[Reply] + } //#reply-command - final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult] - final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) - extends Command[OperationResult] + final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command[OperationResult] //#reply-command - final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) - extends Command[OperationResult] + final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command[OperationResult] //#reply-command - final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance] - final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance] + final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command[OperationResult] // Reply //#reply-command @@ -115,19 +114,19 @@ object AccountExampleWithEventHandlersInState { case c: Withdraw => withdraw(acc, c) case c: GetBalance => getBalance(acc, c) case c: CloseAccount => closeAccount(acc, c) - case c: CreateAccount => Effect.reply(c)(Rejected("Account is already created")) + case c: CreateAccount => Effect.reply(c.replyTo)(Rejected("Account is already created")) } case ClosedAccount => cmd match { case c @ (_: Deposit | _: Withdraw) => - Effect.reply(c)(Rejected("Account is closed")) - case c: GetBalance => - Effect.reply(c)(CurrentBalance(Zero)) - case c: CloseAccount => - Effect.reply(c)(Rejected("Account is already closed")) - case c: CreateAccount => - Effect.reply(c)(Rejected("Account is already created")) + Effect.reply(c.replyTo)(Rejected("Account is closed")) + case GetBalance(replyTo) => + Effect.reply(replyTo)(CurrentBalance(Zero)) + case CloseAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already closed")) + case CreateAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already created")) } } } @@ -137,31 +136,31 @@ object AccountExampleWithEventHandlersInState { } private def createAccount(cmd: CreateAccount): ReplyEffect[Event, Account] = { - Effect.persist(AccountCreated).thenReply(cmd)(_ => Confirmed) + Effect.persist(AccountCreated).thenReply(cmd.replyTo)(_ => Confirmed) } private def deposit(cmd: Deposit): ReplyEffect[Event, Account] = { - Effect.persist(Deposited(cmd.amount)).thenReply(cmd)(_ => Confirmed) + Effect.persist(Deposited(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed) } //#reply private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = { if (acc.canWithdraw(cmd.amount)) - Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ => Confirmed) + Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed) else - Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) + Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}")) } //#reply private def getBalance(acc: OpenedAccount, cmd: GetBalance): ReplyEffect[Event, Account] = { - Effect.reply(cmd)(CurrentBalance(acc.balance)) + Effect.reply(cmd.replyTo)(CurrentBalance(acc.balance)) } private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[Event, Account] = { if (acc.balance == Zero) - Effect.persist(AccountClosed).thenReply(cmd)(_ => Confirmed) + Effect.persist(AccountClosed).thenReply(cmd.replyTo)(_ => Confirmed) else - Effect.reply(cmd)(Rejected("Can't close account with non-zero balance")) + Effect.reply(cmd.replyTo)(Rejected("Can't close account with non-zero balance")) } } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala index 3d4eb8ab15..828c7fee45 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionState.scala @@ -8,7 +8,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EventSourcedEntity -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.scaladsl.Effect import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable @@ -18,21 +17,21 @@ import akka.serialization.jackson.CborSerializable * - Option[State] that is starting with None as the initial state * - event handlers in the state classes * - command handlers in the state classes - * - replies of various types, using ExpectingReply and withEnforcedReplies + * - replies of various types, using withEnforcedReplies */ object AccountExampleWithOptionState { //#account-entity object AccountEntity { // Command - sealed trait Command[Reply <: CommandReply] extends ExpectingReply[Reply] with CborSerializable - final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult] - final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) - extends Command[OperationResult] - final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult]) - extends Command[OperationResult] - final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance] - final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + sealed trait Command[Reply <: CommandReply] extends CborSerializable { + def replyTo: ActorRef[Reply] + } + final case class CreateAccount(replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class Deposit(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[OperationResult]) extends Command[OperationResult] + final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance] + final case class CloseAccount(replyTo: ActorRef[OperationResult]) extends Command[OperationResult] // Reply sealed trait CommandReply extends CborSerializable @@ -63,26 +62,26 @@ object AccountExampleWithOptionState { override def applyCommand(cmd: Command[_]): ReplyEffect = cmd match { - case c: Deposit => - Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed) + case Deposit(amount, replyTo) => + Effect.persist(Deposited(amount)).thenReply(replyTo)(_ => Confirmed) - case c: Withdraw => - if (canWithdraw(c.amount)) - Effect.persist(Withdrawn(c.amount)).thenReply(c)(_ => Confirmed) + case Withdraw(amount, replyTo) => + if (canWithdraw(amount)) + Effect.persist(Withdrawn(amount)).thenReply(replyTo)(_ => Confirmed) else - Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw ${c.amount}")) + Effect.reply(replyTo)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount")) - case c: GetBalance => - Effect.reply(c)(CurrentBalance(balance)) + case GetBalance(replyTo) => + Effect.reply(replyTo)(CurrentBalance(balance)) - case c: CloseAccount => + case CloseAccount(replyTo) => if (balance == Zero) - Effect.persist(AccountClosed).thenReply(c)(_ => Confirmed) + Effect.persist(AccountClosed).thenReply(replyTo)(_ => Confirmed) else - Effect.reply(c)(Rejected("Can't close account with non-zero balance")) + Effect.reply(replyTo)(Rejected("Can't close account with non-zero balance")) - case c: CreateAccount => - Effect.reply(c)(Rejected("Account is already created")) + case CreateAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already created")) } @@ -103,13 +102,13 @@ object AccountExampleWithOptionState { override def applyCommand(cmd: Command[_]): ReplyEffect = cmd match { case c @ (_: Deposit | _: Withdraw) => - Effect.reply(c)(Rejected("Account is closed")) - case c: GetBalance => - Effect.reply(c)(CurrentBalance(Zero)) - case c: CloseAccount => - Effect.reply(c)(Rejected("Account is already closed")) - case c: CreateAccount => - Effect.reply(c)(Rejected("Account is already created")) + Effect.reply(c.replyTo)(Rejected("Account is closed")) + case GetBalance(replyTo) => + Effect.reply(replyTo)(CurrentBalance(Zero)) + case CloseAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already closed")) + case CreateAccount(replyTo) => + Effect.reply(replyTo)(Rejected("Account is already created")) } override def applyEvent(event: Event): Account = @@ -138,8 +137,8 @@ object AccountExampleWithOptionState { def onFirstCommand(cmd: Command[_]): ReplyEffect = { cmd match { - case c: CreateAccount => - Effect.persist(AccountCreated).thenReply(c)(_ => Confirmed) + case CreateAccount(replyTo) => + Effect.persist(AccountCreated).thenReply(replyTo)(_ => Confirmed) case _ => // CreateAccount before handling any other commands Effect.unhandled.thenNoReply() diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index e810f28f09..1bbc2a1e36 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -539,6 +539,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible * `GetDataDeleted` and `UpdateDataDeleted` introduced as described in @ref[DataDeleted](#datadeleted). * `SubscribeResponse` introduced in `Subscribe` because the responses can be both `Changed` and `Deleted`. * `ReplicationDeleteFailure` renamed to `DeleteFailure`. +* `EventSourcedBehavior.withEnforcedReplies` signature changed. Command is not required to extend `ExpectingReply` anymore. `ExpectingReply` has been removed therefore. #### Akka Typed Stream API changes diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index f891476c01..5b7ce1f8ef 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -302,8 +302,7 @@ Scala Java : @@snip [AccountExampleWithNullState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.java) { #withEnforcedReplies } -The commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`] -in a standardized way. +The commands must have a field of @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef`] that can then be used to send a reply. Scala : @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #reply-command } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/ExpectingReply.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/ExpectingReply.scala deleted file mode 100644 index 8a79761716..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/ExpectingReply.scala +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.persistence.typed - -import akka.actor.typed.ActorRef - -/** - * Commands may implement this trait to facilitate sending reply messages via `Effect.thenReply`. - * - * @tparam ReplyMessage The type of the reply message - */ -trait ExpectingReply[ReplyMessage] { - def replyTo: ActorRef[ReplyMessage] -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala index 7fb0ff3d6c..bfa76f76e7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EffectImpl.scala @@ -7,9 +7,9 @@ package akka.persistence.typed.internal import scala.collection.immutable import akka.annotation.InternalApi -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.javadsl import akka.persistence.typed.scaladsl +import akka.actor.typed.ActorRef /** INTERNAL API */ @InternalApi @@ -24,9 +24,9 @@ private[akka] abstract class EffectImpl[+Event, State] override def thenRun(chainedEffect: State => Unit): EffectImpl[Event, State] = CompositeEffect(this, new Callback[State](chainedEffect)) - override def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])( + override def thenReply[ReplyMessage](replyTo: ActorRef[ReplyMessage])( replyWithMessage: State => ReplyMessage): EffectImpl[Event, State] = - CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage)) + CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](replyTo, replyWithMessage)) override def thenUnstashAll(): EffectImpl[Event, State] = CompositeEffect(this, UnstashAll.asInstanceOf[SideEffect[State]]) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala index eb09b820ee..45e0795b8c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/Effect.scala @@ -8,9 +8,9 @@ import akka.util.ccompat.JavaConverters._ import akka.annotation.DoNotInherit import akka.annotation.InternalApi import akka.japi.function -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.internal.SideEffect import akka.persistence.typed.internal._ +import akka.actor.typed.ActorRef /** * INTERNAL API: see `class EffectFactories` @@ -79,10 +79,10 @@ import akka.persistence.typed.internal._ none().thenUnstashAll() /** - * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the - * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * Send a reply message to the command. The type of the + * reply message must conform to the type specified by the passed replyTo `ActorRef`. * - * This has the same semantics as `cmd.replyTo.tell`. + * This has the same semantics as `replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten * when the `EventSourcedBehavior` is created with [[EventSourcedBehaviorWithEnforcedReplies]]. When @@ -90,10 +90,8 @@ import akka.persistence.typed.internal._ * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. */ - def reply[ReplyMessage]( - cmd: ExpectingReply[ReplyMessage], - replyWithMessage: ReplyMessage): ReplyEffect[Event, State] = - none().thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] { + def reply[ReplyMessage](replyTo: ActorRef[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] = + none().thenReply[ReplyMessage](replyTo, new function.Function[State, ReplyMessage] { override def apply(param: State): ReplyMessage = replyWithMessage }) @@ -160,10 +158,10 @@ import akka.persistence.typed.internal._ def thenUnstashAll(): Effect[Event, State] /** - * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the - * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * Send a reply message to the command. The type of the + * reply message must conform to the type specified by the passed replyTo `ActorRef`. * - * This has the same semantics as `cmd.replyTo().tell`. + * This has the same semantics as `replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten * when the `EventSourcedBehavior` is created with [[EventSourcedBehaviorWithEnforcedReplies]]. When @@ -172,9 +170,9 @@ import akka.persistence.typed.internal._ * finding mistakes. */ def thenReply[ReplyMessage]( - cmd: ExpectingReply[ReplyMessage], + replyTo: ActorRef[ReplyMessage], replyWithMessage: function.Function[State, ReplyMessage]): ReplyEffect[Event, State] = - CompositeEffect(this, SideEffect[State](newState => cmd.replyTo ! replyWithMessage(newState))) + CompositeEffect(this, SideEffect[State](newState => replyTo ! replyWithMessage(newState))) /** * When [[EventSourcedBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala index 8b2c201e78..4877e791f2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/Effect.scala @@ -6,9 +6,9 @@ package akka.persistence.typed.scaladsl import scala.collection.{ immutable => im } import akka.annotation.DoNotInherit -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.internal.SideEffect import akka.persistence.typed.internal._ +import akka.actor.typed.ActorRef /** * Factory methods for creating [[Effect]] directives - how an event sourced actor reacts on a command. @@ -85,8 +85,8 @@ object Effect { CompositeEffect(none.asInstanceOf[EffectBuilder[Event, State]], SideEffect.unstashAll[State]()) /** - * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the - * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * Send a reply message to the command. The type of the + * reply message must conform to the type specified by the passed replyTo `ActorRef`. * * This has the same semantics as `cmd.replyTo.tell`. * @@ -96,9 +96,9 @@ object Effect { * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. */ - def reply[ReplyMessage, Event, State](cmd: ExpectingReply[ReplyMessage])( + def reply[ReplyMessage, Event, State](replyTo: ActorRef[ReplyMessage])( replyWithMessage: ReplyMessage): ReplyEffect[Event, State] = - none[Event, State].thenReply[ReplyMessage](cmd)(_ => replyWithMessage) + none[Event, State].thenReply[ReplyMessage](replyTo)(_ => replyWithMessage) /** * When [[EventSourcedBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect @@ -152,10 +152,10 @@ trait EffectBuilder[+Event, State] extends Effect[Event, State] { def thenUnstashAll(): Effect[Event, State] /** - * Send a reply message to the command, which implements [[ExpectingReply]]. The type of the - * reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`. + * Send a reply message to the command. The type of the + * reply message must conform to the type specified by the passed replyTo `ActorRef`. * - * This has the same semantics as `cmd.replyTo.tell`. + * This has the same semantics as `replyTo.tell`. * * It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten * when the `EventSourcedBehavior` is created with [[EventSourcedBehavior.withEnforcedReplies]]. When @@ -163,7 +163,7 @@ trait EffectBuilder[+Event, State] extends Effect[Event, State] { * The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help * finding mistakes. */ - def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])( + def thenReply[ReplyMessage](replyTo: ActorRef[ReplyMessage])( replyWithMessage: State => ReplyMessage): ReplyEffect[Event, State] /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 7b35cb7539..9f458e6b22 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -15,7 +15,6 @@ import akka.actor.typed.scaladsl.ActorContext import akka.annotation.DoNotInherit import akka.persistence.typed.EventAdapter import akka.persistence.typed.SnapshotAdapter -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.SnapshotSelectionCriteria import akka.persistence.typed.internal._ @@ -59,7 +58,7 @@ object EventSourcedBehavior { * Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be * created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]]. */ - def withEnforcedReplies[Command <: ExpectingReply[_], Event, State]( + def withEnforcedReplies[Command, Event, State]( persistenceId: PersistenceId, emptyState: State, commandHandler: (State, Command) => ReplyEffect[Event, State], diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 10d0bc885b..008e541da4 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -85,18 +85,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite { INSTANCE } - public static class IncrementWithConfirmation implements Command, ExpectingReply { - private final ActorRef replyTo; + public static class IncrementWithConfirmation implements Command { + public final ActorRef replyTo; @JsonCreator public IncrementWithConfirmation(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } public enum StopThenLog implements Command { @@ -107,18 +102,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite { INSTANCE } - public static class GetValue implements Command, ExpectingReply { - private final ActorRef replyTo; + public static class GetValue implements Command { + public final ActorRef replyTo; @JsonCreator public GetValue(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } // need the JsonTypeInfo because of the Wrapper @@ -233,11 +223,11 @@ public class PersistentActorJavaDslTest extends JUnitSuite { private ReplyEffect incrementWithConfirmation( State state, IncrementWithConfirmation command) { - return Effect().persist(new Incremented(1)).thenReply(command, newState -> done()); + return Effect().persist(new Incremented(1)).thenReply(command.replyTo, newState -> done()); } private ReplyEffect getValue(State state, GetValue command) { - return Effect().reply(command, state); + return Effect().reply(command.replyTo, state); } private Effect incrementLater(State state, IncrementLater command) { diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionCommand.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionCommand.java index 0bf6e411f1..e659fc1e2f 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionCommand.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionCommand.java @@ -6,7 +6,6 @@ package jdocs.akka.persistence.typed.auction; import akka.Done; import akka.actor.typed.ActorRef; -import akka.persistence.typed.ExpectingReply; import java.util.UUID; @@ -14,68 +13,40 @@ import java.util.UUID; public interface AuctionCommand { /** Start the auction. */ - final class StartAuction implements AuctionCommand, ExpectingReply { + final class StartAuction implements AuctionCommand { /** The auction to start. */ - private final Auction auction; + public final Auction auction; - private final ActorRef replyTo; + public final ActorRef replyTo; public StartAuction(Auction auction, ActorRef replyTo) { this.auction = auction; this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } - - public Auction getAuction() { - return auction; - } } /** Cancel the auction. */ - final class CancelAuction implements AuctionCommand, ExpectingReply { - private final ActorRef replyTo; + final class CancelAuction implements AuctionCommand { + public final ActorRef replyTo; public CancelAuction(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } /** Place a bid on the auction. */ - final class PlaceBid implements AuctionCommand, ExpectingReply { + final class PlaceBid implements AuctionCommand { - private final int bidPrice; - private final UUID bidder; - - private final ActorRef replyTo; + public final int bidPrice; + public final UUID bidder; + public final ActorRef replyTo; public PlaceBid(int bidPrice, UUID bidder, ActorRef replyTo) { this.bidPrice = bidPrice; this.bidder = bidder; this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } - - public int getBidPrice() { - return bidPrice; - } - - public UUID getBidder() { - return bidder; - } } interface PlaceBidReply {} @@ -167,31 +138,21 @@ public interface AuctionCommand { } /** Finish bidding. */ - final class FinishBidding implements AuctionCommand, ExpectingReply { + final class FinishBidding implements AuctionCommand { - private final ActorRef replyTo; + public final ActorRef replyTo; FinishBidding(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } /** Get the auction. */ - final class GetAuction implements AuctionCommand, ExpectingReply { - private final ActorRef replyTo; + final class GetAuction implements AuctionCommand { + public final ActorRef replyTo; public GetAuction(ActorRef replyTo) { this.replyTo = replyTo; } - - @Override - public ActorRef replyTo() { - return replyTo; - } } } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java index 163f5d7e03..e8a3b00a77 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/auction/AuctionEntity.java @@ -5,7 +5,6 @@ package jdocs.akka.persistence.typed.auction; import akka.Done; -import akka.persistence.typed.ExpectingReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.*; @@ -16,6 +15,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Optional; import java.util.UUID; +import akka.actor.typed.ActorRef; /** * Based on @@ -27,7 +27,6 @@ public class AuctionEntity private final UUID entityUUID; public AuctionEntity(String entityId) { - // when used with Cluster Sharding this should use EntityTypeKey, or PersistentEntity super(new PersistenceId("Auction|" + entityId)); this.entityUUID = UUID.fromString(entityId); } @@ -42,7 +41,7 @@ public class AuctionEntity .onCommand( PlaceBid.class, (state, cmd) -> - Effect().reply(cmd, createResult(state, PlaceBidStatus.NOT_STARTED))); + Effect().reply(cmd.replyTo, createResult(state, PlaceBidStatus.NOT_STARTED))); // Command handler for the under auction state. private CommandHandlerWithReplyBuilderByState< @@ -50,7 +49,7 @@ public class AuctionEntity underAuctionHandler = newCommandHandlerWithReplyBuilder() .forState(state -> state.getStatus() == AuctionStatus.UNDER_AUCTION) - .onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd)) + .onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo)) .onCommand(PlaceBid.class, this::placeBid) .onCommand(FinishBidding.class, this::finishBidding); @@ -60,12 +59,12 @@ public class AuctionEntity completedHandler = newCommandHandlerWithReplyBuilder() .forState(state -> state.getStatus() == AuctionStatus.COMPLETE) - .onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd)) - .onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd)) + .onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo)) + .onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd.replyTo)) .onCommand( PlaceBid.class, (state, cmd) -> - Effect().reply(cmd, createResult(state, PlaceBidStatus.FINISHED))); + Effect().reply(cmd.replyTo, createResult(state, PlaceBidStatus.FINISHED))); // Command handler for the cancelled state. private CommandHandlerWithReplyBuilderByState< @@ -73,20 +72,20 @@ public class AuctionEntity cancelledHandler = newCommandHandlerWithReplyBuilder() .forState(state -> state.getStatus() == AuctionStatus.CANCELLED) - .onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd)) - .onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd)) - .onCommand(CancelAuction.class, (state, cmd) -> alreadyDone(cmd)) + .onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo)) + .onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd.replyTo)) + .onCommand(CancelAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo)) .onCommand( PlaceBid.class, (state, cmd) -> - Effect().reply(cmd, createResult(state, PlaceBidStatus.CANCELLED))); + Effect().reply(cmd.replyTo, createResult(state, PlaceBidStatus.CANCELLED))); private CommandHandlerWithReplyBuilderByState< AuctionCommand, AuctionEvent, AuctionState, AuctionState> getAuctionHandler = newCommandHandlerWithReplyBuilder() .forStateType(AuctionState.class) - .onCommand(GetAuction.class, (state, cmd) -> Effect().reply(cmd, state)); + .onCommand(GetAuction.class, (state, cmd) -> Effect().reply(cmd.replyTo, state)); private CommandHandlerBuilderByState cancelHandler = @@ -95,28 +94,28 @@ public class AuctionEntity .onCommand(CancelAuction.class, this::cancelAuction); // Note, an item can go from completed to cancelled, since it is the item service that controls // whether an auction is cancelled or not. If it cancels before it receives a bidding finished - // event from us, it will ignore the bidding finished event, so we need to update our state - // to reflect that. + // event from us, it will ignore the bidding finished event, so we need to update our state to + // reflect that. private ReplyEffect startAuction( AuctionState state, StartAuction cmd) { return Effect() - .persist(new AuctionStarted(entityUUID, cmd.getAuction())) - .thenReply(cmd, notUsed -> Done.getInstance()); + .persist(new AuctionStarted(entityUUID, cmd.auction)) + .thenReply(cmd.replyTo, notUsed -> Done.getInstance()); } private ReplyEffect finishBidding( AuctionState state, FinishBidding cmd) { return Effect() .persist(new BiddingFinished(entityUUID)) - .thenReply(cmd, notUsed -> Done.getInstance()); + .thenReply(cmd.replyTo, notUsed -> Done.getInstance()); } private ReplyEffect cancelAuction( AuctionState state, CancelAuction cmd) { return Effect() .persist(new AuctionCancelled(entityUUID)) - .thenReply(cmd, notUsed -> Done.getInstance()); + .thenReply(cmd.replyTo, notUsed -> Done.getInstance()); } /** The main logic for handling of bids. */ @@ -127,12 +126,14 @@ public class AuctionEntity // Even though we're not in the finished state yet, we should check if (auction.getEndTime().isBefore(now)) { - return Effect().reply(bid, createResult(state, PlaceBidStatus.FINISHED)); + return Effect().reply(bid.replyTo, createResult(state, PlaceBidStatus.FINISHED)); } - if (auction.getCreator().equals(bid.getBidder())) { + if (auction.getCreator().equals(bid.bidder)) { return Effect() - .reply(bid, new PlaceBidRejected("An auctions creator cannot bid in their own auction.")); + .reply( + bid.replyTo, + new PlaceBidRejected("An auctions creator cannot bid in their own auction.")); } Optional currentBid = state.lastBid(); @@ -147,13 +148,13 @@ public class AuctionEntity } boolean bidderIsCurrentBidder = - currentBid.filter(b -> b.getBidder().equals(bid.getBidder())).isPresent(); + currentBid.filter(b -> b.getBidder().equals(bid.bidder)).isPresent(); - if (bidderIsCurrentBidder && bid.getBidPrice() >= currentBidPrice) { + if (bidderIsCurrentBidder && bid.bidPrice >= currentBidPrice) { // Allow the current bidder to update their bid if (auction.getReservePrice() > currentBidPrice) { - int newBidPrice = Math.min(auction.getReservePrice(), bid.getBidPrice()); + int newBidPrice = Math.min(auction.getReservePrice(), bid.bidPrice); PlaceBidStatus placeBidStatus; if (newBidPrice == auction.getReservePrice()) { @@ -162,25 +163,22 @@ public class AuctionEntity placeBidStatus = PlaceBidStatus.ACCEPTED_BELOW_RESERVE; } return Effect() - .persist( - new BidPlaced( - entityUUID, new Bid(bid.getBidder(), now, newBidPrice, bid.getBidPrice()))) + .persist(new BidPlaced(entityUUID, new Bid(bid.bidder, now, newBidPrice, bid.bidPrice))) .thenReply( - bid, newState -> new PlaceBidResult(placeBidStatus, newBidPrice, bid.getBidder())); + bid.replyTo, + newState -> new PlaceBidResult(placeBidStatus, newBidPrice, bid.bidder)); } return Effect() .persist( - new BidPlaced( - entityUUID, new Bid(bid.getBidder(), now, currentBidPrice, bid.getBidPrice()))) + new BidPlaced(entityUUID, new Bid(bid.bidder, now, currentBidPrice, bid.bidPrice))) .thenReply( - bid, - newState -> - new PlaceBidResult(PlaceBidStatus.ACCEPTED, currentBidPrice, bid.getBidder())); + bid.replyTo, + newState -> new PlaceBidResult(PlaceBidStatus.ACCEPTED, currentBidPrice, bid.bidder)); } - if (bid.getBidPrice() < currentBidPrice + auction.getIncrement()) { - return Effect().reply(bid, createResult(state, PlaceBidStatus.TOO_LOW)); - } else if (bid.getBidPrice() <= currentBidMaximum) { + if (bid.bidPrice < currentBidPrice + auction.getIncrement()) { + return Effect().reply(bid.replyTo, createResult(state, PlaceBidStatus.TOO_LOW)); + } else if (bid.bidPrice <= currentBidMaximum) { return handleAutomaticOutbid( bid, auction, now, currentBid, currentBidPrice, currentBidMaximum); } else { @@ -202,21 +200,20 @@ public class AuctionEntity Optional currentBid, int currentBidPrice, int currentBidMaximum) { - // Adjust the bid so that the increment for the current maximum makes the current maximum a - // valid bid - int adjustedBidPrice = Math.min(bid.getBidPrice(), currentBidMaximum - auction.getIncrement()); + // Adjust the bid so that the increment for the current maximum makes the + // current maximum a valid bid + int adjustedBidPrice = Math.min(bid.bidPrice, currentBidMaximum - auction.getIncrement()); int newBidPrice = adjustedBidPrice + auction.getIncrement(); return Effect() .persist( Arrays.asList( - new BidPlaced( - entityUUID, new Bid(bid.getBidder(), now, adjustedBidPrice, bid.getBidPrice())), + new BidPlaced(entityUUID, new Bid(bid.bidder, now, adjustedBidPrice, bid.bidPrice)), new BidPlaced( entityUUID, new Bid(currentBid.get().getBidder(), now, newBidPrice, currentBidMaximum)))) .thenReply( - bid, + bid.replyTo, newState -> new PlaceBidResult( PlaceBidStatus.ACCEPTED_OUTBID, newBidPrice, currentBid.get().getBidder())); @@ -225,19 +222,17 @@ public class AuctionEntity /** Handle the situation where a bid will be accepted as the new winning bidder. */ private ReplyEffect handleNewWinningBidder( PlaceBid bid, Auction auction, Instant now, int currentBidMaximum) { - int nextIncrement = Math.min(currentBidMaximum + auction.getIncrement(), bid.getBidPrice()); + int nextIncrement = Math.min(currentBidMaximum + auction.getIncrement(), bid.bidPrice); int newBidPrice; if (nextIncrement < auction.getReservePrice()) { - newBidPrice = Math.min(auction.getReservePrice(), bid.getBidPrice()); + newBidPrice = Math.min(auction.getReservePrice(), bid.bidPrice); } else { newBidPrice = nextIncrement; } return Effect() - .persist( - new BidPlaced( - entityUUID, new Bid(bid.getBidder(), now, newBidPrice, bid.getBidPrice()))) + .persist(new BidPlaced(entityUUID, new Bid(bid.bidder, now, newBidPrice, bid.bidPrice))) .thenReply( - bid, + bid.replyTo, newState -> { PlaceBidStatus status; if (newBidPrice < auction.getReservePrice()) { @@ -245,7 +240,7 @@ public class AuctionEntity } else { status = PlaceBidStatus.ACCEPTED; } - return new PlaceBidResult(status, newBidPrice, bid.getBidder()); + return new PlaceBidResult(status, newBidPrice, bid.bidder); }); } @@ -292,7 +287,7 @@ public class AuctionEntity } } - private ReplyEffect alreadyDone(ExpectingReply cmd) { - return Effect().reply(cmd, Done.getInstance()); + private ReplyEffect alreadyDone(ActorRef replyTo) { + return Effect().reply(replyTo, Done.getInstance()); } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala index 3551097fa5..e8644e9fe4 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorReplySpec.scala @@ -13,7 +13,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.serialization.jackson.CborSerializable import com.typesafe.config.Config @@ -30,10 +29,10 @@ object EventSourcedBehaviorReplySpec { akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" """) - sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage] with CborSerializable - final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command[Done] - final case class IncrementReplyLater(override val replyTo: ActorRef[Done]) extends Command[Done] - final case class ReplyNow(override val replyTo: ActorRef[Done]) extends Command[Done] + sealed trait Command[ReplyMessage] extends CborSerializable + final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done] + final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done] + final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done] final case class GetValue(replyTo: ActorRef[State]) extends Command[State] sealed trait Event extends CborSerializable @@ -53,17 +52,17 @@ object EventSourcedBehaviorReplySpec { commandHandler = (state, command) => command match { - case cmd: IncrementWithConfirmation => - Effect.persist(Incremented(1)).thenReply(cmd)(_ => Done) + case IncrementWithConfirmation(replyTo) => + Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done) - case cmd: IncrementReplyLater => - Effect.persist(Incremented(1)).thenRun((_: State) => ctx.self ! ReplyNow(cmd.replyTo)).thenNoReply() + case IncrementReplyLater(replyTo) => + Effect.persist(Incremented(1)).thenRun((_: State) => ctx.self ! ReplyNow(replyTo)).thenNoReply() - case cmd: ReplyNow => - Effect.reply(cmd)(Done) + case ReplyNow(replyTo) => + Effect.reply(replyTo)(Done) - case query: GetValue => - Effect.reply(query)(state) + case GetValue(replyTo) => + Effect.reply(replyTo)(state) }, eventHandler = (state, evt) => diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index d9b90a56fd..c55d715f68 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -32,7 +32,6 @@ import akka.persistence.query.PersistenceQuery import akka.persistence.query.Sequence import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.snapshot.SnapshotStore -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.SnapshotCompleted @@ -99,9 +98,7 @@ object EventSourcedBehaviorSpec { final case object IncrementLater extends Command final case object IncrementAfterReceiveTimeout extends Command final case object IncrementTwiceAndThenLog extends Command - final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) - extends Command - with ExpectingReply[Done] + final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command final case object DoNothingAndThenLog extends Command final case object EmptyEventsListAndThenLog extends Command final case class GetValue(replyTo: ActorRef[State]) extends Command @@ -198,8 +195,8 @@ object EventSourcedBehaviorSpec { case IncrementWithPersistAll(n) => Effect.persist((0 until n).map(_ => Incremented(1))) - case cmd: IncrementWithConfirmation => - Effect.persist(Incremented(1)).thenReply(cmd)(_ => Done) + case IncrementWithConfirmation(replyTo) => + Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done) case GetValue(replyTo) => replyTo ! state diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala index edf43a60ae..c0d9c61f75 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorStashSpec.scala @@ -24,7 +24,6 @@ import akka.actor.typed.internal.PoisonPill import akka.actor.typed.javadsl.StashOverflowException import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.persistence.typed.ExpectingReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.RecoveryCompleted import com.typesafe.config.Config @@ -45,22 +44,21 @@ object EventSourcedBehaviorStashSpec { } """).withFallback(ConfigFactory.defaultReference()).resolve() - sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage] + sealed trait Command[ReplyMessage] // Unstash and change to active mode - final case class Activate(id: String, override val replyTo: ActorRef[Ack]) extends Command[Ack] + final case class Activate(id: String, val replyTo: ActorRef[Ack]) extends Command[Ack] // Change to active mode, stash incoming Increment - final case class Deactivate(id: String, override val replyTo: ActorRef[Ack]) extends Command[Ack] + final case class Deactivate(id: String, val replyTo: ActorRef[Ack]) extends Command[Ack] // Persist Incremented if in active mode, otherwise stashed - final case class Increment(id: String, override val replyTo: ActorRef[Ack]) extends Command[Ack] + final case class Increment(id: String, val replyTo: ActorRef[Ack]) extends Command[Ack] // Persist ValueUpdated, independent of active/inactive - final case class UpdateValue(id: String, value: Int, override val replyTo: ActorRef[Ack]) extends Command[Ack] + final case class UpdateValue(id: String, value: Int, val replyTo: ActorRef[Ack]) extends Command[Ack] // Retrieve current state, independent of active/inactive final case class GetValue(replyTo: ActorRef[State]) extends Command[State] final case class Unhandled(replyTo: ActorRef[NotUsed]) extends Command[NotUsed] - final case class Throw(id: String, t: Throwable, override val replyTo: ActorRef[Ack]) extends Command[Ack] - final case class IncrementThenThrow(id: String, t: Throwable, override val replyTo: ActorRef[Ack]) - extends Command[Ack] - final case class Slow(id: String, latch: CountDownLatch, override val replyTo: ActorRef[Ack]) extends Command[Ack] + final case class Throw(id: String, t: Throwable, val replyTo: ActorRef[Ack]) extends Command[Ack] + final case class IncrementThenThrow(id: String, t: Throwable) extends Command[Ack] + final case class Slow(id: String, latch: CountDownLatch, val replyTo: ActorRef[Ack]) extends Command[Ack] final case class Ack(id: String) @@ -115,27 +113,27 @@ object EventSourcedBehaviorStashSpec { private def active(state: State, command: Command[_]): ReplyEffect[Event, State] = { command match { - case cmd: Increment => - Effect.persist(Incremented(1)).thenReply(cmd)(_ => Ack(cmd.id)) - case cmd @ UpdateValue(_, value, _) => - Effect.persist(ValueUpdated(value)).thenReply(cmd)(_ => Ack(cmd.id)) - case query: GetValue => - Effect.reply(query)(state) - case cmd: Deactivate => - Effect.persist(Deactivated).thenReply(cmd)(_ => Ack(cmd.id)) - case cmd: Activate => + case Increment(id, replyTo) => + Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Ack(id)) + case UpdateValue(id, value, replyTo) => + Effect.persist(ValueUpdated(value)).thenReply(replyTo)(_ => Ack(id)) + case GetValue(replyTo) => + Effect.reply(replyTo)(state) + case Deactivate(id, replyTo) => + Effect.persist(Deactivated).thenReply(replyTo)(_ => Ack(id)) + case Activate(id, replyTo) => // already active - Effect.reply(cmd)(Ack(cmd.id)) + Effect.reply(replyTo)(Ack(id)) case _: Unhandled => Effect.unhandled.thenNoReply() case Throw(id, t, replyTo) => replyTo ! Ack(id) throw t - case cmd: IncrementThenThrow => - Effect.persist(Incremented(1)).thenRun((_: State) => throw cmd.t).thenNoReply() - case cmd: Slow => - cmd.latch.await(30, TimeUnit.SECONDS) - Effect.reply(cmd)(Ack(cmd.id)) + case IncrementThenThrow(_, throwable) => + Effect.persist(Incremented(1)).thenRun((_: State) => throw throwable).thenNoReply() + case Slow(id, latch, replyTo) => + latch.await(30, TimeUnit.SECONDS) + Effect.reply(replyTo)(Ack(id)) } } @@ -143,15 +141,15 @@ object EventSourcedBehaviorStashSpec { command match { case _: Increment => Effect.stash() - case cmd @ UpdateValue(_, value, _) => - Effect.persist(ValueUpdated(value)).thenReply(cmd)(_ => Ack(cmd.id)) - case query: GetValue => - Effect.reply(query)(state) - case cmd: Deactivate => + case UpdateValue(id, value, replyTo) => + Effect.persist(ValueUpdated(value)).thenReply(replyTo)(_ => Ack(id)) + case GetValue(replyTo) => + Effect.reply(replyTo)(state) + case Deactivate(id, replyTo) => // already inactive - Effect.reply(cmd)(Ack(cmd.id)) - case cmd: Activate => - Effect.persist(Activated).thenReply(cmd)((_: State) => Ack(cmd.id)).thenUnstashAll() + Effect.reply(replyTo)(Ack(id)) + case Activate(id, replyTo) => + Effect.persist(Activated).thenReply(replyTo)((_: State) => Ack(id)).thenUnstashAll() case _: Unhandled => Effect.unhandled.thenNoReply() case Throw(id, t, replyTo) => @@ -429,7 +427,7 @@ class EventSourcedBehaviorStashSpec (1 to 10).foreach { n => if (n == 3) - c ! IncrementThenThrow(s"inc-$n", new TestException("test"), ackProbe.ref) + c ! IncrementThenThrow(s"inc-$n", new TestException("test")) else c ! Increment(s"inc-$n", ackProbe.ref) }