align Effect API between scaladsl and javadsl, #25706
scaladsl: * stop => stop() * andThenStop() => thenStop() javadsl: * andThen => thenRun
This commit is contained in:
parent
751a67ef42
commit
a948f5572b
21 changed files with 79 additions and 54 deletions
|
|
@ -150,7 +150,7 @@ public class HelloWorldPersistentEntityExample {
|
||||||
|
|
||||||
private Effect<Greeted, KnownPeople> greet(KnownPeople state, Greet cmd) {
|
private Effect<Greeted, KnownPeople> greet(KnownPeople state, Greet cmd) {
|
||||||
return Effect().persist(new Greeted(cmd.whom))
|
return Effect().persist(new Greeted(cmd.whom))
|
||||||
.andThen(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople())));
|
.thenRun(newState -> cmd.replyTo.tell(new Greeting(cmd.whom, newState.numberOfPeople())));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -54,7 +54,7 @@ object ClusterShardingPersistenceSpec {
|
||||||
case Get(replyTo) ⇒
|
case Get(replyTo) ⇒
|
||||||
replyTo ! s"$entityId:$state"
|
replyTo ! s"$entityId:$state"
|
||||||
Effect.none
|
Effect.none
|
||||||
case StopPlz ⇒ Effect.stop
|
case StopPlz ⇒ Effect.stop()
|
||||||
},
|
},
|
||||||
eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt)
|
eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -77,7 +77,7 @@ object HelloWorldPersistentEntityExample {
|
||||||
.thenRun(state ⇒ cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople))
|
.thenRun(state ⇒ cmd.replyTo ! Greeting(cmd.whom, state.numberOfPeople))
|
||||||
|
|
||||||
private def passivate(): Effect[Greeted, KnownPeople] =
|
private def passivate(): Effect[Greeted, KnownPeople] =
|
||||||
Effect.stop
|
Effect.stop()
|
||||||
|
|
||||||
private val eventHandler: (KnownPeople, Greeted) ⇒ KnownPeople = {
|
private val eventHandler: (KnownPeople, Greeted) ⇒ KnownPeople = {
|
||||||
(state, evt) ⇒ state.add(evt.whom)
|
(state, evt) ⇒ state.add(evt.whom)
|
||||||
|
|
|
||||||
|
|
@ -45,7 +45,7 @@ object ClusterSingletonPersistenceSpec {
|
||||||
case Get(replyTo) ⇒
|
case Get(replyTo) ⇒
|
||||||
replyTo ! state
|
replyTo ! state
|
||||||
Effect.none
|
Effect.none
|
||||||
case StopPlz ⇒ Effect.stop
|
case StopPlz ⇒ Effect.stop()
|
||||||
},
|
},
|
||||||
eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt)
|
eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,9 +7,9 @@ package akka.persistence.typed.internal
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
|
||||||
import akka.persistence.typed.{ SideEffect, javadsl, scaladsl }
|
import akka.persistence.typed.{ SideEffect, javadsl, scaladsl }
|
||||||
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.persistence.typed.NoReplyEffectImpl
|
import akka.persistence.typed.NoReplyEffectImpl
|
||||||
|
import akka.persistence.typed.Stop
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
@InternalApi
|
@InternalApi
|
||||||
|
|
@ -23,6 +23,9 @@ private[akka] abstract class EffectImpl[+Event, State] extends javadsl.ReplyEffe
|
||||||
override def thenNoReply(): EffectImpl[Event, State] =
|
override def thenNoReply(): EffectImpl[Event, State] =
|
||||||
CompositeEffect(this, new NoReplyEffectImpl[State])
|
CompositeEffect(this, new NoReplyEffectImpl[State])
|
||||||
|
|
||||||
|
override def thenStop(): EffectImpl[Event, State] =
|
||||||
|
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,20 @@ package akka.persistence.typed.javadsl
|
||||||
import akka.annotation.DoNotInherit
|
import akka.annotation.DoNotInherit
|
||||||
import akka.japi.function
|
import akka.japi.function
|
||||||
import akka.persistence.typed.internal._
|
import akka.persistence.typed.internal._
|
||||||
import akka.persistence.typed.{ SideEffect, Stop }
|
import akka.persistence.typed.SideEffect
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
import akka.annotation.InternalApi
|
||||||
import akka.persistence.typed.ExpectingReply
|
import akka.persistence.typed.ExpectingReply
|
||||||
|
|
||||||
object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
|
/**
|
||||||
|
* INTERNAL API: see `class EffectFactories`
|
||||||
|
*/
|
||||||
|
@InternalApi private[akka] object EffectFactories extends EffectFactories[Nothing, Nothing, Nothing]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory methods for creating [[Effect]] directives.
|
* Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command.
|
||||||
|
* Created via [[PersistentBehavior.Effect]].
|
||||||
*
|
*
|
||||||
* Not for user extension
|
* Not for user extension
|
||||||
*/
|
*/
|
||||||
|
|
@ -35,17 +40,17 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
|
||||||
/**
|
/**
|
||||||
* Do not persist anything
|
* Do not persist anything
|
||||||
*/
|
*/
|
||||||
def none: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
|
def none(): Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop this persistent actor
|
* Stop this persistent actor
|
||||||
*/
|
*/
|
||||||
def stop: Effect[Event, State] = none.thenStop()
|
def stop(): Effect[Event, State] = none().thenStop()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This command is not handled, but it is not an error that it isn't.
|
* This command is not handled, but it is not an error that it isn't.
|
||||||
*/
|
*/
|
||||||
def unhandled: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
|
def unhandled(): Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
||||||
|
|
@ -60,7 +65,7 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
|
||||||
* finding mistakes.
|
* finding mistakes.
|
||||||
*/
|
*/
|
||||||
def reply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
|
def reply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
|
||||||
none.thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] {
|
none().thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] {
|
||||||
override def apply(param: State): ReplyMessage = replyWithMessage
|
override def apply(param: State): ReplyMessage = replyWithMessage
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
@ -70,7 +75,7 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
|
||||||
* sent for a specific command or the reply will be sent later.
|
* sent for a specific command or the reply will be sent later.
|
||||||
*/
|
*/
|
||||||
def noReply(): ReplyEffect[Event, State] =
|
def noReply(): ReplyEffect[Event, State] =
|
||||||
none.thenNoReply()
|
none().thenNoReply()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -78,24 +83,31 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
|
||||||
*
|
*
|
||||||
* Additional side effects can be performed in the callback `andThen`
|
* Additional side effects can be performed in the callback `andThen`
|
||||||
*
|
*
|
||||||
* Instances of `Effect` are available through factories in the respective Java and Scala DSL packages.
|
* Instances of `Effect` are available through factories [[PersistentBehavior.Effect]].
|
||||||
*
|
*
|
||||||
* Not intended for user extension.
|
* Not intended for user extension.
|
||||||
*/
|
*/
|
||||||
@DoNotInherit abstract class Effect[+Event, State] {
|
@DoNotInherit abstract class Effect[+Event, State] {
|
||||||
self: EffectImpl[Event, State] ⇒
|
self: EffectImpl[Event, State] ⇒
|
||||||
/** Convenience method to register a side effect with just a callback function */
|
/**
|
||||||
final def andThen(callback: function.Procedure[State]): Effect[Event, State] =
|
* Run the given callback. Callbacks are run sequentially.
|
||||||
|
*/
|
||||||
|
final def thenRun(callback: function.Procedure[State]): Effect[Event, State] =
|
||||||
CompositeEffect(this, SideEffect[State](s ⇒ callback.apply(s)))
|
CompositeEffect(this, SideEffect[State](s ⇒ callback.apply(s)))
|
||||||
|
|
||||||
/** Convenience method to register a side effect that doesn't need access to state */
|
/**
|
||||||
final def andThen(callback: function.Effect): Effect[Event, State] =
|
* Run the given callback. Callbacks are run sequentially.
|
||||||
|
*/
|
||||||
|
final def thenRun(callback: function.Effect): Effect[Event, State] =
|
||||||
CompositeEffect(this, SideEffect[State]((_: State) ⇒ callback.apply()))
|
CompositeEffect(this, SideEffect[State]((_: State) ⇒ callback.apply()))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run the given callback after the current Effect
|
||||||
|
*/
|
||||||
def andThen(chainedEffect: SideEffect[State]): Effect[Event, State]
|
def andThen(chainedEffect: SideEffect[State]): Effect[Event, State]
|
||||||
|
|
||||||
final def thenStop(): Effect[Event, State] =
|
/** The side effect is to stop the actor */
|
||||||
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
|
def thenStop(): Effect[Event, State]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,19 @@ final class EventHandlerBuilder[State >: Null, Event]() {
|
||||||
build()
|
build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Match any event.
|
||||||
|
*
|
||||||
|
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
|
||||||
|
* the state in a `BiFunction`.
|
||||||
|
*/
|
||||||
|
def matchAny(f: JFunction[Event, State]): EventHandler[State, Event] = {
|
||||||
|
matchAny(new BiFunction[State, Event, State] {
|
||||||
|
override def apply(state: State, event: Event): State = f(event)
|
||||||
|
})
|
||||||
|
build()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compose this builder with another builder. The handlers in this builder will be tried first followed
|
* Compose this builder with another builder. The handlers in this builder will be tried first followed
|
||||||
* by the handlers in `other`.
|
* by the handlers in `other`.
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,8 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
|
||||||
*
|
*
|
||||||
* Return effects from your handlers in order to instruct persistence on how to act on the incoming message (i.e. persist events).
|
* Return effects from your handlers in order to instruct persistence on how to act on the incoming message (i.e. persist events).
|
||||||
*/
|
*/
|
||||||
protected final def Effect: EffectFactories[Command, Event, State] = EffectFactory.asInstanceOf[EffectFactories[Command, Event, State]]
|
protected final def Effect: EffectFactories[Command, Event, State] =
|
||||||
|
EffectFactories.asInstanceOf[EffectFactories[Command, Event, State]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implement by returning the initial empty state object.
|
* Implement by returning the initial empty state object.
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import akka.persistence.typed.ExpectingReply
|
||||||
import akka.persistence.typed.ReplyEffectImpl
|
import akka.persistence.typed.ReplyEffectImpl
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factories for effects - how a persistent actor reacts on a command
|
* Factory methods for creating [[Effect]] directives - how a persistent actor reacts on a command.
|
||||||
*/
|
*/
|
||||||
object Effect {
|
object Effect {
|
||||||
|
|
||||||
|
|
@ -58,7 +58,7 @@ object Effect {
|
||||||
* Stop this persistent actor
|
* Stop this persistent actor
|
||||||
* Side effects can be chained with `andThen`
|
* Side effects can be chained with `andThen`
|
||||||
*/
|
*/
|
||||||
def stop[Event, State]: Effect[Event, State] = none.andThenStop()
|
def stop[Event, State](): Effect[Event, State] = none.thenStop()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
||||||
|
|
@ -113,9 +113,7 @@ trait Effect[+Event, State] {
|
||||||
CompositeEffect(this, chainedEffects)
|
CompositeEffect(this, chainedEffects)
|
||||||
|
|
||||||
/** The side effect is to stop the actor */
|
/** The side effect is to stop the actor */
|
||||||
def andThenStop(): Effect[Event, State] = {
|
def thenStop(): Effect[Event, State]
|
||||||
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
|
||||||
|
|
|
||||||
|
|
@ -176,7 +176,7 @@ public class PersistentActorCompileOnlyTest {
|
||||||
//#commonChainedEffects
|
//#commonChainedEffects
|
||||||
return commandHandlerBuilder(ExampleState.class)
|
return commandHandlerBuilder(ExampleState.class)
|
||||||
.matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data))
|
.matchCommand(Cmd.class, (state, cmd) -> Effect().persist(new Evt(cmd.data))
|
||||||
.andThen(() -> cmd.sender.tell(new Ack()))
|
.thenRun(() -> cmd.sender.tell(new Ack()))
|
||||||
.andThen(commonChainedEffect)
|
.andThen(commonChainedEffect)
|
||||||
)
|
)
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -303,7 +303,7 @@ public class PersistentActorCompileOnlyTest {
|
||||||
return commandHandlerBuilder(EventsInFlight.class)
|
return commandHandlerBuilder(EventsInFlight.class)
|
||||||
.matchCommand(DoSideEffect.class,
|
.matchCommand(DoSideEffect.class,
|
||||||
(state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data))
|
(state, cmd) -> Effect().persist(new IntentRecord(state.nextCorrelationId, cmd.data))
|
||||||
.andThen(() -> performSideEffect(ctx.getSelf().narrow(), state.nextCorrelationId, cmd.data, ctx.getSystem().scheduler())))
|
.thenRun(() -> performSideEffect(ctx.getSelf().narrow(), state.nextCorrelationId, cmd.data, ctx.getSystem().scheduler())))
|
||||||
.matchCommand(AcknowledgeSideEffect.class, (state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId)))
|
.matchCommand(AcknowledgeSideEffect.class, (state, command) -> Effect().persist(new SideEffectAcknowledged(command.correlationId)))
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -292,14 +292,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
||||||
.matchCommand(Timeout.class,
|
.matchCommand(Timeout.class,
|
||||||
(state, msg) -> Effect().persist(timeoutEvent))
|
(state, msg) -> Effect().persist(timeoutEvent))
|
||||||
.matchCommand(EmptyEventsListAndThenLog.class, (state, msg) -> Effect().persist(Collections.emptyList())
|
.matchCommand(EmptyEventsListAndThenLog.class, (state, msg) -> Effect().persist(Collections.emptyList())
|
||||||
.andThen(s -> loggingProbe.tell(loggingOne)))
|
.thenRun(s -> loggingProbe.tell(loggingOne)))
|
||||||
.matchCommand(StopThenLog.class,
|
.matchCommand(StopThenLog.class,
|
||||||
(state, msg) -> Effect().stop()
|
(state, msg) -> Effect().stop()
|
||||||
.andThen(s -> loggingProbe.tell(loggingOne)))
|
.thenRun(s -> loggingProbe.tell(loggingOne)))
|
||||||
.matchCommand(IncrementTwiceAndLog.class,
|
.matchCommand(IncrementTwiceAndLog.class,
|
||||||
(state, msg) -> Effect().persist(
|
(state, msg) -> Effect().persist(
|
||||||
Arrays.asList(new Incremented(1), new Incremented(1)))
|
Arrays.asList(new Incremented(1), new Incremented(1)))
|
||||||
.andThen(s -> loggingProbe.tell(loggingOne)))
|
.thenRun(s -> loggingProbe.tell(loggingOne)))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -88,7 +88,7 @@ public class AccountExample extends PersistentBehavior<AccountExample.AccountCom
|
||||||
return Effect().unhandled(); // TODO replies are missing in this example
|
return Effect().unhandled(); // TODO replies are missing in this example
|
||||||
} else {
|
} else {
|
||||||
return Effect().persist(new Withdrawn(cmd.amount))
|
return Effect().persist(new Withdrawn(cmd.amount))
|
||||||
.andThen(acc2 -> { // FIXME in scaladsl it's named thenRun, change javadsl also?
|
.thenRun(acc2 -> { // FIXME in scaladsl it's named thenRun, change javadsl also?
|
||||||
// we know this cast is safe, but somewhat ugly
|
// we know this cast is safe, but somewhat ugly
|
||||||
OpenedAccount openAccount = (OpenedAccount) acc2;
|
OpenedAccount openAccount = (OpenedAccount) acc2;
|
||||||
// do some side-effect using balance
|
// do some side-effect using balance
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,6 @@ import akka.persistence.typed.javadsl.CommandHandlerBuilder;
|
||||||
import akka.persistence.typed.javadsl.EventHandler;
|
import akka.persistence.typed.javadsl.EventHandler;
|
||||||
import akka.persistence.typed.javadsl.PersistentBehavior;
|
import akka.persistence.typed.javadsl.PersistentBehavior;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
public class BlogPostExample {
|
public class BlogPostExample {
|
||||||
|
|
||||||
//#event
|
//#event
|
||||||
|
|
@ -168,7 +166,7 @@ public class BlogPostExample {
|
||||||
//#reply
|
//#reply
|
||||||
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
||||||
return Effect().persist(event)
|
return Effect().persist(event)
|
||||||
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
||||||
//#reply
|
//#reply
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -179,10 +177,10 @@ public class BlogPostExample {
|
||||||
return commandHandlerBuilder(DraftState.class)
|
return commandHandlerBuilder(DraftState.class)
|
||||||
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
||||||
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
|
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
|
||||||
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
|
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
|
||||||
})
|
})
|
||||||
.matchCommand(Publish.class, (state, cmd) -> Effect()
|
.matchCommand(Publish.class, (state, cmd) -> Effect()
|
||||||
.persist(new Published(state.postId())).andThen(() -> {
|
.persist(new Published(state.postId())).thenRun(() -> {
|
||||||
System.out.println("Blog post published: " + state.postId());
|
System.out.println("Blog post published: " + state.postId());
|
||||||
cmd.replyTo.tell(Done.getInstance());
|
cmd.replyTo.tell(Done.getInstance());
|
||||||
}))
|
}))
|
||||||
|
|
@ -196,7 +194,7 @@ public class BlogPostExample {
|
||||||
return commandHandlerBuilder(PublishedState.class)
|
return commandHandlerBuilder(PublishedState.class)
|
||||||
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
||||||
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
|
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
|
||||||
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
|
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
|
||||||
})
|
})
|
||||||
.matchCommand(GetPost.class, (state, cmd) -> {
|
.matchCommand(GetPost.class, (state, cmd) -> {
|
||||||
cmd.replyTo.tell(state.postContent);
|
cmd.replyTo.tell(state.postContent);
|
||||||
|
|
|
||||||
|
|
@ -127,7 +127,7 @@ public class NullBlogState {
|
||||||
.matchCommand(AddPost.class, cmd -> {
|
.matchCommand(AddPost.class, cmd -> {
|
||||||
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
||||||
return Effect().persist(event)
|
return Effect().persist(event)
|
||||||
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
||||||
})
|
})
|
||||||
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
|
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
|
||||||
}
|
}
|
||||||
|
|
@ -136,10 +136,10 @@ public class NullBlogState {
|
||||||
return commandHandlerBuilder(Objects::nonNull)
|
return commandHandlerBuilder(Objects::nonNull)
|
||||||
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
||||||
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
|
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
|
||||||
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
|
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
|
||||||
})
|
})
|
||||||
.matchCommand(Publish.class, (state, cmd) -> Effect()
|
.matchCommand(Publish.class, (state, cmd) -> Effect()
|
||||||
.persist(new Published(state.postId())).andThen(() -> {
|
.persist(new Published(state.postId())).thenRun(() -> {
|
||||||
System.out.println("Blog post published: " + state.postId());
|
System.out.println("Blog post published: " + state.postId());
|
||||||
cmd.replyTo.tell(Done.getInstance());
|
cmd.replyTo.tell(Done.getInstance());
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
|
|
@ -127,7 +127,7 @@ public class OptionalBlogState {
|
||||||
.matchCommand(AddPost.class, (state, cmd) -> {
|
.matchCommand(AddPost.class, (state, cmd) -> {
|
||||||
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
|
||||||
return Effect().persist(event)
|
return Effect().persist(event)
|
||||||
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
.thenRun(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
|
||||||
})
|
})
|
||||||
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
|
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
|
||||||
}
|
}
|
||||||
|
|
@ -136,10 +136,10 @@ public class OptionalBlogState {
|
||||||
return commandHandlerBuilder(state -> state.isPresent())
|
return commandHandlerBuilder(state -> state.isPresent())
|
||||||
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
.matchCommand(ChangeBody.class, (state, cmd) -> {
|
||||||
BodyChanged event = new BodyChanged(state.get().postId(), cmd.newBody);
|
BodyChanged event = new BodyChanged(state.get().postId(), cmd.newBody);
|
||||||
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
|
return Effect().persist(event).thenRun(() -> cmd.replyTo.tell(Done.getInstance()));
|
||||||
})
|
})
|
||||||
.matchCommand(Publish.class, (state, cmd) -> Effect()
|
.matchCommand(Publish.class, (state, cmd) -> Effect()
|
||||||
.persist(new Published(state.get().postId())).andThen(() -> {
|
.persist(new Published(state.get().postId())).thenRun(() -> {
|
||||||
System.out.println("Blog post published: " + state.get().postId());
|
System.out.println("Blog post published: " + state.get().postId());
|
||||||
cmd.replyTo.tell(Done.getInstance());
|
cmd.replyTo.tell(Done.getInstance());
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ object RecoveryPermitterSpec {
|
||||||
persistenceId = PersistenceId(name),
|
persistenceId = PersistenceId(name),
|
||||||
emptyState = EmptyState,
|
emptyState = EmptyState,
|
||||||
commandHandler = CommandHandler.command {
|
commandHandler = CommandHandler.command {
|
||||||
case StopActor ⇒ Effect.stop
|
case StopActor ⇒ Effect.stop()
|
||||||
case command ⇒ commandProbe.ref ! command; Effect.none
|
case command ⇒ commandProbe.ref ! command; Effect.none
|
||||||
},
|
},
|
||||||
eventHandler = { (state, event) ⇒ eventProbe.ref ! event; state }
|
eventHandler = { (state, event) ⇒ eventProbe.ref ! event; state }
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.co
|
||||||
emptyState = null,
|
emptyState = null,
|
||||||
commandHandler = (_, command) ⇒ {
|
commandHandler = (_, command) ⇒ {
|
||||||
if (command == "stop")
|
if (command == "stop")
|
||||||
Effect.stop
|
Effect.stop()
|
||||||
else
|
else
|
||||||
Effect.persist(command)
|
Effect.persist(command)
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -398,7 +398,7 @@ object PersistentActorCompileOnlyTest {
|
||||||
case Enough ⇒
|
case Enough ⇒
|
||||||
Effect.persist(Done)
|
Effect.persist(Done)
|
||||||
.thenRun((_: State) ⇒ println("yay"))
|
.thenRun((_: State) ⇒ println("yay"))
|
||||||
.andThenStop
|
.thenStop
|
||||||
}
|
}
|
||||||
|
|
||||||
private val eventHandler: (State, Event) ⇒ State = {
|
private val eventHandler: (State, Event) ⇒ State = {
|
||||||
|
|
|
||||||
|
|
@ -139,14 +139,14 @@ object PersistentBehaviorSpec {
|
||||||
.thenRun { (_: State) ⇒
|
.thenRun { (_: State) ⇒
|
||||||
loggingActor ! firstLogging
|
loggingActor ! firstLogging
|
||||||
}
|
}
|
||||||
.andThenStop
|
.thenStop
|
||||||
|
|
||||||
case IncrementTwiceThenLogThenStop ⇒
|
case IncrementTwiceThenLogThenStop ⇒
|
||||||
Effect.persist(Incremented(1), Incremented(2))
|
Effect.persist(Incremented(1), Incremented(2))
|
||||||
.thenRun { (_: State) ⇒
|
.thenRun { (_: State) ⇒
|
||||||
loggingActor ! firstLogging
|
loggingActor ! firstLogging
|
||||||
}
|
}
|
||||||
.andThenStop
|
.thenStop
|
||||||
|
|
||||||
case IncrementWithPersistAll(n) ⇒
|
case IncrementWithPersistAll(n) ⇒
|
||||||
Effect.persist((0 until n).map(_ ⇒ Incremented(1)))
|
Effect.persist((0 until n).map(_ ⇒ Incremented(1)))
|
||||||
|
|
@ -210,7 +210,7 @@ object PersistentBehaviorSpec {
|
||||||
.thenRun { _ ⇒
|
.thenRun { _ ⇒
|
||||||
loggingActor ! firstLogging
|
loggingActor ! firstLogging
|
||||||
}
|
}
|
||||||
.andThenStop
|
.thenStop
|
||||||
},
|
},
|
||||||
eventHandler = (state, evt) ⇒ evt match {
|
eventHandler = (state, evt) ⇒ evt match {
|
||||||
case Incremented(delta) ⇒
|
case Incremented(delta) ⇒
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.co
|
||||||
emptyState = 0,
|
emptyState = 0,
|
||||||
commandHandler = (_, command) ⇒ {
|
commandHandler = (_, command) ⇒ {
|
||||||
if (command < 0)
|
if (command < 0)
|
||||||
Effect.stop
|
Effect.stop()
|
||||||
else
|
else
|
||||||
Effect.persist(command)
|
Effect.persist(command)
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -70,7 +70,7 @@ object BlogPostExample {
|
||||||
|
|
||||||
case BlankState ⇒ command match {
|
case BlankState ⇒ command match {
|
||||||
case cmd: AddPost ⇒ addPost(cmd)
|
case cmd: AddPost ⇒ addPost(cmd)
|
||||||
case PassivatePost ⇒ Effect.stop
|
case PassivatePost ⇒ Effect.stop()
|
||||||
case _ ⇒ Effect.unhandled
|
case _ ⇒ Effect.unhandled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -79,12 +79,12 @@ object BlogPostExample {
|
||||||
case Publish(replyTo) ⇒ publish(draftState, replyTo)
|
case Publish(replyTo) ⇒ publish(draftState, replyTo)
|
||||||
case GetPost(replyTo) ⇒ getPost(draftState, replyTo)
|
case GetPost(replyTo) ⇒ getPost(draftState, replyTo)
|
||||||
case _: AddPost ⇒ Effect.unhandled
|
case _: AddPost ⇒ Effect.unhandled
|
||||||
case PassivatePost ⇒ Effect.stop
|
case PassivatePost ⇒ Effect.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
case publishedState: PublishedState ⇒ command match {
|
case publishedState: PublishedState ⇒ command match {
|
||||||
case GetPost(replyTo) ⇒ getPost(publishedState, replyTo)
|
case GetPost(replyTo) ⇒ getPost(publishedState, replyTo)
|
||||||
case PassivatePost ⇒ Effect.stop
|
case PassivatePost ⇒ Effect.stop()
|
||||||
case _ ⇒ Effect.unhandled
|
case _ ⇒ Effect.unhandled
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue