Composable javadsl CommandHandlerBuilder, #25226 (#25227)

* Composable javadsl CommandHandlerBuilder, #25226
* CommandHandlerBuilder with stateClass and statePredicate parameters
* CommandHandlerBuilder.orElse
* Remove ActorContext from handler function signatures, can be
  passed in constructor
This commit is contained in:
Patrik Nordwall 2018-07-06 16:35:07 +02:00 committed by Christopher Batey
parent 46b433b47d
commit 9cecba3455
15 changed files with 920 additions and 325 deletions

View file

@ -55,7 +55,8 @@ public final class FI {
/**
* Functional interface for a predicate.
*
* This class is kept for compatibility, but for future API's please prefer {@link akka.japi.function.Predicate}.
* This class is kept for compatibility, but for future API's please prefer {@link java.util.function.Predicate}
* or {@link akka.japi.function.Predicate}.
*
* @param <T> the type that the predicate will operate on.
*/
@ -171,7 +172,8 @@ public final class FI {
/**
* Package scoped functional interface for a predicate. Used internally to match against arbitrary types.
*
* This class is kept for compatibility, but for future API's please prefer {@link akka.japi.function.Predicate}.
* This class is kept for compatibility, but for future API's please prefer {@link java.util.function.Predicate}
* or {@link akka.japi.function.Predicate}.
*/
static interface Predicate {
/**

View file

@ -57,7 +57,7 @@ trait Effect {
/**
* Java API: Defines a criteria and determines whether the parameter meets this criteria.
*
* This class is kept for compatibility, but for future API's please prefer [[akka.japi.function.Predicate]].
* This class is kept for compatibility, but for future API's please prefer [[java.util.function.Predicate]].
*/
trait Predicate[T] {
def test(param: T): Boolean

View file

@ -50,7 +50,8 @@ Next we'll discuss each of these in detail.
### Command handler
The command handler is a function with 3 parameters for the `ActorContext`, current `State`, and `Command`.
The command handler is a function with @java[2 parameters for]@scala[3 parameters for the `ActorContext`,]
current `State` and `Command`.
A command handler returns an `Effect` directive that defines what event or events, if any, to persist.
@java[Effects are created using a factory that is returned via the `Effect()` method]
@ -123,6 +124,8 @@ Java
The `PersistentBehavior` can then be run as with any plain typed actor as described in [typed actors documentation](actors-typed.md).
@java[The `ActorContext` can be obtained with `Behaviors.setup` and be passed as a constructor parameter.]
## Larger example
After processing a message, plain typed actors are able to return the `Behavior` that is used
@ -158,16 +161,18 @@ Scala
Java
: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #commands }
The command handler to process each command is decided by a `CommandHandler.byState` command handler,
which is a function from `State => CommandHandler`:
@java[The commandler handler to process each command is decided by the state class (or state predicate) that is
given to the `commandHandlerBuilder` and the match cases in the builders. Several builders can be composed with `orElse`:]
@scala[The command handler to process each command is decided by a `CommandHandler.byState` command handler,
which is a function from `State => CommandHandler`:]
Scala
: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #by-state-command-handler }
Java
: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #by-state-command-handler }
: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #command-handler }
This can refer to many other `CommandHandler`s e.g one for a post that hasn't been started:
The @java[`CommandHandlerBuilder`]@scala[`CommandHandler`] for a post that hasn't been initialized with content:
Scala
: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #initial-command-handler }
@ -175,7 +180,7 @@ Scala
Java
: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #initial-command-handler }
And a different `CommandHandler` for after the post has been added:
And a different @java[`CommandHandlerBuilder`]@scala[`CommandHandler`] for after the post content has been added:
Scala
: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #post-added-command-handler }
@ -193,7 +198,7 @@ Scala
Java
: @@snip [InDepthPersistentBehaviorTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/InDepthPersistentBehaviorTest.java) { #event-handler }
And finally the behavior is created from the `byState` command handler:
And finally the behavior is created @scala[from the `PersistentBehaviors.receive`]:
Scala
: @@snip [InDepthPersistentBehaviorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/InDepthPersistentBehaviorSpec.scala) { #behavior }
@ -219,7 +224,7 @@ Scala
Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #recovery }
The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`,
The `onRecoveryCompleted` takes @scala[an `ActorContext` and] the current `State`,
and doesn't return anything.
## Tagging

View file

@ -4,9 +4,10 @@
package akka.persistence.typed.javadsl
import akka.actor.typed.javadsl.ActorContext
import java.util.function.BiFunction
import java.util.function.Predicate
import akka.annotation.InternalApi
import akka.japi.pf.FI
import akka.persistence.typed.internal._
import akka.util.OptionVal
@ -17,43 +18,78 @@ import akka.util.OptionVal
*/
@FunctionalInterface
trait CommandHandler[Command, Event, State] {
def apply(ctx: ActorContext[Command], state: State, command: Command): Effect[Event, State]
}
/**
* FunctionalInterface for reacting on signals
*
* Used with [[CommandHandlerBuilder]] to setup the behavior of a [[PersistentBehavior]]
*/
@FunctionalInterface
trait CommandToEffect[Command, MsgCommand <: Command, Event, State] {
/**
* @return An effect for the given command
*/
def apply(ctx: ActorContext[Command], state: State, command: MsgCommand): Effect[Event, State]
def apply(state: State, command: Command): Effect[Event, State]
}
final class CommandHandlerBuilder[Command, Event, State] @InternalApi private[persistence] () {
object CommandHandlerBuilder {
private final case class CommandHandlerCase(predicate: Command Boolean, handler: CommandToEffect[Command, Command, Event, State])
private var cases: List[CommandHandlerCase] = Nil
private def addCase(predicate: Command Boolean, handler: CommandToEffect[Command, Command, Event, State]): Unit = {
cases = CommandHandlerCase(predicate, handler) :: cases
private val _trueStatePredicate: Predicate[Any] = new Predicate[Any] {
override def test(t: Any): Boolean = true
}
private def trueStatePredicate[S]: Predicate[S] = _trueStatePredicate.asInstanceOf[Predicate[S]]
/**
* @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass`
* @return A new, mutable, command handler builder
*/
def builder[Command, Event, S <: State, State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] =
new CommandHandlerBuilder(stateClass, statePredicate = trueStatePredicate)
/**
* @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`,
* useful for example when state type is an Optional
* @return A new, mutable, command handler builder
*/
def builder[Command, Event, State](statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] =
new CommandHandlerBuilder(classOf[Any].asInstanceOf[Class[State]], statePredicate)
/**
* INTERNAL API
*/
@InternalApi private final case class CommandHandlerCase[Command, Event, State](
commandPredicate: Command Boolean,
statePredicate: State Boolean,
handler: BiFunction[State, Command, Effect[Event, State]])
}
final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalApi private[persistence] (
val stateClass: Class[S], val statePredicate: Predicate[S]) {
import CommandHandlerBuilder.CommandHandlerCase
private var cases: List[CommandHandlerCase[Command, Event, State]] = Nil
private def addCase(predicate: Command Boolean, handler: BiFunction[S, Command, Effect[Event, State]]): Unit = {
cases = CommandHandlerCase[Command, Event, State](
commandPredicate = predicate,
statePredicate = state stateClass.isAssignableFrom(state.getClass) && statePredicate.test(state.asInstanceOf[S]),
handler.asInstanceOf[BiFunction[State, Command, Effect[Event, State]]]) :: cases
}
/**
* Match any command which the given `predicate` returns true for
*/
def matchCommand(predicate: FI.TypedPredicate[Command], commandToEffect: CommandToEffect[Command, Command, Event, State]): CommandHandlerBuilder[Command, Event, State] = {
addCase(cmd predicate.defined(cmd), commandToEffect)
def matchCommand(predicate: Predicate[Command], handler: BiFunction[S, Command, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
addCase(cmd predicate.test(cmd), handler)
this
}
def matchCommand[C <: Command](commandClass: Class[C], commandToEffect: CommandToEffect[Command, C, Event, State]): CommandHandlerBuilder[Command, Event, State] = {
addCase(cmd commandClass.isAssignableFrom(cmd.getClass), commandToEffect.asInstanceOf[CommandToEffect[Command, Command, Event, State]])
def matchCommand[C <: Command](commandClass: Class[C], handler: BiFunction[S, C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
addCase(cmd commandClass.isAssignableFrom(cmd.getClass), handler.asInstanceOf[BiFunction[S, Command, Effect[Event, State]]])
this
}
/**
* Compose this builder with another builder. The handlers in this builder will be tried first followed
* by the handlers in `other`.
*/
def orElse[S2 <: State](other: CommandHandlerBuilder[Command, Event, S2, State]): CommandHandlerBuilder[Command, Event, S2, State] = {
val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.stateClass, other.statePredicate)
// problem with overloaded constructor with `cases` as parameter
newBuilder.cases = other.cases ::: cases
newBuilder
}
/**
* Builds a Command Handler and resets this builder
*/
@ -61,14 +97,15 @@ final class CommandHandlerBuilder[Command, Event, State] @InternalApi private[pe
val builtCases = cases.reverse.toArray
cases = Nil
new CommandHandler[Command, Event, State] {
override def apply(ctx: ActorContext[Command], state: State, command: Command) = {
override def apply(state: State, command: Command): Effect[Event, State] = {
var idx = 0
var effect: OptionVal[Effect[Event, State]] = OptionVal.None
while (idx < builtCases.length && effect.isEmpty) {
val curr = builtCases(idx)
if (curr.predicate(command)) {
effect = OptionVal.Some(curr.handler.apply(ctx, state, command))
if (curr.statePredicate(state) && curr.commandPredicate(command)) {
val x: Effect[Event, State] = curr.handler.apply(state, command)
effect = OptionVal.Some(x)
}
idx += 1
}
@ -82,104 +119,4 @@ final class CommandHandlerBuilder[Command, Event, State] @InternalApi private[pe
}
}
/**
* Mutable builder for nested Java [[CommandHandler]]s where different states should have different command handlers.
* [[CommandHandler]] per state are added with the `match` methods and finally a [[CommandHandler]] is created with [[ByStateCommandHandlerBuilder#build]].
*
* Match statements are appended and evaluated in order, the first one to match is used. If no match is found when
* evaluating the built [[CommandHandler]] for a given state a [[scala.MatchError]] is thrown.
*/
final class ByStateCommandHandlerBuilder[Command, Event, State] @InternalApi private[javadsl] () {
private final case class ByStateCase(predicate: State Boolean, handler: CommandHandler[Command, Event, State])
private var cases: List[ByStateCase] = Nil
private def addCase(predicate: State Boolean, handler: CommandHandler[Command, Event, State]): Unit = {
cases = ByStateCase(predicate, handler) :: cases
}
/**
* Match any state that the `predicate` returns true for.
*/
def matchState(predicate: FI.TypedPredicate[State], commandHandler: CommandHandler[Command, Event, State]): ByStateCommandHandlerBuilder[Command, Event, State] = {
addCase(
predicate.defined,
commandHandler.asInstanceOf[CommandHandler[Command, Event, State]])
this
}
/**
* Match any state which is an instance of `S` or a subtype of `S`
*
* Throws `java.lang.IllegalArgumentException` if `stateClass` is not a subtype of the root `State` this builder was created with
*/
def matchState[S <: State](stateClass: Class[S], commandHandler: CommandHandler[Command, Event, S]): ByStateCommandHandlerBuilder[Command, Event, State] = {
addCase(
state stateClass.isAssignableFrom(state.getClass),
commandHandler.asInstanceOf[CommandHandler[Command, Event, State]])
this
}
/**
* Match any state which is an instance of `S` or a subtype of `S` and for which the `predicate` returns true
*
* Throws `java.lang.IllegalArgumentException` if `stateClass` is not a subtype of the root `State` this builder was created with
*/
def matchState[S <: State](stateClass: Class[S], predicate: FI.TypedPredicate[S], commandHandler: CommandHandler[Command, Event, S]): ByStateCommandHandlerBuilder[Command, Event, State] = {
addCase(
state stateClass.isAssignableFrom(state.getClass) && predicate.defined(state.asInstanceOf[S]),
commandHandler.asInstanceOf[CommandHandler[Command, Event, State]])
this
}
/**
* Match states that are equal to the given `state` instance
*/
def matchExact[S <: State](state: S, commandHandler: CommandHandler[Command, Event, S]): ByStateCommandHandlerBuilder[Command, Event, State] = {
addCase(s s.equals(state), commandHandler.asInstanceOf[CommandHandler[Command, Event, State]])
this
}
/**
* Match any state
*
* Builds and returns the handler since this will not let through any states to subsequent match statements
*/
def matchAny(commandHandler: CommandHandler[Command, Event, State]): CommandHandler[Command, Event, State] = {
addCase(_ true, commandHandler)
build()
}
/**
* Build a command handler from the appended cases. The returned handler will throw a [[scala.MatchError]] if applied to
* a command that has no defined match.
*
* The builder is reset to empty after build has been called.
*/
def build(): CommandHandler[Command, Event, State] = {
val builtCases = cases.reverse.toArray
cases = Nil
new CommandHandler[Command, Event, State] {
override def apply(ctx: ActorContext[Command], state: State, command: Command) = {
var idx = 0
var effect: OptionVal[Effect[Event, State]] = OptionVal.None
while (idx < builtCases.length && effect.isEmpty) {
val curr = builtCases(idx)
if (curr.predicate(state)) {
effect = OptionVal.Some(curr.handler.apply(ctx, state, command))
}
idx += 1
}
effect match {
case OptionVal.None throw new MatchError(s"No match found for command of type [${command.getClass.getName}]")
case OptionVal.Some(e) e.asInstanceOf[EffectImpl[Event, State]]
}
}
}
}
}

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl
import java.util.function.BiFunction
import akka.annotation.InternalApi
import akka.util.OptionVal
/**
@ -21,16 +22,23 @@ trait EventHandler[Event, State] {
object EventHandlerBuilder {
def builder[Event, State >: Null](): EventHandlerBuilder[Event, State] =
new EventHandlerBuilder[Event, State]()
/**
* INTERNAL API
*/
@InternalApi private final case class EventHandlerCase[Event, State](
eventPredicate: Event Boolean,
statePredicate: State Boolean,
handler: BiFunction[State, Event, State])
}
final class EventHandlerBuilder[Event, State >: Null]() {
import EventHandlerBuilder.EventHandlerCase
private final case class EventHandlerCase(predicate: Event Boolean, handler: BiFunction[State, Event, State])
private var cases: List[EventHandlerCase] = Nil
private var cases: List[EventHandlerCase[Event, State]] = Nil
private def addCase(predicate: Event Boolean, handler: BiFunction[State, Event, State]): Unit = {
cases = EventHandlerCase(predicate, handler) :: cases
cases = EventHandlerCase[Event, State](predicate, _ true, handler) :: cases
}
/**
@ -41,6 +49,16 @@ final class EventHandlerBuilder[Event, State >: Null]() {
this
}
def matchEvent[E <: Event, S <: State](eventClass: Class[E], stateClass: Class[S],
biFunction: BiFunction[S, E, State]): EventHandlerBuilder[Event, State] = {
cases = EventHandlerCase[Event, State](
eventPredicate = e eventClass.isAssignableFrom(e.getClass),
statePredicate = s stateClass.isAssignableFrom(s.getClass),
biFunction.asInstanceOf[BiFunction[State, Event, State]]) :: cases
this
}
/**
* Match any event
*
@ -51,6 +69,17 @@ final class EventHandlerBuilder[Event, State >: Null]() {
build()
}
/**
* Compose this builder with another builder. The handlers in this builder will be tried first followed
* by the handlers in `other`.
*/
def orElse(other: EventHandlerBuilder[Event, State]): EventHandlerBuilder[Event, State] = {
val newBuilder = new EventHandlerBuilder[Event, State]
// problem with overloaded constructor with `cases` as parameter
newBuilder.cases = other.cases ::: cases
newBuilder
}
/**
* Builds and returns a handler from the appended states. The returned [[EventHandler]] will throw a [[scala.MatchError]]
* if applied to an event that has no defined case.
@ -66,7 +95,7 @@ final class EventHandlerBuilder[Event, State >: Null]() {
var idx = 0
while (idx < builtCases.length && result.isEmpty) {
val curr = builtCases(idx)
if (curr.predicate(event)) {
if (curr.statePredicate(state) && curr.eventPredicate(event)) {
result = OptionVal.Some[State](curr.handler.apply(state, event))
}
idx += 1

View file

@ -4,6 +4,7 @@
package akka.persistence.typed.javadsl
import java.util.function.Predicate
import java.util.{ Collections, Optional }
import akka.actor.typed
@ -14,9 +15,10 @@ import akka.annotation.{ ApiMayChange, InternalApi }
import akka.persistence.SnapshotMetadata
import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._
import scala.util.{ Failure, Success }
import akka.japi.pf.FI
/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] {
@ -40,7 +42,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
* Implement by handling incoming commands and return an `Effect()` to persist or signal other effects
* of the command handling such as stopping the behavior or others.
*
* This method is only invoked when the actor is running (i.e. not replaying).
* The command handlers are only invoked when the actor is running (i.e. not replaying).
* While the actor is persisting events, the incoming messages are stashed and only
* delivered to the handler once persisting them has completed.
*/
@ -49,7 +51,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
/**
* Implement by applying the event to the current state in order to return a new state.
*
* This method invoked during recovery as well as running operation of this behavior,
* The event handlers are invoked during recovery as well as running operation of this behavior,
* in order to keep updating the state state.
*
* For that reason it is strongly discouraged to perform side-effects in this handler;
@ -58,43 +60,38 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
protected def eventHandler(): EventHandler[Event, State]
/**
* @return A new, mutable, by state command handler builder
* @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass`
* @return A new, mutable, command handler builder
*/
protected final def commandHandlerBuilder(): CommandHandlerBuilder[Command, Event, State] =
new CommandHandlerBuilder[Command, Event, State]()
protected final def commandHandlerBuilder[S <: State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] =
CommandHandlerBuilder.builder[Command, Event, S, State](stateClass)
/**
* @return A new, mutable, by state command handler builder
* @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`,
* * useful for example when state type is an Optional
* @return A new, mutable, command handler builder
*/
protected final def byStateCommandHandlerBuilder(): ByStateCommandHandlerBuilder[Command, Event, State] =
new ByStateCommandHandlerBuilder[Command, Event, State]()
protected final def commandHandlerBuilder(statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] =
CommandHandlerBuilder.builder[Command, Event, State](statePredicate)
/**
* @return A new, mutable, event handler builder
*/
protected final def eventHandlerBuilder(): EventHandlerBuilder[Event, State] =
new EventHandlerBuilder[Event, State]()
EventHandlerBuilder.builder[Event, State]()
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {}
def onRecoveryCompleted(state: State): Unit = ()
/**
* Override to get notified when a snapshot is finished.
* The default implementation logs failures at error and success writes at
* debug.
*
* @param result None if successful otherwise contains the exception thrown when snapshotting
*/
def onSnapshot(ctx: ActorContext[Command], meta: SnapshotMetadata, result: Optional[Throwable]): Unit = {
if (result.isPresent) {
ctx.getLog.error(result.get(), "Save snapshot failed, snapshot metadata: [{}]", meta)
} else {
ctx.getLog.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
}
}
def onSnapshot(meta: SnapshotMetadata, result: Optional[Throwable]): Unit = ()
/**
* Override and define that snapshot should be saved every N events.
@ -128,7 +125,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
/**
* INTERNAL API: DeferredBehavior init
*/
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
@InternalApi override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
val snapshotWhen: (State, Event, Long) Boolean = { (state, event, seqNr)
val n = snapshotEvery()
@ -148,13 +145,20 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
scaladsl.PersistentBehaviors.receive[Command, Event, State](
persistenceId,
emptyState,
(c, state, cmd) commandHandler()(c.asJava, state, cmd).asInstanceOf[EffectImpl[Event, State]],
(c, state, cmd) commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
eventHandler()(_, _))
.onRecoveryCompleted((ctx, state) onRecoveryCompleted(ctx.asJava, state))
.onRecoveryCompleted((ctx, state) onRecoveryCompleted(state))
.snapshotWhen(snapshotWhen)
.withTagger(tagger)
.onSnapshot((ctx, meta, result) {
onSnapshot(ctx.asJava, meta, result match {
result match {
case Success(_)
ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
case Failure(e)
ctx.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta)
}
onSnapshot(meta, result match {
case Success(_) Optional.empty()
case Failure(t) Optional.of(t)
})

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.javadsl;
import akka.actor.Scheduler;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
@ -96,7 +97,7 @@ public class PersistentActorCompileOnlyTest {
//#command-handler
@Override
public CommandHandler<SimpleCommand, SimpleEvent, SimpleState> commandHandler() {
return (ctx, state, cmd) -> Effect().persist(new SimpleEvent(cmd.data));
return (state, cmd) -> Effect().persist(new SimpleEvent(cmd.data));
}
//#command-handler
@ -156,8 +157,8 @@ public class PersistentActorCompileOnlyTest {
@Override
public CommandHandler<MyCommand, MyEvent, ExampleState> commandHandler() {
return commandHandlerBuilder()
.matchCommand(Cmd.class, (ctx, state, cmd) -> Effect().persist(new Evt(cmd.data))
return commandHandlerBuilder(ExampleState.class)
.matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data))
.andThen(() -> cmd.sender.tell(new Ack())))
.build();
}
@ -254,8 +255,11 @@ public class PersistentActorCompileOnlyTest {
}
class MyPersistentBehavior extends PersistentBehavior<Command, Event, RecoveryComplete.EventsInFlight> {
public MyPersistentBehavior(String persistenceId) {
private final ActorContext<Command> ctx;
public MyPersistentBehavior(String persistenceId, ActorContext<Command> ctx) {
super(persistenceId);
this.ctx = ctx;
}
@Override
@ -265,11 +269,11 @@ public class PersistentActorCompileOnlyTest {
@Override
public CommandHandler<Command, Event, EventsInFlight> commandHandler() {
return commandHandlerBuilder()
return commandHandlerBuilder(EventsInFlight.class)
.matchCommand(DoSideEffect.class,
(ctx, state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data))
(state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data))
.andThen(() -> performSideEffect(ctx.getSelf().narrow(), state.nextCorrelationId, cmd.data, ctx.getSystem().scheduler())))
.matchCommand(AcknowledgeSideEffect.class, (ctx, state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId)))
.matchCommand(AcknowledgeSideEffect.class, (state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId)))
.build();
}

View file

@ -174,29 +174,29 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
private static String loggingOne = "one";
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId, ActorRef<Pair<State, Incremented>> probe) {
private Behavior<Command> counter(String persistenceId, ActorRef<Pair<State, Incremented>> probe) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>());
}
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId,
ActorRef<Pair<State, Incremented>> probe,
Function<Incremented, Set<String>> tagger) {
private Behavior<Command> counter(String persistenceId,
ActorRef<Pair<State, Incremented>> probe,
Function<Incremented, Set<String>> tagger) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>());
}
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId,
ActorRef<Pair<State, Incremented>> probe,
EventAdapter<Incremented, ?> transformer) {
private Behavior<Command> counter(String persistenceId,
ActorRef<Pair<State, Incremented>> probe,
EventAdapter<Incremented, ?> transformer) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer);
}
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId) {
private Behavior<Command> counter(String persistenceId) {
return counter(persistenceId,
TestProbe.<Pair<State, Incremented>>create(testKit.system()).ref(),
TestProbe.<String>create(testKit.system()).ref(),
@ -207,7 +207,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
);
}
private PersistentBehavior<Command, Incremented, State> counter(
private Behavior<Command> counter(
String persistenceId,
Function3<State, Incremented, Long, Boolean> snapshot,
ActorRef<Optional<Throwable>> snapshotProbe
@ -221,7 +221,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
new NoOpEventAdapter<>());
}
private PersistentBehavior<Command, Incremented, State> counter(
private Behavior<Command> counter(
String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe) {
@ -231,7 +231,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
);
}
private PersistentBehavior<Command, Incremented, State> counter(
private Behavior<Command> counter(
String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
Function3<State, Incremented, Long, Boolean> snapshot) {
@ -240,7 +240,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
);
}
private <A> PersistentBehavior<Command, Incremented, State> counter(
private <A> Behavior<Command> counter(
String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe,
@ -248,90 +248,93 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
Function<Incremented, Set<String>> tagsFunction,
ActorRef<Optional<Throwable>> snapshotProbe,
EventAdapter<Incremented, A> transformer) {
return new PersistentBehavior<Command, Incremented, State>(persistentId) {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder()
.matchCommand(Increment.class, (ctx, state, command) -> Effect().persist(new Incremented(1)))
.matchCommand(GetValue.class, (ctx, state, command) -> {
command.replyTo.tell(state);
return Effect().none();
})
.matchCommand(IncrementLater.class, (ctx, state, command) -> {
ActorRef<Object> delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> {
timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10));
return Behaviors.receive((context, o) -> Behaviors.stopped());
}));
ctx.watchWith(delay, new DelayFinished());
return Effect().none();
})
.matchCommand(DelayFinished.class, (ctx, state, finished) -> Effect().persist(new Incremented(10)))
.matchCommand(Increment100OnTimeout.class, (ctx, state, msg) -> {
ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout());
return Effect().none();
})
.matchCommand(Timeout.class,
(ctx, state, msg) -> Effect().persist(timeoutEvent))
.matchCommand(EmptyEventsListAndThenLog.class, (ctx, state, msg) -> Effect().persist(Collections.emptyList())
.andThen(s -> loggingProbe.tell(loggingOne)))
.matchCommand(StopThenLog.class,
(ctx, state, msg) -> Effect().stop()
.andThen(s -> loggingProbe.tell(loggingOne)))
.matchCommand(IncrementTwiceAndLog.class,
(ctx, state, msg) -> Effect().persist(
Arrays.asList(new Incremented(1), new Incremented(1)))
.andThen(s -> loggingProbe.tell(loggingOne)))
.build();
}
return Behaviors.setup(ctx -> {
return new PersistentBehavior<Command, Incremented, State>(persistentId) {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class)
.matchCommand(Increment.class, (state, command) -> Effect().persist(new Incremented(1)))
.matchCommand(GetValue.class, (state, command) -> {
command.replyTo.tell(state);
return Effect().none();
})
.matchCommand(IncrementLater.class, (state, command) -> {
ActorRef<Object> delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> {
timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10));
return Behaviors.receive((context, o) -> Behaviors.stopped());
}));
ctx.watchWith(delay, new DelayFinished());
return Effect().none();
})
.matchCommand(DelayFinished.class, (state, finished) -> Effect().persist(new Incremented(10)))
.matchCommand(Increment100OnTimeout.class, (state, msg) -> {
ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout());
return Effect().none();
})
.matchCommand(Timeout.class,
(state, msg) -> Effect().persist(timeoutEvent))
.matchCommand(EmptyEventsListAndThenLog.class, (state, msg) -> Effect().persist(Collections.emptyList())
.andThen(s -> loggingProbe.tell(loggingOne)))
.matchCommand(StopThenLog.class,
(state, msg) -> Effect().stop()
.andThen(s -> loggingProbe.tell(loggingOne)))
.matchCommand(IncrementTwiceAndLog.class,
(state, msg) -> Effect().persist(
Arrays.asList(new Incremented(1), new Incremented(1)))
.andThen(s -> loggingProbe.tell(loggingOne)))
.build();
@Override
public EventHandler<Incremented, State> eventHandler() {
return eventHandlerBuilder()
.matchEvent(Incremented.class, (state, event) -> {
List<Integer> newHistory = new ArrayList<>(state.history);
newHistory.add(state.value);
eventProbe.tell(Pair.create(state, event));
return new State(state.value + event.delta, newHistory);
})
.build();
}
@Override
public State emptyState() {
return emptyState;
}
@Override
public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) {
try {
return snapshot.apply(state, event, sequenceNr);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Set<String> tagsFor(Incremented incremented) {
try {
return tagsFunction.apply(incremented);
} catch (Exception e) {
throw new RuntimeException(e);
@Override
public EventHandler<Incremented, State> eventHandler() {
return eventHandlerBuilder()
.matchEvent(Incremented.class, (state, event) -> {
List<Integer> newHistory = new ArrayList<>(state.history);
newHistory.add(state.value);
eventProbe.tell(Pair.create(state, event));
return new State(state.value + event.delta, newHistory);
})
.build();
}
}
@Override
public void onSnapshot(ActorContext<Command> ctx, SnapshotMetadata meta, Optional<Throwable> result) {
snapshotProbe.tell(result);
}
@Override
public State emptyState() {
return emptyState;
}
@Override
public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) {
try {
return snapshot.apply(state, event, sequenceNr);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Set<String> tagsFor(Incremented incremented) {
try {
return tagsFunction.apply(incremented);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void onSnapshot(SnapshotMetadata meta, Optional<Throwable> result) {
snapshotProbe.tell(result);
}
@Override
public EventAdapter<Incremented, A> eventAdapter() {
return transformer;
}
};
@Override
public EventAdapter<Incremented, A> eventAdapter() {
return transformer;
}
};
});
}
@Test
@ -405,7 +408,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void snapshot() {
TestProbe<Optional<Throwable>> snapshotProbe = testKit.createTestProbe();
PersistentBehavior<Command, Incremented, State> snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref());
Behavior<Command> snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref());
ActorRef<Command> c = testKit.spawn(snapshoter);
c.tell(Increment.instance);
c.tell(Increment.instance);

View file

@ -35,7 +35,7 @@ public class BasicPersistentBehaviorsTest {
@Override
public CommandHandler<Command, Event, State> commandHandler() {
return (ctx, state, command) -> {
return (state, command) -> {
throw new RuntimeException("TODO: process the command & return an Effect");
};
}
@ -49,7 +49,7 @@ public class BasicPersistentBehaviorsTest {
//#recovery
@Override
public void onRecoveryCompleted(ActorContext<Command> ctx, State state) {
public void onRecoveryCompleted(State state) {
throw new RuntimeException("TODO: add some end-of-recovery side-effect here");
}
//#recovery

View file

@ -6,7 +6,11 @@ package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
@ -47,27 +51,41 @@ public class InDepthPersistentBehaviorTest {
//#event
//#state
public static class BlogState {
final Optional<PostContent> postContent;
interface BlogState {}
public static class BlankState implements BlogState {}
public static class DraftState implements BlogState {
final PostContent postContent;
final boolean published;
BlogState(Optional<PostContent> postContent, boolean published) {
DraftState(PostContent postContent, boolean published) {
this.postContent = postContent;
this.published = published;
}
public BlogState withContent(PostContent newContent) {
return new BlogState(Optional.of(newContent), this.published);
}
public boolean isEmpty() {
return postContent.isPresent();
public DraftState withContent(PostContent newContent) {
return new DraftState(newContent, this.published);
}
public String postId() {
return postContent.orElseGet(() -> {
throw new IllegalStateException("postId unknown before post is created");
}).postId;
return postContent.postId;
}
}
public static class PublishedState implements BlogState {
final PostContent postContent;
PublishedState(PostContent postContent) {
this.postContent = postContent;
}
public PublishedState withContent(PostContent newContent) {
return new PublishedState(newContent);
}
public String postId() {
return postContent.postId;
}
}
//#state
@ -132,69 +150,105 @@ public class InDepthPersistentBehaviorTest {
//#behavior
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, BlogState> {
//#behavior
//#initial-command-handler
private CommandHandler<BlogCommand, BlogEvent, BlogState> initialCommandHandler = commandHandlerBuilder()
.matchCommand(AddPost.class, (ctx, state, cmd) -> {
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
})
.matchCommand(PassivatePost.class, (ctx, state, cmd) -> Effect().stop())
.build();
//#initial-command-handler
private final ActorContext<BlogCommand> ctx;
//#post-added-command-handler
private CommandHandler<BlogCommand, BlogEvent, BlogState> postCommandHandler = commandHandlerBuilder()
.matchCommand(ChangeBody.class, (ctx, state, cmd) -> {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
})
.matchCommand(Publish.class, (ctx, state, cmd) -> Effect()
.persist(new Published(state.postId())).andThen(() -> {
System.out.println("Blog post published: " + state.postId());
cmd.replyTo.tell(Done.getInstance());
}))
.matchCommand(GetPost.class, (ctx, state, cmd) -> {
cmd.replyTo.tell(state.postContent.get());
return Effect().none();
})
.matchCommand(AddPost.class, (ctx, state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, (ctx, state, cmd) -> Effect().stop())
.build();
//#post-added-command-handler
public BlogBehavior(String persistenceId) {
public BlogBehavior(String persistenceId, ActorContext<BlogCommand> ctx) {
super(persistenceId);
this.ctx = ctx;
}
@Override
public BlogState emptyState() {
return new BlogState(Optional.empty(), false);
//#initial-command-handler
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlankState, BlogState> initialCommandHandler() {
return commandHandlerBuilder(BlankState.class)
.matchCommand(AddPost.class, (state, cmd) -> {
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
});
}
//#initial-command-handler
//#post-added-command-handler
private CommandHandlerBuilder<BlogCommand, BlogEvent, DraftState, BlogState> draftCommandHandler() {
return commandHandlerBuilder(DraftState.class)
.matchCommand(ChangeBody.class, (state, cmd) -> {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
})
.matchCommand(Publish.class, (state, cmd) -> Effect()
.persist(new Published(state.postId())).andThen(() -> {
System.out.println("Blog post published: " + state.postId());
cmd.replyTo.tell(Done.getInstance());
}))
.matchCommand(GetPost.class, (state, cmd) -> {
cmd.replyTo.tell(state.postContent);
return Effect().none();
});
}
//#by-state-command-handler
private CommandHandlerBuilder<BlogCommand, BlogEvent, PublishedState, BlogState> publishedCommandHandler() {
return commandHandlerBuilder(PublishedState.class)
.matchCommand(ChangeBody.class, (state, cmd) -> {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
})
.matchCommand(GetPost.class, (state, cmd) -> {
cmd.replyTo.tell(state.postContent);
return Effect().none();
});
}
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> commonCommandHandler() {
return commandHandlerBuilder(BlogState.class)
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
}
//#post-added-command-handler
//#command-handler
@Override
public CommandHandler<BlogCommand, BlogEvent, BlogState> commandHandler() {
return byStateCommandHandlerBuilder()
.matchState(BlogState.class, (state) -> !state.postContent.isPresent(), initialCommandHandler)
.matchState(BlogState.class, (state) -> state.postContent.isPresent(), postCommandHandler)
.build();
return
initialCommandHandler()
.orElse(draftCommandHandler())
.orElse(publishedCommandHandler())
.orElse(commonCommandHandler())
.build();
}
//#by-state-command-handler
//#command-handler
//#event-handler
@Override
public EventHandler<BlogEvent, BlogState> eventHandler() {
return eventHandlerBuilder()
.matchEvent(PostAdded.class, (state, event) -> state.withContent(event.content))
.matchEvent(BodyChanged.class, (state, newBody) ->
new BlogState(state.postContent.map(pc -> new PostContent(pc.postId, pc.title, newBody.newBody)), state.published))
.matchEvent(Published.class, (state, event) -> new BlogState(state.postContent, true))
.build();
.matchEvent(PostAdded.class, (state, event) ->
new DraftState(event.content, false))
.matchEvent(BodyChanged.class, DraftState.class, (state, chg) ->
state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody)))
.matchEvent(BodyChanged.class, PublishedState.class, (state, chg) ->
state.withContent(new PostContent(state.postId(), state.postContent.title, chg.newBody)))
.matchEvent(Published.class, DraftState.class, (state, event) ->
new PublishedState(state.postContent))
.build();
}
//#event-handler
//#behavior
public static Behavior<BlogCommand> behavior(String entityId) {
return Behaviors.setup(ctx ->
new BlogBehavior("Blog-" + entityId, ctx)
);
}
@Override
public BlogState emptyState() {
return new BlankState();
}
// commandHandler, eventHandler as in above snippets
}
//#behavior
}

View file

@ -0,0 +1,124 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class MovieWatchList extends PersistentBehavior<MovieWatchList.Command, MovieWatchList.Event, MovieWatchList.MovieList> {
interface Command {
}
public static class AddMovie implements Command {
public final String movieId;
public AddMovie(String movieId) {
this.movieId = movieId;
}
}
public static class RemoveMovie implements Command {
public final String movieId;
public RemoveMovie(String movieId) {
this.movieId = movieId;
}
}
interface Event {
}
public static class MovieAdded implements Event {
public final String movieId;
public MovieAdded(String movieId) {
this.movieId = movieId;
}
}
public static class MovieRemoved implements Event {
public final String movieId;
public MovieRemoved(String movieId) {
this.movieId = movieId;
}
}
public static class GetMovieList implements Command {
public final ActorRef<MovieList> replyTo;
public GetMovieList(ActorRef<MovieList> replyTo) {
this.replyTo = replyTo;
}
}
public static class MovieList {
public final Set<String> movieIds;
public MovieList(Set<String> movieIds) {
this.movieIds = Collections.unmodifiableSet(movieIds);
}
public MovieList add(String movieId) {
Set<String> newSet = new HashSet<>(movieIds);
newSet.add(movieId);
return new MovieList(newSet);
}
public MovieList remove(String movieId) {
Set<String> newSet = new HashSet<>(movieIds);
newSet.remove(movieId);
return new MovieList(newSet);
}
}
public static Behavior<Command> behavior(String userId) {
return new MovieWatchList("movies-" + userId);
}
public MovieWatchList(String persistenceId) {
super(persistenceId);
}
@Override
public MovieList emptyState() {
return new MovieList(Collections.emptySet());
}
@Override
public CommandHandler<Command, Event, MovieList> commandHandler() {
return commandHandlerBuilder(MovieList.class)
.matchCommand(AddMovie.class, (state, cmd) -> {
return Effect().persist(new MovieAdded(cmd.movieId));
})
.matchCommand(RemoveMovie.class, (state, cmd) -> {
return Effect().persist(new MovieRemoved(cmd.movieId));
})
.matchCommand(GetMovieList.class, (state, cmd) -> {
cmd.replyTo.tell(state);
return Effect().none();
})
.build();
}
@Override
public EventHandler<Event, MovieList> eventHandler() {
return eventHandlerBuilder()
.matchEvent(MovieAdded.class, (state, event) -> state.add(event.movieId))
.matchEvent(MovieRemoved.class, (state, event) -> state.remove(event.movieId))
.build();
}
}

View file

@ -0,0 +1,180 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import java.util.Optional;
public class OptionalBlogState {
interface BlogEvent {
}
public static class PostAdded implements BlogEvent {
private final String postId;
private final PostContent content;
public PostAdded(String postId, PostContent content) {
this.postId = postId;
this.content = content;
}
}
public static class BodyChanged implements BlogEvent {
private final String postId;
private final String newBody;
public BodyChanged(String postId, String newBody) {
this.postId = postId;
this.newBody = newBody;
}
}
public static class Published implements BlogEvent {
private final String postId;
public Published(String postId) {
this.postId = postId;
}
}
public static class BlogState {
final PostContent postContent;
final boolean published;
BlogState(PostContent postContent, boolean published) {
this.postContent = postContent;
this.published = published;
}
public BlogState withContent(PostContent newContent) {
return new BlogState(newContent, this.published);
}
public String postId() {
return postContent.postId;
}
}
public interface BlogCommand {
}
public static class AddPost implements BlogCommand {
final PostContent content;
final ActorRef<AddPostDone> replyTo;
public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
this.content = content;
this.replyTo = replyTo;
}
}
public static class AddPostDone implements BlogCommand {
final String postId;
public AddPostDone(String postId) {
this.postId = postId;
}
}
public static class GetPost implements BlogCommand {
final ActorRef<PostContent> replyTo;
public GetPost(ActorRef<PostContent> replyTo) {
this.replyTo = replyTo;
}
}
public static class ChangeBody implements BlogCommand {
final String newBody;
final ActorRef<Done> replyTo;
public ChangeBody(String newBody, ActorRef<Done> replyTo) {
this.newBody = newBody;
this.replyTo = replyTo;
}
}
public static class Publish implements BlogCommand {
final ActorRef<Done> replyTo;
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class PassivatePost implements BlogCommand {
}
public static class PostContent implements BlogCommand {
final String postId;
final String title;
final String body;
public PostContent(String postId, String title, String body) {
this.postId = postId;
this.title = title;
this.body = body;
}
}
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, Optional<BlogState>> {
private CommandHandlerBuilder<BlogCommand, BlogEvent, Optional<BlogState>, Optional<BlogState>> initialCommandHandler() {
return commandHandlerBuilder(state -> !state.isPresent())
.matchCommand(AddPost.class, (state, cmd) -> {
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
})
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
}
private CommandHandlerBuilder<BlogCommand, BlogEvent, Optional<BlogState>, Optional<BlogState>> postCommandHandler() {
return commandHandlerBuilder(state -> state.isPresent())
.matchCommand(ChangeBody.class, (state, cmd) -> {
BodyChanged event = new BodyChanged(state.get().postId(), cmd.newBody);
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
})
.matchCommand(Publish.class, (state, cmd) -> Effect()
.persist(new Published(state.get().postId())).andThen(() -> {
System.out.println("Blog post published: " + state.get().postId());
cmd.replyTo.tell(Done.getInstance());
}))
.matchCommand(GetPost.class, (state, cmd) -> {
cmd.replyTo.tell(state.get().postContent);
return Effect().none();
})
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
}
public BlogBehavior(String persistenceId) {
super(persistenceId);
}
@Override
public Optional<BlogState> emptyState() {
return Optional.empty();
}
@Override
public CommandHandler<BlogCommand, BlogEvent, Optional<BlogState>> commandHandler() {
return initialCommandHandler().orElse(postCommandHandler()).build();
}
@Override
public EventHandler<BlogEvent, Optional<BlogState>> eventHandler() {
return eventHandlerBuilder()
.matchEvent(PostAdded.class, (state, event) ->
Optional.of(new BlogState(event.content, false)))
.matchEvent(BodyChanged.class, (state, chg) ->
state.map(blogState -> blogState.withContent(
new PostContent(blogState.postId(), blogState.postContent.title, chg.newBody))))
.matchEvent(Published.class, (state, event) ->
state.map(blogState -> new BlogState(blogState.postContent, true)))
.build();
}
}
}

View file

@ -0,0 +1,98 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import akka.actor.typed.Behavior
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
object AccountExample1 {
sealed trait AccountCommand
case object CreateAccount extends AccountCommand
case class Deposit(amount: Double) extends AccountCommand
case class Withdraw(amount: Double) extends AccountCommand
case object CloseAccount extends AccountCommand
sealed trait AccountEvent
case object AccountCreated extends AccountEvent
case class Deposited(amount: Double) extends AccountEvent
case class Withdrawn(amount: Double) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Account
case class OpenedAccount(balance: Double) extends Account
case object ClosedAccount extends Account
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
CommandHandler.command {
case CreateAccount Effect.persist(AccountCreated)
case _ Effect.unhandled
}
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] = {
case (ctx, Some(acc: OpenedAccount), cmd) cmd match {
case Deposit(amount) Effect.persist(Deposited(amount))
case Withdraw(amount)
if ((acc.balance - amount) < 0.0)
Effect.unhandled // TODO replies are missing in this example
else {
Effect
.persist(Withdrawn(amount))
.andThen {
case Some(OpenedAccount(balance))
// do some side-effect using balance
println(balance)
case _ throw new IllegalStateException
}
}
case CloseAccount if acc.balance == 0.0
Effect.persist(AccountClosed)
case CloseAccount
Effect.unhandled
case _
Effect.unhandled
}
case _ throw new IllegalStateException
}
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
CommandHandler.command(_ Effect.unhandled)
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Option[Account]] =
CommandHandler.byState {
case None initialHandler
case Some(OpenedAccount(_)) openedAccountHandler
case Some(ClosedAccount) closedHandler
}
private val eventHandler: (Option[Account], AccountEvent) Option[Account] = {
case (None, AccountCreated) Some(OpenedAccount(0.0))
case (Some(acc @ OpenedAccount(_)), Deposited(amount))
Some(acc.copy(balance = acc.balance + amount))
case (Some(acc @ OpenedAccount(_)), Withdrawn(amount))
Some(acc.copy(balance = acc.balance - amount))
case (Some(OpenedAccount(_)), AccountClosed)
Some(ClosedAccount)
case (state, event) throw new RuntimeException(s"unexpected event [$event] in state [$state]")
}
def behavior(accountNumber: String): Behavior[AccountCommand] =
PersistentBehaviors.receive[AccountCommand, AccountEvent, Option[Account]](
persistenceId = accountNumber,
emptyState = None,
commandHandler = commandHandler,
eventHandler = eventHandler
)
}

View file

@ -0,0 +1,101 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import akka.actor.typed.Behavior
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
object AccountExample2 {
sealed trait AccountCommand
case object CreateAccount extends AccountCommand
case class Deposit(amount: Double) extends AccountCommand
case class Withdraw(amount: Double) extends AccountCommand
case object CloseAccount extends AccountCommand
sealed trait AccountEvent
case object AccountCreated extends AccountEvent
case class Deposited(amount: Double) extends AccountEvent
case class Withdrawn(amount: Double) extends AccountEvent
case object AccountClosed extends AccountEvent
sealed trait Account {
def applyEvent(event: AccountEvent): Account
}
case object EmptyAccount extends Account {
override def applyEvent(event: AccountEvent): Account = event match {
case AccountCreated OpenedAccount(0.0)
case _ throw new IllegalStateException(s"unexpected event [$event] in state [EmptyAccount]")
}
}
case class OpenedAccount(balance: Double) extends Account {
override def applyEvent(event: AccountEvent): Account = event match {
case Deposited(amount) copy(balance = balance + amount)
case Withdrawn(amount) copy(balance = balance - amount)
case AccountClosed ClosedAccount
case _ throw new IllegalStateException(s"unexpected event [$event] in state [OpenedAccount]")
}
}
case object ClosedAccount extends Account {
override def applyEvent(event: AccountEvent): Account =
throw new IllegalStateException(s"unexpected event [$event] in state [ClosedAccount]")
}
private val initialHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
CommandHandler.command {
case CreateAccount Effect.persist(AccountCreated)
case _ Effect.unhandled
}
private val openedAccountHandler: CommandHandler[AccountCommand, AccountEvent, Account] = {
case (ctx, acc: OpenedAccount, cmd) cmd match {
case Deposit(amount) Effect.persist(Deposited(amount))
case Withdraw(amount)
if ((acc.balance - amount) < 0.0)
Effect.unhandled // TODO replies are missing in this example
else {
Effect
.persist(Withdrawn(amount))
.andThen {
case OpenedAccount(balance)
// do some side-effect using balance
println(balance)
case _ throw new IllegalStateException
}
}
case CloseAccount if acc.balance == 0.0
Effect.persist(AccountClosed)
case CloseAccount
Effect.unhandled
}
case _ throw new IllegalStateException
}
private val closedHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
CommandHandler.command(_ Effect.unhandled)
private def commandHandler: CommandHandler[AccountCommand, AccountEvent, Account] =
CommandHandler.byState {
case EmptyAccount initialHandler
case OpenedAccount(_) openedAccountHandler
case ClosedAccount closedHandler
}
private val eventHandler: (Account, AccountEvent) Account =
(state, event) state.applyEvent(event)
def behavior(accountNumber: String): Behavior[AccountCommand] =
PersistentBehaviors.receive[AccountCommand, AccountEvent, Account](
persistenceId = accountNumber,
emptyState = EmptyAccount,
commandHandler = commandHandler,
eventHandler = eventHandler
)
}

View file

@ -0,0 +1,54 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
object MovieWatchList {
sealed trait Command
final case class AddMovie(movieId: String) extends Command
final case class RemoveMovie(movieId: String) extends Command
final case class GetMovieList(replyTo: ActorRef[MovieList]) extends Command
sealed trait Event
final case class MovieAdded(movieId: String) extends Event
final case class MovieRemoved(movieId: String) extends Event
final case class MovieList(movieIds: Set[String]) {
def applyEvent(event: Event): MovieList = {
event match {
case MovieAdded(movieId) copy(movieIds = movieIds + movieId)
case MovieRemoved(movieId) copy(movieIds = movieIds + movieId)
}
}
}
private val commandHandler: CommandHandler[Command, Event, MovieList] = {
(ctx, state, cmd)
cmd match {
case AddMovie(movieId)
Effect.persist(MovieAdded(movieId))
case RemoveMovie(movieId)
Effect.persist(MovieRemoved(movieId))
case GetMovieList(replyTo)
replyTo ! state
Effect.none
}
}
def behavior(userId: String): Behavior[Command] = {
PersistentBehaviors.receive[Command, Event, MovieList](
persistenceId = "movies-" + userId,
emptyState = MovieList(Set.empty),
commandHandler,
eventHandler = (state, event) state.applyEvent(event)
)
}
}