From 9cecba34558ca3e5b21ad2619d9879ab63b79426 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 6 Jul 2018 16:35:07 +0200 Subject: [PATCH] Composable javadsl CommandHandlerBuilder, #25226 (#25227) * Composable javadsl CommandHandlerBuilder, #25226 * CommandHandlerBuilder with stateClass and statePredicate parameters * CommandHandlerBuilder.orElse * Remove ActorContext from handler function signatures, can be passed in constructor --- akka-actor/src/main/java/akka/japi/pf/FI.java | 6 +- .../src/main/scala/akka/japi/JavaAPI.scala | 2 +- .../src/main/paradox/typed/persistence.md | 21 +- .../typed/javadsl/CommandHandler.scala | 195 ++++++------------ .../typed/javadsl/EventHandler.scala | 39 +++- .../typed/javadsl/PersistentBehavior.scala | 52 ++--- .../PersistentActorCompileOnlyTest.java | 18 +- .../javadsl/PersistentActorJavaDslTest.java | 181 ++++++++-------- .../typed/BasicPersistentBehaviorsTest.java | 4 +- .../typed/InDepthPersistentBehaviorTest.java | 170 +++++++++------ .../persistence/typed/MovieWatchList.java | 124 +++++++++++ .../persistence/typed/OptionalBlogState.java | 180 ++++++++++++++++ .../persistence/typed/AccountExample1.scala | 98 +++++++++ .../persistence/typed/AccountExample2.scala | 101 +++++++++ .../persistence/typed/MovieWatchList.scala | 54 +++++ 15 files changed, 920 insertions(+), 325 deletions(-) create mode 100644 akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java create mode 100644 akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java create mode 100644 akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala create mode 100644 akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala create mode 100644 akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala diff --git a/akka-actor/src/main/java/akka/japi/pf/FI.java b/akka-actor/src/main/java/akka/japi/pf/FI.java index 97bf086c69..fad9fe7eea 100644 --- a/akka-actor/src/main/java/akka/japi/pf/FI.java +++ b/akka-actor/src/main/java/akka/japi/pf/FI.java @@ -55,7 +55,8 @@ public final class FI { /** * Functional interface for a predicate. * - * This class is kept for compatibility, but for future API's please prefer {@link akka.japi.function.Predicate}. + * This class is kept for compatibility, but for future API's please prefer {@link java.util.function.Predicate} + * or {@link akka.japi.function.Predicate}. * * @param the type that the predicate will operate on. */ @@ -171,7 +172,8 @@ public final class FI { /** * Package scoped functional interface for a predicate. Used internally to match against arbitrary types. * - * This class is kept for compatibility, but for future API's please prefer {@link akka.japi.function.Predicate}. + * This class is kept for compatibility, but for future API's please prefer {@link java.util.function.Predicate} + * or {@link akka.japi.function.Predicate}. */ static interface Predicate { /** diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index ed0c26a7b7..3c874ce2c1 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -57,7 +57,7 @@ trait Effect { /** * Java API: Defines a criteria and determines whether the parameter meets this criteria. * - * This class is kept for compatibility, but for future API's please prefer [[akka.japi.function.Predicate]]. + * This class is kept for compatibility, but for future API's please prefer [[java.util.function.Predicate]]. */ trait Predicate[T] { def test(param: T): Boolean diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 33bd71d31c..5059581dfc 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -50,7 +50,8 @@ Next we'll discuss each of these in detail. ### Command handler -The command handler is a function with 3 parameters for the `ActorContext`, current `State`, and `Command`. +The command handler is a function with @java[2 parameters for]@scala[3 parameters for the `ActorContext`,] +current `State` and `Command`. A command handler returns an `Effect` directive that defines what event or events, if any, to persist. @java[Effects are created using a factory that is returned via the `Effect()` method] @@ -123,6 +124,8 @@ Java The `PersistentBehavior` can then be run as with any plain typed actor as described in [typed actors documentation](actors-typed.md). +@java[The `ActorContext` can be obtained with `Behaviors.setup` and be passed as a constructor parameter.] + ## Larger example After processing a message, plain typed actors are able to return the `Behavior` that is used @@ -158,16 +161,18 @@ Scala Java : @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #commands } -The command handler to process each command is decided by a `CommandHandler.byState` command handler, -which is a function from `State => CommandHandler`: +@java[The commandler handler to process each command is decided by the state class (or state predicate) that is +given to the `commandHandlerBuilder` and the match cases in the builders. Several builders can be composed with `orElse`:] +@scala[The command handler to process each command is decided by a `CommandHandler.byState` command handler, +which is a function from `State => CommandHandler`:] Scala : @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #by-state-command-handler } Java -: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #by-state-command-handler } +: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #command-handler } -This can refer to many other `CommandHandler`s e.g one for a post that hasn't been started: +The @java[`CommandHandlerBuilder`]@scala[`CommandHandler`] for a post that hasn't been initialized with content: Scala : @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #initial-command-handler } @@ -175,7 +180,7 @@ Scala Java : @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #initial-command-handler } -And a different `CommandHandler` for after the post has been added: +And a different @java[`CommandHandlerBuilder`]@scala[`CommandHandler`] for after the post content has been added: Scala : @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #post-added-command-handler } @@ -193,7 +198,7 @@ Scala Java : @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #event-handler } -And finally the behavior is created from the `byState` command handler: +And finally the behavior is created @scala[from the `PersistentBehaviors.receive`]: Scala : @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior } @@ -219,7 +224,7 @@ Scala Java : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #recovery } -The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`, +The `onRecoveryCompleted` takes @scala[an `ActorContext` and] the current `State`, and doesn't return anything. ## Tagging diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala index 461b1862cd..5d900532b7 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala @@ -4,9 +4,10 @@ package akka.persistence.typed.javadsl -import akka.actor.typed.javadsl.ActorContext +import java.util.function.BiFunction +import java.util.function.Predicate + import akka.annotation.InternalApi -import akka.japi.pf.FI import akka.persistence.typed.internal._ import akka.util.OptionVal @@ -17,43 +18,78 @@ import akka.util.OptionVal */ @FunctionalInterface trait CommandHandler[Command, Event, State] { - def apply(ctx: ActorContext[Command], state: State, command: Command): Effect[Event, State] -} -/** - * FunctionalInterface for reacting on signals - * - * Used with [[CommandHandlerBuilder]] to setup the behavior of a [[PersistentBehavior]] - */ -@FunctionalInterface -trait CommandToEffect[Command, MsgCommand <: Command, Event, State] { - /** - * @return An effect for the given command - */ - def apply(ctx: ActorContext[Command], state: State, command: MsgCommand): Effect[Event, State] + def apply(state: State, command: Command): Effect[Event, State] } -final class CommandHandlerBuilder[Command, Event, State] @InternalApi private[persistence] () { +object CommandHandlerBuilder { - private final case class CommandHandlerCase(predicate: Command ⇒ Boolean, handler: CommandToEffect[Command, Command, Event, State]) - - private var cases: List[CommandHandlerCase] = Nil - - private def addCase(predicate: Command ⇒ Boolean, handler: CommandToEffect[Command, Command, Event, State]): Unit = { - cases = CommandHandlerCase(predicate, handler) :: cases + private val _trueStatePredicate: Predicate[Any] = new Predicate[Any] { + override def test(t: Any): Boolean = true } + + private def trueStatePredicate[S]: Predicate[S] = _trueStatePredicate.asInstanceOf[Predicate[S]] + + /** + * @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass` + * @return A new, mutable, command handler builder + */ + def builder[Command, Event, S <: State, State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] = + new CommandHandlerBuilder(stateClass, statePredicate = trueStatePredicate) + + /** + * @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`, + * useful for example when state type is an Optional + * @return A new, mutable, command handler builder + */ + def builder[Command, Event, State](statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] = + new CommandHandlerBuilder(classOf[Any].asInstanceOf[Class[State]], statePredicate) + + /** + * INTERNAL API + */ + @InternalApi private final case class CommandHandlerCase[Command, Event, State]( + commandPredicate: Command ⇒ Boolean, + statePredicate: State ⇒ Boolean, + handler: BiFunction[State, Command, Effect[Event, State]]) +} + +final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalApi private[persistence] ( + val stateClass: Class[S], val statePredicate: Predicate[S]) { + import CommandHandlerBuilder.CommandHandlerCase + + private var cases: List[CommandHandlerCase[Command, Event, State]] = Nil + + private def addCase(predicate: Command ⇒ Boolean, handler: BiFunction[S, Command, Effect[Event, State]]): Unit = { + cases = CommandHandlerCase[Command, Event, State]( + commandPredicate = predicate, + statePredicate = state ⇒ stateClass.isAssignableFrom(state.getClass) && statePredicate.test(state.asInstanceOf[S]), + handler.asInstanceOf[BiFunction[State, Command, Effect[Event, State]]]) :: cases + } + /** * Match any command which the given `predicate` returns true for */ - def matchCommand(predicate: FI.TypedPredicate[Command], commandToEffect: CommandToEffect[Command, Command, Event, State]): CommandHandlerBuilder[Command, Event, State] = { - addCase(cmd ⇒ predicate.defined(cmd), commandToEffect) + def matchCommand(predicate: Predicate[Command], handler: BiFunction[S, Command, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = { + addCase(cmd ⇒ predicate.test(cmd), handler) this } - def matchCommand[C <: Command](commandClass: Class[C], commandToEffect: CommandToEffect[Command, C, Event, State]): CommandHandlerBuilder[Command, Event, State] = { - addCase(cmd ⇒ commandClass.isAssignableFrom(cmd.getClass), commandToEffect.asInstanceOf[CommandToEffect[Command, Command, Event, State]]) + def matchCommand[C <: Command](commandClass: Class[C], handler: BiFunction[S, C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = { + addCase(cmd ⇒ commandClass.isAssignableFrom(cmd.getClass), handler.asInstanceOf[BiFunction[S, Command, Effect[Event, State]]]) this } + /** + * Compose this builder with another builder. The handlers in this builder will be tried first followed + * by the handlers in `other`. + */ + def orElse[S2 <: State](other: CommandHandlerBuilder[Command, Event, S2, State]): CommandHandlerBuilder[Command, Event, S2, State] = { + val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.stateClass, other.statePredicate) + // problem with overloaded constructor with `cases` as parameter + newBuilder.cases = other.cases ::: cases + newBuilder + } + /** * Builds a Command Handler and resets this builder */ @@ -61,14 +97,15 @@ final class CommandHandlerBuilder[Command, Event, State] @InternalApi private[pe val builtCases = cases.reverse.toArray cases = Nil new CommandHandler[Command, Event, State] { - override def apply(ctx: ActorContext[Command], state: State, command: Command) = { + override def apply(state: State, command: Command): Effect[Event, State] = { var idx = 0 var effect: OptionVal[Effect[Event, State]] = OptionVal.None while (idx < builtCases.length && effect.isEmpty) { val curr = builtCases(idx) - if (curr.predicate(command)) { - effect = OptionVal.Some(curr.handler.apply(ctx, state, command)) + if (curr.statePredicate(state) && curr.commandPredicate(command)) { + val x: Effect[Event, State] = curr.handler.apply(state, command) + effect = OptionVal.Some(x) } idx += 1 } @@ -82,104 +119,4 @@ final class CommandHandlerBuilder[Command, Event, State] @InternalApi private[pe } } -/** - * Mutable builder for nested Java [[CommandHandler]]s where different states should have different command handlers. - * [[CommandHandler]] per state are added with the `match` methods and finally a [[CommandHandler]] is created with [[ByStateCommandHandlerBuilder#build]]. - * - * Match statements are appended and evaluated in order, the first one to match is used. If no match is found when - * evaluating the built [[CommandHandler]] for a given state a [[scala.MatchError]] is thrown. - */ - -final class ByStateCommandHandlerBuilder[Command, Event, State] @InternalApi private[javadsl] () { - - private final case class ByStateCase(predicate: State ⇒ Boolean, handler: CommandHandler[Command, Event, State]) - - private var cases: List[ByStateCase] = Nil - - private def addCase(predicate: State ⇒ Boolean, handler: CommandHandler[Command, Event, State]): Unit = { - cases = ByStateCase(predicate, handler) :: cases - } - - /** - * Match any state that the `predicate` returns true for. - */ - def matchState(predicate: FI.TypedPredicate[State], commandHandler: CommandHandler[Command, Event, State]): ByStateCommandHandlerBuilder[Command, Event, State] = { - addCase( - predicate.defined, - commandHandler.asInstanceOf[CommandHandler[Command, Event, State]]) - this - } - - /** - * Match any state which is an instance of `S` or a subtype of `S` - * - * Throws `java.lang.IllegalArgumentException` if `stateClass` is not a subtype of the root `State` this builder was created with - */ - def matchState[S <: State](stateClass: Class[S], commandHandler: CommandHandler[Command, Event, S]): ByStateCommandHandlerBuilder[Command, Event, State] = { - addCase( - state ⇒ stateClass.isAssignableFrom(state.getClass), - commandHandler.asInstanceOf[CommandHandler[Command, Event, State]]) - this - } - - /** - * Match any state which is an instance of `S` or a subtype of `S` and for which the `predicate` returns true - * - * Throws `java.lang.IllegalArgumentException` if `stateClass` is not a subtype of the root `State` this builder was created with - */ - def matchState[S <: State](stateClass: Class[S], predicate: FI.TypedPredicate[S], commandHandler: CommandHandler[Command, Event, S]): ByStateCommandHandlerBuilder[Command, Event, State] = { - addCase( - state ⇒ stateClass.isAssignableFrom(state.getClass) && predicate.defined(state.asInstanceOf[S]), - commandHandler.asInstanceOf[CommandHandler[Command, Event, State]]) - this - } - - /** - * Match states that are equal to the given `state` instance - */ - def matchExact[S <: State](state: S, commandHandler: CommandHandler[Command, Event, S]): ByStateCommandHandlerBuilder[Command, Event, State] = { - addCase(s ⇒ s.equals(state), commandHandler.asInstanceOf[CommandHandler[Command, Event, State]]) - this - } - - /** - * Match any state - * - * Builds and returns the handler since this will not let through any states to subsequent match statements - */ - def matchAny(commandHandler: CommandHandler[Command, Event, State]): CommandHandler[Command, Event, State] = { - addCase(_ ⇒ true, commandHandler) - build() - } - - /** - * Build a command handler from the appended cases. The returned handler will throw a [[scala.MatchError]] if applied to - * a command that has no defined match. - * - * The builder is reset to empty after build has been called. - */ - def build(): CommandHandler[Command, Event, State] = { - val builtCases = cases.reverse.toArray - cases = Nil - new CommandHandler[Command, Event, State] { - override def apply(ctx: ActorContext[Command], state: State, command: Command) = { - var idx = 0 - var effect: OptionVal[Effect[Event, State]] = OptionVal.None - - while (idx < builtCases.length && effect.isEmpty) { - val curr = builtCases(idx) - if (curr.predicate(state)) { - effect = OptionVal.Some(curr.handler.apply(ctx, state, command)) - } - idx += 1 - } - - effect match { - case OptionVal.None ⇒ throw new MatchError(s"No match found for command of type [${command.getClass.getName}]") - case OptionVal.Some(e) ⇒ e.asInstanceOf[EffectImpl[Event, State]] - } - } - } - } -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala index 3f65dc7e14..b058c24e46 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala @@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl import java.util.function.BiFunction +import akka.annotation.InternalApi import akka.util.OptionVal /** @@ -21,16 +22,23 @@ trait EventHandler[Event, State] { object EventHandlerBuilder { def builder[Event, State >: Null](): EventHandlerBuilder[Event, State] = new EventHandlerBuilder[Event, State]() + + /** + * INTERNAL API + */ + @InternalApi private final case class EventHandlerCase[Event, State]( + eventPredicate: Event ⇒ Boolean, + statePredicate: State ⇒ Boolean, + handler: BiFunction[State, Event, State]) } final class EventHandlerBuilder[Event, State >: Null]() { + import EventHandlerBuilder.EventHandlerCase - private final case class EventHandlerCase(predicate: Event ⇒ Boolean, handler: BiFunction[State, Event, State]) - - private var cases: List[EventHandlerCase] = Nil + private var cases: List[EventHandlerCase[Event, State]] = Nil private def addCase(predicate: Event ⇒ Boolean, handler: BiFunction[State, Event, State]): Unit = { - cases = EventHandlerCase(predicate, handler) :: cases + cases = EventHandlerCase[Event, State](predicate, _ ⇒ true, handler) :: cases } /** @@ -41,6 +49,16 @@ final class EventHandlerBuilder[Event, State >: Null]() { this } + def matchEvent[E <: Event, S <: State](eventClass: Class[E], stateClass: Class[S], + biFunction: BiFunction[S, E, State]): EventHandlerBuilder[Event, State] = { + + cases = EventHandlerCase[Event, State]( + eventPredicate = e ⇒ eventClass.isAssignableFrom(e.getClass), + statePredicate = s ⇒ stateClass.isAssignableFrom(s.getClass), + biFunction.asInstanceOf[BiFunction[State, Event, State]]) :: cases + this + } + /** * Match any event * @@ -51,6 +69,17 @@ final class EventHandlerBuilder[Event, State >: Null]() { build() } + /** + * Compose this builder with another builder. The handlers in this builder will be tried first followed + * by the handlers in `other`. + */ + def orElse(other: EventHandlerBuilder[Event, State]): EventHandlerBuilder[Event, State] = { + val newBuilder = new EventHandlerBuilder[Event, State] + // problem with overloaded constructor with `cases` as parameter + newBuilder.cases = other.cases ::: cases + newBuilder + } + /** * Builds and returns a handler from the appended states. The returned [[EventHandler]] will throw a [[scala.MatchError]] * if applied to an event that has no defined case. @@ -66,7 +95,7 @@ final class EventHandlerBuilder[Event, State >: Null]() { var idx = 0 while (idx < builtCases.length && result.isEmpty) { val curr = builtCases(idx) - if (curr.predicate(event)) { + if (curr.statePredicate(state) && curr.eventPredicate(event)) { result = OptionVal.Some[State](curr.handler.apply(state, event)) } idx += 1 diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index e3b295279c..a3fd9aea44 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -4,6 +4,7 @@ package akka.persistence.typed.javadsl +import java.util.function.Predicate import java.util.{ Collections, Optional } import akka.actor.typed @@ -14,9 +15,10 @@ import akka.annotation.{ ApiMayChange, InternalApi } import akka.persistence.SnapshotMetadata import akka.persistence.typed.{ EventAdapter, _ } import akka.persistence.typed.internal._ - import scala.util.{ Failure, Success } +import akka.japi.pf.FI + /** Java API */ @ApiMayChange abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] { @@ -40,7 +42,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence * Implement by handling incoming commands and return an `Effect()` to persist or signal other effects * of the command handling such as stopping the behavior or others. * - * This method is only invoked when the actor is running (i.e. not replaying). + * The command handlers are only invoked when the actor is running (i.e. not replaying). * While the actor is persisting events, the incoming messages are stashed and only * delivered to the handler once persisting them has completed. */ @@ -49,7 +51,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence /** * Implement by applying the event to the current state in order to return a new state. * - * This method invoked during recovery as well as running operation of this behavior, + * The event handlers are invoked during recovery as well as running operation of this behavior, * in order to keep updating the state state. * * For that reason it is strongly discouraged to perform side-effects in this handler; @@ -58,43 +60,38 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence protected def eventHandler(): EventHandler[Event, State] /** - * @return A new, mutable, by state command handler builder + * @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass` + * @return A new, mutable, command handler builder */ - protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] = - new CommandHandlerBuilder[Command, Event, State]() + protected final def commandHandlerBuilder[S <: State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] = + CommandHandlerBuilder.builder[Command, Event, S, State](stateClass) /** - * @return A new, mutable, by state command handler builder + * @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`, + * * useful for example when state type is an Optional + * @return A new, mutable, command handler builder */ - protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] = - new ByStateCommandHandlerBuilder[Command, Event, State]() + protected final def commandHandlerBuilder(statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] = + CommandHandlerBuilder.builder[Command, Event, State](statePredicate) /** * @return A new, mutable, event handler builder */ protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] = - new EventHandlerBuilder[Event, State]() + EventHandlerBuilder.builder[Event, State]() /** * The `callback` function is called to notify the actor that the recovery process * is finished. */ - def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} + def onRecoveryCompleted(state: State): Unit = () /** * Override to get notified when a snapshot is finished. - * The default implementation logs failures at error and success writes at - * debug. * * @param result None if successful otherwise contains the exception thrown when snapshotting */ - def onSnapshot(ctx: ActorContext[Command], meta: SnapshotMetadata, result: Optional[Throwable]): Unit = { - if (result.isPresent) { - ctx.getLog.error(result.get(), "Save snapshot failed, snapshot metadata: [{}]", meta) - } else { - ctx.getLog.debug("Save snapshot successful, snapshot metadata: [{}]", meta) - } - } + def onSnapshot(meta: SnapshotMetadata, result: Optional[Throwable]): Unit = () /** * Override and define that snapshot should be saved every N events. @@ -128,7 +125,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence /** * INTERNAL API: DeferredBehavior init */ - override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { + @InternalApi override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { val snapshotWhen: (State, Event, Long) ⇒ Boolean = { (state, event, seqNr) ⇒ val n = snapshotEvery() @@ -148,13 +145,20 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence scaladsl.PersistentBehaviors.receive[Command, Event, State]( persistenceId, emptyState, - (c, state, cmd) ⇒ commandHandler()(c.asJava, state, cmd).asInstanceOf[EffectImpl[Event, State]], + (c, state, cmd) ⇒ commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], eventHandler()(_, _)) - .onRecoveryCompleted((ctx, state) ⇒ onRecoveryCompleted(ctx.asJava, state)) + .onRecoveryCompleted((ctx, state) ⇒ onRecoveryCompleted(state)) .snapshotWhen(snapshotWhen) .withTagger(tagger) .onSnapshot((ctx, meta, result) ⇒ { - onSnapshot(ctx.asJava, meta, result match { + result match { + case Success(_) ⇒ + ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + case Failure(e) ⇒ + ctx.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta) + } + + onSnapshot(meta, result match { case Success(_) ⇒ Optional.empty() case Failure(t) ⇒ Optional.of(t) }) diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index 424fa1876e..ec5f0ee66d 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -5,6 +5,7 @@ package akka.persistence.typed.javadsl; import akka.actor.Scheduler; +import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.ActorRef; import akka.persistence.typed.EventAdapter; import akka.actor.testkit.typed.javadsl.TestInbox; @@ -96,7 +97,7 @@ public class PersistentActorCompileOnlyTest { //#command-handler @Override public CommandHandler commandHandler() { - return (ctx, state, cmd) -> Effect().persist(new SimpleEvent(cmd.data)); + return (state, cmd) -> Effect().persist(new SimpleEvent(cmd.data)); } //#command-handler @@ -156,8 +157,8 @@ public class PersistentActorCompileOnlyTest { @Override public CommandHandler commandHandler() { - return commandHandlerBuilder() - .matchCommand(Cmd.class, (ctx, state, cmd) -> Effect().persist(new Evt(cmd.data)) + return commandHandlerBuilder(ExampleState.class) + .matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data)) .andThen(() -> cmd.sender.tell(new Ack()))) .build(); } @@ -254,8 +255,11 @@ public class PersistentActorCompileOnlyTest { } class MyPersistentBehavior extends PersistentBehavior { - public MyPersistentBehavior(String persistenceId) { + private final ActorContext ctx; + + public MyPersistentBehavior(String persistenceId, ActorContext ctx) { super(persistenceId); + this.ctx = ctx; } @Override @@ -265,11 +269,11 @@ public class PersistentActorCompileOnlyTest { @Override public CommandHandler commandHandler() { - return commandHandlerBuilder() + return commandHandlerBuilder(EventsInFlight.class) .matchCommand(DoSideEffect.class, - (ctx, state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data)) + (state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data)) .andThen(() -> performSideEffect(ctx.getSelf().narrow(), state.nextCorrelationId, cmd.data, ctx.getSystem().scheduler()))) - .matchCommand(AcknowledgeSideEffect.class, (ctx, state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId))) + .matchCommand(AcknowledgeSideEffect.class, (state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId))) .build(); } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 5ea1574a44..4209ca135c 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -174,29 +174,29 @@ public class PersistentActorJavaDslTest extends JUnitSuite { private static String loggingOne = "one"; - private PersistentBehavior counter(String persistenceId, ActorRef> probe) { + private Behavior counter(String persistenceId, ActorRef> probe) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>()); } - private PersistentBehavior counter(String persistenceId, - ActorRef> probe, - Function> tagger) { + private Behavior counter(String persistenceId, + ActorRef> probe, + Function> tagger) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>()); } - private PersistentBehavior counter(String persistenceId, - ActorRef> probe, - EventAdapter transformer) { + private Behavior counter(String persistenceId, + ActorRef> probe, + EventAdapter transformer) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer); } - private PersistentBehavior counter(String persistenceId) { + private Behavior counter(String persistenceId) { return counter(persistenceId, TestProbe.>create(testKit.system()).ref(), TestProbe.create(testKit.system()).ref(), @@ -207,7 +207,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { ); } - private PersistentBehavior counter( + private Behavior counter( String persistenceId, Function3 snapshot, ActorRef> snapshotProbe @@ -221,7 +221,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { new NoOpEventAdapter<>()); } - private PersistentBehavior counter( + private Behavior counter( String persistentId, ActorRef> eventProbe, ActorRef loggingProbe) { @@ -231,7 +231,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { ); } - private PersistentBehavior counter( + private Behavior counter( String persistentId, ActorRef> eventProbe, Function3 snapshot) { @@ -240,7 +240,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { ); } - private PersistentBehavior counter( + private Behavior counter( String persistentId, ActorRef> eventProbe, ActorRef loggingProbe, @@ -248,90 +248,93 @@ public class PersistentActorJavaDslTest extends JUnitSuite { Function> tagsFunction, ActorRef> snapshotProbe, EventAdapter transformer) { - return new PersistentBehavior(persistentId) { - @Override - public CommandHandler commandHandler() { - return commandHandlerBuilder() - .matchCommand(Increment.class, (ctx, state, command) -> Effect().persist(new Incremented(1))) - .matchCommand(GetValue.class, (ctx, state, command) -> { - command.replyTo.tell(state); - return Effect().none(); - }) - .matchCommand(IncrementLater.class, (ctx, state, command) -> { - ActorRef delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> { - timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10)); - return Behaviors.receive((context, o) -> Behaviors.stopped()); - })); - ctx.watchWith(delay, new DelayFinished()); - return Effect().none(); - }) - .matchCommand(DelayFinished.class, (ctx, state, finished) -> Effect().persist(new Incremented(10))) - .matchCommand(Increment100OnTimeout.class, (ctx, state, msg) -> { - ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout()); - return Effect().none(); - }) - .matchCommand(Timeout.class, - (ctx, state, msg) -> Effect().persist(timeoutEvent)) - .matchCommand(EmptyEventsListAndThenLog.class, (ctx, state, msg) -> Effect().persist(Collections.emptyList()) - .andThen(s -> loggingProbe.tell(loggingOne))) - .matchCommand(StopThenLog.class, - (ctx, state, msg) -> Effect().stop() - .andThen(s -> loggingProbe.tell(loggingOne))) - .matchCommand(IncrementTwiceAndLog.class, - (ctx, state, msg) -> Effect().persist( - Arrays.asList(new Incremented(1), new Incremented(1))) - .andThen(s -> loggingProbe.tell(loggingOne))) - .build(); - } + return Behaviors.setup(ctx -> { + return new PersistentBehavior(persistentId) { + @Override + public CommandHandler commandHandler() { + return commandHandlerBuilder(State.class) + .matchCommand(Increment.class, (state, command) -> Effect().persist(new Incremented(1))) + .matchCommand(GetValue.class, (state, command) -> { + command.replyTo.tell(state); + return Effect().none(); + }) + .matchCommand(IncrementLater.class, (state, command) -> { + ActorRef delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> { + timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10)); + return Behaviors.receive((context, o) -> Behaviors.stopped()); + })); + ctx.watchWith(delay, new DelayFinished()); + return Effect().none(); + }) + .matchCommand(DelayFinished.class, (state, finished) -> Effect().persist(new Incremented(10))) + .matchCommand(Increment100OnTimeout.class, (state, msg) -> { + ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout()); + return Effect().none(); + }) + .matchCommand(Timeout.class, + (state, msg) -> Effect().persist(timeoutEvent)) + .matchCommand(EmptyEventsListAndThenLog.class, (state, msg) -> Effect().persist(Collections.emptyList()) + .andThen(s -> loggingProbe.tell(loggingOne))) + .matchCommand(StopThenLog.class, + (state, msg) -> Effect().stop() + .andThen(s -> loggingProbe.tell(loggingOne))) + .matchCommand(IncrementTwiceAndLog.class, + (state, msg) -> Effect().persist( + Arrays.asList(new Incremented(1), new Incremented(1))) + .andThen(s -> loggingProbe.tell(loggingOne))) + .build(); - @Override - public EventHandler eventHandler() { - return eventHandlerBuilder() - .matchEvent(Incremented.class, (state, event) -> { - List newHistory = new ArrayList<>(state.history); - newHistory.add(state.value); - eventProbe.tell(Pair.create(state, event)); - return new State(state.value + event.delta, newHistory); - }) - .build(); - - } - - @Override - public State emptyState() { - return emptyState; - } - - @Override - public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) { - try { - return snapshot.apply(state, event, sequenceNr); - } catch (Exception e) { - throw new RuntimeException(e); } - } - @Override - public Set tagsFor(Incremented incremented) { - try { - return tagsFunction.apply(incremented); - } catch (Exception e) { - throw new RuntimeException(e); + @Override + public EventHandler eventHandler() { + return eventHandlerBuilder() + .matchEvent(Incremented.class, (state, event) -> { + List newHistory = new ArrayList<>(state.history); + newHistory.add(state.value); + eventProbe.tell(Pair.create(state, event)); + return new State(state.value + event.delta, newHistory); + }) + .build(); + } - } - @Override - public void onSnapshot(ActorContext ctx, SnapshotMetadata meta, Optional result) { - snapshotProbe.tell(result); - } + @Override + public State emptyState() { + return emptyState; + } + + @Override + public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) { + try { + return snapshot.apply(state, event, sequenceNr); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Set tagsFor(Incremented incremented) { + try { + return tagsFunction.apply(incremented); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void onSnapshot(SnapshotMetadata meta, Optional result) { + snapshotProbe.tell(result); + } - @Override - public EventAdapter eventAdapter() { - return transformer; - } - }; + @Override + public EventAdapter eventAdapter() { + return transformer; + } + }; + }); } @Test @@ -405,7 +408,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void snapshot() { TestProbe> snapshotProbe = testKit.createTestProbe(); - PersistentBehavior snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref()); + Behavior snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref()); ActorRef c = testKit.spawn(snapshoter); c.tell(Increment.instance); c.tell(Increment.instance); diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java index cc41216956..fe154f92cf 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java @@ -35,7 +35,7 @@ public class BasicPersistentBehaviorsTest { @Override public CommandHandler commandHandler() { - return (ctx, state, command) -> { + return (state, command) -> { throw new RuntimeException("TODO: process the command & return an Effect"); }; } @@ -49,7 +49,7 @@ public class BasicPersistentBehaviorsTest { //#recovery @Override - public void onRecoveryCompleted(ActorContext ctx, State state) { + public void onRecoveryCompleted(State state) { throw new RuntimeException("TODO: add some end-of-recovery side-effect here"); } //#recovery diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java index fa9393e63f..48d54cb052 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java @@ -6,7 +6,11 @@ package jdocs.akka.persistence.typed; import akka.Done; import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.CommandHandlerBuilder; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.PersistentBehavior; @@ -47,27 +51,41 @@ public class InDepthPersistentBehaviorTest { //#event //#state - public static class BlogState { - final Optional postContent; + interface BlogState {} + + public static class BlankState implements BlogState {} + + public static class DraftState implements BlogState { + final PostContent postContent; final boolean published; - BlogState(Optional postContent, boolean published) { + DraftState(PostContent postContent, boolean published) { this.postContent = postContent; this.published = published; } - public BlogState withContent(PostContent newContent) { - return new BlogState(Optional.of(newContent), this.published); - } - - public boolean isEmpty() { - return postContent.isPresent(); + public DraftState withContent(PostContent newContent) { + return new DraftState(newContent, this.published); } public String postId() { - return postContent.orElseGet(() -> { - throw new IllegalStateException("postId unknown before post is created"); - }).postId; + return postContent.postId; + } + } + + public static class PublishedState implements BlogState { + final PostContent postContent; + + PublishedState(PostContent postContent) { + this.postContent = postContent; + } + + public PublishedState withContent(PostContent newContent) { + return new PublishedState(newContent); + } + + public String postId() { + return postContent.postId; } } //#state @@ -132,69 +150,105 @@ public class InDepthPersistentBehaviorTest { //#behavior public static class BlogBehavior extends PersistentBehavior { + //#behavior - //#initial-command-handler - private CommandHandler initialCommandHandler = commandHandlerBuilder() - .matchCommand(AddPost.class, (ctx, state, cmd) -> { - PostAdded event = new PostAdded(cmd.content.postId, cmd.content); - return Effect().persist(event) - .andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); - }) - .matchCommand(PassivatePost.class, (ctx, state, cmd) -> Effect().stop()) - .build(); - //#initial-command-handler + private final ActorContext ctx; - //#post-added-command-handler - private CommandHandler postCommandHandler = commandHandlerBuilder() - .matchCommand(ChangeBody.class, (ctx, state, cmd) -> { - BodyChanged event = new BodyChanged(state.postId(), cmd.newBody); - return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance())); - }) - .matchCommand(Publish.class, (ctx, state, cmd) -> Effect() - .persist(new Published(state.postId())).andThen(() -> { - System.out.println("Blog post published: " + state.postId()); - cmd.replyTo.tell(Done.getInstance()); - })) - .matchCommand(GetPost.class, (ctx, state, cmd) -> { - cmd.replyTo.tell(state.postContent.get()); - return Effect().none(); - }) - .matchCommand(AddPost.class, (ctx, state, cmd) -> Effect().unhandled()) - .matchCommand(PassivatePost.class, (ctx, state, cmd) -> Effect().stop()) - .build(); - //#post-added-command-handler - - - public BlogBehavior(String persistenceId) { + public BlogBehavior(String persistenceId, ActorContext ctx) { super(persistenceId); + this.ctx = ctx; } - @Override - public BlogState emptyState() { - return new BlogState(Optional.empty(), false); + //#initial-command-handler + private CommandHandlerBuilder initialCommandHandler() { + return commandHandlerBuilder(BlankState.class) + .matchCommand(AddPost.class, (state, cmd) -> { + PostAdded event = new PostAdded(cmd.content.postId, cmd.content); + return Effect().persist(event) + .andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); + }); + } + //#initial-command-handler + + //#post-added-command-handler + private CommandHandlerBuilder draftCommandHandler() { + return commandHandlerBuilder(DraftState.class) + .matchCommand(ChangeBody.class, (state, cmd) -> { + BodyChanged event = new BodyChanged(state.postId(), cmd.newBody); + return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance())); + }) + .matchCommand(Publish.class, (state, cmd) -> Effect() + .persist(new Published(state.postId())).andThen(() -> { + System.out.println("Blog post published: " + state.postId()); + cmd.replyTo.tell(Done.getInstance()); + })) + .matchCommand(GetPost.class, (state, cmd) -> { + cmd.replyTo.tell(state.postContent); + return Effect().none(); + }); } - //#by-state-command-handler + private CommandHandlerBuilder publishedCommandHandler() { + return commandHandlerBuilder(PublishedState.class) + .matchCommand(ChangeBody.class, (state, cmd) -> { + BodyChanged event = new BodyChanged(state.postId(), cmd.newBody); + return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance())); + }) + .matchCommand(GetPost.class, (state, cmd) -> { + cmd.replyTo.tell(state.postContent); + return Effect().none(); + }); + } + + private CommandHandlerBuilder commonCommandHandler() { + return commandHandlerBuilder(BlogState.class) + .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) + .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); + } + //#post-added-command-handler + + + //#command-handler @Override public CommandHandler commandHandler() { - return byStateCommandHandlerBuilder() - .matchState(BlogState.class, (state) -> !state.postContent.isPresent(), initialCommandHandler) - .matchState(BlogState.class, (state) -> state.postContent.isPresent(), postCommandHandler) - .build(); + return + initialCommandHandler() + .orElse(draftCommandHandler()) + .orElse(publishedCommandHandler()) + .orElse(commonCommandHandler()) + .build(); } - //#by-state-command-handler + //#command-handler //#event-handler @Override public EventHandler eventHandler() { return eventHandlerBuilder() - .matchEvent(PostAdded.class, (state, event) -> state.withContent(event.content)) - .matchEvent(BodyChanged.class, (state, newBody) -> - new BlogState(state.postContent.map(pc -> new PostContent(pc.postId, pc.title, newBody.newBody)), state.published)) - .matchEvent(Published.class, (state, event) -> new BlogState(state.postContent, true)) - .build(); + .matchEvent(PostAdded.class, (state, event) -> + new DraftState(event.content, false)) + .matchEvent(BodyChanged.class, DraftState.class, (state, chg) -> + state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody))) + .matchEvent(BodyChanged.class, PublishedState.class, (state, chg) -> + state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody))) + .matchEvent(Published.class, DraftState.class, (state, event) -> + new PublishedState(state.postContent)) + .build(); } //#event-handler + + //#behavior + public static Behavior behavior(String entityId) { + return Behaviors.setup(ctx -> + new BlogBehavior("Blog-" + entityId, ctx) + ); + } + + @Override + public BlogState emptyState() { + return new BlankState(); + } + + // commandHandler, eventHandler as in above snippets } //#behavior } diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java new file mode 100644 index 0000000000..455e30d059 --- /dev/null +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/MovieWatchList.java @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.PersistentBehavior; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class MovieWatchList extends PersistentBehavior { + + interface Command { + } + + public static class AddMovie implements Command { + public final String movieId; + + public AddMovie(String movieId) { + this.movieId = movieId; + } + } + + public static class RemoveMovie implements Command { + public final String movieId; + + public RemoveMovie(String movieId) { + this.movieId = movieId; + } + } + + interface Event { + } + + public static class MovieAdded implements Event { + public final String movieId; + + public MovieAdded(String movieId) { + this.movieId = movieId; + } + } + + public static class MovieRemoved implements Event { + public final String movieId; + + public MovieRemoved(String movieId) { + this.movieId = movieId; + } + } + + public static class GetMovieList implements Command { + public final ActorRef replyTo; + + public GetMovieList(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static class MovieList { + public final Set movieIds; + + public MovieList(Set movieIds) { + this.movieIds = Collections.unmodifiableSet(movieIds); + } + + public MovieList add(String movieId) { + Set newSet = new HashSet<>(movieIds); + newSet.add(movieId); + return new MovieList(newSet); + } + + public MovieList remove(String movieId) { + Set newSet = new HashSet<>(movieIds); + newSet.remove(movieId); + return new MovieList(newSet); + } + } + + public static Behavior behavior(String userId) { + return new MovieWatchList("movies-" + userId); + } + + public MovieWatchList(String persistenceId) { + super(persistenceId); + } + + @Override + public MovieList emptyState() { + return new MovieList(Collections.emptySet()); + } + + @Override + public CommandHandler commandHandler() { + return commandHandlerBuilder(MovieList.class) + .matchCommand(AddMovie.class, (state, cmd) -> { + return Effect().persist(new MovieAdded(cmd.movieId)); + }) + .matchCommand(RemoveMovie.class, (state, cmd) -> { + return Effect().persist(new MovieRemoved(cmd.movieId)); + }) + .matchCommand(GetMovieList.class, (state, cmd) -> { + cmd.replyTo.tell(state); + return Effect().none(); + }) + .build(); + } + + @Override + public EventHandler eventHandler() { + return eventHandlerBuilder() + .matchEvent(MovieAdded.class, (state, event) -> state.add(event.movieId)) + .matchEvent(MovieRemoved.class, (state, event) -> state.remove(event.movieId)) + .build(); + } + + +} diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java new file mode 100644 index 0000000000..9792155bcb --- /dev/null +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.CommandHandlerBuilder; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.PersistentBehavior; + +import java.util.Optional; + +public class OptionalBlogState { + + interface BlogEvent { + } + public static class PostAdded implements BlogEvent { + private final String postId; + private final PostContent content; + + public PostAdded(String postId, PostContent content) { + this.postId = postId; + this.content = content; + } + } + + public static class BodyChanged implements BlogEvent { + private final String postId; + private final String newBody; + + public BodyChanged(String postId, String newBody) { + this.postId = postId; + this.newBody = newBody; + } + } + + public static class Published implements BlogEvent { + private final String postId; + + public Published(String postId) { + this.postId = postId; + } + } + + public static class BlogState { + final PostContent postContent; + final boolean published; + + BlogState(PostContent postContent, boolean published) { + this.postContent = postContent; + this.published = published; + } + + public BlogState withContent(PostContent newContent) { + return new BlogState(newContent, this.published); + } + + public String postId() { + return postContent.postId; + } + } + + public interface BlogCommand { + } + public static class AddPost implements BlogCommand { + final PostContent content; + final ActorRef replyTo; + + public AddPost(PostContent content, ActorRef replyTo) { + this.content = content; + this.replyTo = replyTo; + } + } + public static class AddPostDone implements BlogCommand { + final String postId; + + public AddPostDone(String postId) { + this.postId = postId; + } + } + public static class GetPost implements BlogCommand { + final ActorRef replyTo; + + public GetPost(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + public static class ChangeBody implements BlogCommand { + final String newBody; + final ActorRef replyTo; + + public ChangeBody(String newBody, ActorRef replyTo) { + this.newBody = newBody; + this.replyTo = replyTo; + } + } + public static class Publish implements BlogCommand { + final ActorRef replyTo; + + public Publish(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + public static class PassivatePost implements BlogCommand { + + } + public static class PostContent implements BlogCommand { + final String postId; + final String title; + final String body; + + public PostContent(String postId, String title, String body) { + this.postId = postId; + this.title = title; + this.body = body; + } + } + + public static class BlogBehavior extends PersistentBehavior> { + + private CommandHandlerBuilder, Optional> initialCommandHandler() { + return commandHandlerBuilder(state -> !state.isPresent()) + .matchCommand(AddPost.class, (state, cmd) -> { + PostAdded event = new PostAdded(cmd.content.postId, cmd.content); + return Effect().persist(event) + .andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); + }) + .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); + } + + private CommandHandlerBuilder, Optional> postCommandHandler() { + return commandHandlerBuilder(state -> state.isPresent()) + .matchCommand(ChangeBody.class, (state, cmd) -> { + BodyChanged event = new BodyChanged(state.get().postId(), cmd.newBody); + return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance())); + }) + .matchCommand(Publish.class, (state, cmd) -> Effect() + .persist(new Published(state.get().postId())).andThen(() -> { + System.out.println("Blog post published: " + state.get().postId()); + cmd.replyTo.tell(Done.getInstance()); + })) + .matchCommand(GetPost.class, (state, cmd) -> { + cmd.replyTo.tell(state.get().postContent); + return Effect().none(); + }) + .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) + .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); + } + + public BlogBehavior(String persistenceId) { + super(persistenceId); + } + + @Override + public Optional emptyState() { + return Optional.empty(); + } + + @Override + public CommandHandler> commandHandler() { + return initialCommandHandler().orElse(postCommandHandler()).build(); + } + + @Override + public EventHandler> eventHandler() { + return eventHandlerBuilder() + .matchEvent(PostAdded.class, (state, event) -> + Optional.of(new BlogState(event.content, false))) + .matchEvent(BodyChanged.class, (state, chg) -> + state.map(blogState -> blogState.withContent( + new PostContent(blogState.postId(), blogState.postContent.title, chg.newBody)))) + .matchEvent(Published.class, (state, event) -> + state.map(blogState -> new BlogState(blogState.postContent, true))) + .build(); + } + } +} diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala new file mode 100644 index 0000000000..1d9fed3a82 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample1.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package docs.akka.persistence.typed + +import akka.actor.typed.Behavior +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler + +object AccountExample1 { + + sealed trait AccountCommand + case object CreateAccount extends AccountCommand + case class Deposit(amount: Double) extends AccountCommand + case class Withdraw(amount: Double) extends AccountCommand + case object CloseAccount extends AccountCommand + + sealed trait AccountEvent + case object AccountCreated extends AccountEvent + case class Deposited(amount: Double) extends AccountEvent + case class Withdrawn(amount: Double) extends AccountEvent + case object AccountClosed extends AccountEvent + + sealed trait Account + case class OpenedAccount(balance: Double) extends Account + case object ClosedAccount extends Account + + private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = + CommandHandler.command { + case CreateAccount ⇒ Effect.persist(AccountCreated) + case _ ⇒ Effect.unhandled + } + + private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = { + case (ctx, Some(acc: OpenedAccount), cmd) ⇒ cmd match { + case Deposit(amount) ⇒ Effect.persist(Deposited(amount)) + + case Withdraw(amount) ⇒ + if ((acc.balance - amount) < 0.0) + Effect.unhandled // TODO replies are missing in this example + else { + Effect + .persist(Withdrawn(amount)) + .andThen { + case Some(OpenedAccount(balance)) ⇒ + // do some side-effect using balance + println(balance) + case _ ⇒ throw new IllegalStateException + } + } + case CloseAccount if acc.balance == 0.0 ⇒ + Effect.persist(AccountClosed) + + case CloseAccount ⇒ + Effect.unhandled + + case _ ⇒ + Effect.unhandled + } + case _ ⇒ throw new IllegalStateException + } + + private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = + CommandHandler.command(_ ⇒ Effect.unhandled) + + private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = + CommandHandler.byState { + case None ⇒ initialHandler + case Some(OpenedAccount(_)) ⇒ openedAccountHandler + case Some(ClosedAccount) ⇒ closedHandler + } + + private val eventHandler: (Option[Account], AccountEvent) ⇒ Option[Account] = { + case (None, AccountCreated) ⇒ Some(OpenedAccount(0.0)) + + case (Some(acc @ OpenedAccount(_)), Deposited(amount)) ⇒ + Some(acc.copy(balance = acc.balance + amount)) + + case (Some(acc @ OpenedAccount(_)), Withdrawn(amount)) ⇒ + Some(acc.copy(balance = acc.balance - amount)) + + case (Some(OpenedAccount(_)), AccountClosed) ⇒ + Some(ClosedAccount) + + case (state, event) ⇒ throw new RuntimeException(s"unexpected event [$event] in state [$state]") + } + + def behavior(accountNumber: String): Behavior[AccountCommand] = + PersistentBehaviors.receive[AccountCommand, AccountEvent, Option[Account]]( + persistenceId = accountNumber, + emptyState = None, + commandHandler = commandHandler, + eventHandler = eventHandler + ) +} + diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala new file mode 100644 index 0000000000..91b8a64b3b --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/AccountExample2.scala @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package docs.akka.persistence.typed + +import akka.actor.typed.Behavior +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler + +object AccountExample2 { + + sealed trait AccountCommand + case object CreateAccount extends AccountCommand + case class Deposit(amount: Double) extends AccountCommand + case class Withdraw(amount: Double) extends AccountCommand + case object CloseAccount extends AccountCommand + + sealed trait AccountEvent + case object AccountCreated extends AccountEvent + case class Deposited(amount: Double) extends AccountEvent + case class Withdrawn(amount: Double) extends AccountEvent + case object AccountClosed extends AccountEvent + + sealed trait Account { + def applyEvent(event: AccountEvent): Account + } + case object EmptyAccount extends Account { + override def applyEvent(event: AccountEvent): Account = event match { + case AccountCreated ⇒ OpenedAccount(0.0) + case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]") + } + } + case class OpenedAccount(balance: Double) extends Account { + override def applyEvent(event: AccountEvent): Account = event match { + case Deposited(amount) ⇒ copy(balance = balance + amount) + case Withdrawn(amount) ⇒ copy(balance = balance - amount) + case AccountClosed ⇒ ClosedAccount + case _ ⇒ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]") + } + } + case object ClosedAccount extends Account { + override def applyEvent(event: AccountEvent): Account = + throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]") + } + + private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Account] = + CommandHandler.command { + case CreateAccount ⇒ Effect.persist(AccountCreated) + case _ ⇒ Effect.unhandled + } + + private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Account] = { + case (ctx, acc: OpenedAccount, cmd) ⇒ cmd match { + case Deposit(amount) ⇒ Effect.persist(Deposited(amount)) + + case Withdraw(amount) ⇒ + if ((acc.balance - amount) < 0.0) + Effect.unhandled // TODO replies are missing in this example + else { + Effect + .persist(Withdrawn(amount)) + .andThen { + case OpenedAccount(balance) ⇒ + // do some side-effect using balance + println(balance) + case _ ⇒ throw new IllegalStateException + } + } + case CloseAccount if acc.balance == 0.0 ⇒ + Effect.persist(AccountClosed) + + case CloseAccount ⇒ + Effect.unhandled + } + case _ ⇒ throw new IllegalStateException + } + + private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Account] = + CommandHandler.command(_ ⇒ Effect.unhandled) + + private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Account] = + CommandHandler.byState { + case EmptyAccount ⇒ initialHandler + case OpenedAccount(_) ⇒ openedAccountHandler + case ClosedAccount ⇒ closedHandler + } + + private val eventHandler: (Account, AccountEvent) ⇒ Account = + (state, event) ⇒ state.applyEvent(event) + + def behavior(accountNumber: String): Behavior[AccountCommand] = + PersistentBehaviors.receive[AccountCommand, AccountEvent, Account]( + persistenceId = accountNumber, + emptyState = EmptyAccount, + commandHandler = commandHandler, + eventHandler = eventHandler + ) +} + diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala new file mode 100644 index 0000000000..868403b4c4 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/MovieWatchList.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package docs.akka.persistence.typed + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.PersistentBehaviors +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler + +object MovieWatchList { + sealed trait Command + final case class AddMovie(movieId: String) extends Command + final case class RemoveMovie(movieId: String) extends Command + final case class GetMovieList(replyTo: ActorRef[MovieList]) extends Command + + sealed trait Event + final case class MovieAdded(movieId: String) extends Event + final case class MovieRemoved(movieId: String) extends Event + + final case class MovieList(movieIds: Set[String]) { + def applyEvent(event: Event): MovieList = { + event match { + case MovieAdded(movieId) ⇒ copy(movieIds = movieIds + movieId) + case MovieRemoved(movieId) ⇒ copy(movieIds = movieIds + movieId) + } + } + } + + private val commandHandler: CommandHandler[Command, Event, MovieList] = { + (ctx, state, cmd) ⇒ + cmd match { + case AddMovie(movieId) ⇒ + Effect.persist(MovieAdded(movieId)) + case RemoveMovie(movieId) ⇒ + Effect.persist(MovieRemoved(movieId)) + case GetMovieList(replyTo) ⇒ + replyTo ! state + Effect.none + } + } + + def behavior(userId: String): Behavior[Command] = { + PersistentBehaviors.receive[Command, Event, MovieList]( + persistenceId = "movies-" + userId, + emptyState = MovieList(Set.empty), + commandHandler, + eventHandler = (state, event) ⇒ state.applyEvent(event) + ) + } + +}