AccountExample in Scala in a few flavors, #25485
* AccountExample in Scala in a few flavors * include Account examples in reference docs * cleanup BlogPost example * include reply doc snippets
This commit is contained in:
parent
58ec80d4f8
commit
1691961a10
14 changed files with 805 additions and 390 deletions
|
|
@ -9,11 +9,9 @@ import java.time.Duration;
|
|||
import akka.actor.typed.ActorRef;
|
||||
import akka.actor.typed.ActorSystem;
|
||||
import akka.actor.typed.Behavior;
|
||||
import akka.actor.typed.Props;
|
||||
import akka.actor.typed.javadsl.Behaviors;
|
||||
|
||||
//#import
|
||||
import akka.cluster.sharding.typed.ClusterShardingSettings;
|
||||
import akka.cluster.sharding.typed.ShardingEnvelope;
|
||||
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
|
||||
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
|
||||
|
|
@ -22,9 +20,9 @@ import akka.cluster.sharding.typed.javadsl.ShardedEntity;
|
|||
|
||||
//#import
|
||||
|
||||
import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.BlogCommand;
|
||||
import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.BlogBehavior;
|
||||
import jdocs.akka.persistence.typed.InDepthPersistentBehaviorTest.PassivatePost;
|
||||
import jdocs.akka.persistence.typed.BlogPostExample.BlogCommand;
|
||||
import jdocs.akka.persistence.typed.BlogPostExample.BlogBehavior;
|
||||
import jdocs.akka.persistence.typed.BlogPostExample.PassivatePost;
|
||||
|
||||
public class ShardingCompileOnlyTest {
|
||||
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ import scala.concurrent.duration._
|
|||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.cluster.sharding.typed.scaladsl.ShardedEntity
|
||||
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec
|
||||
import docs.akka.persistence.typed.InDepthPersistentBehaviorSpec.{ BlogCommand, PassivatePost }
|
||||
import docs.akka.persistence.typed.BlogPostExample
|
||||
import docs.akka.persistence.typed.BlogPostExample.{ BlogCommand, PassivatePost }
|
||||
|
||||
object ShardingCompileOnlySpec {
|
||||
|
||||
|
|
@ -65,7 +65,7 @@ object ShardingCompileOnlySpec {
|
|||
shardRegion ! ShardingEnvelope("counter-1", Increment)
|
||||
//#send
|
||||
|
||||
import InDepthPersistentBehaviorSpec.behavior
|
||||
import BlogPostExample.behavior
|
||||
//#persistence
|
||||
val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost")
|
||||
|
||||
|
|
|
|||
|
|
@ -68,10 +68,10 @@ Taking the larger example from the @ref:[persistence documentation](persistence.
|
|||
a sharded entity is the same as for a non persistent behavior. The behavior:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior }
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior }
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #behavior }
|
||||
|
||||
To create the entity:
|
||||
|
||||
|
|
|
|||
51
akka-docs/src/main/paradox/typed/persistence-style.md
Normal file
51
akka-docs/src/main/paradox/typed/persistence-style.md
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
# Persistence - coding style
|
||||
|
||||
## Event handlers in the state
|
||||
|
||||
The section about @ref:[Changing Behavior](persistence.md#changing-behavior) described how commands and events
|
||||
can be handled differently depending on the state. One can take that one step further and define the event
|
||||
handler inside the state classes. In @ref:[next section the command handlers](#command-handlers-in-the-state) are
|
||||
also defined in the state.
|
||||
|
||||
The state can be seen as your domain object and it should contain the core business logic. Then it's a matter
|
||||
of taste if event handlers and command handlers should be defined in the state or be kept outside it.
|
||||
|
||||
Here we are using a bank account as the example domain. It has 3 state classes that are representing the lifecycle
|
||||
of the account; `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.
|
||||
|
||||
Scala
|
||||
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #account-entity }
|
||||
|
||||
TODO include corresponding example in Java
|
||||
|
||||
Notice how the `eventHandler` delegates to the `applyEvent` in the `Account` (state), which is implemented
|
||||
in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.
|
||||
|
||||
## Command handlers in the state
|
||||
|
||||
We can take the previous bank account example one step further by handling the commands in the state too.
|
||||
|
||||
Scala
|
||||
: @@snip [AccountExampleWithCommandHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithCommandHandlersInState.scala) { #account-entity }
|
||||
|
||||
TODO include corresponding example in Java
|
||||
|
||||
Notice how the command handler is delegating to `applyCommand` in the `Account` (state), which is implemented
|
||||
in the concrete `EmptyAccount`, `OpenedAccount`, and `ClosedAccount`.
|
||||
|
||||
## Optional initial state
|
||||
|
||||
Sometimes it's not desirable to use a separate state class for the empty initial state, but rather treat that as
|
||||
there is no state yet.
|
||||
@java[`null` can then be used as the `emptyState`, but be aware of that the `state` parameter
|
||||
will then be `null` for the first commands and events until the first event has be persisted to create the
|
||||
non-null state. It's possible to use `Optional` instead of `null` but that results in rather much boilerplate
|
||||
to unwrap the `Optional` state parameter and therefore `null` is probably preferred. The following example
|
||||
illustrates using `null` as the `emptyState`.]
|
||||
@scala[`Option[State]` can be used as the state type and `None` as the `emptyState`. Pattern matching
|
||||
is then used in command and event handlers at the outer layer before delegating to the state or other methods.]
|
||||
|
||||
Scala
|
||||
: @@snip [AccountExampleWithOptionState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithOptionState.scala) { #account-entity }
|
||||
|
||||
TODO include corresponding example in Java
|
||||
|
|
@ -1,5 +1,11 @@
|
|||
# Persistence
|
||||
|
||||
@@@ index
|
||||
|
||||
* [Persistence - coding style](persistence-style.md)
|
||||
|
||||
@@@
|
||||
|
||||
## Dependency
|
||||
|
||||
To use Akka Persistence Typed, add the module to your project:
|
||||
|
|
@ -140,7 +146,7 @@ Java
|
|||
|
||||
|
||||
|
||||
## Larger example
|
||||
## Changing Behavior
|
||||
|
||||
After processing a message, plain typed actors are able to return the `Behavior` that is used
|
||||
for next message.
|
||||
|
|
@ -153,73 +159,82 @@ That would be very prone to mistakes and thus not allowed in Typed Persistence.
|
|||
|
||||
For basic actors you can use the same set of command handlers independent of what state the entity is in,
|
||||
as shown in above example. For more complex actors it's useful to be able to change the behavior in the sense
|
||||
that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities.
|
||||
that different functions for processing commands may be defined depending on what state the actor is in.
|
||||
This is useful when implementing finite state machine (FSM) like entities.
|
||||
|
||||
The next example shows how to define different behavior based on the current `State`. It is an actor that
|
||||
represents the state of a blog post. Before a post is started the only command it can process is to `AddPost`. Once it is started
|
||||
then it we can look it up with `GetPost`, modify it with `ChangeBody` or publish it with `Publish`.
|
||||
represents the state of a blog post. Before a post is started the only command it can process is to `AddPost`.
|
||||
Once it is started then it we can look it up with `GetPost`, modify it with `ChangeBody` or publish it with `Publish`.
|
||||
|
||||
The state is captured by:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #state }
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #state }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #state }
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #state }
|
||||
|
||||
The commands, of which only a subset are valid depending on the state:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #commands }
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #commands }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #commands }
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #commands }
|
||||
|
||||
@java[The commandler handler to process each command is decided by the state class (or state predicate) that is
|
||||
given to the `commandHandlerBuilder` and the match cases in the builders. Several builders can be composed with `orElse`:]
|
||||
@scala[The command handler to process each command is composed by two levels of command handlers,
|
||||
one which matches on the state and then delegates to the another handler, specific to the state:]
|
||||
@scala[The command handler to process each command is decided by first looking at the state and then the command.
|
||||
It typically becomes two levels of pattern matching, first on the state and then on the command. Delegating to methods
|
||||
is a good practise because the one-line cases give a nice overview of the message dispatch.]
|
||||
|
||||
@@@ div { .group-scala }
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #by-state-command-handler }
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #command-handler }
|
||||
|
||||
@@@
|
||||
|
||||
@@@ div { .group-java }
|
||||
|
||||
TODO rewrite this example to be more like the Scala example
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #command-handler }
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #command-handler }
|
||||
|
||||
The @java[`CommandHandlerBuilder`]@scala[`CommandHandler`] for a post that hasn't been initialized with content:
|
||||
The `CommandHandlerBuilder` for a post that hasn't been initialized with content:
|
||||
|
||||
Java
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #initial-command-handler }
|
||||
|
||||
And a different `CommandHandlerBuilder` for after the post content has been added:
|
||||
|
||||
Java
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #post-added-command-handler }
|
||||
|
||||
@@@
|
||||
|
||||
The event handler:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #initial-command-handler }
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #event-handler }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #initial-command-handler }
|
||||
|
||||
And a different @java[`CommandHandlerBuilder`]@scala[`CommandHandler`] for after the post content has been added:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #post-added-command-handler }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #post-added-command-handler }
|
||||
|
||||
The event handler is always the same independent of state. The main reason for not making the event handler
|
||||
part of the `CommandHandler` is that contrary to Commands, all events must be handled and that is typically independent of what the
|
||||
current state is. The event handler can still decide what to do based on the state, if that is needed.
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #event-handler }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #event-handler }
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #event-handler }
|
||||
|
||||
And finally the behavior is created @scala[from the `PersistentBehavior.apply`]:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentBehaviorSpec.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior }
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #behavior }
|
||||
|
||||
Java
|
||||
: @@snip [InDepthPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #behavior }
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #behavior }
|
||||
|
||||
This can be taken one or two steps further by defining the event and command handlers in the state class as
|
||||
illustrated in @ref:[event handlers in the state](persistence-style.md#event-handlers-in-the-state) and
|
||||
@ref:[command handlers in the state](persistence-style.md#command-handlers-in-the-state).
|
||||
|
||||
There is also an example illustrating an @ref:[optional initial state](persistence-style.md#optional-initial-state).
|
||||
|
||||
## Effects and Side Effects
|
||||
|
||||
|
|
@ -258,7 +273,19 @@ Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`Act
|
|||
commands. After validation errors or after persisting events, using a `thenRun` side effect, the reply message can
|
||||
be sent to the `ActorRef`.
|
||||
|
||||
TODO example of thenRun reply
|
||||
Scala
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #reply-command }
|
||||
|
||||
Java
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #reply-command }
|
||||
|
||||
|
||||
Scala
|
||||
: @@snip [BlogPostExample.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BlogPostExample.scala) { #reply }
|
||||
|
||||
Java
|
||||
: @@snip [BlogPostExample.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BlogPostExample.java) { #reply }
|
||||
|
||||
|
||||
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that
|
||||
it can be used to enforce that replies are not forgotten when implementing the `PersistentBehavior`.
|
||||
|
|
@ -273,11 +300,25 @@ is not used, but then there will be no compilation errors if the reply decision
|
|||
Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific
|
||||
command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.
|
||||
|
||||
TODO example of thenReply
|
||||
Scala
|
||||
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #reply-command }
|
||||
|
||||
TODO include corresponding example in Java
|
||||
|
||||
When using the reply effect the commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`]
|
||||
in a standardized way.
|
||||
|
||||
Scala
|
||||
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #reply }
|
||||
|
||||
TODO include corresponding example in Java
|
||||
|
||||
Scala
|
||||
: @@snip [AccountExampleWithEventHandlersInState.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExampleWithEventHandlersInState.scala) { #withEnforcedReplies }
|
||||
|
||||
TODO include corresponding example in Java
|
||||
|
||||
|
||||
## Serialization
|
||||
|
||||
The same @ref:[serialization](../serialization.md) mechanism as for untyped
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import akka.persistence.typed.PersistenceId
|
|||
object PersistentBehavior {
|
||||
|
||||
/**
|
||||
* Type alias for the command handler function for reacting on events having been persisted.
|
||||
* Type alias for the command handler function that defines how to act on commands.
|
||||
*
|
||||
* The type alias is not used in API signatures because it's easier to see (in IDE) what is needed
|
||||
* when full function type is used. When defining the handler as a separate function value it can
|
||||
|
|
@ -29,7 +29,7 @@ object PersistentBehavior {
|
|||
type CommandHandler[Command, Event, State] = (State, Command) ⇒ Effect[Event, State]
|
||||
|
||||
/**
|
||||
* Type alias for the event handler function defines how to act on commands.
|
||||
* Type alias for the event handler function for updating the state based on events having been persisted.
|
||||
*
|
||||
* The type alias is not used in API signatures because it's easier to see (in IDE) what is needed
|
||||
* when full function type is used. When defining the handler as a separate function value it can
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import akka.persistence.typed.javadsl.PersistentBehavior;
|
|||
|
||||
import java.util.Optional;
|
||||
|
||||
public class InDepthPersistentBehaviorTest {
|
||||
public class BlogPostExample {
|
||||
|
||||
//#event
|
||||
interface BlogEvent {
|
||||
|
|
@ -54,19 +54,19 @@ public class InDepthPersistentBehaviorTest {
|
|||
//#state
|
||||
interface BlogState {}
|
||||
|
||||
public static class BlankState implements BlogState {}
|
||||
public static enum BlankState implements BlogState {
|
||||
INSTANCE
|
||||
}
|
||||
|
||||
public static class DraftState implements BlogState {
|
||||
final PostContent postContent;
|
||||
final boolean published;
|
||||
|
||||
DraftState(PostContent postContent, boolean published) {
|
||||
DraftState(PostContent postContent) {
|
||||
this.postContent = postContent;
|
||||
this.published = published;
|
||||
}
|
||||
|
||||
public DraftState withContent(PostContent newContent) {
|
||||
return new DraftState(newContent, this.published);
|
||||
return new DraftState(newContent);
|
||||
}
|
||||
|
||||
public String postId() {
|
||||
|
|
@ -94,6 +94,7 @@ public class InDepthPersistentBehaviorTest {
|
|||
//#commands
|
||||
public interface BlogCommand {
|
||||
}
|
||||
//#reply-command
|
||||
public static class AddPost implements BlogCommand {
|
||||
final PostContent content;
|
||||
final ActorRef<AddPostDone> replyTo;
|
||||
|
|
@ -110,6 +111,7 @@ public class InDepthPersistentBehaviorTest {
|
|||
this.postId = postId;
|
||||
}
|
||||
}
|
||||
//#reply-command
|
||||
public static class GetPost implements BlogCommand {
|
||||
final ActorRef<PostContent> replyTo;
|
||||
|
||||
|
|
@ -163,9 +165,11 @@ public class InDepthPersistentBehaviorTest {
|
|||
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlankState, BlogState> initialCommandHandler() {
|
||||
return commandHandlerBuilder(BlankState.class)
|
||||
.matchCommand(AddPost.class, (state, cmd) -> {
|
||||
//#reply
|
||||
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
||||
return Effect().persist(event)
|
||||
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
||||
//#reply
|
||||
});
|
||||
}
|
||||
//#initial-command-handler
|
||||
|
|
@ -225,7 +229,7 @@ public class InDepthPersistentBehaviorTest {
|
|||
public EventHandler<BlogState, BlogEvent> eventHandler() {
|
||||
return eventHandlerBuilder()
|
||||
.matchEvent(PostAdded.class, (state, event) ->
|
||||
new DraftState(event.content, false))
|
||||
new DraftState(event.content))
|
||||
.matchEvent(BodyChanged.class, DraftState.class, (state, chg) ->
|
||||
state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody)))
|
||||
.matchEvent(BodyChanged.class, PublishedState.class, (state, chg) ->
|
||||
|
|
@ -245,7 +249,7 @@ public class InDepthPersistentBehaviorTest {
|
|||
|
||||
@Override
|
||||
public BlogState emptyState() {
|
||||
return new BlankState();
|
||||
return BlankState.INSTANCE;
|
||||
}
|
||||
|
||||
// commandHandler, eventHandler as in above snippets
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
|
||||
|
||||
object AccountExample1 {
|
||||
|
||||
sealed trait AccountCommand
|
||||
case object CreateAccount extends AccountCommand
|
||||
case class Deposit(amount: Double) extends AccountCommand
|
||||
case class Withdraw(amount: Double) extends AccountCommand
|
||||
case object CloseAccount extends AccountCommand
|
||||
|
||||
sealed trait AccountEvent
|
||||
case object AccountCreated extends AccountEvent
|
||||
case class Deposited(amount: Double) extends AccountEvent
|
||||
case class Withdrawn(amount: Double) extends AccountEvent
|
||||
case object AccountClosed extends AccountEvent
|
||||
|
||||
sealed trait Account
|
||||
case class OpenedAccount(balance: Double) extends Account
|
||||
case object ClosedAccount extends Account
|
||||
|
||||
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
|
||||
CommandHandler.command {
|
||||
case CreateAccount ⇒ Effect.persist(AccountCreated)
|
||||
case _ ⇒ Effect.unhandled
|
||||
}
|
||||
|
||||
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = {
|
||||
case (Some(acc: OpenedAccount), cmd) ⇒ cmd match {
|
||||
case Deposit(amount) ⇒ Effect.persist(Deposited(amount))
|
||||
|
||||
case Withdraw(amount) ⇒
|
||||
if ((acc.balance - amount) < 0.0)
|
||||
Effect.unhandled // TODO replies are missing in this example
|
||||
else {
|
||||
Effect
|
||||
.persist(Withdrawn(amount))
|
||||
.thenRun {
|
||||
case Some(OpenedAccount(balance)) ⇒
|
||||
// do some side-effect using balance
|
||||
println(balance)
|
||||
case _ ⇒ throw new IllegalStateException
|
||||
}
|
||||
}
|
||||
case CloseAccount if acc.balance == 0.0 ⇒
|
||||
Effect.persist(AccountClosed)
|
||||
|
||||
case CloseAccount ⇒
|
||||
Effect.unhandled
|
||||
|
||||
case _ ⇒
|
||||
Effect.unhandled
|
||||
}
|
||||
case _ ⇒ throw new IllegalStateException
|
||||
}
|
||||
|
||||
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
|
||||
CommandHandler.command(_ ⇒ Effect.unhandled)
|
||||
|
||||
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = { (state, command) ⇒
|
||||
state match {
|
||||
case None ⇒ initialHandler(state, command)
|
||||
case Some(OpenedAccount(_)) ⇒ openedAccountHandler(state, command)
|
||||
case Some(ClosedAccount) ⇒ closedHandler(state, command)
|
||||
}
|
||||
}
|
||||
|
||||
private val eventHandler: (Option[Account], AccountEvent) ⇒ Option[Account] = {
|
||||
case (None, AccountCreated) ⇒ Some(OpenedAccount(0.0))
|
||||
|
||||
case (Some(acc @ OpenedAccount(_)), Deposited(amount)) ⇒
|
||||
Some(acc.copy(balance = acc.balance + amount))
|
||||
|
||||
case (Some(acc @ OpenedAccount(_)), Withdrawn(amount)) ⇒
|
||||
Some(acc.copy(balance = acc.balance - amount))
|
||||
|
||||
case (Some(OpenedAccount(_)), AccountClosed) ⇒
|
||||
Some(ClosedAccount)
|
||||
|
||||
case (state, event) ⇒ throw new RuntimeException(s"unexpected event [$event] in state [$state]")
|
||||
}
|
||||
|
||||
def behavior(accountNumber: String): Behavior[AccountCommand] =
|
||||
PersistentBehavior[AccountCommand, AccountEvent, Option[Account]](
|
||||
persistenceId = PersistenceId(s"Account-$accountNumber"),
|
||||
emptyState = None,
|
||||
commandHandler = commandHandler,
|
||||
eventHandler = eventHandler
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -1,103 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior.CommandHandler
|
||||
|
||||
object AccountExample2 {
|
||||
|
||||
sealed trait AccountCommand
|
||||
case object CreateAccount extends AccountCommand
|
||||
case class Deposit(amount: Double) extends AccountCommand
|
||||
case class Withdraw(amount: Double) extends AccountCommand
|
||||
case object CloseAccount extends AccountCommand
|
||||
|
||||
sealed trait AccountEvent
|
||||
case object AccountCreated extends AccountEvent
|
||||
case class Deposited(amount: Double) extends AccountEvent
|
||||
case class Withdrawn(amount: Double) extends AccountEvent
|
||||
case object AccountClosed extends AccountEvent
|
||||
|
||||
sealed trait Account {
|
||||
def applyEvent(event: AccountEvent): Account
|
||||
}
|
||||
case object EmptyAccount extends Account {
|
||||
override def applyEvent(event: AccountEvent): Account = event match {
|
||||
case AccountCreated ⇒ OpenedAccount(0.0)
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
|
||||
}
|
||||
}
|
||||
case class OpenedAccount(balance: Double) extends Account {
|
||||
override def applyEvent(event: AccountEvent): Account = event match {
|
||||
case Deposited(amount) ⇒ copy(balance = balance + amount)
|
||||
case Withdrawn(amount) ⇒ copy(balance = balance - amount)
|
||||
case AccountClosed ⇒ ClosedAccount
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
|
||||
}
|
||||
}
|
||||
case object ClosedAccount extends Account {
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
|
||||
}
|
||||
|
||||
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
|
||||
CommandHandler.command {
|
||||
case CreateAccount ⇒ Effect.persist(AccountCreated)
|
||||
case _ ⇒ Effect.unhandled
|
||||
}
|
||||
|
||||
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Account] = {
|
||||
case (acc: OpenedAccount, cmd) ⇒ cmd match {
|
||||
case Deposit(amount) ⇒ Effect.persist(Deposited(amount))
|
||||
|
||||
case Withdraw(amount) ⇒
|
||||
if ((acc.balance - amount) < 0.0)
|
||||
Effect.unhandled // TODO replies are missing in this example
|
||||
else {
|
||||
Effect
|
||||
.persist(Withdrawn(amount))
|
||||
.thenRun {
|
||||
case OpenedAccount(balance) ⇒
|
||||
// do some side-effect using balance
|
||||
println(balance)
|
||||
case _ ⇒ throw new IllegalStateException
|
||||
}
|
||||
}
|
||||
case CloseAccount if acc.balance == 0.0 ⇒
|
||||
Effect.persist(AccountClosed)
|
||||
|
||||
case CloseAccount ⇒
|
||||
Effect.unhandled
|
||||
}
|
||||
case _ ⇒ throw new IllegalStateException
|
||||
}
|
||||
|
||||
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
|
||||
CommandHandler.command(_ ⇒ Effect.unhandled)
|
||||
|
||||
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Account] = { (state, command) ⇒
|
||||
state match {
|
||||
case EmptyAccount ⇒ initialHandler(state, command)
|
||||
case OpenedAccount(_) ⇒ openedAccountHandler(state, command)
|
||||
case ClosedAccount ⇒ closedHandler(state, command)
|
||||
}
|
||||
}
|
||||
|
||||
private val eventHandler: (Account, AccountEvent) ⇒ Account =
|
||||
(state, event) ⇒ state.applyEvent(event)
|
||||
|
||||
def behavior(accountNumber: String): Behavior[AccountCommand] =
|
||||
PersistentBehavior[AccountCommand, AccountEvent, Account](
|
||||
persistenceId = PersistenceId(s"Account-$accountNumber"),
|
||||
emptyState = EmptyAccount,
|
||||
commandHandler = commandHandler,
|
||||
eventHandler = eventHandler
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,155 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
|
||||
/**
|
||||
* Bank account example illustrating:
|
||||
* - different state classes representing the lifecycle of the account
|
||||
* - event handlers in the state classes
|
||||
* - command handlers in the state classes
|
||||
* - replies of various types, using ExpectingReply and withEnforcedReplies
|
||||
*/
|
||||
object AccountExampleWithCommandHandlersInState {
|
||||
|
||||
//##account-entity
|
||||
object AccountEntity {
|
||||
// Command
|
||||
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
|
||||
final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance])
|
||||
extends AccountCommand[CurrentBalance]
|
||||
final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
|
||||
// Reply
|
||||
sealed trait AccountCommandReply
|
||||
sealed trait OperationResult extends AccountCommandReply
|
||||
case object Confirmed extends OperationResult
|
||||
final case class Rejected(reason: String) extends OperationResult
|
||||
final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply
|
||||
|
||||
// Event
|
||||
sealed trait AccountEvent
|
||||
case object AccountCreated extends AccountEvent
|
||||
case class Deposited(amount: BigDecimal) extends AccountEvent
|
||||
case class Withdrawn(amount: BigDecimal) extends AccountEvent
|
||||
case object AccountClosed extends AccountEvent
|
||||
|
||||
val Zero = BigDecimal(0)
|
||||
|
||||
// type alias to reduce boilerplate
|
||||
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Account]
|
||||
|
||||
// State
|
||||
sealed trait Account {
|
||||
def applyCommand(cmd: AccountCommand[_]): ReplyEffect
|
||||
def applyEvent(event: AccountEvent): Account
|
||||
}
|
||||
case object EmptyAccount extends Account {
|
||||
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
|
||||
cmd match {
|
||||
case c: CreateAccount ⇒
|
||||
Effect.persist(AccountCreated)
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
case _ ⇒
|
||||
// CreateAccount before handling any other commands
|
||||
Effect.unhandled.thenNoReply()
|
||||
}
|
||||
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
event match {
|
||||
case AccountCreated ⇒ OpenedAccount(Zero)
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
|
||||
}
|
||||
}
|
||||
case class OpenedAccount(balance: BigDecimal) extends Account {
|
||||
require(balance >= Zero, "Account balance can't be negative")
|
||||
|
||||
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
|
||||
cmd match {
|
||||
case c @ Deposit(amount) ⇒
|
||||
Effect.persist(Deposited(amount))
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
|
||||
case c @ Withdraw(amount) ⇒
|
||||
if (canWithdraw(amount)) {
|
||||
Effect.persist(Withdrawn(amount))
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
|
||||
} else {
|
||||
Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
|
||||
}
|
||||
|
||||
case c: GetBalance ⇒
|
||||
Effect.reply(c)(CurrentBalance(balance))
|
||||
|
||||
case c: CloseAccount ⇒
|
||||
if (balance == Zero)
|
||||
Effect.persist(AccountClosed)
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
else
|
||||
Effect.reply(c)(Rejected("Can't close account with non-zero balance"))
|
||||
|
||||
case c: CreateAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already created"))
|
||||
|
||||
}
|
||||
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
event match {
|
||||
case Deposited(amount) ⇒ copy(balance = balance + amount)
|
||||
case Withdrawn(amount) ⇒ copy(balance = balance - amount)
|
||||
case AccountClosed ⇒ ClosedAccount
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
|
||||
}
|
||||
|
||||
def canWithdraw(amount: BigDecimal): Boolean = {
|
||||
balance - amount >= Zero
|
||||
}
|
||||
|
||||
}
|
||||
case object ClosedAccount extends Account {
|
||||
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
|
||||
cmd match {
|
||||
case c @ (_: Deposit | _: Withdraw) ⇒
|
||||
Effect.reply(c)(Rejected("Account is closed"))
|
||||
case c: GetBalance ⇒
|
||||
Effect.reply(c)(CurrentBalance(Zero))
|
||||
case c: CloseAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already closed"))
|
||||
case c: CreateAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already created"))
|
||||
}
|
||||
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
|
||||
}
|
||||
|
||||
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
|
||||
PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Account](
|
||||
PersistenceId(s"Account|$accountNumber"),
|
||||
EmptyAccount,
|
||||
(state, cmd) ⇒ state.applyCommand(cmd),
|
||||
(state, event) ⇒ state.applyEvent(event)
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
//##account-entity
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
import akka.persistence.typed.scaladsl.ReplyEffect
|
||||
|
||||
/**
|
||||
* Bank account example illustrating:
|
||||
* - different state classes representing the lifecycle of the account
|
||||
* - event handlers in the state classes
|
||||
* - command handlers outside the state classes, pattern matching of commands in one place that
|
||||
* is delegating to methods
|
||||
* - replies of various types, using ExpectingReply and withEnforcedReplies
|
||||
*/
|
||||
object AccountExampleWithEventHandlersInState {
|
||||
|
||||
//#account-entity
|
||||
object AccountEntity {
|
||||
// Command
|
||||
//#reply-command
|
||||
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
|
||||
//#reply-command
|
||||
final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
//#reply-command
|
||||
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
//#reply-command
|
||||
final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance])
|
||||
extends AccountCommand[CurrentBalance]
|
||||
final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
|
||||
// Reply
|
||||
//#reply-command
|
||||
sealed trait AccountCommandReply
|
||||
sealed trait OperationResult extends AccountCommandReply
|
||||
case object Confirmed extends OperationResult
|
||||
final case class Rejected(reason: String) extends OperationResult
|
||||
//#reply-command
|
||||
final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply
|
||||
|
||||
// Event
|
||||
sealed trait AccountEvent
|
||||
case object AccountCreated extends AccountEvent
|
||||
case class Deposited(amount: BigDecimal) extends AccountEvent
|
||||
case class Withdrawn(amount: BigDecimal) extends AccountEvent
|
||||
case object AccountClosed extends AccountEvent
|
||||
|
||||
val Zero = BigDecimal(0)
|
||||
|
||||
// State
|
||||
sealed trait Account {
|
||||
def applyEvent(event: AccountEvent): Account
|
||||
}
|
||||
case object EmptyAccount extends Account {
|
||||
override def applyEvent(event: AccountEvent): Account = event match {
|
||||
case AccountCreated ⇒ OpenedAccount(Zero)
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
|
||||
}
|
||||
}
|
||||
case class OpenedAccount(balance: BigDecimal) extends Account {
|
||||
require(balance >= Zero, "Account balance can't be negative")
|
||||
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
event match {
|
||||
case Deposited(amount) ⇒ copy(balance = balance + amount)
|
||||
case Withdrawn(amount) ⇒ copy(balance = balance - amount)
|
||||
case AccountClosed ⇒ ClosedAccount
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
|
||||
}
|
||||
|
||||
def canWithdraw(amount: BigDecimal): Boolean = {
|
||||
balance - amount >= Zero
|
||||
}
|
||||
|
||||
}
|
||||
case object ClosedAccount extends Account {
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
|
||||
}
|
||||
|
||||
// Note that after defining command, event and state classes you would probably start here when writing this.
|
||||
// When filling in the parameters of PersistentBehaviors.apply you can use IntelliJ alt+Enter > createValue
|
||||
// to generate the stub with types for the command and event handlers.
|
||||
|
||||
//#withEnforcedReplies
|
||||
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
|
||||
PersistentBehavior.withEnforcedReplies(
|
||||
PersistenceId(s"Account|$accountNumber"),
|
||||
EmptyAccount,
|
||||
commandHandler,
|
||||
eventHandler
|
||||
)
|
||||
}
|
||||
//#withEnforcedReplies
|
||||
|
||||
private val commandHandler: (Account, AccountCommand[_]) ⇒ ReplyEffect[AccountEvent, Account] = {
|
||||
(state, cmd) ⇒
|
||||
state match {
|
||||
case EmptyAccount ⇒ cmd match {
|
||||
case c: CreateAccount ⇒ createAccount(c)
|
||||
case _ ⇒ Effect.unhandled.thenNoReply() // CreateAccount before handling any other commands
|
||||
}
|
||||
|
||||
case acc @ OpenedAccount(_) ⇒ cmd match {
|
||||
case c: Deposit ⇒ deposit(c)
|
||||
case c: Withdraw ⇒ withdraw(acc, c)
|
||||
case c: GetBalance ⇒ getBalance(acc, c)
|
||||
case c: CloseAccount ⇒ closeAccount(acc, c)
|
||||
case c: CreateAccount ⇒ Effect.reply(c)(Rejected("Account is already created"))
|
||||
}
|
||||
|
||||
case ClosedAccount ⇒
|
||||
cmd match {
|
||||
case c @ (_: Deposit | _: Withdraw) ⇒
|
||||
Effect.reply(c)(Rejected("Account is closed"))
|
||||
case c: GetBalance ⇒
|
||||
Effect.reply(c)(CurrentBalance(Zero))
|
||||
case c: CloseAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already closed"))
|
||||
case c: CreateAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already created"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val eventHandler: (Account, AccountEvent) ⇒ Account = {
|
||||
(state, event) ⇒ state.applyEvent(event)
|
||||
}
|
||||
|
||||
private def createAccount(cmd: CreateAccount): ReplyEffect[AccountEvent, Account] = {
|
||||
Effect.persist(AccountCreated)
|
||||
.thenReply(cmd)(_ ⇒ Confirmed)
|
||||
}
|
||||
|
||||
private def deposit(cmd: Deposit): ReplyEffect[AccountEvent, Account] = {
|
||||
Effect.persist(Deposited(cmd.amount))
|
||||
.thenReply(cmd)(_ ⇒ Confirmed)
|
||||
}
|
||||
|
||||
//#reply
|
||||
private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[AccountEvent, Account] = {
|
||||
if (acc.canWithdraw(cmd.amount)) {
|
||||
Effect.persist(Withdrawn(cmd.amount))
|
||||
.thenReply(cmd)(_ ⇒ Confirmed)
|
||||
|
||||
} else {
|
||||
Effect.reply(cmd)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
|
||||
}
|
||||
}
|
||||
//#reply
|
||||
|
||||
private def getBalance(acc: OpenedAccount, cmd: GetBalance): ReplyEffect[AccountEvent, Account] = {
|
||||
Effect.reply(cmd)(CurrentBalance(acc.balance))
|
||||
}
|
||||
|
||||
private def closeAccount(acc: OpenedAccount, cmd: CloseAccount): ReplyEffect[AccountEvent, Account] = {
|
||||
if (acc.balance == Zero)
|
||||
Effect.persist(AccountClosed)
|
||||
.thenReply(cmd)(_ ⇒ Confirmed)
|
||||
else
|
||||
Effect.reply(cmd)(Rejected("Can't close account with non-zero balance"))
|
||||
}
|
||||
|
||||
}
|
||||
//#account-entity
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,162 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
|
||||
/**
|
||||
* Bank account example illustrating:
|
||||
* - Option[State] that is starting with None as the initial state
|
||||
* - event handlers in the state classes
|
||||
* - command handlers in the state classes
|
||||
* - replies of various types, using ExpectingReply and withEnforcedReplies
|
||||
*/
|
||||
object AccountExampleWithOptionState {
|
||||
|
||||
//#account-entity
|
||||
object AccountEntity {
|
||||
// Command
|
||||
sealed trait AccountCommand[Reply] extends ExpectingReply[Reply]
|
||||
final case class CreateAccount()(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class Deposit(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class Withdraw(amount: BigDecimal)(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
final case class GetBalance()(override val replyTo: ActorRef[CurrentBalance])
|
||||
extends AccountCommand[CurrentBalance]
|
||||
final case class CloseAccount()(override val replyTo: ActorRef[OperationResult])
|
||||
extends AccountCommand[OperationResult]
|
||||
|
||||
// Reply
|
||||
sealed trait AccountCommandReply
|
||||
sealed trait OperationResult extends AccountCommandReply
|
||||
case object Confirmed extends OperationResult
|
||||
final case class Rejected(reason: String) extends OperationResult
|
||||
final case class CurrentBalance(balance: BigDecimal) extends AccountCommandReply
|
||||
|
||||
// Event
|
||||
sealed trait AccountEvent
|
||||
case object AccountCreated extends AccountEvent
|
||||
case class Deposited(amount: BigDecimal) extends AccountEvent
|
||||
case class Withdrawn(amount: BigDecimal) extends AccountEvent
|
||||
case object AccountClosed extends AccountEvent
|
||||
|
||||
val Zero = BigDecimal(0)
|
||||
|
||||
// type alias to reduce boilerplate
|
||||
type ReplyEffect = akka.persistence.typed.scaladsl.ReplyEffect[AccountEvent, Option[Account]]
|
||||
|
||||
// State
|
||||
sealed trait Account {
|
||||
def applyCommand(cmd: AccountCommand[_]): ReplyEffect
|
||||
def applyEvent(event: AccountEvent): Account
|
||||
}
|
||||
case class OpenedAccount(balance: BigDecimal) extends Account {
|
||||
require(balance >= Zero, "Account balance can't be negative")
|
||||
|
||||
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
|
||||
cmd match {
|
||||
case c @ Deposit(amount) ⇒
|
||||
Effect.persist(Deposited(amount))
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
|
||||
case c @ Withdraw(amount) ⇒
|
||||
if (canWithdraw(amount)) {
|
||||
Effect.persist(Withdrawn(amount))
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
|
||||
} else {
|
||||
Effect.reply(c)(Rejected(s"Insufficient balance $balance to be able to withdraw $amount"))
|
||||
}
|
||||
|
||||
case c: GetBalance ⇒
|
||||
Effect.reply(c)(CurrentBalance(balance))
|
||||
|
||||
case c: CloseAccount ⇒
|
||||
if (balance == Zero)
|
||||
Effect.persist(AccountClosed)
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
else
|
||||
Effect.reply(c)(Rejected("Can't close account with non-zero balance"))
|
||||
|
||||
case c: CreateAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already created"))
|
||||
|
||||
}
|
||||
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
event match {
|
||||
case Deposited(amount) ⇒ copy(balance = balance + amount)
|
||||
case Withdrawn(amount) ⇒ copy(balance = balance - amount)
|
||||
case AccountClosed ⇒ ClosedAccount
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
|
||||
}
|
||||
|
||||
def canWithdraw(amount: BigDecimal): Boolean = {
|
||||
balance - amount >= Zero
|
||||
}
|
||||
|
||||
}
|
||||
case object ClosedAccount extends Account {
|
||||
override def applyCommand(cmd: AccountCommand[_]): ReplyEffect =
|
||||
cmd match {
|
||||
case c @ (_: Deposit | _: Withdraw) ⇒
|
||||
Effect.reply(c)(Rejected("Account is closed"))
|
||||
case c: GetBalance ⇒
|
||||
Effect.reply(c)(CurrentBalance(Zero))
|
||||
case c: CloseAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already closed"))
|
||||
case c: CreateAccount ⇒
|
||||
Effect.reply(c)(Rejected("Account is already created"))
|
||||
}
|
||||
|
||||
override def applyEvent(event: AccountEvent): Account =
|
||||
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
|
||||
}
|
||||
|
||||
def behavior(accountNumber: String): Behavior[AccountCommand[AccountCommandReply]] = {
|
||||
PersistentBehavior.withEnforcedReplies[AccountCommand[AccountCommandReply], AccountEvent, Option[Account]](
|
||||
PersistenceId(s"Account|$accountNumber"),
|
||||
None,
|
||||
(state, cmd) ⇒ state match {
|
||||
case None ⇒ onFirstCommand(cmd)
|
||||
case Some(account) ⇒ account.applyCommand(cmd)
|
||||
},
|
||||
(state, event) ⇒ state match {
|
||||
case None ⇒ Some(onFirstEvent(event))
|
||||
case Some(account) ⇒ Some(account.applyEvent(event))
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
def onFirstCommand(cmd: AccountCommand[_]): ReplyEffect = {
|
||||
cmd match {
|
||||
case c: CreateAccount ⇒
|
||||
Effect.persist(AccountCreated)
|
||||
.thenReply(c)(_ ⇒ Confirmed)
|
||||
case _ ⇒
|
||||
// CreateAccount before handling any other commands
|
||||
Effect.unhandled.thenNoReply()
|
||||
}
|
||||
}
|
||||
|
||||
def onFirstEvent(event: AccountEvent): Account = {
|
||||
event match {
|
||||
case AccountCreated ⇒ OpenedAccount(Zero)
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
//#account-entity
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
|
||||
object BlogPostExample {
|
||||
|
||||
//#event
|
||||
sealed trait BlogEvent
|
||||
final case class PostAdded(
|
||||
postId: String,
|
||||
content: PostContent) extends BlogEvent
|
||||
|
||||
final case class BodyChanged(
|
||||
postId: String,
|
||||
newBody: String) extends BlogEvent
|
||||
final case class Published(postId: String) extends BlogEvent
|
||||
//#event
|
||||
|
||||
//#state
|
||||
sealed trait BlogState
|
||||
|
||||
case object BlankState extends BlogState
|
||||
|
||||
final case class DraftState(content: PostContent) extends BlogState {
|
||||
def withBody(newBody: String): DraftState =
|
||||
copy(content = content.copy(body = newBody))
|
||||
|
||||
def postId: String = content.postId
|
||||
}
|
||||
|
||||
final case class PublishedState(content: PostContent) extends BlogState {
|
||||
def postId: String = content.postId
|
||||
}
|
||||
//#state
|
||||
|
||||
//#commands
|
||||
sealed trait BlogCommand
|
||||
//#reply-command
|
||||
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
|
||||
final case class AddPostDone(postId: String)
|
||||
//#reply-command
|
||||
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
|
||||
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
|
||||
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
|
||||
final case object PassivatePost extends BlogCommand
|
||||
final case class PostContent(postId: String, title: String, body: String)
|
||||
//#commands
|
||||
|
||||
//#behavior
|
||||
def behavior(entityId: String): Behavior[BlogCommand] =
|
||||
PersistentBehavior[BlogCommand, BlogEvent, BlogState](
|
||||
persistenceId = PersistenceId(s"Blog-$entityId"),
|
||||
emptyState = BlankState,
|
||||
commandHandler,
|
||||
eventHandler)
|
||||
//#behavior
|
||||
|
||||
//#command-handler
|
||||
private val commandHandler: (BlogState, BlogCommand) ⇒ Effect[BlogEvent, BlogState] = { (state, command) ⇒
|
||||
state match {
|
||||
|
||||
case BlankState ⇒ command match {
|
||||
case cmd: AddPost ⇒ addPost(cmd)
|
||||
case PassivatePost ⇒ Effect.stop
|
||||
case _ ⇒ Effect.unhandled
|
||||
}
|
||||
|
||||
case draftState: DraftState ⇒ command match {
|
||||
case cmd: ChangeBody ⇒ changeBody(draftState, cmd)
|
||||
case Publish(replyTo) ⇒ publish(draftState, replyTo)
|
||||
case GetPost(replyTo) ⇒ getPost(draftState, replyTo)
|
||||
case _: AddPost ⇒ Effect.unhandled
|
||||
case PassivatePost ⇒ Effect.stop
|
||||
}
|
||||
|
||||
case publishedState: PublishedState ⇒ command match {
|
||||
case GetPost(replyTo) ⇒ getPost(publishedState, replyTo)
|
||||
case PassivatePost ⇒ Effect.stop
|
||||
case _ ⇒ Effect.unhandled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def addPost(cmd: AddPost): Effect[BlogEvent, BlogState] = {
|
||||
//#reply
|
||||
val evt = PostAdded(cmd.content.postId, cmd.content)
|
||||
Effect.persist(evt).thenRun { _ ⇒
|
||||
// After persist is done additional side effects can be performed
|
||||
cmd.replyTo ! AddPostDone(cmd.content.postId)
|
||||
}
|
||||
//#reply
|
||||
}
|
||||
|
||||
private def changeBody(state: DraftState, cmd: ChangeBody): Effect[BlogEvent, BlogState] = {
|
||||
val evt = BodyChanged(state.postId, cmd.newBody)
|
||||
Effect.persist(evt).thenRun { _ ⇒
|
||||
cmd.replyTo ! Done
|
||||
}
|
||||
}
|
||||
|
||||
private def publish(state: DraftState, replyTo: ActorRef[Done]): Effect[BlogEvent, BlogState] = {
|
||||
Effect.persist(Published(state.postId)).thenRun { _ ⇒
|
||||
println(s"Blog post ${state.postId} was published")
|
||||
replyTo ! Done
|
||||
}
|
||||
}
|
||||
|
||||
private def getPost(state: DraftState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = {
|
||||
replyTo ! state.content
|
||||
Effect.none
|
||||
}
|
||||
|
||||
private def getPost(state: PublishedState, replyTo: ActorRef[PostContent]): Effect[BlogEvent, BlogState] = {
|
||||
replyTo ! state.content
|
||||
Effect.none
|
||||
}
|
||||
//#command-handler
|
||||
|
||||
//#event-handler
|
||||
private val eventHandler: (BlogState, BlogEvent) ⇒ BlogState = { (state, event) ⇒
|
||||
state match {
|
||||
|
||||
case BlankState ⇒ event match {
|
||||
case PostAdded(_, content) ⇒
|
||||
DraftState(content)
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
|
||||
}
|
||||
|
||||
case draftState: DraftState ⇒ event match {
|
||||
|
||||
case BodyChanged(_, newBody) ⇒
|
||||
draftState.withBody(newBody)
|
||||
|
||||
case Published(_) ⇒
|
||||
PublishedState(draftState.content)
|
||||
|
||||
case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
|
||||
}
|
||||
|
||||
case _: PublishedState ⇒
|
||||
// no more changes after published
|
||||
throw new IllegalStateException(s"unexpected event [$event] in state [$state]")
|
||||
}
|
||||
}
|
||||
//#event-handler
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -1,130 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.scaladsl.PersistentBehavior
|
||||
import akka.persistence.typed.scaladsl.Effect
|
||||
|
||||
object InDepthPersistentBehaviorSpec {
|
||||
|
||||
//#event
|
||||
sealed trait BlogEvent extends Serializable
|
||||
final case class PostAdded(
|
||||
postId: String,
|
||||
content: PostContent) extends BlogEvent
|
||||
|
||||
final case class BodyChanged(
|
||||
postId: String,
|
||||
newBody: String) extends BlogEvent
|
||||
final case class Published(postId: String) extends BlogEvent
|
||||
//#event
|
||||
|
||||
//#state
|
||||
object BlogState {
|
||||
val empty = BlogState(None, published = false)
|
||||
}
|
||||
|
||||
final case class BlogState(content: Option[PostContent], published: Boolean) {
|
||||
def withContent(newContent: PostContent): BlogState =
|
||||
copy(content = Some(newContent))
|
||||
def isEmpty: Boolean = content.isEmpty
|
||||
def postId: String = content match {
|
||||
case Some(c) ⇒ c.postId
|
||||
case None ⇒ throw new IllegalStateException("postId unknown before post is created")
|
||||
}
|
||||
}
|
||||
//#state
|
||||
|
||||
//#commands
|
||||
sealed trait BlogCommand extends Serializable
|
||||
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
|
||||
final case class AddPostDone(postId: String)
|
||||
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
|
||||
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
|
||||
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
|
||||
final case object PassivatePost extends BlogCommand
|
||||
final case class PostContent(postId: String, title: String, body: String)
|
||||
//#commands
|
||||
|
||||
//#initial-command-handler
|
||||
private val initial: (BlogState, BlogCommand) ⇒ Effect[BlogEvent, BlogState] =
|
||||
(state, cmd) ⇒
|
||||
cmd match {
|
||||
case AddPost(content, replyTo) ⇒
|
||||
val evt = PostAdded(content.postId, content)
|
||||
Effect.persist(evt).thenRun { state2 ⇒
|
||||
// After persist is done additional side effects can be performed
|
||||
replyTo ! AddPostDone(content.postId)
|
||||
}
|
||||
case PassivatePost ⇒
|
||||
Effect.stop
|
||||
case _ ⇒
|
||||
Effect.unhandled
|
||||
}
|
||||
//#initial-command-handler
|
||||
|
||||
//#post-added-command-handler
|
||||
private val postAdded: (BlogState, BlogCommand) ⇒ Effect[BlogEvent, BlogState] = {
|
||||
(state, cmd) ⇒
|
||||
cmd match {
|
||||
case ChangeBody(newBody, replyTo) ⇒
|
||||
val evt = BodyChanged(state.postId, newBody)
|
||||
Effect.persist(evt).thenRun { _ ⇒
|
||||
replyTo ! Done
|
||||
}
|
||||
case Publish(replyTo) ⇒
|
||||
Effect.persist(Published(state.postId)).thenRun { _ ⇒
|
||||
println(s"Blog post ${state.postId} was published")
|
||||
replyTo ! Done
|
||||
}
|
||||
case GetPost(replyTo) ⇒
|
||||
replyTo ! state.content.get
|
||||
Effect.none
|
||||
case _: AddPost ⇒
|
||||
Effect.unhandled
|
||||
case PassivatePost ⇒
|
||||
Effect.stop
|
||||
}
|
||||
}
|
||||
//#post-added-command-handler
|
||||
|
||||
//#by-state-command-handler
|
||||
private val commandHandler: (BlogState, BlogCommand) ⇒ Effect[BlogEvent, BlogState] = { (state, command) ⇒
|
||||
if (state.isEmpty) initial(state, command)
|
||||
else postAdded(state, command)
|
||||
}
|
||||
//#by-state-command-handler
|
||||
|
||||
//#event-handler
|
||||
private val eventHandler: (BlogState, BlogEvent) ⇒ BlogState = { (state, event) ⇒
|
||||
event match {
|
||||
case PostAdded(postId, content) ⇒
|
||||
state.withContent(content)
|
||||
|
||||
case BodyChanged(_, newBody) ⇒
|
||||
state.content match {
|
||||
case Some(c) ⇒ state.copy(content = Some(c.copy(body = newBody)))
|
||||
case None ⇒ state
|
||||
}
|
||||
|
||||
case Published(_) ⇒
|
||||
state.copy(published = true)
|
||||
}
|
||||
}
|
||||
//#event-handler
|
||||
|
||||
//#behavior
|
||||
def behavior(entityId: String): Behavior[BlogCommand] =
|
||||
PersistentBehavior[BlogCommand, BlogEvent, BlogState](
|
||||
persistenceId = PersistenceId(s"Blog-$entityId"),
|
||||
emptyState = BlogState.empty,
|
||||
commandHandler,
|
||||
eventHandler)
|
||||
//#behavior
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue