Initial docs for typed persistence (#24197)
This commit is contained in:
parent
b395be2559
commit
bfbdf89776
9 changed files with 399 additions and 48 deletions
|
|
@ -29,6 +29,6 @@ that the module or API wasn't useful.
|
|||
These are the current complete modules marked as **may change**:
|
||||
|
||||
* @ref:[Multi Node Testing](../multi-node-testing.md)
|
||||
* @ref:[Akka Typed](../typed.md)
|
||||
* @ref:[Akka Typed](../actors-typed.md)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
@@@ index
|
||||
|
||||
* [actors](typed.md)
|
||||
* [actors](actors-typed.md)
|
||||
* [coexisting](coexisting.md)
|
||||
* [cluster](cluster-typed.md)
|
||||
* [cluster-sharding](cluster-sharding-typed.md)
|
||||
|
|
|
|||
|
|
@ -1,32 +1,188 @@
|
|||
# Persistence
|
||||
|
||||
TODO
|
||||
@@@ warning
|
||||
|
||||
## Dependency
|
||||
This module is currently marked as @ref:[may change](common/may-change.md) in the sense
|
||||
of being the subject of active research. This means that API or semantics can
|
||||
change without warning or deprecation period and it is not recommended to use
|
||||
this module in production just yet—you have been warned.
|
||||
|
||||
@@@
|
||||
|
||||
sbt
|
||||
: @@@vars
|
||||
```
|
||||
"com.typesafe.akka" %% "akka-persistence-typed" % "$akka.version$"
|
||||
```
|
||||
@@@
|
||||
@@@ warning
|
||||
|
||||
This module only has a Scala DSL. See [#24193](https://github.com/akka/akka/issues/24193)
|
||||
to track progress and to contribute to the Java DSL.
|
||||
|
||||
@@@
|
||||
|
||||
To use typed persistence add the following dependency:
|
||||
|
||||
@@dependency [sbt,Maven,Gradle] {
|
||||
group=com.typesafe.akka
|
||||
artifact=akka-persistence-typed_2.11
|
||||
version=$akka.version$
|
||||
}
|
||||
|
||||
|
||||
|
||||
Akka Persistence is a library for building event sourced actors. For background about how it works
|
||||
see the @ref:[untyped Akka Persistence section](persistence.md). This documentation shows how the typed API for persistence
|
||||
works and assumes you know what is meant by `Command`, `Event` and `State`.
|
||||
|
||||
Let's start with a simple example. The minimum required for a `PersistentActor` is:
|
||||
|
||||
Scala
|
||||
: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala) { #structure }
|
||||
|
||||
The first important thing to notice is the `Behavior` of a `PersistentActor` is typed to the type of the `Command`
|
||||
because this type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system.
|
||||
The event and state are only used internally.
|
||||
|
||||
The parameters to `PersistentActor.immutable` are::
|
||||
|
||||
* `persistenceId` is the unique identifier for the persistent actor.
|
||||
* `initialState` defines the `State` when the entity is first created e.g. a Counter would start wiht 0 as state.
|
||||
* `commandHandler` defines how to handle command and optional functions for other signals, e.g. `Termination` messages if `watch` is used.
|
||||
* `eventHandler` updates the current state when an event has been persisted.
|
||||
|
||||
Next we'll discuss each of these in detail.
|
||||
|
||||
### Command handler
|
||||
|
||||
The command handler is a function with 3 parameters for the `ActorContext`, current `State`, and `Command`.
|
||||
|
||||
A command handler returns an `Effect` directive that defines what event or events, if any, to persist.
|
||||
|
||||
* `Effect.persist` will persist one single event or several events atomically, i.e. all events
|
||||
are stored or none of them are stored if there is an error
|
||||
* `Effect.none` no events are to be persisted, for example a read-only command
|
||||
* `Effect.unhandled` the command is unhandled (not supported) in current state
|
||||
|
||||
External side effects can be performed after successful persist with the `andThen` function e.g `Effect.persist(..).andThen`.
|
||||
In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying
|
||||
the event is passed as parameter to the `andThen` function.
|
||||
|
||||
### Event handler
|
||||
|
||||
When an event has been persisted successfully the current state is updated by applying the
|
||||
event to the current state with the `eventHandler` function.
|
||||
|
||||
The event handler returns the new state, which must be immutable so you return a new instance of the state.
|
||||
The same event handler is also used when the entity is started up to recover its state from the stored events.
|
||||
|
||||
It is not recommended to perform side effects
|
||||
in the event handler, as those are also executed during recovery of an persistent actor
|
||||
|
||||
## Basic example
|
||||
|
||||
Command and event:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #command }
|
||||
|
||||
State is a List containing all the events:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #state }
|
||||
|
||||
The command handler just persists the `Cmd` payload in an `Evt`:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #command-handler }
|
||||
|
||||
The event handler appends the event to the state. This is called after successfully
|
||||
persisting the event in the database:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #event-handler }
|
||||
|
||||
These are used to create a `PersistentBehavior`:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #behavior }
|
||||
|
||||
The behavior can then be run as with any normal typed actor as described in [typed actors documentation](actors-typed.md).
|
||||
|
||||
## Larger example
|
||||
|
||||
After processing a message plain typed actors are able to return the `Behavior` that is used
|
||||
for next message.
|
||||
|
||||
As you can see in the above examples this is not supported by typed persistent actors. Instead, the state is
|
||||
returned by `eventHandler`. The reason a new behavior can't be returned is that behavior is part of the actor's
|
||||
state and must also carefully be reconstructed during recovery. If it would have been supported it would mean
|
||||
that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used.
|
||||
That would be very prone to mistakes and thus not allowed in Typed Persistence.
|
||||
|
||||
For simple actors you can use the same set of command handlers independent of what state the entity is in,
|
||||
as shown in above example. For more complex actors it's useful to be able to change the behavior in the sense
|
||||
that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities.
|
||||
|
||||
The next example shows how to define different behavior based on the current `State`. It is an actor that
|
||||
represents the state of a blog post. Before a post is started the only command it can process is to `AddPost`. Once it is started
|
||||
then it we can look it up with `GetPost`, modify it with `ChangeBody` or publish it with `Publish`.
|
||||
|
||||
The state is captured by:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #state }
|
||||
|
||||
The commands (only a subset are valid depending on state):
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #commands }
|
||||
|
||||
The command handler to process each command is decided by a `CommandHandler.byState` command handler,
|
||||
which is a function from `State => CommandHandler`:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #by-state-command-handler }
|
||||
|
||||
This can refer to many other `CommandHandler`s e.g one for a post that hasn't been started:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #initial-command-handler }
|
||||
|
||||
And a different `CommandHandler` for after the post has been added:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #post-added-command-handler }
|
||||
|
||||
The event handler is always the same independent of state. The main reason for not making the event handler
|
||||
part of the `CommandHandler` is that all events must be handled and that is typically independent of what the
|
||||
current state is. The event handler can of course still decide what to do based on the state if that is needed.
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #event-handler }
|
||||
|
||||
And finally the behavior is created from the `byState` command handler:
|
||||
|
||||
Scala
|
||||
: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #behavior }
|
||||
|
||||
## Serialization
|
||||
|
||||
The same @ref:[serialization](serialization.md) mechanism as for untyped
|
||||
actors is also used in Akka Typed, also for persistent actors. When picking serialization solution for the events
|
||||
you should also consider that it must be possible read old events when the application has evolved.
|
||||
Strategies for that can be found in the @ref:[schema evolution](persistence-schema-evolution.md).
|
||||
|
||||
## Recovery
|
||||
|
||||
Since it is strongly discouraged to perform side effects in applyEvent ,
|
||||
side effects should be performed once recovery has completed in the `onRecoveryCompleted` callback
|
||||
|
||||
Scala
|
||||
: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala) { #recovery }
|
||||
|
||||
The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`.
|
||||
|
||||
## Current limitations
|
||||
|
||||
* The `PersistentBehavior` can't be wrapped in other behaviors, such as `Actor.deferred`. See [#23694](https://github.com/akka/akka/issues/23694)
|
||||
* Can only tag events with event adapters. See [#23817](https://github.com/akka/akka/issues/23817)
|
||||
* Missing Java DSL. See [#24193](https://github.com/akka/akka/issues/24193)
|
||||
* Snapshot support. See [#24196](https://github.com/akka/akka/issues/24196)
|
||||
|
||||
Gradle
|
||||
: @@@vars
|
||||
```
|
||||
dependencies {
|
||||
compile group: 'com.typesafe.akka', name: 'akka-persistence-typed_2.11', version: '$akka.version$'
|
||||
}
|
||||
```
|
||||
@@@
|
||||
|
||||
Maven
|
||||
: @@@vars
|
||||
```
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-persistence-typed_$scala.binary_version$</artifactId>
|
||||
<version>$akka.version$</version>
|
||||
</dependency>
|
||||
```
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -1,5 +1,16 @@
|
|||
# Testing
|
||||
|
||||
@@@ warning
|
||||
|
||||
This module is currently marked as @ref:[may change](common/may-change.md) in the sense
|
||||
of being the subject of active research. This means that API or semantics can
|
||||
change without warning or deprecation period and it is not recommended to use
|
||||
this module in production just yet—you have been warned.
|
||||
|
||||
|
||||
@@@
|
||||
|
||||
To use the testkit add the following dependency:
|
||||
## Dependency
|
||||
|
||||
sbt
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
@@@ note
|
||||
|
||||
This module will be deprecated as it will be superseded by the @ref:[Akka Typed](typed.md)
|
||||
This module will be deprecated as it will be superseded by the @ref:[Akka Typed](actors-typed.md)
|
||||
project which is currently being developed in open preview mode.
|
||||
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -5,9 +5,7 @@ package akka.persistence.typed.scaladsl
|
|||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Terminated
|
||||
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated }
|
||||
import akka.actor.typed.scaladsl.Actor
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.actor.typed.scaladsl.TimerScheduler
|
||||
|
|
@ -17,26 +15,40 @@ object PersistentActorCompileOnlyTest {
|
|||
import akka.persistence.typed.scaladsl.PersistentActor._
|
||||
|
||||
object Simple {
|
||||
sealed trait MyCommand
|
||||
case class Cmd(data: String) extends MyCommand
|
||||
//#command
|
||||
sealed trait SimpleCommand
|
||||
case class Cmd(data: String) extends SimpleCommand
|
||||
|
||||
sealed trait MyEvent
|
||||
case class Evt(data: String) extends MyEvent
|
||||
sealed trait SimpleEvent
|
||||
case class Evt(data: String) extends SimpleEvent
|
||||
//#command
|
||||
|
||||
//#state
|
||||
case class ExampleState(events: List[String] = Nil)
|
||||
//#state
|
||||
|
||||
PersistentActor.immutable[MyCommand, MyEvent, ExampleState](
|
||||
persistenceId = "sample-id-1",
|
||||
|
||||
initialState = ExampleState(Nil),
|
||||
|
||||
commandHandler = CommandHandler.command {
|
||||
//#command-handler
|
||||
val commandHandler: CommandHandler[SimpleCommand, SimpleEvent, ExampleState] =
|
||||
CommandHandler.command {
|
||||
case Cmd(data) ⇒ Effect.persist(Evt(data))
|
||||
},
|
||||
}
|
||||
//#command-handler
|
||||
|
||||
//#event-handler
|
||||
val eventHandler: (ExampleState, SimpleEvent) ⇒ (ExampleState) = {
|
||||
case (state, Evt(data)) ⇒ state.copy(data :: state.events)
|
||||
}
|
||||
//#event-handler
|
||||
|
||||
//#behavior
|
||||
val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] =
|
||||
PersistentActor.immutable[SimpleCommand, SimpleEvent, ExampleState](
|
||||
persistenceId = "sample-id-1",
|
||||
initialState = ExampleState(Nil),
|
||||
commandHandler = commandHandler,
|
||||
eventHandler = eventHandler)
|
||||
//#behavior
|
||||
|
||||
eventHandler = {
|
||||
case (state, Evt(data)) ⇒ state.copy(data :: state.events)
|
||||
})
|
||||
}
|
||||
|
||||
object WithAck {
|
||||
|
|
@ -58,7 +70,9 @@ object PersistentActorCompileOnlyTest {
|
|||
commandHandler = CommandHandler.command {
|
||||
case Cmd(data, sender) ⇒
|
||||
Effect.persist(Evt(data))
|
||||
.andThen { sender ! Ack }
|
||||
.andThen {
|
||||
sender ! Ack
|
||||
}
|
||||
},
|
||||
|
||||
eventHandler = {
|
||||
|
|
@ -375,7 +389,9 @@ object PersistentActorCompileOnlyTest {
|
|||
Effect.none
|
||||
case CheerUp(sender) ⇒
|
||||
changeMoodIfNeeded(state, Happy)
|
||||
.andThen { sender ! Ack }
|
||||
.andThen {
|
||||
sender ! Ack
|
||||
}
|
||||
case Remember(memory) ⇒
|
||||
// A more elaborate example to show we still have full control over the effects
|
||||
// if needed (e.g. when some logic is factored out but you want to add more effects)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.persistence.typed.scaladsl.PersistentActor
|
||||
|
||||
object BasicPersistentActorSpec {
|
||||
|
||||
//#structure
|
||||
sealed trait Command
|
||||
sealed trait Event
|
||||
case class State()
|
||||
|
||||
val behavior: Behavior[Command] =
|
||||
PersistentActor.immutable[Command, Event, State](
|
||||
persistenceId = "abc",
|
||||
initialState = State(),
|
||||
commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ⇒ ??? },
|
||||
eventHandler = (state, evt) ⇒ ???)
|
||||
//#structure
|
||||
|
||||
//#recovery
|
||||
val recoveryBehavior: Behavior[Command] =
|
||||
PersistentActor.immutable[Command, Event, State](
|
||||
persistenceId = "abc",
|
||||
initialState = State(),
|
||||
commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ⇒ ??? },
|
||||
eventHandler = (state, evt) ⇒ ???)
|
||||
.onRecoveryCompleted { (ctx, state) ⇒
|
||||
???
|
||||
}
|
||||
//#recovery
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.akka.persistence.typed
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.typed.{ ActorRef, Behavior }
|
||||
import akka.persistence.typed.scaladsl.PersistentActor
|
||||
import akka.persistence.typed.scaladsl.PersistentActor.{ CommandHandler, Effect }
|
||||
|
||||
object InDepthPersistentActorSpec {
|
||||
|
||||
//#event
|
||||
sealed trait BlogEvent extends Serializable
|
||||
final case class PostAdded(
|
||||
postId: String,
|
||||
content: PostContent) extends BlogEvent
|
||||
|
||||
final case class BodyChanged(
|
||||
postId: String,
|
||||
newBody: String) extends BlogEvent
|
||||
final case class Published(postId: String) extends BlogEvent
|
||||
//#event
|
||||
|
||||
//#state
|
||||
object BlogState {
|
||||
val empty = BlogState(None, published = false)
|
||||
}
|
||||
|
||||
final case class BlogState(content: Option[PostContent], published: Boolean) {
|
||||
def withContent(newContent: PostContent): BlogState =
|
||||
copy(content = Some(newContent))
|
||||
def isEmpty: Boolean = content.isEmpty
|
||||
def postId: String = content match {
|
||||
case Some(c) ⇒ c.postId
|
||||
case None ⇒ throw new IllegalStateException("postId unknown before post is created")
|
||||
}
|
||||
}
|
||||
//#state
|
||||
|
||||
//#commands
|
||||
sealed trait BlogCommand extends Serializable
|
||||
final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
|
||||
final case class AddPostDone(postId: String)
|
||||
final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand
|
||||
final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand
|
||||
final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand
|
||||
final case object PassivatePost extends BlogCommand
|
||||
final case class PostContent(postId: String, title: String, body: String)
|
||||
//#commands
|
||||
|
||||
//#initial-command-handler
|
||||
private def initial: CommandHandler[BlogCommand, BlogEvent, BlogState] =
|
||||
CommandHandler { (ctx, state, cmd) ⇒
|
||||
cmd match {
|
||||
case AddPost(content, replyTo) ⇒
|
||||
val evt = PostAdded(content.postId, content)
|
||||
Effect.persist(evt).andThen { state2 ⇒
|
||||
// After persist is done additional side effects can be performed
|
||||
replyTo ! AddPostDone(content.postId)
|
||||
}
|
||||
case PassivatePost ⇒
|
||||
Effect.stop
|
||||
case _ ⇒
|
||||
Effect.unhandled
|
||||
}
|
||||
}
|
||||
//#initial-command-handler
|
||||
|
||||
//#post-added-command-handler
|
||||
private def postAdded: CommandHandler[BlogCommand, BlogEvent, BlogState] = {
|
||||
CommandHandler { (ctx, state, cmd) ⇒
|
||||
cmd match {
|
||||
case ChangeBody(newBody, replyTo) ⇒
|
||||
val evt = BodyChanged(state.postId, newBody)
|
||||
Effect.persist(evt).andThen { _ ⇒
|
||||
replyTo ! Done
|
||||
}
|
||||
case Publish(replyTo) ⇒
|
||||
Effect.persist(Published(state.postId)).andThen { _ ⇒
|
||||
println(s"Blog post ${state.postId} was published")
|
||||
replyTo ! Done
|
||||
}
|
||||
case GetPost(replyTo) ⇒
|
||||
replyTo ! state.content.get
|
||||
Effect.none
|
||||
case _: AddPost ⇒
|
||||
Effect.unhandled
|
||||
case PassivatePost ⇒
|
||||
Effect.stop
|
||||
}
|
||||
}
|
||||
}
|
||||
//#post-added-command-handler
|
||||
|
||||
//#by-state-command-handler
|
||||
private def commandHandler: CommandHandler[BlogCommand, BlogEvent, BlogState] = CommandHandler.byState {
|
||||
case state if state.isEmpty ⇒ initial
|
||||
case state if !state.isEmpty ⇒ postAdded
|
||||
}
|
||||
//#by-state-command-handler
|
||||
|
||||
//#event-handler
|
||||
private def eventHandler(state: BlogState, event: BlogEvent): BlogState =
|
||||
event match {
|
||||
case PostAdded(postId, content) ⇒
|
||||
state.withContent(content)
|
||||
|
||||
case BodyChanged(_, newBody) ⇒
|
||||
state.content match {
|
||||
case Some(c) ⇒ state.copy(content = Some(c.copy(body = newBody)))
|
||||
case None ⇒ state
|
||||
}
|
||||
|
||||
case Published(_) ⇒
|
||||
state.copy(published = true)
|
||||
}
|
||||
//#event-handler
|
||||
|
||||
//#behavior
|
||||
def behavior: Behavior[BlogCommand] =
|
||||
PersistentActor.immutable[BlogCommand, BlogEvent, BlogState](
|
||||
persistenceId = "abc",
|
||||
initialState = BlogState.empty,
|
||||
commandHandler,
|
||||
eventHandler)
|
||||
//#behavior
|
||||
}
|
||||
|
||||
class InDepthPersistentActorSpec {
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue