First draft of reference docs on durable state (#30387)

* First draft of reference docs on durable state
This commit is contained in:
Debasish Ghosh 2021-07-16 12:41:19 +05:30 committed by GitHub
parent 9abef504eb
commit b98c3b2da6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1778 additions and 5 deletions

View file

@ -0,0 +1,218 @@
/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.cluster.sharding.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.pattern.StatusReply;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.state.javadsl.CommandHandlerWithReply;
import akka.persistence.typed.state.javadsl.CommandHandlerWithReplyBuilder;
import akka.persistence.typed.state.javadsl.DurableStateBehaviorWithEnforcedReplies;
import akka.persistence.typed.state.javadsl.ReplyEffect;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.math.BigDecimal;
/**
* This bank account example illustrates the following: - different state classes representing the
* lifecycle of the account - null as emptyState - command handlers that delegate to methods in the
* DurableStateBehavior class, and - replies of various types that use
* DurableStateBehaviorWithEnforcedReplies
*/
public interface AccountExampleWithNullDurableState {
// #account-entity
// #withEnforcedReplies
public class AccountEntity
extends DurableStateBehaviorWithEnforcedReplies<
AccountEntity.Command, AccountEntity.Account> {
// #withEnforcedReplies
public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "Account");
// Command
// #reply-command
interface Command extends CborSerializable {}
// #reply-command
public static class CreateAccount implements Command {
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CreateAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
public static class Deposit implements Command {
public final BigDecimal amount;
public final ActorRef<StatusReply<Done>> replyTo;
public Deposit(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
this.amount = amount;
}
}
public static class Withdraw implements Command {
public final BigDecimal amount;
public final ActorRef<StatusReply<Done>> replyTo;
public Withdraw(BigDecimal amount, ActorRef<StatusReply<Done>> replyTo) {
this.amount = amount;
this.replyTo = replyTo;
}
}
public static class GetBalance implements Command {
public final ActorRef<CurrentBalance> replyTo;
@JsonCreator
public GetBalance(ActorRef<CurrentBalance> replyTo) {
this.replyTo = replyTo;
}
}
public static class CloseAccount implements Command {
public final ActorRef<StatusReply<Done>> replyTo;
@JsonCreator
public CloseAccount(ActorRef<StatusReply<Done>> replyTo) {
this.replyTo = replyTo;
}
}
// Reply
public static class CurrentBalance implements CborSerializable {
public final BigDecimal balance;
@JsonCreator
public CurrentBalance(BigDecimal balance) {
this.balance = balance;
}
}
// State
interface Account extends CborSerializable {}
public static class OpenedAccount implements Account {
public final BigDecimal balance;
public OpenedAccount() {
this.balance = BigDecimal.ZERO;
}
@JsonCreator
public OpenedAccount(BigDecimal balance) {
this.balance = balance;
}
OpenedAccount makeDeposit(BigDecimal amount) {
return new OpenedAccount(balance.add(amount));
}
boolean canWithdraw(BigDecimal amount) {
return (balance.subtract(amount).compareTo(BigDecimal.ZERO) >= 0);
}
OpenedAccount makeWithdraw(BigDecimal amount) {
if (!canWithdraw(amount))
throw new IllegalStateException("Account balance can't be negative");
return new OpenedAccount(balance.subtract(amount));
}
ClosedAccount closedAccount() {
return new ClosedAccount();
}
}
public static class ClosedAccount implements Account {}
public static AccountEntity create(String accountNumber, PersistenceId persistenceId) {
return new AccountEntity(accountNumber, persistenceId);
}
private final String accountNumber;
private AccountEntity(String accountNumber, PersistenceId persistenceId) {
super(persistenceId);
this.accountNumber = accountNumber;
}
@Override
public Account emptyState() {
return null;
}
@Override
public CommandHandlerWithReply<Command, Account> commandHandler() {
CommandHandlerWithReplyBuilder<Command, Account> builder =
newCommandHandlerWithReplyBuilder();
builder.forNullState().onCommand(CreateAccount.class, this::createAccount);
builder
.forStateType(OpenedAccount.class)
.onCommand(Deposit.class, this::deposit)
.onCommand(Withdraw.class, this::withdraw)
.onCommand(GetBalance.class, this::getBalance)
.onCommand(CloseAccount.class, this::closeAccount);
builder
.forStateType(ClosedAccount.class)
.onAnyCommand(() -> Effect().unhandled().thenNoReply());
return builder.build();
}
private ReplyEffect<Account> createAccount(CreateAccount command) {
return Effect()
.persist(new OpenedAccount())
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
private ReplyEffect<Account> deposit(OpenedAccount account, Deposit command) {
return Effect()
.persist(account.makeDeposit(command.amount))
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
// #reply
private ReplyEffect<Account> withdraw(OpenedAccount account, Withdraw command) {
if (!account.canWithdraw(command.amount)) {
return Effect()
.reply(
command.replyTo,
StatusReply.error("not enough funds to withdraw " + command.amount));
} else {
return Effect()
.persist(account.makeWithdraw(command.amount))
.thenReply(command.replyTo, account2 -> StatusReply.ack());
}
}
// #reply
private ReplyEffect<Account> getBalance(OpenedAccount account, GetBalance command) {
return Effect().reply(command.replyTo, new CurrentBalance(account.balance));
}
private ReplyEffect<Account> closeAccount(OpenedAccount account, CloseAccount command) {
if (account.balance.equals(BigDecimal.ZERO)) {
return Effect()
.persist(account.closedAccount())
.thenReply(command.replyTo, account2 -> StatusReply.ack());
} else {
return Effect()
.reply(command.replyTo, StatusReply.error("balance must be zero for closing account"));
}
}
}
// #account-entity
}

View file

@ -0,0 +1,135 @@
/*
* Copyright (C) 2019-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.Effect
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.serialization.jackson.CborSerializable
/**
* This bank account example illustrates:
* - different state classes representing the lifecycle of the account
* - command handlers in the state classes
* - replies of various types, using withEnforcedReplies
*/
object AccountExampleWithCommandHandlersInDurableState {
//#account-entity
object AccountEntity {
// Command
//#reply-command
sealed trait Command extends CborSerializable
//#reply-command
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
//#reply-command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
//#reply-command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
final case class CurrentBalance(balance: BigDecimal)
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = akka.persistence.typed.state.scaladsl.ReplyEffect[Account]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: Command): ReplyEffect
}
case object EmptyAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(OpenedAccount(Zero)).thenReply(replyTo)(_ => StatusReply.Ack)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()
}
}
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case cmd @ Deposit(_, _) => deposit(cmd)
case cmd @ Withdraw(_, _) => withdraw(cmd)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(ClosedAccount).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(StatusReply.Error("Account is already created"))
}
private def canWithdraw(amount: BigDecimal): Boolean = {
balance - amount >= Zero
}
//#reply
private def deposit(cmd: Deposit) = {
Effect.persist(copy(balance = balance + cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
}
private def withdraw(cmd: Withdraw) = {
if (canWithdraw(cmd.amount))
Effect.persist(copy(balance = balance - cmd.amount)).thenReply(cmd.replyTo)(_ => StatusReply.Ack)
else
Effect.reply(cmd.replyTo)(
StatusReply.Error(s"Insufficient balance ${balance} to be able to withdraw ${cmd.amount}"))
}
//#reply
}
case object ClosedAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case c: Deposit =>
replyClosed(c.replyTo)
case c: Withdraw =>
replyClosed(c.replyTo)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
replyClosed(replyTo)
case CreateAccount(replyTo) =>
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect =
Effect.reply(replyTo)(StatusReply.Error(s"Account is closed"))
}
// when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`:
val TypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("Account")
//#withEnforcedReplies
def apply(persistenceId: PersistenceId): Behavior[Command] = {
DurableStateBehavior
.withEnforcedReplies[Command, Account](persistenceId, EmptyAccount, (state, cmd) => state.applyCommand(cmd))
}
//#withEnforcedReplies
}
//#account-entity
}

View file

@ -0,0 +1,126 @@
/*
* Copyright (C) 2019-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.cluster.sharding.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.Effect
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.serialization.jackson.CborSerializable
/**
* This bank account example illustrates:
* - Option[State] that is starting with None as the initial state
* - command handlers in the state classes
* - replies of various types, using withEnforcedReplies
*/
object AccountExampleWithOptionDurableState {
//#account-entity
object AccountEntity {
// Command
sealed trait Command extends CborSerializable
final case class CreateAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Deposit(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class Withdraw(amount: BigDecimal, replyTo: ActorRef[StatusReply[Done]]) extends Command
final case class GetBalance(replyTo: ActorRef[CurrentBalance]) extends Command
final case class CloseAccount(replyTo: ActorRef[StatusReply[Done]]) extends Command
// Reply
final case class CurrentBalance(balance: BigDecimal) extends CborSerializable
val Zero = BigDecimal(0)
// type alias to reduce boilerplate
type ReplyEffect = akka.persistence.typed.state.scaladsl.ReplyEffect[Option[Account]]
// State
sealed trait Account extends CborSerializable {
def applyCommand(cmd: Command): ReplyEffect
}
case class OpenedAccount(balance: BigDecimal) extends Account {
require(balance >= Zero, "Account balance can't be negative")
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case Deposit(amount, replyTo) =>
Effect.persist(Some(copy(balance = balance + amount))).thenReply(replyTo)(_ => StatusReply.Ack)
case Withdraw(amount, replyTo) =>
if (canWithdraw(amount))
Effect.persist(Some(copy(balance = balance - amount))).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(StatusReply.Error(s"Insufficient balance $balance to be able to withdraw $amount"))
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(balance))
case CloseAccount(replyTo) =>
if (balance == Zero)
Effect.persist(Some(ClosedAccount)).thenReply(replyTo)(_ => StatusReply.Ack)
else
Effect.reply(replyTo)(StatusReply.Error("Can't close account with non-zero balance"))
case CreateAccount(replyTo) =>
Effect.reply(replyTo)(StatusReply.Error("Account is already created"))
}
def canWithdraw(amount: BigDecimal): Boolean = {
balance - amount >= Zero
}
}
case object ClosedAccount extends Account {
override def applyCommand(cmd: Command): ReplyEffect =
cmd match {
case c: Deposit =>
replyClosed(c.replyTo)
case c: Withdraw =>
replyClosed(c.replyTo)
case GetBalance(replyTo) =>
Effect.reply(replyTo)(CurrentBalance(Zero))
case CloseAccount(replyTo) =>
replyClosed(replyTo)
case CreateAccount(replyTo) =>
replyClosed(replyTo)
}
private def replyClosed(replyTo: ActorRef[StatusReply[Done]]): ReplyEffect =
Effect.reply(replyTo)(StatusReply.Error(s"Account is closed"))
}
// when used with sharding, this TypeKey can be used in `sharding.init` and `sharding.entityRefFor`:
val TypeKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("Account")
def apply(persistenceId: PersistenceId): Behavior[Command] = {
DurableStateBehavior.withEnforcedReplies[Command, Option[Account]](
persistenceId,
None,
(state, cmd) =>
state match {
case None => onFirstCommand(cmd)
case Some(account) => account.applyCommand(cmd)
})
}
def onFirstCommand(cmd: Command): ReplyEffect = {
cmd match {
case CreateAccount(replyTo) =>
Effect.persist(Some(OpenedAccount(Zero))).thenReply(replyTo)(_ => StatusReply.Ack)
case _ =>
// CreateAccount before handling any other commands
Effect.unhandled.thenNoReply()
}
}
}
//#account-entity
}

View file

@ -10,6 +10,7 @@
* [index-actors](typed/index.md)
* [index-cluster](typed/index-cluster.md)
* [index-persistence](typed/index-persistence.md)
* [index-persistence-durable-state](typed/index-persistence-durable-state.md)
* [stream/index](stream/index.md)
* [discovery](discovery/index.md)
* [index-utilities](index-utilities.md)

View file

@ -0,0 +1,14 @@
---
project.description: Durable state with Akka Persistence enables actors to persist the latest version of the state. This persistence is used for recovery on failure, or when migrating within a cluster.
---
# Persistence (Durable State)
@@toc { depth=2 }
@@@ index
* [persistence-durable-state](persistence-durable-state.md)
* [persistence-style](persistence-style-durable-state.md)
@@@

View file

@ -1,8 +1,8 @@
---
project.description: Event Sourcing with Akka Persistence enables actors to persist your events for recovery on failure or when migrated within a cluster.
project.description: Use of Akka Persistence with Event Sourcing enables actors to persist your events for recovery on failure or when migrated within a cluster.
---
# Persistence
# Persistence (Event Sourcing)
@@toc { depth=2 }

View file

@ -0,0 +1,394 @@
---
project.description: Durable State with Akka Persistence enables actors to persist its state for recovery on failure or when migrated within a cluster.
---
# Durable State
## Module info
To use Akka Persistence, add the module to your project:
@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group=com.typesafe.akka
artifact=akka-persistence-typed_$scala.binary.version$
version=AkkaVersion
group2=com.typesafe.akka
artifact2=akka-persistence-testkit_$scala.binary.version$
version2=AkkaVersion
scope2=test
}
You also have to select durable state store plugin, see @ref:[Persistence Plugins](../persistence-plugins.md).
@@project-info{ projectId="akka-persistence-typed" }
## Introduction
This model of Akka Persistence enables a stateful actor / entity to store the full state after processing each command instead of using event sourcing. This reduces the conceptual complexity and can be a handy tool for simple use cases. Very much like a CRUD based operation, the API is conceptually simple - a function from current state and incoming command to the next state which replaces the current state in the database.
```
(State, Command) => State
```
The current state is always stored in the database. Since only the latest state is stored, we don't have access to any of the history of changes, unlike event sourced storage. Akka Persistence would read that state and store it in memory. After processing of the command is finished, the new state will be stored in the database. The processing of the next command will not start until the state has been successfully stored in the database.
The database specific implementations can be added to existing Akka Persistence plugin implementations, starting with the JDBC plugin. The plugin would serialize the state and store as a blob with the persistenceId as the primary key.
## Example and core API
Let's start with a simple example that models a counter using an Akka persistent actor. The minimum required for a @apidoc[DurableStateBehavior] is:
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #structure }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #structure }
The first important thing to notice is the `Behavior` of a persistent actor is typed to the type of the `Command`
because this is the type of message a persistent actor should receive. In Akka this is now enforced by the type system.
The components that make up a `DurableStateBehavior` are:
* `persistenceId` is the stable unique identifier for the persistent actor.
* `emptyState` defines the `State` when the entity is first created e.g. a Counter would start with 0 as state.
* `commandHandler` defines how to handle commands and map to appropriate effects e.g. persisting state and replying to actors.
Next we'll discuss each of these in detail.
### PersistenceId
The @apidoc[akka.persistence.typed.PersistenceId] is the stable unique identifier for the persistent actor in the backend
durabe state store.
@ref:[Cluster Sharding](cluster-sharding.md) is typically used together with `DurableStateBehavior` to ensure
that there is only one active entity for each `PersistenceId` (`entityId`).
The `entityId` in Cluster Sharding is the business domain identifier which uniquely identifies the instance of
that specific `EntityType`. This means that across the cluster we have a unique combination of (`EntityType`, `EntityId`).
Hence the `entityId` might not be unique enough to be used as the `PersistenceId` by itself. For example
two different types of entities may have the same `entityId`. To create a unique `PersistenceId` the `entityId`
should be prefixed with a stable name of the entity type, which typically is the same as the `EntityTypeKey.name` that
is used in Cluster Sharding. There are @scala[`PersistenceId.apply`]@java[`PersistenceId.of`] factory methods
to help with constructing such `PersistenceId` from an `entityTypeHint` and `entityId`.
The default separator when concatenating the `entityTypeHint` and `entityId` is `|`, but a custom separator
is supported.
The @ref:[Persistence example in the Cluster Sharding documentation](cluster-sharding.md#persistence-example)
illustrates how to construct the `PersistenceId` from the `entityTypeKey` and `entityId` provided by the
`EntityContext`.
A custom identifier can be created with `PersistenceId.ofUniqueId`.
### Command handler
The command handler is a function with 2 parameters, the current `State` and the incoming `Command`.
A command handler returns an `Effect` directive that defines what state, if any, to persist.
Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory].
The two most commonly used effects are:
* `persist` will persist state atomically, i.e. all state will be stored or none of them are stored if there is an error
* `none` no state to be persisted, for example a read-only command
More effects are explained in @ref:[Effects and Side Effects](#effects-and-side-effects).
In addition to returning the primary `Effect` for the command, `DurableStateBehavior`s can also
chain side effects that are to be performed after successful persist which is achieved with the `thenRun`
function e.g @scala[`Effect.persist(..).thenRun`]@java[`Effect().persist(..).thenRun`].
### Completing the example
Let's fill in the details of the example.
Commands:
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #command }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #command }
State is a storage for the latest value of the counter.
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #state }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #state }
The command handler handles the commands `Increment`, `IncrementBy` and `GetValue`.
* `Increment` increments the counter by `1` and persists the updated value as an effect in the State
* `IncrementBy` increments the counter by the value passed to it and persists the updated value as an effect in the State
* `GetValue` retrieves the value of the counter from the State and replies with it to the actor passed in
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #command-handler }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #command-handler }
@scala[These are used to create a `DurableStateBehavior`:]
@java[These are defined in an `DurableStateBehavior`:]
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #behavior }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #behavior }
## Effects and Side Effects
A command handler returns an `Effect` directive that defines what state, if any, to persist.
Effects are created using @java[a factory that is returned via the `Effect()` method] @scala[the `Effect` factory]
and can be one of:
* `persist` will persist the latest state. If it's a new persistence id, the record will be inserted. In case of an existing
persistence id, the record will be updated only if the revision number of the incoming record is 1 more than the already
existing record. Otherwise `persist` will fail.
* `none` no state to be persisted, for example a read-only command
* `unhandled` the command is unhandled (not supported) in current state
* `stop` stop this actor
* `stash` the current command is stashed
* `unstashAll` process the commands that were stashed with @scala[`Effect.stash`]@java[`Effect().stash`]
* `reply` send a reply message to the given `ActorRef`
Note that only one of those can be chosen per incoming command. It is not possible to both persist and say none/unhandled.
In addition to returning the primary `Effect` for the command `DurableStateBehavior`s can also
chain side effects that are to be performed after successful persist which is achieved with the `thenRun`
function that runs the callback passed to it e.g @scala[`Effect.persist(..).thenRun`]@java[`Effect().persist(..).thenRun`].
All `thenRun` registered callbacks are executed sequentially after successful execution of the persist statement
(or immediately, in case of `none` and `unhandled`).
In addition to `thenRun` the following actions can also be performed after successful persist:
* `thenStop` the actor will be stopped
* `thenUnstashAll` process the commands that were stashed with @scala[`Effect.stash`]@java[`Effect().stash`]
* `thenReply` send a reply message to the given `ActorRef`
In the example below, we use a different constructor of `DurableStateBehavior.withEnforcedReplies`, which creates
a `Behavior` for a persistent actor that ensures that every command sends a reply back. Hence it will be
a compilation error if the returned effect from a `CommandHandler` isn't a `ReplyEffect`.
Instead of `Increment` we will have a new command `IncrementWithConfirmation` that, along with persistence will also
send an acknowledgement as a reply to the `ActorRef` passed in the command.
Example of effects and side-effects:
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #effects }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #effects }
The most common way to have a side-effect is to use the `thenRun` method on `Effect`. In case you have multiple side-effects
that needs to be run for several commands, you can factor them out into functions and reuse for all the commands. For example:
Scala
: @@snip [PersistentActorCompileOnlyTest.scala](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #commonChainedEffects }
Java
: @@snip [PersistentActorCompileOnlyTest.java](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #commonChainedEffects }
### Side effects ordering and guarantees
Any side effects are executed on an at-most-once basis and will not be executed if the persist fails.
Side effects are not run when the actor is restarted or started again after being stopped.
The side effects are executed sequentially, it is not possible to execute side effects in parallel, unless they
call out to something that is running concurrently (for example sending a message to another actor).
It's possible to execute a side effect before persisting the state, but that can result in that the
side effect is performed but that the state is not stored if the persist fails.
## Cluster Sharding and DurableStateBehavior
@ref:[Cluster Sharding](cluster-sharding.md) is an excellent fit to spread persistent actors over a
cluster, addressing them by id. It makes it possible to have more persistent actors exist in the cluster than what
would fit in the memory of one node. Cluster sharding improves the resilience of the cluster. If a node crashes,
the persistent actors are quickly started on a new node and can resume operations.
The `DurableStateBehavior` can then be run as any plain actor as described in @ref:[actors documentation](actors.md),
but since Akka Persistence is based on the single-writer principle, the persistent actors are typically used together
with Cluster Sharding. For a particular `persistenceId` only one persistent actor instance should be active at one time.
Cluster Sharding ensures that there is only one active entity (or actor instance) for each id.
## Accessing the ActorContext
If the @apidoc[DurableStateBehavior] needs to use the @apidoc[typed.*.ActorContext], for example to spawn child actors, it can be obtained by wrapping construction with `Behaviors.setup`:
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #actor-context }
Java
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #actor-context }
## Changing Behavior
After processing a message, actors are able to return the `Behavior` that is used
for the next message.
As you can see in the above examples this is not supported by persistent actors. Instead, the state is
persisted as an `Effect` by the `commandHandler`.
The reason a new behavior can't be returned is that behavior is part of the actor's
state and must also carefully be reconstructed during recovery from the persisted state. This would imply
that the state needs to be encoded such that the behavior can also be restored from it.
That would be very prone to mistakes which is why it is not allowed in Akka Persistence.
For basic actors you can use the same set of command handlers independent of what state the entity is in.
For more complex actors it's useful to be able to change the behavior in the sense
that different functions for processing commands may be defined depending on what state the actor is in.
This is useful when implementing finite state machine (FSM) like entities.
The next example demonstrates how to define different behavior based on the current `State`. It shows an actor that
represents the state of a blog post. Before a post is started the only command it can process is to `AddPost`.
Once it is started then one can look it up with `GetPost`, modify it with `ChangeBody` or publish it with `Publish`.
The state is captured by:
Scala
: @@snip [BlogPostEntityDurableState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntityDurableState.scala) { #state }
Java
: @@snip [BlogPostEntityDurableState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntityDurableState.java) { #state }
The commands, of which only a subset are valid depending on the state:
Scala
: @@snip [BlogPostEntityDurableState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntityDurableState.scala) { #commands }
Java
: @@snip [BlogPostEntityDurableState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntityDurableState.java) { #commands }
@java[The command handler to process each command is decided by the state class (or state predicate) that is
given to the `forStateType` of the `CommandHandlerBuilder` and the match cases in the builders.]
@scala[The command handler to process each command is decided by first looking at the state and then the command.
It typically becomes two levels of pattern matching, first on the state and then on the command.]
Delegating to methods like `addPost`, `changeBody`, `publish` etc. is a good practice because the one-line cases give a nice overview of the message dispatch.
Scala
: @@snip [BlogPostEntityDurableState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntityDurableState.scala) { #command-handler }
Java
: @@snip [BlogPostEntityDurableState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntityDurableState.java) { #command-handler }
And finally the behavior is created @scala[from the `DurableStateBehavior.apply`]:
Scala
: @@snip [BlogPostEntityDurableState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntityDurableState.scala) { #behavior }
Java
: @@snip [BlogPostEntityDurableState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntityDurableState.java) { #behavior }
This can be refactored one or two steps further by defining the command handlers in the state class as
illustrated in @ref:[command handlers in the state](persistence-style-durable-state.md#command-handlers-in-the-state).
There is also an example illustrating an @ref:[optional initial state](persistence-style-durable-state.md#optional-initial-state).
## Replies
The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for
persistent actors, because you typically want to know if the command was rejected due to validation errors and
when accepted you want a confirmation when the events have been successfully stored.
Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`]. If the
command can either have a successful response or a validation error returned, the generic response type @scala[`StatusReply[ReplyType]]`]
@java[`StatusReply<ReplyType>`] can be used. If the successful reply does not contain a value but is more of an acknowledgement
a pre defined @scala[`StatusReply.Ack`]@java[`StatusReply.ack()`] of type @scala[`StatusReply[Done]`]@java[`StatusReply<Done>`]
can be used.
After validation errors or after persisting events, using a `thenRun` side effect, the reply message can
be sent to the `ActorRef`.
Scala
: @@snip [BlogPostEntityDurableState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntityDurableState.scala) { #reply-command }
Java
: @@snip [BlogPostEntityDurableState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntityDurableState.java) { #reply-command }
Scala
: @@snip [BlogPostEntityDurableState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostEntityDurableState.scala) { #reply }
Java
: @@snip [BlogPostEntityDurableState.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostEntityDurableState.java) { #reply }
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that
it can be used to enforce that you do not forget to specify replies when implementing the `DurableStateBehavior`.
If it's defined with @scala[`DurableStateBehavior.withEnforcedReplies`]@java[`DurableStateBehaviorWithEnforcedReplies`]
there will be compilation errors if the returned effect isn't a `ReplyEffect`, which can be
created with @scala[`Effect.reply`]@java[`Effect().reply`], @scala[`Effect.noReply`]@java[`Effect().noReply`],
@scala[`Effect.thenReply`]@java[`Effect().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effect().thenNoReply`].
Scala
: @@snip [AccountExampleWithCommandHandlersInDurableState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInDurableState.scala) { #withEnforcedReplies }
Java
: @@snip [AccountExampleWithNullDurableState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullDurableState.java) { #withEnforcedReplies }
The commands must have a field of @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`] that can then be used to send a reply.
Scala
: @@snip [AccountExampleWithCommandHandlersInDurableState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInDurableState.scala) { #reply-command }
Java
: @@snip [AccountExampleWithNullDurableState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullDurableState.java) { #reply-command }
The `ReplyEffect` is created with @scala[`Effect.reply`]@java[`Effect().reply`], @scala[`Effect.noReply`]@java[`Effect().noReply`],
@scala[`Effect.thenReply`]@java[`Effect().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effect().thenNoReply`].
@java[Note that command handlers are defined with `newCommandHandlerWithReplyBuilder` when using
`EventSourcedBehaviorWithEnforcedReplies`, as opposed to newCommandHandlerBuilder when using `EventSourcedBehavior`.]
Scala
: @@snip [AccountExampleWithCommandHandlersInDurableState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInDurableState.scala) { #reply }
Java
: @@snip [AccountExampleWithNullDurableState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullDurableState.java) { #reply }
These effects will send the reply message even when @scala[`DurableStateBehavior.withEnforcedReplies`]@java[`DurableStateBehaviorWithEnforcedReplies`]
is not used, but then there will be no compilation errors if the reply decision is left out.
Note that the `noReply` is a way of making a conscious decision that a reply shouldn't be sent for a specific
command or that a reply will be sent later, perhaps after some asynchronous interaction with other actors or services.
## Serialization
The same @ref:[serialization](../serialization.md) mechanism as for actor messages is also used for persistent actors.
You need to enable @ref:[serialization](../serialization.md) for your commands (messages) and state.
@ref:[Serialization with Jackson](../serialization-jackson.md) is a good choice in many cases and our
recommendation if you don't have other preference.
## Tagging
Persistence allows you to use tags in persistence query. Tagging allows you to identify a subset of states in the durable store
and separately consume them as a stream through the `DurableStateStoreQuery` interface.
Scala
: @@snip [DurableStatePersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #tagging }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #tagging }
## Wrapping DurableStateBehavior
When creating a `DurableStateBehavior`, it is possible to wrap `DurableStateBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance
to access the logger from within the `ActorContext` to log for debugging the `commandHandler`.
Scala
: @@snip [DurableStatePersistentActorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/DurableStatePersistentBehaviorCompileOnly.scala) { #wrapPersistentBehavior }
Java
: @@snip [DurableStatePersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/DurableStatePersistentBehaviorTest.java) { #wrapPersistentBehavior }

View file

@ -0,0 +1,32 @@
# Style Guide
@@@ div { .group-scala }
## Command handlers in the state
We can take the previous bank account example one step further by handling the commands within the state as well.
Scala
: @@snip [AccountExampleWithCommandHandlersInDurableState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithCommandHandlersInDurableState.scala) { #account-entity }
Take note of how the command handler is delegating to `applyCommand` in the `Account` (state), which is implemented
in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.
@@@
## Optional initial state
Sometimes it's not desirable to use a separate state class for the empty initial state, but rather act as if
there is no state yet.
@java[You can use `null` as the `emptyState`, but be aware of that the `state` parameter
will be `null` until the first non-null state has been persisted
It's possible to use `Optional` instead of `null`, but that requires extra boilerplate
to unwrap the `Optional` state parameter. Therefore use of `null` is simpler. The following example
illustrates using `null` as the `emptyState`.]
@scala[`Option[State]` can be used as the state type and `None` as the `emptyState`. Then pattern matching
is used in command handlers at the outer layer before delegating to the state or other methods.]
Scala
: @@snip [AccountExampleWithOptionDurableState.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleWithOptionDurableState.scala) { #account-entity }
Java
: @@snip [AccountExampleWithNullDurableState.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleWithNullDurableState.java) { #account-entity }

View file

@ -31,6 +31,8 @@ object DurableStateBehaviorReplySpec {
final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case object Increment extends Command[Nothing]
case class IncrementBy(by: Int) extends Command[Nothing]
final case class State(value: Int) extends CborSerializable
@ -59,8 +61,11 @@ object DurableStateBehaviorReplySpec {
case GetValue(replyTo) =>
Effect.reply(replyTo)(state)
case _ => ???
})
}
}
class DurableStateBehaviorReplySpec

View file

@ -106,7 +106,7 @@ object Effect {
* Not for user extension.
*/
@DoNotInherit
trait Effect[State]
trait Effect[+State]
/**
* A command handler returns an `Effect` directive that defines what state to persist.
@ -118,7 +118,7 @@ trait Effect[State]
* Not for user extension.
*/
@DoNotInherit
trait EffectBuilder[State] extends Effect[State] {
trait EffectBuilder[+State] extends Effect[State] {
/* The state that will be persisted in this effect */
def state: Option[State]
@ -170,7 +170,7 @@ trait EffectBuilder[State] extends Effect[State] {
*
* Not intended for user extension.
*/
@DoNotInherit trait ReplyEffect[State] extends Effect[State] {
@DoNotInherit trait ReplyEffect[+State] extends Effect[State] {
/**
* Unstash the commands that were stashed with [[Effect.stash]].

View file

@ -0,0 +1,217 @@
/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.state.javadsl.*;
// #behavior
public class BlogPostEntityDurableState
extends DurableStateBehavior<
BlogPostEntityDurableState.Command, BlogPostEntityDurableState.State> {
// commands and state as in above snippets
// #behavior
// #state
interface State {}
enum BlankState implements State {
INSTANCE
}
static class DraftState implements State {
final PostContent content;
DraftState(PostContent content) {
this.content = content;
}
DraftState withContent(PostContent newContent) {
return new DraftState(newContent);
}
DraftState withBody(String newBody) {
return withContent(new PostContent(postId(), content.title, newBody));
}
String postId() {
return content.postId;
}
}
static class PublishedState implements State {
final PostContent content;
PublishedState(PostContent content) {
this.content = content;
}
PublishedState withContent(PostContent newContent) {
return new PublishedState(newContent);
}
PublishedState withBody(String newBody) {
return withContent(new PostContent(postId(), content.title, newBody));
}
String postId() {
return content.postId;
}
}
// #state
// #commands
public interface Command {}
// #reply-command
public static class AddPost implements Command {
final PostContent content;
final ActorRef<AddPostDone> replyTo;
public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
this.content = content;
this.replyTo = replyTo;
}
}
public static class AddPostDone implements Command {
final String postId;
public AddPostDone(String postId) {
this.postId = postId;
}
}
// #reply-command
public static class GetPost implements Command {
final ActorRef<PostContent> replyTo;
public GetPost(ActorRef<PostContent> replyTo) {
this.replyTo = replyTo;
}
}
public static class ChangeBody implements Command {
final String newBody;
final ActorRef<Done> replyTo;
public ChangeBody(String newBody, ActorRef<Done> replyTo) {
this.newBody = newBody;
this.replyTo = replyTo;
}
}
public static class Publish implements Command {
final ActorRef<Done> replyTo;
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class PostContent implements Command {
final String postId;
final String title;
final String body;
public PostContent(String postId, String title, String body) {
this.postId = postId;
this.title = title;
this.body = body;
}
}
// #commands
// #behavior
public static Behavior<Command> create(String entityId, PersistenceId persistenceId) {
return Behaviors.setup(
context -> {
context.getLog().info("Starting BlogPostEntityDurableState {}", entityId);
return new BlogPostEntityDurableState(persistenceId);
});
}
private BlogPostEntityDurableState(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public State emptyState() {
return BlankState.INSTANCE;
}
// #behavior
// #command-handler
@Override
public CommandHandler<Command, State> commandHandler() {
CommandHandlerBuilder<Command, State> builder = newCommandHandlerBuilder();
builder.forStateType(BlankState.class).onCommand(AddPost.class, this::onAddPost);
builder
.forStateType(DraftState.class)
.onCommand(ChangeBody.class, this::onChangeBody)
.onCommand(Publish.class, this::onPublish)
.onCommand(GetPost.class, this::onGetPost);
builder
.forStateType(PublishedState.class)
.onCommand(ChangeBody.class, this::onChangeBody)
.onCommand(GetPost.class, this::onGetPost);
builder.forAnyState().onCommand(AddPost.class, (state, cmd) -> Effect().unhandled());
return builder.build();
}
private Effect<State> onAddPost(AddPost cmd) {
// #reply
return Effect()
.persist(new DraftState(cmd.content))
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
// #reply
}
private Effect<State> onChangeBody(DraftState state, ChangeBody cmd) {
return Effect()
.persist(state.withBody(cmd.newBody))
.thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
}
private Effect<State> onChangeBody(PublishedState state, ChangeBody cmd) {
return Effect()
.persist(state.withBody(cmd.newBody))
.thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
}
private Effect<State> onPublish(DraftState state, Publish cmd) {
return Effect()
.persist(new PublishedState(state.content))
.thenRun(
() -> {
System.out.println("Blog post published: " + state.postId());
cmd.replyTo.tell(Done.getInstance());
});
}
private Effect<State> onGetPost(DraftState state, GetPost cmd) {
cmd.replyTo.tell(state.content);
return Effect().none();
}
private Effect<State> onGetPost(PublishedState state, GetPost cmd) {
cmd.replyTo.tell(state.content);
return Effect().none();
}
// #command-handler
// #behavior
// commandHandler, eventHandler as in above snippets
}
// #behavior

View file

@ -0,0 +1,355 @@
/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.persistence.typed.state.javadsl.CommandHandler;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
// #behavior
import akka.persistence.typed.state.javadsl.DurableStateBehavior;
import akka.persistence.typed.PersistenceId;
// #behavior
// #effects
import akka.Done;
// #effects
import java.time.Duration;
public class DurableStatePersistentBehaviorTest {
interface Structure {
// #structure
public class MyPersistentCounter
extends DurableStateBehavior<MyPersistentCounter.Command<?>, MyPersistentCounter.State> {
interface Command<ReplyMessage> {}
public static class State {
private final int value;
public State(int value) {
this.value = value;
}
public int get() {
return value;
}
}
public static Behavior<Command<?>> create(PersistenceId persistenceId) {
return new MyPersistentCounter(persistenceId);
}
private MyPersistentCounter(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public State emptyState() {
return new State(0);
}
@Override
public CommandHandler<Command<?>, State> commandHandler() {
return (state, command) -> {
throw new RuntimeException("TODO: process the command & return an Effect");
};
}
}
// #structure
}
interface FirstExample {
// #behavior
public class MyPersistentCounter
extends DurableStateBehavior<MyPersistentCounter.Command<?>, MyPersistentCounter.State> {
// #behavior
// #command
interface Command<ReplyMessage> {}
public enum Increment implements Command<Void> {
INSTANCE
}
public static class IncrementBy implements Command<Void> {
public final int value;
public IncrementBy(int value) {
this.value = value;
}
}
public static class GetValue implements Command<State> {
private final ActorRef<Integer> replyTo;
public GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
// #command
// #state
public static class State {
private final int value;
public State(int value) {
this.value = value;
}
public int get() {
return value;
}
}
// #state
// #behavior
// commands, events and state defined here
public static Behavior<Command<?>> create(PersistenceId persistenceId) {
return new MyPersistentCounter(persistenceId);
}
private MyPersistentCounter(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public State emptyState() {
return new State(0);
}
// #command-handler
@Override
public CommandHandler<Command<?>, State> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(
Increment.class, (state, command) -> Effect().persist(new State(state.get() + 1)))
.onCommand(
IncrementBy.class,
(state, command) -> Effect().persist(new State(state.get() + command.value)))
.onCommand(
GetValue.class, (state, command) -> Effect().reply(command.replyTo, state.get()))
.build();
}
// #command-handler
}
// #behavior
}
interface SecondExample {
public class MyPersistentCounterWithReplies
extends DurableStateBehavior<
MyPersistentCounterWithReplies.Command<?>, MyPersistentCounterWithReplies.State> {
// #effects
interface Command<ReplyMessage> {}
public static class IncrementWithConfirmation implements Command<Void> {
public final ActorRef<Done> replyTo;
public IncrementWithConfirmation(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class GetValue implements Command<State> {
private final ActorRef<Integer> replyTo;
public GetValue(ActorRef<Integer> replyTo) {
this.replyTo = replyTo;
}
}
public static class State {
private final int value;
public State(int value) {
this.value = value;
}
public int get() {
return value;
}
}
public static Behavior<Command<?>> create(PersistenceId persistenceId) {
return new MyPersistentCounterWithReplies(persistenceId);
}
private MyPersistentCounterWithReplies(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public State emptyState() {
return new State(0);
}
@Override
public CommandHandler<Command<?>, State> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(
IncrementWithConfirmation.class,
(state, command) ->
Effect()
.persist(new State(state.get() + 1))
.thenReply(command.replyTo, (st) -> Done.getInstance()))
.onCommand(
GetValue.class, (state, command) -> Effect().reply(command.replyTo, state.get()))
.build();
}
// #effects
}
}
interface WithActorContext {
// #actor-context
public class MyPersistentBehavior
extends DurableStateBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.State> {
// #actor-context
interface Command {}
public static class State {}
// #actor-context
public static Behavior<Command> create(PersistenceId persistenceId) {
return Behaviors.setup(ctx -> new MyPersistentBehavior(persistenceId, ctx));
}
// this makes the context available to the command handler etc.
private final ActorContext<Command> context;
// optionally if you only need `ActorContext.getSelf()`
private final ActorRef<Command> self;
public MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> ctx) {
super(persistenceId);
this.context = ctx;
this.self = ctx.getSelf();
}
// #actor-context
@Override
public State emptyState() {
return null;
}
@Override
public CommandHandler<Command, State> commandHandler() {
return null;
}
// #actor-context
}
// #actor-context
}
interface More {
// #supervision
// #tagging
public class MyPersistentBehavior
extends DurableStateBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.State> {
// #tagging
// #supervision
interface Command {}
public static class State {}
// #supervision
public static Behavior<Command> create(PersistenceId persistenceId) {
return new MyPersistentBehavior(persistenceId);
}
private MyPersistentBehavior(PersistenceId persistenceId) {
super(
persistenceId,
SupervisorStrategy.restartWithBackoff(
Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
}
// #supervision
@Override
public State emptyState() {
return new State();
}
@Override
public CommandHandler<Command, State> commandHandler() {
return (state, command) -> {
throw new RuntimeException("TODO: process the command & return an Effect");
};
}
// #tagging
@Override
public String tag() {
return "tag1";
}
// #tagging
// #supervision
}
// #supervision
}
interface More2 {
// #wrapPersistentBehavior
public class MyPersistentBehavior
extends DurableStateBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.State> {
// #wrapPersistentBehavior
interface Command {}
public static class State {}
// #wrapPersistentBehavior
public static Behavior<Command> create(PersistenceId persistenceId) {
return Behaviors.setup(context -> new MyPersistentBehavior(persistenceId, context));
}
private final ActorContext<Command> context;
private MyPersistentBehavior(PersistenceId persistenceId, ActorContext<Command> context) {
super(
persistenceId,
SupervisorStrategy.restartWithBackoff(
Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
this.context = context;
}
// #wrapPersistentBehavior
@Override
public State emptyState() {
return new State();
}
// #wrapPersistentBehavior
@Override
public CommandHandler<Command, State> commandHandler() {
return (state, command) -> {
context.getLog().info("In command handler");
return Effect().none();
};
}
// #wrapPersistentBehavior
}
}
}

View file

@ -0,0 +1,125 @@
/*
* Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.pattern.StatusReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.Effect
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
//#behavior
object BlogPostEntityDurableState {
// commands, state defined here
//#behavior
//#state
sealed trait State
case object BlankState extends State
final case class DraftState(content: PostContent) extends State {
def withBody(newBody: String): DraftState =
copy(content = content.copy(body = newBody))
def postId: String = content.postId
}
final case class PublishedState(content: PostContent) extends State {
def postId: String = content.postId
}
//#state
//#commands
sealed trait Command
//#reply-command
final case class AddPost(content: PostContent, replyTo: ActorRef[StatusReply[AddPostDone]]) extends Command
final case class AddPostDone(postId: String)
//#reply-command
final case class GetPost(replyTo: ActorRef[PostContent]) extends Command
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends Command
final case class Publish(replyTo: ActorRef[Done]) extends Command
final case class PostContent(postId: String, title: String, body: String)
//#commands
//#behavior
def apply(entityId: String, persistenceId: PersistenceId): Behavior[Command] = {
Behaviors.setup { context =>
context.log.info("Starting BlogPostEntityDurableState {}", entityId)
DurableStateBehavior[Command, State](persistenceId, emptyState = BlankState, commandHandler)
}
}
//#behavior
//#command-handler
private val commandHandler: (State, Command) => Effect[State] = { (state, command) =>
state match {
case BlankState =>
command match {
case cmd: AddPost => addPost(cmd)
case _ => Effect.unhandled
}
case draftState: DraftState =>
command match {
case cmd: ChangeBody => changeBody(draftState, cmd)
case Publish(replyTo) => publish(draftState, replyTo)
case GetPost(replyTo) => getPost(draftState, replyTo)
case AddPost(_, replyTo) =>
Effect.unhandled[State].thenRun(_ => replyTo ! StatusReply.Error("Cannot add post while in draft state"))
}
case publishedState: PublishedState =>
command match {
case GetPost(replyTo) => getPost(publishedState, replyTo)
case AddPost(_, replyTo) =>
Effect.unhandled[State].thenRun(_ => replyTo ! StatusReply.Error("Cannot add post, already published"))
case _ => Effect.unhandled
}
}
}
private def addPost(cmd: AddPost): Effect[State] = {
//#reply
Effect.persist(DraftState(cmd.content)).thenRun { _ =>
// After persist is done additional side effects can be performed
cmd.replyTo ! StatusReply.Success(AddPostDone(cmd.content.postId))
}
//#reply
}
private def changeBody(state: DraftState, cmd: ChangeBody): Effect[State] = {
Effect.persist(state.withBody(cmd.newBody)).thenRun { _ =>
cmd.replyTo ! Done
}
}
private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[State] = {
Effect.persist(PublishedState(state.content)).thenRun { _ =>
println(s"Blog post ${state.postId} was published")
replyTo ! Done
}
}
private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[State] = {
replyTo ! state.content
Effect.none
}
private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[State] = {
replyTo ! state.content
Effect.none
}
//#command-handler
//#behavior
// commandHandler defined here
}
//#behavior

View file

@ -0,0 +1,151 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.Done
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.state.scaladsl.Effect
//#structure
//#behavior
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.persistence.typed.PersistenceId
//#behavior
//#structure
import scala.annotation.nowarn
import akka.serialization.jackson.CborSerializable
// unused variables in pattern match are useful in the docs
@nowarn
object DurableStatePersistentBehaviorCompileOnly {
object FirstExample {
//#command
sealed trait Command[ReplyMessage] extends CborSerializable
final case object Increment extends Command[Nothing]
final case class IncrementBy(value: Int) extends Command[Nothing]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
//#command
//#state
final case class State(value: Int) extends CborSerializable
//#state
//#command-handler
import akka.persistence.typed.state.scaladsl.Effect
val commandHandler: (State, Command[_]) => Effect[State] = (state, command) =>
command match {
case Increment => Effect.persist(state.copy(value = state.value + 1))
case IncrementBy(by) => Effect.persist(state.copy(value = state.value + by))
case GetValue(replyTo) => Effect.reply(replyTo)(state)
}
//#command-handler
//#behavior
def counter(id: String): DurableStateBehavior[Command[_], State] = {
DurableStateBehavior.apply[Command[_], State](
persistenceId = PersistenceId.ofUniqueId(id),
emptyState = State(0),
commandHandler = commandHandler)
}
//#behavior
}
//#structure
object MyPersistentCounter {
sealed trait Command[ReplyMessage] extends CborSerializable
final case class State(value: Int) extends CborSerializable
def counter(persistenceId: PersistenceId): DurableStateBehavior[Command[_], State] = {
DurableStateBehavior.apply[Command[_], State](
persistenceId,
emptyState = State(0),
commandHandler =
(state, command) => throw new NotImplementedError("TODO: process the command & return an Effect"))
}
}
//#structure
import MyPersistentCounter._
object MyPersistentCounterWithReplies {
//#effects
sealed trait Command[ReplyMessage] extends CborSerializable
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case class State(value: Int) extends CborSerializable
def counter(persistenceId: PersistenceId): DurableStateBehavior[Command[_], State] = {
DurableStateBehavior.withEnforcedReplies[Command[_], State](
persistenceId,
emptyState = State(0),
commandHandler = (state, command) =>
command match {
case IncrementWithConfirmation(replyTo) =>
Effect.persist(state.copy(value = state.value + 1)).thenReply(replyTo)(_ => Done)
case GetValue(replyTo) =>
Effect.reply(replyTo)(state)
})
}
//#effects
}
object BehaviorWithContext {
// #actor-context
import akka.persistence.typed.state.scaladsl.Effect
import akka.persistence.typed.state.scaladsl.DurableStateBehavior.CommandHandler
def apply(): Behavior[String] =
Behaviors.setup { context =>
DurableStateBehavior[String, State](
persistenceId = PersistenceId.ofUniqueId("myPersistenceId"),
emptyState = State(0),
commandHandler = CommandHandler.command { cmd =>
context.log.info("Got command {}", cmd)
Effect.none
})
}
// #actor-context
}
object TaggingBehavior {
def apply(): Behavior[Command[_]] =
//#tagging
DurableStateBehavior[Command[_], State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(0),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"))
.withTag("tag1")
.withTag("tag2")
//#tagging
}
object WrapBehavior {
import akka.persistence.typed.state.scaladsl.Effect
import akka.persistence.typed.state.scaladsl.DurableStateBehavior.CommandHandler
def apply(): Behavior[Command[_]] =
//#wrapPersistentBehavior
Behaviors.setup[Command[_]] { context =>
DurableStateBehavior[Command[_], State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(0),
commandHandler = CommandHandler.command { cmd =>
context.log.info("Got command {}", cmd)
Effect.none
})
}
//#wrapPersistentBehavior
}
}