doc: stylish persistence.md and persistence-snapshot.md, #24717

This commit is contained in:
Patrik Nordwall 2019-09-16 08:29:03 +02:00
parent 8cb2721c33
commit 733f5fe2b1
19 changed files with 773 additions and 669 deletions

View file

@ -63,7 +63,7 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void handleDeposit() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "1");
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "1");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
@ -75,7 +75,7 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void handleWithdraw() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "2");
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "2");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
@ -87,7 +87,7 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void rejectWithdrawOverdraft() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "3");
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "3");
TestProbe<OperationResult> probe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(probe.getRef()));
probe.expectMessage(Confirmed.INSTANCE);
@ -99,7 +99,7 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void handleGetBalance() {
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "4");
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "4");
TestProbe<OperationResult> opProbe = testKit.createTestProbe(OperationResult.class);
ref.tell(new CreateAccount(opProbe.getRef()));
opProbe.expectMessage(Confirmed.INSTANCE);
@ -115,7 +115,7 @@ public class AccountExampleTest extends JUnitSuite {
@Test
public void beUsableWithAsk() throws Exception {
Duration timeout = Duration.ofSeconds(3);
EntityRef<AccountCommand> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "5");
EntityRef<Command> ref = sharding().entityRefFor(AccountEntity.ENTITY_TYPE_KEY, "5");
CompletionStage<OperationResult> createResult = ref.ask(CreateAccount::new, timeout);
assertEquals(Confirmed.INSTANCE, createResult.toCompletableFuture().get(3, TimeUnit.SECONDS));

View file

@ -30,18 +30,18 @@ public interface AccountExampleWithEventHandlersInState {
// #withEnforcedReplies
public class AccountEntity
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> {
// #withEnforcedReplies
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "Account");
// Command
// #reply-command
interface AccountCommand<Reply> extends ExpectingReply<Reply>, CborSerializable {}
interface Command<Reply> extends ExpectingReply<Reply>, CborSerializable {}
// #reply-command
public static class CreateAccount implements AccountCommand<OperationResult> {
public static class CreateAccount implements Command<OperationResult> {
private final ActorRef<OperationResult> replyTo;
@JsonCreator
@ -55,7 +55,7 @@ public interface AccountExampleWithEventHandlersInState {
}
}
public static class Deposit implements AccountCommand<OperationResult> {
public static class Deposit implements Command<OperationResult> {
public final BigDecimal amount;
private final ActorRef<OperationResult> replyTo;
@ -70,7 +70,7 @@ public interface AccountExampleWithEventHandlersInState {
}
}
public static class Withdraw implements AccountCommand<OperationResult> {
public static class Withdraw implements Command<OperationResult> {
public final BigDecimal amount;
private final ActorRef<OperationResult> replyTo;
@ -85,7 +85,7 @@ public interface AccountExampleWithEventHandlersInState {
}
}
public static class GetBalance implements AccountCommand<CurrentBalance> {
public static class GetBalance implements Command<CurrentBalance> {
private final ActorRef<CurrentBalance> replyTo;
@JsonCreator
@ -99,7 +99,7 @@ public interface AccountExampleWithEventHandlersInState {
}
}
public static class CloseAccount implements AccountCommand<OperationResult> {
public static class CloseAccount implements Command<OperationResult> {
private final ActorRef<OperationResult> replyTo;
@JsonCreator
@ -115,9 +115,9 @@ public interface AccountExampleWithEventHandlersInState {
// Reply
// #reply-command
interface AccountCommandReply extends CborSerializable {}
interface CommandReply extends CborSerializable {}
interface OperationResult extends AccountCommandReply {}
interface OperationResult extends CommandReply {}
enum Confirmed implements OperationResult {
INSTANCE
@ -133,7 +133,7 @@ public interface AccountExampleWithEventHandlersInState {
}
// #reply-command
public static class CurrentBalance implements AccountCommandReply {
public static class CurrentBalance implements CommandReply {
public final BigDecimal balance;
@JsonCreator
@ -143,11 +143,11 @@ public interface AccountExampleWithEventHandlersInState {
}
// Event
interface AccountEvent extends CborSerializable {}
interface Event extends CborSerializable {}
public static class AccountCreated implements AccountEvent {}
public static class AccountCreated implements Event {}
public static class Deposited implements AccountEvent {
public static class Deposited implements Event {
public final BigDecimal amount;
@JsonCreator
@ -156,7 +156,7 @@ public interface AccountExampleWithEventHandlersInState {
}
}
public static class Withdrawn implements AccountEvent {
public static class Withdrawn implements Event {
public final BigDecimal amount;
@JsonCreator
@ -165,7 +165,7 @@ public interface AccountExampleWithEventHandlersInState {
}
}
public static class AccountClosed implements AccountEvent {}
public static class AccountClosed implements Event {}
// State
interface Account extends CborSerializable {}
@ -209,7 +209,7 @@ public interface AccountExampleWithEventHandlersInState {
return new AccountEntity(accountNumber);
}
public AccountEntity(String accountNumber) {
private AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@ -219,8 +219,8 @@ public interface AccountExampleWithEventHandlersInState {
}
@Override
public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() {
CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder =
public CommandHandlerWithReply<Command, Event, Account> commandHandler() {
CommandHandlerWithReplyBuilder<Command, Event, Account> builder =
newCommandHandlerWithReplyBuilder();
builder.forStateType(EmptyAccount.class).onCommand(CreateAccount.class, this::createAccount);
@ -239,21 +239,20 @@ public interface AccountExampleWithEventHandlersInState {
return builder.build();
}
private ReplyEffect<AccountEvent, Account> createAccount(
EmptyAccount account, CreateAccount command) {
private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.thenReply(command, account2 -> Confirmed.INSTANCE);
}
private ReplyEffect<AccountEvent, Account> deposit(OpenedAccount account, Deposit command) {
private ReplyEffect<Event, Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(new Deposited(command.amount))
.thenReply(command, account2 -> Confirmed.INSTANCE);
}
// #reply
private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) {
private ReplyEffect<Event, Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(command, new Rejected("not enough funds to withdraw " + command.amount));
@ -265,13 +264,11 @@ public interface AccountExampleWithEventHandlersInState {
}
// #reply
private ReplyEffect<AccountEvent, Account> getBalance(
OpenedAccount account, GetBalance command) {
private ReplyEffect<Event, Account> getBalance(OpenedAccount account, GetBalance command) {
return Effect().reply(command, new CurrentBalance(account.balance));
}
private ReplyEffect<AccountEvent, Account> closeAccount(
OpenedAccount account, CloseAccount command) {
private ReplyEffect<Event, Account> closeAccount(OpenedAccount account, CloseAccount command) {
if (account.balance.equals(BigDecimal.ZERO)) {
return Effect()
.persist(new AccountClosed())
@ -282,8 +279,8 @@ public interface AccountExampleWithEventHandlersInState {
}
@Override
public EventHandler<Account, AccountEvent> eventHandler() {
EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder();
public EventHandler<Account, Event> eventHandler() {
EventHandlerBuilder<Account, Event> builder = newEventHandlerBuilder();
builder
.forStateType(EmptyAccount.class)

View file

@ -5,12 +5,9 @@
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
@ -32,15 +29,15 @@ public interface AccountExampleWithMutableState {
// #account-entity
public class AccountEntity
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> {
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "Account");
// Command
interface AccountCommand<Reply> extends ExpectingReply<Reply>, CborSerializable {}
interface Command<Reply> extends ExpectingReply<Reply>, CborSerializable {}
public static class CreateAccount implements AccountCommand<OperationResult> {
public static class CreateAccount implements Command<OperationResult> {
private final ActorRef<OperationResult> replyTo;
@JsonCreator
@ -54,7 +51,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class Deposit implements AccountCommand<OperationResult> {
public static class Deposit implements Command<OperationResult> {
public final BigDecimal amount;
private final ActorRef<OperationResult> replyTo;
@ -69,7 +66,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class Withdraw implements AccountCommand<OperationResult> {
public static class Withdraw implements Command<OperationResult> {
public final BigDecimal amount;
private final ActorRef<OperationResult> replyTo;
@ -84,7 +81,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class GetBalance implements AccountCommand<CurrentBalance> {
public static class GetBalance implements Command<CurrentBalance> {
private final ActorRef<CurrentBalance> replyTo;
@JsonCreator
@ -98,7 +95,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class CloseAccount implements AccountCommand<OperationResult> {
public static class CloseAccount implements Command<OperationResult> {
private final ActorRef<OperationResult> replyTo;
@JsonCreator
@ -113,9 +110,9 @@ public interface AccountExampleWithMutableState {
}
// Reply
interface AccountCommandReply extends CborSerializable {}
interface CommandReply extends CborSerializable {}
interface OperationResult extends AccountCommandReply {}
interface OperationResult extends CommandReply {}
enum Confirmed implements OperationResult {
INSTANCE
@ -130,7 +127,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class CurrentBalance implements AccountCommandReply {
public static class CurrentBalance implements CommandReply {
public final BigDecimal balance;
@JsonCreator
@ -140,11 +137,11 @@ public interface AccountExampleWithMutableState {
}
// Event
interface AccountEvent extends CborSerializable {}
interface Event extends CborSerializable {}
public static class AccountCreated implements AccountEvent {}
public static class AccountCreated implements Event {}
public static class Deposited implements AccountEvent {
public static class Deposited implements Event {
public final BigDecimal amount;
@JsonCreator
@ -153,7 +150,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class Withdrawn implements AccountEvent {
public static class Withdrawn implements Event {
public final BigDecimal amount;
@JsonCreator
@ -162,7 +159,7 @@ public interface AccountExampleWithMutableState {
}
}
public static class AccountClosed implements AccountEvent {}
public static class AccountClosed implements Event {}
// State
interface Account extends CborSerializable {}
@ -205,7 +202,7 @@ public interface AccountExampleWithMutableState {
return new AccountEntity(accountNumber);
}
public AccountEntity(String accountNumber) {
private AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@ -215,8 +212,8 @@ public interface AccountExampleWithMutableState {
}
@Override
public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() {
CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder =
public CommandHandlerWithReply<Command, Event, Account> commandHandler() {
CommandHandlerWithReplyBuilder<Command, Event, Account> builder =
newCommandHandlerWithReplyBuilder();
builder.forStateType(EmptyAccount.class).onCommand(CreateAccount.class, this::createAccount);
@ -235,20 +232,19 @@ public interface AccountExampleWithMutableState {
return builder.build();
}
private ReplyEffect<AccountEvent, Account> createAccount(
EmptyAccount account, CreateAccount command) {
private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.thenReply(command, account2 -> Confirmed.INSTANCE);
}
private ReplyEffect<AccountEvent, Account> deposit(OpenedAccount account, Deposit command) {
private ReplyEffect<Event, Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(new Deposited(command.amount))
.thenReply(command, account2 -> Confirmed.INSTANCE);
}
private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) {
private ReplyEffect<Event, Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(command, new Rejected("not enough funds to withdraw " + command.amount));
@ -259,13 +255,11 @@ public interface AccountExampleWithMutableState {
}
}
private ReplyEffect<AccountEvent, Account> getBalance(
OpenedAccount account, GetBalance command) {
private ReplyEffect<Event, Account> getBalance(OpenedAccount account, GetBalance command) {
return Effect().reply(command, new CurrentBalance(account.balance));
}
private ReplyEffect<AccountEvent, Account> closeAccount(
OpenedAccount account, CloseAccount command) {
private ReplyEffect<Event, Account> closeAccount(OpenedAccount account, CloseAccount command) {
if (account.getBalance().equals(BigDecimal.ZERO)) {
return Effect()
.persist(new AccountClosed())
@ -276,8 +270,8 @@ public interface AccountExampleWithMutableState {
}
@Override
public EventHandler<Account, AccountEvent> eventHandler() {
EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder();
public EventHandler<Account, Event> eventHandler() {
EventHandlerBuilder<Account, Event> builder = newEventHandlerBuilder();
builder
.forStateType(EmptyAccount.class)

View file

@ -5,18 +5,13 @@
package jdocs.akka.cluster.sharding.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.cluster.sharding.typed.javadsl.EventSourcedEntityWithEnforcedReplies;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.EventHandlerBuilder;
import akka.persistence.typed.javadsl.EventSourcedBehaviorWithEnforcedReplies;
import akka.persistence.typed.javadsl.ReplyEffect;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
@ -34,15 +29,15 @@ public interface AccountExampleWithNullState {
// #account-entity
public class AccountEntity
extends EventSourcedEntityWithEnforcedReplies<
AccountEntity.AccountCommand, AccountEntity.AccountEvent, AccountEntity.Account> {
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> {
public static final EntityTypeKey<AccountCommand> ENTITY_TYPE_KEY =
EntityTypeKey.create(AccountCommand.class, "Account");
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "Account");
// Command
interface AccountCommand<Reply> extends ExpectingReply<Reply>, CborSerializable {}
interface Command<Reply> extends ExpectingReply<Reply>, CborSerializable {}
public static class CreateAccount implements AccountCommand<OperationResult> {
public static class CreateAccount implements Command<OperationResult> {
private final ActorRef<OperationResult> replyTo;
@JsonCreator
@ -56,7 +51,7 @@ public interface AccountExampleWithNullState {
}
}
public static class Deposit implements AccountCommand<OperationResult> {
public static class Deposit implements Command<OperationResult> {
public final BigDecimal amount;
private final ActorRef<OperationResult> replyTo;
@ -71,7 +66,7 @@ public interface AccountExampleWithNullState {
}
}
public static class Withdraw implements AccountCommand<OperationResult> {
public static class Withdraw implements Command<OperationResult> {
public final BigDecimal amount;
private final ActorRef<OperationResult> replyTo;
@ -86,7 +81,7 @@ public interface AccountExampleWithNullState {
}
}
public static class GetBalance implements AccountCommand<CurrentBalance> {
public static class GetBalance implements Command<CurrentBalance> {
private final ActorRef<CurrentBalance> replyTo;
@JsonCreator
@ -100,7 +95,7 @@ public interface AccountExampleWithNullState {
}
}
public static class CloseAccount implements AccountCommand<OperationResult> {
public static class CloseAccount implements Command<OperationResult> {
private final ActorRef<OperationResult> replyTo;
@JsonCreator
@ -115,9 +110,9 @@ public interface AccountExampleWithNullState {
}
// Reply
interface AccountCommandReply extends CborSerializable {}
interface CommandReply extends CborSerializable {}
interface OperationResult extends AccountCommandReply {}
interface OperationResult extends CommandReply {}
enum Confirmed implements OperationResult {
INSTANCE
@ -132,7 +127,7 @@ public interface AccountExampleWithNullState {
}
}
public static class CurrentBalance implements AccountCommandReply {
public static class CurrentBalance implements CommandReply {
public final BigDecimal balance;
@JsonCreator
@ -142,11 +137,11 @@ public interface AccountExampleWithNullState {
}
// Event
interface AccountEvent extends CborSerializable {}
interface Event extends CborSerializable {}
public static class AccountCreated implements AccountEvent {}
public static class AccountCreated implements Event {}
public static class Deposited implements AccountEvent {
public static class Deposited implements Event {
public final BigDecimal amount;
@JsonCreator
@ -155,7 +150,7 @@ public interface AccountExampleWithNullState {
}
}
public static class Withdrawn implements AccountEvent {
public static class Withdrawn implements Event {
public final BigDecimal amount;
@JsonCreator
@ -164,7 +159,7 @@ public interface AccountExampleWithNullState {
}
}
public static class AccountClosed implements AccountEvent {}
public static class AccountClosed implements Event {}
// State
interface Account extends CborSerializable {}
@ -206,7 +201,7 @@ public interface AccountExampleWithNullState {
return new AccountEntity(accountNumber);
}
public AccountEntity(String accountNumber) {
private AccountEntity(String accountNumber) {
super(ENTITY_TYPE_KEY, accountNumber);
}
@ -216,8 +211,8 @@ public interface AccountExampleWithNullState {
}
@Override
public CommandHandlerWithReply<AccountCommand, AccountEvent, Account> commandHandler() {
CommandHandlerWithReplyBuilder<AccountCommand, AccountEvent, Account> builder =
public CommandHandlerWithReply<Command, Event, Account> commandHandler() {
CommandHandlerWithReplyBuilder<Command, Event, Account> builder =
newCommandHandlerWithReplyBuilder();
builder.forNullState().onCommand(CreateAccount.class, this::createAccount);
@ -236,19 +231,19 @@ public interface AccountExampleWithNullState {
return builder.build();
}
private ReplyEffect<AccountEvent, Account> createAccount(CreateAccount command) {
private ReplyEffect<Event, Account> createAccount(CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.thenReply(command, account2 -> Confirmed.INSTANCE);
}
private ReplyEffect<AccountEvent, Account> deposit(OpenedAccount account, Deposit command) {
private ReplyEffect<Event, Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(new Deposited(command.amount))
.thenReply(command, account2 -> Confirmed.INSTANCE);
}
private ReplyEffect<AccountEvent, Account> withdraw(OpenedAccount account, Withdraw command) {
private ReplyEffect<Event, Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(command, new Rejected("not enough funds to withdraw " + command.amount));
@ -259,13 +254,11 @@ public interface AccountExampleWithNullState {
}
}
private ReplyEffect<AccountEvent, Account> getBalance(
OpenedAccount account, GetBalance command) {
private ReplyEffect<Event, Account> getBalance(OpenedAccount account, GetBalance command) {
return Effect().reply(command, new CurrentBalance(account.balance));
}
private ReplyEffect<AccountEvent, Account> closeAccount(
OpenedAccount account, CloseAccount command) {
private ReplyEffect<Event, Account> closeAccount(OpenedAccount account, CloseAccount command) {
if (account.balance.equals(BigDecimal.ZERO)) {
return Effect()
.persist(new AccountClosed())
@ -276,8 +269,8 @@ public interface AccountExampleWithNullState {
}
@Override
public EventHandler<Account, AccountEvent> eventHandler() {
EventHandlerBuilder<Account, AccountEvent> builder = newEventHandlerBuilder();
public EventHandler<Account, Event> eventHandler() {
EventHandlerBuilder<Account, Event> builder = newEventHandlerBuilder();
builder.forNullState().onEvent(AccountCreated.class, () -> new OpenedAccount());

View file

@ -23,8 +23,7 @@ import akka.cluster.sharding.typed.javadsl.Entity;
// #import
import jdocs.akka.persistence.typed.BlogPostExample.BlogCommand;
import jdocs.akka.persistence.typed.BlogPostExample.BlogBehavior;
import jdocs.akka.persistence.typed.BlogPostEntity;
interface ShardingCompileOnlyTest {
@ -202,9 +201,10 @@ interface ShardingCompileOnlyTest {
ClusterSharding sharding = ClusterSharding.get(system);
// #persistence
EntityTypeKey<BlogCommand> blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost");
EntityTypeKey<BlogPostEntity.Command> blogTypeKey =
EntityTypeKey.create(BlogPostEntity.Command.class, "BlogPost");
sharding.init(Entity.of(blogTypeKey, ctx -> BlogBehavior.behavior(ctx.getEntityId())));
sharding.init(Entity.of(blogTypeKey, ctx -> BlogPostEntity.create(ctx.getEntityId())));
// #persistence
}
}

View file

@ -62,7 +62,7 @@ class AccountExampleSpec
"handle Withdraw" in {
// OperationResult is the expected reply type for these commands, but it should also be
// possible to use the super type AccountCommandReply
val probe = createTestProbe[AccountCommandReply]()
val probe = createTestProbe[CommandReply]()
val ref = ClusterSharding(system).entityRefFor(AccountEntity.TypeKey, "2")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
@ -80,7 +80,7 @@ class AccountExampleSpec
// AccountCommand[_] is the command type, but it should also be possible to narrow it to
// AccountCommand[OperationResult]
val probe = createTestProbe[OperationResult]()
val ref = ClusterSharding(system).entityRefFor[AccountCommand[OperationResult]](AccountEntity.TypeKey, "3")
val ref = ClusterSharding(system).entityRefFor[Command[OperationResult]](AccountEntity.TypeKey, "3")
ref ! CreateAccount(probe.ref)
probe.expectMessage(Confirmed)
ref ! Deposit(100, probe.ref)

View file

@ -24,43 +24,41 @@ object AccountExampleWithCommandHandlersInState {
//#account-entity
object AccountEntity {
// Command
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] with CborSerializable
final case class CreateAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
sealed trait Command[Reply <: CommandReply] extends ExpectingReply[Reply] with CborSerializable
final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult]
final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
extends Command[OperationResult]
final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
extends Command[OperationResult]
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult]
// Reply
sealed trait AccountCommandReply extends CborSerializable
sealed trait OperationResult extends AccountCommandReply
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
// Event
sealed trait AccountEvent extends CborSerializable
case object AccountCreated extends AccountEvent
case class Deposited(amount: BigDecimal) extends AccountEvent
case class Withdrawn(amount: BigDecimal) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Event extends CborSerializable
case object AccountCreated extends Event
case class Deposited(amount: BigDecimal) extends Event
case class Withdrawn(amount: BigDecimal) extends Event
case object AccountClosed extends Event
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Account]
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[Event, Account]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: AccountCommand[_]): ReplyEffect
def applyEvent(event: AccountEvent): Account
def applyCommand(cmd: Command[_]): ReplyEffect
def applyEvent(event: Event): Account
}
case object EmptyAccount extends Account {
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
override def applyCommand(cmd: Command[_]): ReplyEffect =
cmd match {
case c: CreateAccount =>
Effect.persist(AccountCreated).thenReply(c)(_ => Confirmed)
@ -69,7 +67,7 @@ object AccountExampleWithCommandHandlersInState {
Effect.unhandled.thenNoReply()
}
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
event match {
case AccountCreated => OpenedAccount(Zero)
case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
@ -78,7 +76,7 @@ object AccountExampleWithCommandHandlersInState {
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
override def applyCommand(cmd: Command[_]): ReplyEffect =
cmd match {
case c: Deposit =>
Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed)
@ -103,7 +101,7 @@ object AccountExampleWithCommandHandlersInState {
}
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
event match {
case Deposited(amount) => copy(balance = balance + amount)
case Withdrawn(amount) => copy(balance = balance - amount)
@ -117,7 +115,7 @@ object AccountExampleWithCommandHandlersInState {
}
case object ClosedAccount extends Account {
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
override def applyCommand(cmd: Command[_]): ReplyEffect =
cmd match {
case c @ (_: Deposit | _: Withdraw) =>
Effect.reply(c)(Rejected("Account is closed"))
@ -129,15 +127,15 @@ object AccountExampleWithCommandHandlersInState {
Effect.reply(c)(Rejected("Account is already created"))
}
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
val TypeKey: EntityTypeKey[AccountCommand[_]] =
EntityTypeKey[AccountCommand[_]]("Account")
val TypeKey: EntityTypeKey[Command[_]] =
EntityTypeKey[Command[_]]("Account")
def apply(accountNumber: String): Behavior[AccountCommand[_]] = {
EventSourcedEntity.withEnforcedReplies[AccountCommand[_], AccountEvent, Account](
def apply(accountNumber: String): Behavior[Command[_]] = {
EventSourcedEntity.withEnforcedReplies[Command[_], Event, Account](
TypeKey,
accountNumber,
EmptyAccount,

View file

@ -27,44 +27,42 @@ object AccountExampleWithEventHandlersInState {
object AccountEntity {
// Command
//#reply-command
sealed trait AccountCommand[Reply <: AccountCommandReply] extends ExpectingReply[Reply] with CborSerializable
sealed trait Command[Reply <: CommandReply] extends ExpectingReply[Reply] with CborSerializable
//#reply-command
final case class CreateAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult]
final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
extends Command[OperationResult]
//#reply-command
final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
extends Command[OperationResult]
//#reply-command
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult]
// Reply
//#reply-command
sealed trait AccountCommandReply extends CborSerializable
sealed trait OperationResult extends AccountCommandReply
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
//#reply-command
final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
// Event
sealed trait AccountEvent extends CborSerializable
case object AccountCreated extends AccountEvent
case class Deposited(amount: BigDecimal) extends AccountEvent
case class Withdrawn(amount: BigDecimal) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Event extends CborSerializable
case object AccountCreated extends Event
case class Deposited(amount: BigDecimal) extends Event
case class Withdrawn(amount: BigDecimal) extends Event
case object AccountClosed extends Event
val Zero = BigDecimal(0)
// State
sealed trait Account extends CborSerializable {
def applyEvent(event: AccountEvent): Account
def applyEvent(event: Event): Account
}
case object EmptyAccount extends Account {
override def applyEvent(event: AccountEvent): Account = event match {
override def applyEvent(event: Event): Account = event match {
case AccountCreated => OpenedAccount(Zero)
case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
}
@ -72,7 +70,7 @@ object AccountExampleWithEventHandlersInState {
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
event match {
case Deposited(amount) => copy(balance = balance + amount)
case Withdrawn(amount) => copy(balance = balance - amount)
@ -86,24 +84,24 @@ object AccountExampleWithEventHandlersInState {
}
case object ClosedAccount extends Account {
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
val TypeKey: EntityTypeKey[AccountCommand[_]] =
EntityTypeKey[AccountCommand[_]]("Account")
val TypeKey: EntityTypeKey[Command[_]] =
EntityTypeKey[Command[_]]("Account")
// Note that after defining command, event and state classes you would probably start here when writing this.
// When filling in the parameters of EventSourcedBehavior.apply you can use IntelliJ alt+Enter > createValue
// to generate the stub with types for the command and event handlers.
//#withEnforcedReplies
def apply(accountNumber: String): Behavior[AccountCommand[_]] = {
def apply(accountNumber: String): Behavior[Command[_]] = {
EventSourcedEntity.withEnforcedReplies(TypeKey, accountNumber, EmptyAccount, commandHandler, eventHandler)
}
//#withEnforcedReplies
private val commandHandler: (Account, AccountCommand[_]) => ReplyEffect[AccountEvent, Account] = { (state, cmd) =>
private val commandHandler: (Account, Command[_]) => ReplyEffect[Event, Account] = { (state, cmd) =>
state match {
case EmptyAccount =>
cmd match {
@ -134,20 +132,20 @@ object AccountExampleWithEventHandlersInState {
}
}
private val eventHandler: (Account, AccountEvent) => Account = { (state, event) =>
private val eventHandler: (Account, Event) => Account = { (state, event) =>
state.applyEvent(event)
}
private def createAccount(cmd: CreateAccount): ReplyEffect[AccountEvent, Account] = {
private def createAccount(cmd: CreateAccount): ReplyEffect[Event, Account] = {
Effect.persist(AccountCreated).thenReply(cmd)(_ => Confirmed)
}
private def deposit(cmd: Deposit): ReplyEffect[AccountEvent, Account] = {
private def deposit(cmd: Deposit): ReplyEffect[Event, Account] = {
Effect.persist(Deposited(cmd.amount)).thenReply(cmd)(_ => Confirmed)
}
//#reply
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = {
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = {
if (acc.canWithdraw(cmd.amount))
Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd)(_ => Confirmed)
else
@ -155,11 +153,11 @@ object AccountExampleWithEventHandlersInState {
}
//#reply
private def getBalance(acc: OpenedAccount, cmd: GetBalance): ReplyEffect[AccountEvent, Account] = {
private def getBalance(acc: OpenedAccount, cmd: GetBalance): ReplyEffect[Event, Account] = {
Effect.reply(cmd)(CurrentBalance(acc.balance))
}
private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[AccountEvent, Account] = {
private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[Event, Account] = {
if (acc.balance == Zero)
Effect.persist(AccountClosed).thenReply(cmd)(_ => Confirmed)
else

View file

@ -25,45 +25,43 @@ object AccountExampleWithOptionState {
//#account-entity
object AccountEntity {
// Command
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply] with CborSerializable
final case class CreateAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
sealed trait Command[Reply <: CommandReply] extends ExpectingReply[Reply] with CborSerializable
final case class CreateAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult]
final case class Deposit(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
extends Command[OperationResult]
final case class Withdraw(amount: BigDecimal, override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends AccountCommand[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult])
extends AccountCommand[OperationResult]
extends Command[OperationResult]
final case class GetBalance(override val replyTo: ActorRef[CurrentBalance]) extends Command[CurrentBalance]
final case class CloseAccount(override val replyTo: ActorRef[OperationResult]) extends Command[OperationResult]
// Reply
sealed trait AccountCommandReply extends CborSerializable
sealed trait OperationResult extends AccountCommandReply
sealed trait CommandReply extends CborSerializable
sealed trait OperationResult extends CommandReply
case object Confirmed extends OperationResult
final case class Rejected(reason: String) extends OperationResult
final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply
final case class CurrentBalance(balance: BigDecimal) extends CommandReply
// Event
sealed trait AccountEvent extends CborSerializable
case object AccountCreated extends AccountEvent
case class Deposited(amount: BigDecimal) extends AccountEvent
case class Withdrawn(amount: BigDecimal) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Event extends CborSerializable
case object AccountCreated extends Event
case class Deposited(amount: BigDecimal) extends Event
case class Withdrawn(amount: BigDecimal) extends Event
case object AccountClosed extends Event
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Option[Account]]
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[Event, Option[Account]]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: AccountCommand[_]): ReplyEffect
def applyEvent(event: AccountEvent): Account
def applyCommand(cmd: Command[_]): ReplyEffect
def applyEvent(event: Event): Account
}
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
override def applyCommand(cmd: Command[_]): ReplyEffect =
cmd match {
case c: Deposit =>
Effect.persist(Deposited(c.amount)).thenReply(c)(_ => Confirmed)
@ -88,7 +86,7 @@ object AccountExampleWithOptionState {
}
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
event match {
case Deposited(amount) => copy(balance = balance + amount)
case Withdrawn(amount) => copy(balance = balance - amount)
@ -102,7 +100,7 @@ object AccountExampleWithOptionState {
}
case object ClosedAccount extends Account {
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
override def applyCommand(cmd: Command[_]): ReplyEffect =
cmd match {
case c @ (_: Deposit | _: Withdraw) =>
Effect.reply(c)(Rejected("Account is closed"))
@ -114,15 +112,15 @@ object AccountExampleWithOptionState {
Effect.reply(c)(Rejected("Account is already created"))
}
override def applyEvent(event: AccountEvent): Account =
override def applyEvent(event: Event): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
val TypeKey: EntityTypeKey[AccountCommand[_]] =
EntityTypeKey[AccountCommand[_]]("Account")
val TypeKey: EntityTypeKey[Command[_]] =
EntityTypeKey[Command[_]]("Account")
def apply(accountNumber: String): Behavior[AccountCommand[_]] = {
EventSourcedEntity.withEnforcedReplies[AccountCommand[_], AccountEvent, Option[Account]](
def apply(accountNumber: String): Behavior[Command[_]] = {
EventSourcedEntity.withEnforcedReplies[Command[_], Event, Option[Account]](
TypeKey,
accountNumber,
None,
@ -138,7 +136,7 @@ object AccountExampleWithOptionState {
})
}
def onFirstCommand(cmd: AccountCommand[_]): ReplyEffect = {
def onFirstCommand(cmd: Command[_]): ReplyEffect = {
cmd match {
case c: CreateAccount =>
Effect.persist(AccountCreated).thenReply(c)(_ => Confirmed)
@ -148,7 +146,7 @@ object AccountExampleWithOptionState {
}
}
def onFirstEvent(event: AccountEvent): Account = {
def onFirstEvent(event: Event): Account = {
event match {
case AccountCreated => OpenedAccount(Zero)
case _ => throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")

View file

@ -9,8 +9,8 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.Entity
import com.github.ghik.silencer.silent
import docs.akka.persistence.typed.BlogPostExample
import docs.akka.persistence.typed.BlogPostExample.BlogCommand
import docs.akka.persistence.typed.BlogPostEntity
import docs.akka.persistence.typed.BlogPostEntity.Command
@silent
object ShardingCompileOnlySpec {
@ -67,12 +67,10 @@ object ShardingCompileOnlySpec {
shardRegion ! ShardingEnvelope("counter-1", Counter.Increment)
//#send
import BlogPostExample.behavior
//#persistence
val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost")
val BlogTypeKey = EntityTypeKey[Command]("BlogPost")
ClusterSharding(system).init(Entity(typeKey = BlogTypeKey, createBehavior = ctx => behavior(ctx.entityId)))
ClusterSharding(system).init(Entity(typeKey = BlogTypeKey, createBehavior = ctx => BlogPostEntity(ctx.entityId)))
//#persistence
}

View file

@ -38,7 +38,8 @@ recovery like this:
Scala
: @@snip [BasicPersistentActorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #snapshotSelection }
TODO #26273 include corresponding example in Java
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #snapshotSelection }
To disable snapshot-based recovery, applications can use @scala[`SnapshotSelectionCriteria.None`]@java[`SnapshotSelectionCriteria.none()`].
A recovery where no saved snapshot matches the specified `SnapshotSelectionCriteria` will replay all journaled
@ -46,7 +47,7 @@ events. This can be useful if snapshot serialization format has changed in an in
not be used when events have been deleted.
In order to use snapshots, a default snapshot-store (`akka.persistence.snapshot-store.plugin`) must be configured,
or you can pick a snapshot store for for a specific `EventSourcedBehavior by
or you can pick a snapshot store for for a specific `EventSourcedBehavior` by
@scala[defining it with `withSnapshotPluginId` of the `EventSourcedBehavior`]@java[overriding `snapshotPluginId` in
the `EventSourcedBehavior`].

View file

@ -182,18 +182,18 @@ Once it is started then it we can look it up with `GetPost`, modify it with `Cha
The state is captured by:
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #state }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #state }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #state }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #state }
The commands, of which only a subset are valid depending on the state:
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #commands }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #commands }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #commands }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #commands }
@java[The commandler handler to process each command is decided by the state class (or state predicate) that is
given to the `forStateType` of the `CommandHandlerBuilder` and the match cases in the builders.]
@ -202,27 +202,27 @@ It typically becomes two levels of pattern matching, first on the state and then
Delegating to methods is a good practice because the one-line cases give a nice overview of the message dispatch.
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #command-handler }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #command-handler }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #command-handler }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #command-handler }
The event handler:
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #event-handler }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #event-handler }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #event-handler }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #event-handler }
And finally the behavior is created @scala[from the `EventSourcedBehavior.apply`]:
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #behavior }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #behavior }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #behavior }
This can be taken one or two steps further by defining the event and command handlers in the state class as
illustrated in @ref:[event handlers in the state](persistence-style.md#event-handlers-in-the-state) and
@ -241,7 +241,7 @@ Each command has a single `Effect` which can be:
Note that there is only one of these. It is not possible to both persist and say none/unhandled.
These are created using @java[a factory that is returned via the `Effect()` method]
@scala[the `Effect` factory] and once created additional `SideEffects` can be added.
@scala[the `Effect` factory] and once created additional side effects can be added.
Most of them time this will be done with the `thenRun` method on the `Effect` above. You can factor out
common side effects into functions and reuse for several commands. For example:
@ -254,8 +254,8 @@ Java
### Side effects ordering and guarantees
Any `SideEffect`s are executed on an at-once basis and will not be executed if the persist fails.
The `SideEffect`s are executed sequentially, it is not possible to execute `SideEffect`s in parallel.
Any side effects are executed on an at-once basis and will not be executed if the persist fails.
The side effects are executed sequentially, it is not possible to execute side effects in parallel.
## Replies
@ -268,17 +268,17 @@ commands. After validation errors or after persisting events, using a `thenRun`
be sent to the `ActorRef`.
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #reply-command }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #reply-command }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #reply-command }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #reply-command }
Scala
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #reply }
: @@snip [BlogPostEntity.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntity.scala) { #reply }
Java
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #reply }
: @@snip [BlogPostEntity.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntity.java) { #reply }
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that
@ -306,7 +306,8 @@ Java
The `ReplyEffect` is created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`],
@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`].
@java[Note that command handlers are defined with `newCommandHandlerWithReplyBuilder` when using `EventSourcedBehaviorWithEnforcedReplies`, as opposed to newCommandHandlerBuilder when using `EventSourcedBehavior`.]
@java[Note that command handlers are defined with `newCommandHandlerWithReplyBuilder` when using
`EventSourcedBehaviorWithEnforcedReplies`, as opposed to newCommandHandlerBuilder when using `EventSourcedBehavior`.]
Scala
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithEventHandlersInState.scala) { #reply }
@ -411,7 +412,7 @@ This is signalled to a `EventSourcedBehavior` via a `EventRejectedException` and
## Stash
When persisting events with `persist` or `persistAll` it is guaranteed that the persistent actor will not receive
When persisting events with `persist` or `persistAll` it is guaranteed that the `EventSourcedBehavior` will not receive
further commands until after the events have been confirmed to be persisted and additional side effects have been run.
Incoming messages are stashed automatically until the `persist` is completed.

View file

@ -181,11 +181,11 @@ public class PersistentActorCompileOnlyTest {
public static class Cmd implements MyCommand {
private final String data;
private final ActorRef<Ack> sender;
private final ActorRef<Ack> replyTo;
public Cmd(String data, ActorRef<Ack> sender) {
public Cmd(String data, ActorRef<Ack> replyTo) {
this.data = data;
this.sender = sender;
this.replyTo = replyTo;
}
}
@ -218,10 +218,9 @@ public class PersistentActorCompileOnlyTest {
return new ExampleState();
}
// #commonChainedEffects
@Override
public CommandHandler<MyCommand, MyEvent, ExampleState> commandHandler() {
// #commonChainedEffects
return newCommandHandlerBuilder()
.forStateType(ExampleState.class)
.onCommand(
@ -229,11 +228,11 @@ public class PersistentActorCompileOnlyTest {
(state, cmd) ->
Effect()
.persist(new Evt(cmd.data))
.thenRun(() -> cmd.sender.tell(new Ack()))
.thenRun(() -> cmd.replyTo.tell(new Ack()))
.thenRun(commonChainedEffect))
.build();
// #commonChainedEffects
}
// #commonChainedEffects
@Override
public EventHandler<ExampleState, MyEvent> eventHandler() {

View file

@ -13,6 +13,7 @@ import akka.persistence.typed.DeleteEventsFailed;
import akka.persistence.typed.DeleteSnapshotsFailed;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.SnapshotFailed;
import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
// #behavior
@ -36,16 +37,17 @@ public class BasicPersistentBehaviorTest {
extends EventSourcedBehavior<
MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> {
static EventSourcedBehavior<Command, Event, State> eventSourcedBehavior =
new MyPersistentBehavior(new PersistenceId("pid"));
interface Command {}
interface Event {}
public static class State {}
public MyPersistentBehavior(PersistenceId persistenceId) {
public static Behavior<Command> create() {
return new MyPersistentBehavior(new PersistenceId("pid"));
}
private MyPersistentBehavior(PersistenceId persistenceId) {
super(persistenceId);
}
@ -132,7 +134,13 @@ public class BasicPersistentBehaviorTest {
// #state
// #behavior
public MyPersistentBehavior(PersistenceId persistenceId) {
// commands, events and state defined here
public static Behavior<Command> create(PersistenceId persistenceId) {
return new MyPersistentBehavior(persistenceId);
}
private MyPersistentBehavior(PersistenceId persistenceId) {
super(persistenceId);
}
@ -168,20 +176,31 @@ public class BasicPersistentBehaviorTest {
}
interface More {
interface Command {}
interface Event {}
public static class State {}
// #supervision
public class MyPersistentBehavior extends EventSourcedBehavior<Command, Event, State> {
public MyPersistentBehavior(PersistenceId persistenceId) {
public class MyPersistentBehavior
extends EventSourcedBehavior<
MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> {
// #supervision
interface Command {}
interface Event {}
public static class State {}
// #supervision
public static Behavior<Command> create(PersistenceId persistenceId) {
return new MyPersistentBehavior(persistenceId);
}
private MyPersistentBehavior(PersistenceId persistenceId) {
super(
persistenceId,
SupervisorStrategy.restartWithBackoff(
Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
}
// #supervision
@Override
@ -206,7 +225,7 @@ public class BasicPersistentBehaviorTest {
// #recovery
@Override
public SignalHandler signalHandler() {
public SignalHandler<State> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
RecoveryCompleted.instance(),
@ -223,32 +242,92 @@ public class BasicPersistentBehaviorTest {
throw new RuntimeException("TODO: inspect the event and return any tags it should have");
}
// #tagging
// #supervision
}
// #supervision
}
EventSourcedBehavior<Command, Event, State> eventSourcedBehavior =
new MyPersistentBehavior(new PersistenceId("pid"));
interface More2 {
// #wrapPersistentBehavior
Behavior<Command> debugAlwaysSnapshot =
Behaviors.setup(
(context) -> {
return new MyPersistentBehavior(new PersistenceId("pid")) {
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
context
.getLog()
.info(
"Snapshot actor {} => state: {}", context.getSelf().path().name(), state);
return true;
}
};
});
public class MyPersistentBehavior
extends EventSourcedBehavior<
MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> {
// #wrapPersistentBehavior
interface Command {}
interface Event {}
public static class State {}
// #wrapPersistentBehavior
public static Behavior<Command> create(PersistenceId persistenceId) {
return Behaviors.setup(context -> new MyPersistentBehavior(persistenceId, context));
}
private final ActorContext<Command> context;
private MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> context) {
super(
persistenceId,
SupervisorStrategy.restartWithBackoff(
Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
this.context = context;
}
// #wrapPersistentBehavior
@Override
public State emptyState() {
return new State();
}
@Override
public CommandHandler<Command, Event, State> commandHandler() {
return (state, command) -> {
throw new RuntimeException("TODO: process the command & return an Effect");
};
}
@Override
public EventHandler<State, Event> eventHandler() {
return (state, event) -> {
throw new RuntimeException("TODO: process the event return the next state");
};
}
// #wrapPersistentBehavior
@Override
public boolean shouldSnapshot(State state, Event event, long sequenceNr) {
context
.getLog()
.info("Snapshot actor {} => state: {}", context.getSelf().path().name(), state);
return true;
}
}
// #wrapPersistentBehavior
}
public static class BookingCompleted implements Event {}
interface Snapshotting {
public static class Snapshotting extends EventSourcedBehavior<Command, Event, State> {
public Snapshotting(PersistenceId persistenceId) {
public class MyPersistentBehavior
extends EventSourcedBehavior<
MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> {
interface Command {}
interface Event {}
public static class BookingCompleted implements Event {}
public static class State {}
public static Behavior<Command> create(PersistenceId persistenceId) {
return new MyPersistentBehavior(persistenceId);
}
private MyPersistentBehavior(PersistenceId persistenceId) {
super(persistenceId);
}
@ -287,7 +366,7 @@ public class BasicPersistentBehaviorTest {
// #retentionCriteriaWithSignals
@Override
public SignalHandler signalHandler() {
public SignalHandler<State> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
SnapshotFailed.class,
@ -310,44 +389,62 @@ public class BasicPersistentBehaviorTest {
}
// #retentionCriteriaWithSignals
}
}
public static class Snapshotting2 extends Snapshotting {
public Snapshotting2(PersistenceId persistenceId) {
super(persistenceId);
}
// #snapshotAndEventDeletes
@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
return RetentionCriteria.snapshotEvery(100, 2).withDeleteEventsOnSnapshot();
}
// #snapshotAndEventDeletes
public static class Snapshotting2 extends Snapshotting.MyPersistentBehavior {
public Snapshotting2(PersistenceId persistenceId) {
super(persistenceId);
}
// #snapshotAndEventDeletes
@Override // override retentionCriteria in EventSourcedBehavior
public RetentionCriteria retentionCriteria() {
return RetentionCriteria.snapshotEvery(100, 2).withDeleteEventsOnSnapshot();
}
// #snapshotAndEventDeletes
}
public static class SnapshotSelection extends Snapshotting.MyPersistentBehavior {
public SnapshotSelection(PersistenceId persistenceId) {
super(persistenceId);
}
// #snapshotSelection
@Override
public SnapshotSelectionCriteria snapshotSelectionCriteria() {
return SnapshotSelectionCriteria.none();
}
// #snapshotSelection
}
interface WithActorContext {
interface Command {}
interface Event {}
public static class State {}
// #actor-context
public class MyPersistentBehavior extends EventSourcedBehavior<Command, Event, State> {
public class MyPersistentBehavior
extends EventSourcedBehavior<
MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State> {
// #actor-context
public static Behavior<Command> behavior(PersistenceId persistenceId) {
interface Command {}
interface Event {}
public static class State {}
// #actor-context
public static Behavior<Command> create(PersistenceId persistenceId) {
return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx));
}
// this makes the context available to the command handler etc.
private final ActorContext<Command> ctx;
private final ActorContext<Command> context;
// optionally if you only need `ActorContext.getSelf()`
private final ActorRef<Command> self;
public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) {
super(persistenceId);
this.ctx = ctx;
this.context = ctx;
this.self = ctx.getSelf();
}

View file

@ -0,0 +1,267 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.*;
// #behavior
public class BlogPostEntity
extends EventSourcedBehavior<
BlogPostEntity.Command, BlogPostEntity.Event, BlogPostEntity.State> {
// commands, events and state as in above snippets
// #behavior
// #event
public interface Event {}
public static class PostAdded implements Event {
private final String postId;
private final PostContent content;
public PostAdded(String postId, PostContent content) {
this.postId = postId;
this.content = content;
}
}
public static class BodyChanged implements Event {
private final String postId;
private final String newBody;
public BodyChanged(String postId, String newBody) {
this.postId = postId;
this.newBody = newBody;
}
}
public static class Published implements Event {
private final String postId;
public Published(String postId) {
this.postId = postId;
}
}
// #event
// #state
interface State {}
enum BlankState implements State {
INSTANCE
}
static class DraftState implements State {
final PostContent content;
DraftState(PostContent content) {
this.content = content;
}
DraftState withContent(PostContent newContent) {
return new DraftState(newContent);
}
DraftState withBody(String newBody) {
return withContent(new PostContent(postId(), content.title, newBody));
}
String postId() {
return content.postId;
}
}
static class PublishedState implements State {
final PostContent content;
PublishedState(PostContent content) {
this.content = content;
}
PublishedState withContent(PostContent newContent) {
return new PublishedState(newContent);
}
PublishedState withBody(String newBody) {
return withContent(new PostContent(postId(), content.title, newBody));
}
String postId() {
return content.postId;
}
}
// #state
// #commands
public interface Command {}
// #reply-command
public static class AddPost implements Command {
final PostContent content;
final ActorRef<AddPostDone> replyTo;
public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
this.content = content;
this.replyTo = replyTo;
}
}
public static class AddPostDone implements Command {
final String postId;
public AddPostDone(String postId) {
this.postId = postId;
}
}
// #reply-command
public static class GetPost implements Command {
final ActorRef<PostContent> replyTo;
public GetPost(ActorRef<PostContent> replyTo) {
this.replyTo = replyTo;
}
}
public static class ChangeBody implements Command {
final String newBody;
final ActorRef<Done> replyTo;
public ChangeBody(String newBody, ActorRef<Done> replyTo) {
this.newBody = newBody;
this.replyTo = replyTo;
}
}
public static class Publish implements Command {
final ActorRef<Done> replyTo;
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class PostContent implements Command {
final String postId;
final String title;
final String body;
public PostContent(String postId, String title, String body) {
this.postId = postId;
this.title = title;
this.body = body;
}
}
// #commands
// #behavior
public static Behavior<Command> create(String entityId) {
return Behaviors.setup(ctx -> new BlogPostEntity(new PersistenceId("Blog-" + entityId)));
}
private BlogPostEntity(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public State emptyState() {
return BlankState.INSTANCE;
}
// #behavior
// #command-handler
@Override
public CommandHandler<Command, Event, State> commandHandler() {
CommandHandlerBuilder<Command, Event, State> builder = newCommandHandlerBuilder();
builder.forStateType(BlankState.class).onCommand(AddPost.class, this::onAddPost);
builder
.forStateType(DraftState.class)
.onCommand(ChangeBody.class, this::onChangeBody)
.onCommand(Publish.class, this::onPublish)
.onCommand(GetPost.class, this::onGetPost);
builder
.forStateType(PublishedState.class)
.onCommand(ChangeBody.class, this::onChangeBody)
.onCommand(GetPost.class, this::onGetPost);
builder.forAnyState().onCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
return builder.build();
}
private Effect<Event, State> onAddPost(AddPost cmd) {
// #reply
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect()
.persist(event)
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
// #reply
}
private Effect<Event, State> onChangeBody(DraftState state, ChangeBody cmd) {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
}
private Effect<Event, State> onChangeBody(PublishedState state, ChangeBody cmd) {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
}
private Effect<Event, State> onPublish(DraftState state, Publish cmd) {
return Effect()
.persist(new Published(state.postId()))
.thenRun(
() -> {
System.out.println("Blog post published: " + state.postId());
cmd.replyTo.tell(Done.getInstance());
});
}
private Effect<Event, State> onGetPost(DraftState state, GetPost cmd) {
cmd.replyTo.tell(state.content);
return Effect().none();
}
private Effect<Event, State> onGetPost(PublishedState state, GetPost cmd) {
cmd.replyTo.tell(state.content);
return Effect().none();
}
// #command-handler
// #event-handler
@Override
public EventHandler<State, Event> eventHandler() {
EventHandlerBuilder<State, Event> builder = newEventHandlerBuilder();
builder
.forStateType(BlankState.class)
.onEvent(PostAdded.class, event -> new DraftState(event.content));
builder
.forStateType(DraftState.class)
.onEvent(BodyChanged.class, (state, chg) -> state.withBody(chg.newBody))
.onEvent(Published.class, (state, event) -> new PublishedState(state.content));
builder
.forStateType(PublishedState.class)
.onEvent(BodyChanged.class, (state, chg) -> state.withBody(chg.newBody));
return builder.build();
}
// #event-handler
// #behavior
// commandHandler, eventHandler as in above snippets
}
// #behavior

View file

@ -1,264 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.*;
public interface BlogPostExample {
// #event
interface BlogEvent {}
public class PostAdded implements BlogEvent {
private final String postId;
private final PostContent content;
public PostAdded(String postId, PostContent content) {
this.postId = postId;
this.content = content;
}
}
public class BodyChanged implements BlogEvent {
private final String postId;
private final String newBody;
public BodyChanged(String postId, String newBody) {
this.postId = postId;
this.newBody = newBody;
}
}
public class Published implements BlogEvent {
private final String postId;
public Published(String postId) {
this.postId = postId;
}
}
// #event
// #state
interface BlogState {}
public enum BlankState implements BlogState {
INSTANCE
}
public class DraftState implements BlogState {
final PostContent content;
DraftState(PostContent content) {
this.content = content;
}
public DraftState withContent(PostContent newContent) {
return new DraftState(newContent);
}
public DraftState withBody(String newBody) {
return withContent(new PostContent(postId(), content.title, newBody));
}
public String postId() {
return content.postId;
}
}
public class PublishedState implements BlogState {
final PostContent content;
PublishedState(PostContent content) {
this.content = content;
}
public PublishedState withContent(PostContent newContent) {
return new PublishedState(newContent);
}
public PublishedState withBody(String newBody) {
return withContent(new PostContent(postId(), content.title, newBody));
}
public String postId() {
return content.postId;
}
}
// #state
// #commands
public interface BlogCommand {}
// #reply-command
public class AddPost implements BlogCommand {
final PostContent content;
final ActorRef<AddPostDone> replyTo;
public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
this.content = content;
this.replyTo = replyTo;
}
}
public class AddPostDone implements BlogCommand {
final String postId;
public AddPostDone(String postId) {
this.postId = postId;
}
}
// #reply-command
public class GetPost implements BlogCommand {
final ActorRef<PostContent> replyTo;
public GetPost(ActorRef<PostContent> replyTo) {
this.replyTo = replyTo;
}
}
public class ChangeBody implements BlogCommand {
final String newBody;
final ActorRef<Done> replyTo;
public ChangeBody(String newBody, ActorRef<Done> replyTo) {
this.newBody = newBody;
this.replyTo = replyTo;
}
}
public class Publish implements BlogCommand {
final ActorRef<Done> replyTo;
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public class PostContent implements BlogCommand {
final String postId;
final String title;
final String body;
public PostContent(String postId, String title, String body) {
this.postId = postId;
this.title = title;
this.body = body;
}
}
// #commands
// #behavior
public class BlogBehavior extends EventSourcedBehavior<BlogCommand, BlogEvent, BlogState> {
public BlogBehavior(PersistenceId persistenceId) {
super(persistenceId);
}
// #behavior
// #command-handler
@Override
public CommandHandler<BlogCommand, BlogEvent, BlogState> commandHandler() {
CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState> builder = newCommandHandlerBuilder();
builder.forStateType(BlankState.class).onCommand(AddPost.class, this::addPost);
builder
.forStateType(DraftState.class)
.onCommand(ChangeBody.class, this::changeBody)
.onCommand(Publish.class, this::publish)
.onCommand(GetPost.class, this::getPost);
builder
.forStateType(PublishedState.class)
.onCommand(ChangeBody.class, this::changeBody)
.onCommand(GetPost.class, this::getPost);
builder.forAnyState().onCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
return builder.build();
}
private Effect<BlogEvent, BlogState> addPost(AddPost cmd) {
// #reply
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect()
.persist(event)
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
// #reply
}
private Effect<BlogEvent, BlogState> changeBody(DraftState state, ChangeBody cmd) {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
}
private Effect<BlogEvent, BlogState> changeBody(PublishedState state, ChangeBody cmd) {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
}
private Effect<BlogEvent, BlogState> publish(DraftState state, Publish cmd) {
return Effect()
.persist(new Published(state.postId()))
.thenRun(
() -> {
System.out.println("Blog post published: " + state.postId());
cmd.replyTo.tell(Done.getInstance());
});
}
private Effect<BlogEvent, BlogState> getPost(DraftState state, GetPost cmd) {
cmd.replyTo.tell(state.content);
return Effect().none();
}
private Effect<BlogEvent, BlogState> getPost(PublishedState state, GetPost cmd) {
cmd.replyTo.tell(state.content);
return Effect().none();
}
// #command-handler
// #event-handler
@Override
public EventHandler<BlogState, BlogEvent> eventHandler() {
EventHandlerBuilder<BlogState, BlogEvent> builder = newEventHandlerBuilder();
builder
.forStateType(BlankState.class)
.onEvent(PostAdded.class, event -> new DraftState(event.content));
builder
.forStateType(DraftState.class)
.onEvent(BodyChanged.class, (state, chg) -> state.withBody(chg.newBody))
.onEvent(Published.class, (state, event) -> new PublishedState(state.content));
builder
.forStateType(PublishedState.class)
.onEvent(BodyChanged.class, (state, chg) -> state.withBody(chg.newBody));
return builder.build();
}
// #event-handler
// #behavior
public static Behavior<BlogCommand> behavior(String entityId) {
return Behaviors.setup(ctx -> new BlogBehavior(new PersistenceId("Blog-" + entityId)));
}
@Override
public BlogState emptyState() {
return BlankState.INSTANCE;
}
// commandHandler, eventHandler as in above snippets
}
// #behavior
}

View file

@ -15,10 +15,10 @@ import akka.persistence.typed.javadsl.EventSourcedBehavior;
import java.time.Duration;
import java.util.Optional;
public class StashingExample {
public interface StashingExample {
// #stashing
public static class TaskManager
public class TaskManager
extends EventSourcedBehavior<TaskManager.Command, TaskManager.Event, TaskManager.State> {
public interface Command {}
@ -85,7 +85,7 @@ public class StashingExample {
}
}
public static Behavior<Command> createBehavior(PersistenceId persistenceId) {
public static Behavior<Command> create(PersistenceId persistenceId) {
return new TaskManager(persistenceId);
}

View file

@ -14,11 +14,13 @@ import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.EventSeq
//#structure
//#behavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
//#behavior
//#structure
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotFailed
import com.github.ghik.silencer.silent
@ -65,7 +67,7 @@ object BasicPersistentBehaviorCompileOnly {
//#event-handler
//#behavior
def behavior(id: String): EventSourcedBehavior[Command, Event, State] =
def apply(id: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId(id),
emptyState = State(Nil),
@ -76,104 +78,120 @@ object BasicPersistentBehaviorCompileOnly {
}
//#structure
sealed trait Command
sealed trait Event
final case class State()
object MyPersistentBehavior {
sealed trait Command
sealed trait Event
final case class State()
val behavior: Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
def apply(): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
}
//#structure
//#recovery
val recoveryBehavior: Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
.receiveSignal {
case (state, RecoveryCompleted) =>
throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
}
//#recovery
import MyPersistentBehavior._
//#tagging
val taggingBehavior: Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
.withTagger(_ => Set("tag1", "tag2"))
//#tagging
//#wrapPersistentBehavior
val samplePersistentBehavior = EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
.receiveSignal {
case (state, RecoveryCompleted) =>
throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
}
val debugAlwaysSnapshot: Behavior[Command] = Behaviors.setup { context =>
samplePersistentBehavior.snapshotWhen((state, _, _) => {
context.log.info2("Snapshot actor {} => state: {}", context.self.path.name, state)
true
})
object RecoveryBehavior {
//#recovery
def apply(): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.receiveSignal {
case (state, RecoveryCompleted) =>
throw new NotImplementedError("TODO: add some end-of-recovery side-effect here")
}
//#recovery
}
//#wrapPersistentBehavior
//#supervision
val supervisedBehavior = samplePersistentBehavior.onPersistFailure(
SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
//#supervision
object TaggingBehavior {
//#tagging
def apply(): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withTagger(_ => Set("tag1", "tag2"))
//#tagging
}
// #actor-context
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
object WrapBehavior {
def apply(): Behavior[Command] =
//#wrapPersistentBehavior
Behaviors.setup[Command] { context =>
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.snapshotWhen((state, _, _) => {
context.log.info2("Snapshot actor {} => state: {}", context.self.path.name, state)
true
})
}
//#wrapPersistentBehavior
}
val behaviorWithContext: Behavior[String] =
Behaviors.setup { context =>
EventSourcedBehavior[String, String, State](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = new State,
commandHandler = CommandHandler.command { cmd =>
context.log.info("Got command {}", cmd)
Effect.persist(cmd).thenRun { state =>
context.log.info("event persisted, new state {}", state)
}
},
eventHandler = {
case (state, _) => state
})
}
// #actor-context
object Supervision {
def apply(): Behavior[Command] =
//#supervision
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.onPersistFailure(
SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
//#supervision
}
object BehaviorWithContext {
// #actor-context
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
def apply(): Behavior[String] =
Behaviors.setup { context =>
EventSourcedBehavior[String, String, State](
persistenceId = PersistenceId("myPersistenceId"),
emptyState = State(),
commandHandler = CommandHandler.command { cmd =>
context.log.info("Got command {}", cmd)
Effect.persist(cmd).thenRun { state =>
context.log.info("event persisted, new state {}", state)
}
},
eventHandler = {
case (state, _) => state
})
}
// #actor-context
}
final case class BookingCompleted(orderNr: String) extends Event
//#snapshottingEveryN
val snapshottingEveryN = EventSourcedBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1000, keepNSnapshots = 2))
//#snapshottingEveryN
//#snapshottingPredicate
val snapshottingPredicate = EventSourcedBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
@ -183,20 +201,22 @@ object BasicPersistentBehaviorCompileOnly {
//#snapshotSelection
import akka.persistence.typed.SnapshotSelectionCriteria
val snapshotSelection = EventSourcedBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withSnapshotSelectionCriteria(SnapshotSelectionCriteria.none)
//#snapshotSelection
//#retentionCriteria
val snapshotRetention = EventSourcedBehavior[Command, Event, State](
import akka.persistence.typed.scaladsl.Effect
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => state) // do something based on a particular state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
@ -207,11 +227,11 @@ object BasicPersistentBehaviorCompileOnly {
//#snapshotAndEventDeletes
val snapshotAndEventsRetention = EventSourcedBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state
eventHandler = (state, evt) => state) // do something based on a particular event and state
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2).withDeleteEventsOnSnapshot)
.receiveSignal { // optionally respond to signals
case (state, _: SnapshotFailed) => // react to failure
@ -222,11 +242,11 @@ object BasicPersistentBehaviorCompileOnly {
//#retentionCriteriaWithSignals
val fullDeletesSampleWithSignals = EventSourcedBehavior[Command, Event, State](
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => Effect.noReply, // do something based on a particular command and state
eventHandler = (state, evt) => state) // do something based on a particular event and state
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
.receiveSignal { // optionally respond to signals
case (state, _: SnapshotFailed) => // react to failure
@ -244,13 +264,12 @@ object BasicPersistentBehaviorCompileOnly {
//#event-wrapper
//#install-event-adapter
val eventAdapterBehavior: Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
.eventAdapter(new WrapperEventAdapter[Event])
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new NotImplementedError("TODO: process the event return the next state"))
.eventAdapter(new WrapperEventAdapter[Event])
//#install-event-adapter
}

View file

@ -11,48 +11,52 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
object BlogPostExample {
//#behavior
object BlogPostEntity {
// commands, events, state defined here
//#behavior
//#event
sealed trait BlogEvent
final case class PostAdded(postId: String, content: PostContent) extends BlogEvent
sealed trait Event
final case class PostAdded(postId: String, content: PostContent) extends Event
final case class BodyChanged(postId: String, newBody: String) extends BlogEvent
final case class Published(postId: String) extends BlogEvent
final case class BodyChanged(postId: String, newBody: String) extends Event
final case class Published(postId: String) extends Event
//#event
//#state
sealed trait BlogState
sealed trait State
case object BlankState extends BlogState
case object BlankState extends State
final case class DraftState(content: PostContent) extends BlogState {
final case class DraftState(content: PostContent) extends State {
def withBody(newBody: String): DraftState =
copy(content = content.copy(body = newBody))
def postId: String = content.postId
}
final case class PublishedState(content: PostContent) extends BlogState {
final case class PublishedState(content: PostContent) extends State {
def postId: String = content.postId
}
//#state
//#commands
sealed trait BlogCommand
sealed trait Command
//#reply-command
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends Command
final case class AddPostDone(postId: String)
//#reply-command
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
final case class GetPost(replyTo: ActorRef[PostContent]) extends Command
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends Command
final case class Publish(replyTo: ActorRef[Done]) extends Command
final case class PostContent(postId: String, title: String, body: String)
//#commands
//#behavior
def behavior(entityId: String): Behavior[BlogCommand] =
EventSourcedBehavior[BlogCommand, BlogEvent, BlogState](
def apply(entityId: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId(s"Blog-$entityId"),
emptyState = BlankState,
commandHandler,
@ -60,7 +64,7 @@ object BlogPostExample {
//#behavior
//#command-handler
private val commandHandler: (BlogState, BlogCommand) => Effect[BlogEvent, BlogState] = { (state, command) =>
private val commandHandler: (State, Command) => Effect[Event, State] = { (state, command) =>
state match {
case BlankState =>
@ -85,7 +89,7 @@ object BlogPostExample {
}
}
private def addPost(cmd: AddPost): Effect[BlogEvent, BlogState] = {
private def addPost(cmd: AddPost): Effect[Event, State] = {
//#reply
val evt = PostAdded(cmd.content.postId, cmd.content)
Effect.persist(evt).thenRun { _ =>
@ -95,33 +99,33 @@ object BlogPostExample {
//#reply
}
private def changeBody(state: DraftState, cmd: ChangeBody): Effect[BlogEvent, BlogState] = {
private def changeBody(state: DraftState, cmd: ChangeBody): Effect[Event, State] = {
val evt = BodyChanged(state.postId, cmd.newBody)
Effect.persist(evt).thenRun { _ =>
cmd.replyTo ! Done
}
}
private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[BlogEvent, BlogState] = {
private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[Event, State] = {
Effect.persist(Published(state.postId)).thenRun { _ =>
println(s"Blog post ${state.postId} was published")
replyTo ! Done
}
}
private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = {
private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[Event, State] = {
replyTo ! state.content
Effect.none
}
private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = {
private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[Event, State] = {
replyTo ! state.content
Effect.none
}
//#command-handler
//#event-handler
private val eventHandler: (BlogState, BlogEvent) => BlogState = { (state, event) =>
private val eventHandler: (State, Event) => State = { (state, event) =>
state match {
case BlankState =>
@ -150,4 +154,8 @@ object BlogPostExample {
}
//#event-handler
//#behavior
// commandHandler and eventHandler defined here
}
//#behavior