From bfbdf89776dfec4a613ae0eb6853db51e813a0a3 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 22 Dec 2017 13:43:36 +0000 Subject: [PATCH] Initial docs for typed persistence (#24197) --- .../paradox/{typed.md => actors-typed.md} | 0 .../src/main/paradox/common/may-change.md | 2 +- akka-docs/src/main/paradox/index-typed.md | 2 +- .../src/main/paradox/persistence-typed.md | 208 +++++++++++++++--- akka-docs/src/main/paradox/testing-typed.md | 11 + akka-docs/src/main/paradox/typed-actors.md | 2 +- .../PersistentActorCompileOnlyTest.scala | 54 +++-- .../typed/BasicPersistentActorSpec.scala | 36 +++ .../typed/InDepthPersistentActorSpec.scala | 132 +++++++++++ 9 files changed, 399 insertions(+), 48 deletions(-) rename akka-docs/src/main/paradox/{typed.md => actors-typed.md} (100%) create mode 100644 akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala create mode 100644 akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala diff --git a/akka-docs/src/main/paradox/typed.md b/akka-docs/src/main/paradox/actors-typed.md similarity index 100% rename from akka-docs/src/main/paradox/typed.md rename to akka-docs/src/main/paradox/actors-typed.md diff --git a/akka-docs/src/main/paradox/common/may-change.md b/akka-docs/src/main/paradox/common/may-change.md index 95d73631c0..a9cf9e15b0 100644 --- a/akka-docs/src/main/paradox/common/may-change.md +++ b/akka-docs/src/main/paradox/common/may-change.md @@ -29,6 +29,6 @@ that the module or API wasn't useful. These are the current complete modules marked as **may change**: * @ref:[Multi Node Testing](../multi-node-testing.md) -* @ref:[Akka Typed](../typed.md) +* @ref:[Akka Typed](../actors-typed.md) diff --git a/akka-docs/src/main/paradox/index-typed.md b/akka-docs/src/main/paradox/index-typed.md index 5ff10e834b..77fe500ee4 100644 --- a/akka-docs/src/main/paradox/index-typed.md +++ b/akka-docs/src/main/paradox/index-typed.md @@ -4,7 +4,7 @@ @@@ index -* [actors](typed.md) +* [actors](actors-typed.md) * [coexisting](coexisting.md) * [cluster](cluster-typed.md) * [cluster-sharding](cluster-sharding-typed.md) diff --git a/akka-docs/src/main/paradox/persistence-typed.md b/akka-docs/src/main/paradox/persistence-typed.md index 5a746a2a9c..dba8eb997f 100644 --- a/akka-docs/src/main/paradox/persistence-typed.md +++ b/akka-docs/src/main/paradox/persistence-typed.md @@ -1,32 +1,188 @@ # Persistence -TODO +@@@ warning -## Dependency +This module is currently marked as @ref:[may change](common/may-change.md) in the sense + of being the subject of active research. This means that API or semantics can + change without warning or deprecation period and it is not recommended to use + this module in production just yet—you have been warned. + +@@@ -sbt -: @@@vars - ``` - "com.typesafe.akka" %% "akka-persistence-typed" % "$akka.version$" - ``` - @@@ +@@@ warning + +This module only has a Scala DSL. See [#24193](https://github.com/akka/akka/issues/24193) +to track progress and to contribute to the Java DSL. + +@@@ + +To use typed persistence add the following dependency: + +@@dependency [sbt,Maven,Gradle] { + group=com.typesafe.akka + artifact=akka-persistence-typed_2.11 + version=$akka.version$ +} + + + +Akka Persistence is a library for building event sourced actors. For background about how it works +see the @ref:[untyped Akka Persistence section](persistence.md). This documentation shows how the typed API for persistence +works and assumes you know what is meant by `Command`, `Event` and `State`. + +Let's start with a simple example. The minimum required for a `PersistentActor` is: + +Scala +: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala) { #structure } + +The first important thing to notice is the `Behavior` of a `PersistentActor` is typed to the type of the `Command` +because this type of message a persistent actor should receive. In Akka Typed this is now enforced by the type system. +The event and state are only used internally. + +The parameters to `PersistentActor.immutable` are:: + +* `persistenceId` is the unique identifier for the persistent actor. +* `initialState` defines the `State` when the entity is first created e.g. a Counter would start wiht 0 as state. +* `commandHandler` defines how to handle command and optional functions for other signals, e.g. `Termination` messages if `watch` is used. +* `eventHandler` updates the current state when an event has been persisted. + +Next we'll discuss each of these in detail. + +### Command handler + +The command handler is a function with 3 parameters for the `ActorContext`, current `State`, and `Command`. + +A command handler returns an `Effect` directive that defines what event or events, if any, to persist. + +* `Effect.persist` will persist one single event or several events atomically, i.e. all events + are stored or none of them are stored if there is an error +* `Effect.none` no events are to be persisted, for example a read-only command +* `Effect.unhandled` the command is unhandled (not supported) in current state + +External side effects can be performed after successful persist with the `andThen` function e.g `Effect.persist(..).andThen`. +In the example below a reply is sent to the `replyTo` ActorRef. Note that the new state after applying +the event is passed as parameter to the `andThen` function. + +### Event handler + +When an event has been persisted successfully the current state is updated by applying the +event to the current state with the `eventHandler` function. + +The event handler returns the new state, which must be immutable so you return a new instance of the state. +The same event handler is also used when the entity is started up to recover its state from the stored events. + +It is not recommended to perform side effects +in the event handler, as those are also executed during recovery of an persistent actor + +## Basic example + +Command and event: + +Scala +: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #command } + +State is a List containing all the events: + +Scala +: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #state } + +The command handler just persists the `Cmd` payload in an `Evt`: + +Scala +: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #command-handler } + +The event handler appends the event to the state. This is called after successfully +persisting the event in the database: + +Scala +: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #event-handler } + +These are used to create a `PersistentBehavior`: + +Scala +: @@snip [PersistentActorCompileOnyTest.scala]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala) { #behavior } + +The behavior can then be run as with any normal typed actor as described in [typed actors documentation](actors-typed.md). + +## Larger example + +After processing a message plain typed actors are able to return the `Behavior` that is used +for next message. + +As you can see in the above examples this is not supported by typed persistent actors. Instead, the state is +returned by `eventHandler`. The reason a new behavior can't be returned is that behavior is part of the actor's +state and must also carefully be reconstructed during recovery. If it would have been supported it would mean +that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. +That would be very prone to mistakes and thus not allowed in Typed Persistence. + +For simple actors you can use the same set of command handlers independent of what state the entity is in, +as shown in above example. For more complex actors it's useful to be able to change the behavior in the sense +that different functions for processing commands may be defined depending on what state the actor is in. This is useful when implementing finite state machine (FSM) like entities. + +The next example shows how to define different behavior based on the current `State`. It is an actor that +represents the state of a blog post. Before a post is started the only command it can process is to `AddPost`. Once it is started +then it we can look it up with `GetPost`, modify it with `ChangeBody` or publish it with `Publish`. + +The state is captured by: + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #state } + +The commands (only a subset are valid depending on state): + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #commands } + +The command handler to process each command is decided by a `CommandHandler.byState` command handler, +which is a function from `State => CommandHandler`: + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #by-state-command-handler } + +This can refer to many other `CommandHandler`s e.g one for a post that hasn't been started: + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #initial-command-handler } + +And a different `CommandHandler` for after the post has been added: + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #post-added-command-handler } + +The event handler is always the same independent of state. The main reason for not making the event handler +part of the `CommandHandler` is that all events must be handled and that is typically independent of what the +current state is. The event handler can of course still decide what to do based on the state if that is needed. + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #event-handler } + +And finally the behavior is created from the `byState` command handler: + +Scala +: @@snip [InDepthPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala) { #behavior } + +## Serialization + +The same @ref:[serialization](serialization.md) mechanism as for untyped +actors is also used in Akka Typed, also for persistent actors. When picking serialization solution for the events +you should also consider that it must be possible read old events when the application has evolved. +Strategies for that can be found in the @ref:[schema evolution](persistence-schema-evolution.md). + +## Recovery + +Since it is strongly discouraged to perform side effects in applyEvent , +side effects should be performed once recovery has completed in the `onRecoveryCompleted` callback + +Scala +: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala) { #recovery } + +The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`. + +## Current limitations + +* The `PersistentBehavior` can't be wrapped in other behaviors, such as `Actor.deferred`. See [#23694](https://github.com/akka/akka/issues/23694) +* Can only tag events with event adapters. See [#23817](https://github.com/akka/akka/issues/23817) +* Missing Java DSL. See [#24193](https://github.com/akka/akka/issues/24193) +* Snapshot support. See [#24196](https://github.com/akka/akka/issues/24196) -Gradle -: @@@vars - ``` - dependencies { - compile group: 'com.typesafe.akka', name: 'akka-persistence-typed_2.11', version: '$akka.version$' - } - ``` - @@@ -Maven -: @@@vars - ``` - - com.typesafe.akka - akka-persistence-typed_$scala.binary_version$ - $akka.version$ - - ``` - @@@ diff --git a/akka-docs/src/main/paradox/testing-typed.md b/akka-docs/src/main/paradox/testing-typed.md index fafd3e1855..e9c43d5b9a 100644 --- a/akka-docs/src/main/paradox/testing-typed.md +++ b/akka-docs/src/main/paradox/testing-typed.md @@ -1,5 +1,16 @@ # Testing +@@@ warning + +This module is currently marked as @ref:[may change](common/may-change.md) in the sense + of being the subject of active research. This means that API or semantics can + change without warning or deprecation period and it is not recommended to use + this module in production just yet—you have been warned. + + +@@@ + +To use the testkit add the following dependency: ## Dependency sbt diff --git a/akka-docs/src/main/paradox/typed-actors.md b/akka-docs/src/main/paradox/typed-actors.md index f04520e610..9cae31bbf5 100644 --- a/akka-docs/src/main/paradox/typed-actors.md +++ b/akka-docs/src/main/paradox/typed-actors.md @@ -2,7 +2,7 @@ @@@ note -This module will be deprecated as it will be superseded by the @ref:[Akka Typed](typed.md) +This module will be deprecated as it will be superseded by the @ref:[Akka Typed](actors-typed.md) project which is currently being developed in open preview mode. @@@ diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 326a703313..c240d52bbe 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -5,9 +5,7 @@ package akka.persistence.typed.scaladsl import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior -import akka.actor.typed.Terminated +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Terminated } import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.TimerScheduler @@ -17,26 +15,40 @@ object PersistentActorCompileOnlyTest { import akka.persistence.typed.scaladsl.PersistentActor._ object Simple { - sealed trait MyCommand - case class Cmd(data: String) extends MyCommand + //#command + sealed trait SimpleCommand + case class Cmd(data: String) extends SimpleCommand - sealed trait MyEvent - case class Evt(data: String) extends MyEvent + sealed trait SimpleEvent + case class Evt(data: String) extends SimpleEvent + //#command + //#state case class ExampleState(events: List[String] = Nil) + //#state - PersistentActor.immutable[MyCommand, MyEvent, ExampleState]( - persistenceId = "sample-id-1", - - initialState = ExampleState(Nil), - - commandHandler = CommandHandler.command { + //#command-handler + val commandHandler: CommandHandler[SimpleCommand, SimpleEvent, ExampleState] = + CommandHandler.command { case Cmd(data) ⇒ Effect.persist(Evt(data)) - }, + } + //#command-handler + + //#event-handler + val eventHandler: (ExampleState, SimpleEvent) ⇒ (ExampleState) = { + case (state, Evt(data)) ⇒ state.copy(data :: state.events) + } + //#event-handler + + //#behavior + val simpleBehavior: PersistentBehavior[SimpleCommand, SimpleEvent, ExampleState] = + PersistentActor.immutable[SimpleCommand, SimpleEvent, ExampleState]( + persistenceId = "sample-id-1", + initialState = ExampleState(Nil), + commandHandler = commandHandler, + eventHandler = eventHandler) + //#behavior - eventHandler = { - case (state, Evt(data)) ⇒ state.copy(data :: state.events) - }) } object WithAck { @@ -58,7 +70,9 @@ object PersistentActorCompileOnlyTest { commandHandler = CommandHandler.command { case Cmd(data, sender) ⇒ Effect.persist(Evt(data)) - .andThen { sender ! Ack } + .andThen { + sender ! Ack + } }, eventHandler = { @@ -375,7 +389,9 @@ object PersistentActorCompileOnlyTest { Effect.none case CheerUp(sender) ⇒ changeMoodIfNeeded(state, Happy) - .andThen { sender ! Ack } + .andThen { + sender ! Ack + } case Remember(memory) ⇒ // A more elaborate example to show we still have full control over the effects // if needed (e.g. when some logic is factored out but you want to add more effects) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala new file mode 100644 index 0000000000..deeb90dc29 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package docs.akka.persistence.typed + +import akka.actor.typed.Behavior +import akka.persistence.typed.scaladsl.PersistentActor + +object BasicPersistentActorSpec { + + //#structure + sealed trait Command + sealed trait Event + case class State() + + val behavior: Behavior[Command] = + PersistentActor.immutable[Command, Event, State]( + persistenceId = "abc", + initialState = State(), + commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ⇒ ??? }, + eventHandler = (state, evt) ⇒ ???) + //#structure + + //#recovery + val recoveryBehavior: Behavior[Command] = + PersistentActor.immutable[Command, Event, State]( + persistenceId = "abc", + initialState = State(), + commandHandler = PersistentActor.CommandHandler { (ctx, state, cmd) ⇒ ??? }, + eventHandler = (state, evt) ⇒ ???) + .onRecoveryCompleted { (ctx, state) ⇒ + ??? + } + //#recovery + +} diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala new file mode 100644 index 0000000000..5822f5adae --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentActorSpec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package docs.akka.persistence.typed + +import akka.Done +import akka.actor.typed.{ ActorRef, Behavior } +import akka.persistence.typed.scaladsl.PersistentActor +import akka.persistence.typed.scaladsl.PersistentActor.{ CommandHandler, Effect } + +object InDepthPersistentActorSpec { + + //#event + sealed trait BlogEvent extends Serializable + final case class PostAdded( + postId: String, + content: PostContent) extends BlogEvent + + final case class BodyChanged( + postId: String, + newBody: String) extends BlogEvent + final case class Published(postId: String) extends BlogEvent + //#event + + //#state + object BlogState { + val empty = BlogState(None, published = false) + } + + final case class BlogState(content: Option[PostContent], published: Boolean) { + def withContent(newContent: PostContent): BlogState = + copy(content = Some(newContent)) + def isEmpty: Boolean = content.isEmpty + def postId: String = content match { + case Some(c) ⇒ c.postId + case None ⇒ throw new IllegalStateException("postId unknown before post is created") + } + } + //#state + + //#commands + sealed trait BlogCommand extends Serializable + final case class AddPost(content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand + final case class AddPostDone(postId: String) + final case class GetPost(replyTo: ActorRef[PostContent]) extends BlogCommand + final case class ChangeBody(newBody: String, replyTo: ActorRef[Done]) extends BlogCommand + final case class Publish(replyTo: ActorRef[Done]) extends BlogCommand + final case object PassivatePost extends BlogCommand + final case class PostContent(postId: String, title: String, body: String) + //#commands + + //#initial-command-handler + private def initial: CommandHandler[BlogCommand, BlogEvent, BlogState] = + CommandHandler { (ctx, state, cmd) ⇒ + cmd match { + case AddPost(content, replyTo) ⇒ + val evt = PostAdded(content.postId, content) + Effect.persist(evt).andThen { state2 ⇒ + // After persist is done additional side effects can be performed + replyTo ! AddPostDone(content.postId) + } + case PassivatePost ⇒ + Effect.stop + case _ ⇒ + Effect.unhandled + } + } + //#initial-command-handler + + //#post-added-command-handler + private def postAdded: CommandHandler[BlogCommand, BlogEvent, BlogState] = { + CommandHandler { (ctx, state, cmd) ⇒ + cmd match { + case ChangeBody(newBody, replyTo) ⇒ + val evt = BodyChanged(state.postId, newBody) + Effect.persist(evt).andThen { _ ⇒ + replyTo ! Done + } + case Publish(replyTo) ⇒ + Effect.persist(Published(state.postId)).andThen { _ ⇒ + println(s"Blog post ${state.postId} was published") + replyTo ! Done + } + case GetPost(replyTo) ⇒ + replyTo ! state.content.get + Effect.none + case _: AddPost ⇒ + Effect.unhandled + case PassivatePost ⇒ + Effect.stop + } + } + } + //#post-added-command-handler + + //#by-state-command-handler + private def commandHandler: CommandHandler[BlogCommand, BlogEvent, BlogState] = CommandHandler.byState { + case state if state.isEmpty ⇒ initial + case state if !state.isEmpty ⇒ postAdded + } + //#by-state-command-handler + + //#event-handler + private def eventHandler(state: BlogState, event: BlogEvent): BlogState = + event match { + case PostAdded(postId, content) ⇒ + state.withContent(content) + + case BodyChanged(_, newBody) ⇒ + state.content match { + case Some(c) ⇒ state.copy(content = Some(c.copy(body = newBody))) + case None ⇒ state + } + + case Published(_) ⇒ + state.copy(published = true) + } + //#event-handler + + //#behavior + def behavior: Behavior[BlogCommand] = + PersistentActor.immutable[BlogCommand, BlogEvent, BlogState]( + persistenceId = "abc", + initialState = BlogState.empty, + commandHandler, + eventHandler) + //#behavior +} + +class InDepthPersistentActorSpec { + +}