Add overloaded Effect.persist and renaming Effect.done to none #23964

This commit is contained in:
Renato Cavalcanti 2017-11-14 16:48:10 +01:00 committed by Johan Andrén
parent 09d79d5981
commit f92e1c16e7
5 changed files with 128 additions and 32 deletions

View file

@ -57,7 +57,7 @@ object ClusterShardingPersistenceSpec {
case Add(s) Effect.persist(s) case Add(s) Effect.persist(s)
case Get(replyTo) case Get(replyTo)
replyTo ! state replyTo ! state
Effect.done Effect.none
case StopPlz Effect.stop case StopPlz Effect.stop
}), }),
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt) eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)

View file

@ -143,13 +143,13 @@ object PersistentActorCompileOnlyTest {
case Happy CommandHandler.command { case Happy CommandHandler.command {
case Greet(whom) case Greet(whom)
println(s"Super happy to meet you $whom!") println(s"Super happy to meet you $whom!")
Effect.done Effect.none
case MoodSwing Effect.persist(MoodChanged(Sad)) case MoodSwing Effect.persist(MoodChanged(Sad))
} }
case Sad CommandHandler.command { case Sad CommandHandler.command {
case Greet(whom) case Greet(whom)
println(s"hi $whom") println(s"hi $whom")
Effect.done Effect.none
case MoodSwing Effect.persist(MoodChanged(Happy)) case MoodSwing Effect.persist(MoodChanged(Happy))
} }
}, },
@ -299,7 +299,8 @@ object PersistentActorCompileOnlyTest {
val adapt = ctx.spawnAdapter((m: MetaData) GotMetaData(m)) val adapt = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
def addItem(id: Id, self: ActorRef[Command]) = def addItem(id: Id, self: ActorRef[Command]) =
Persist[Event, List[Id]](ItemAdded(id)) Effect
.persist[Event, List[Id]](ItemAdded(id))
.andThen(metadataRegistry ! GetMetaData(id, adapt)) .andThen(metadataRegistry ! GetMetaData(id, adapt))
PersistentActor.immutable[Command, Event, List[Id]]( PersistentActor.immutable[Command, Event, List[Id]](
@ -313,10 +314,10 @@ object PersistentActorCompileOnlyTest {
case RemoveItem(id) Effect.persist(ItemRemoved(id)) case RemoveItem(id) Effect.persist(ItemRemoved(id))
case GotMetaData(data) case GotMetaData(data)
basket = basket.updatedWith(data) basket = basket.updatedWith(data)
Effect.done Effect.none
case GetTotalPrice(sender) case GetTotalPrice(sender)
sender ! basket.items.map(_.price).sum sender ! basket.items.map(_.price).sum
Effect.done Effect.none
} }
} }
else CommandHandler { (ctx, state, cmd) else CommandHandler { (ctx, state, cmd)
@ -329,10 +330,10 @@ object PersistentActorCompileOnlyTest {
stash.foreach(ctx.self ! _) stash.foreach(ctx.self ! _)
stash = Nil stash = Nil
} }
Effect.done Effect.none
case cmd: GetTotalPrice case cmd: GetTotalPrice
stash :+= cmd stash :+= cmd
Effect.done Effect.none
} }
}), }),
eventHandler = (state, evt) evt match { eventHandler = (state, evt) evt match {
@ -362,7 +363,7 @@ object PersistentActorCompileOnlyTest {
case class Remembered(memory: String) extends Event case class Remembered(memory: String) extends Event
def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] = def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] =
if (currentState == newMood) Effect.done if (currentState == newMood) Effect.none
else Effect.persist(MoodChanged(newMood)) else Effect.persist(MoodChanged(newMood))
PersistentActor.immutable[Command, Event, Mood]( PersistentActor.immutable[Command, Event, Mood](
@ -372,7 +373,7 @@ object PersistentActorCompileOnlyTest {
cmd match { cmd match {
case Greet(whom) case Greet(whom)
println(s"Hi there, I'm $state!") println(s"Hi there, I'm $state!")
Effect.done Effect.none
case CheerUp(sender) case CheerUp(sender)
changeMoodIfNeeded(state, Happy) changeMoodIfNeeded(state, Happy)
.andThen { sender ! Ack } .andThen { sender ! Ack }
@ -380,7 +381,7 @@ object PersistentActorCompileOnlyTest {
// A more elaborate example to show we still have full control over the effects // A more elaborate example to show we still have full control over the effects
// if needed (e.g. when some logic is factored out but you want to add more effects) // if needed (e.g. when some logic is factored out but you want to add more effects)
val commonEffects = changeMoodIfNeeded(state, Happy) val commonEffects = changeMoodIfNeeded(state, Happy)
Effect.persistAll(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) Effect.persist(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects)
} }
}, },

View file

@ -31,6 +31,9 @@ object PersistentActorSpec {
final case object Increment extends Command final case object Increment extends Command
final case object IncrementLater extends Command final case object IncrementLater extends Command
final case object IncrementAfterReceiveTimeout extends Command final case object IncrementAfterReceiveTimeout extends Command
final case object IncrementTwiceAndThenLog extends Command
final case object DoNothingAndThenLog extends Command
final case object EmptyEventsListAndThenLog extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command final case class GetValue(replyTo: ActorRef[State]) extends Command
private case object Timeout extends Command private case object Timeout extends Command
@ -41,7 +44,14 @@ object PersistentActorSpec {
case object Tick case object Tick
def counter(persistenceId: String): Behavior[Command] = { val firstLogging = "first logging"
val secondLogging = "second logging"
def counter(persistenceId: String)(implicit actorSystem: ActorSystem[TypedSpec.Command], testSettings: TestKitSettings): Behavior[Command] =
counter(persistenceId, TestProbe[String].ref)
def counter(persistenceId: String, loggingActor: ActorRef[String]): Behavior[Command] = {
PersistentActor.immutable[Command, Event, State]( PersistentActor.immutable[Command, Event, State](
persistenceId, persistenceId,
initialState = State(0, Vector.empty), initialState = State(0, Vector.empty),
@ -50,7 +60,7 @@ object PersistentActorSpec {
Effect.persist(Incremented(1)) Effect.persist(Incremented(1))
case GetValue(replyTo) case GetValue(replyTo)
replyTo ! state replyTo ! state
Effect.done Effect.none
case IncrementLater case IncrementLater
// purpose is to test signals // purpose is to test signals
val delay = ctx.spawnAnonymous(Actor.withTimers[Tick.type] { timers val delay = ctx.spawnAnonymous(Actor.withTimers[Tick.type] { timers
@ -60,13 +70,37 @@ object PersistentActorSpec {
}) })
}) })
ctx.watch(delay) ctx.watch(delay)
Effect.done Effect.none
case IncrementAfterReceiveTimeout case IncrementAfterReceiveTimeout
ctx.setReceiveTimeout(10.millis, Timeout) ctx.setReceiveTimeout(10.millis, Timeout)
Effect.done Effect.none
case Timeout case Timeout
ctx.cancelReceiveTimeout() ctx.cancelReceiveTimeout()
Effect.persist(Incremented(100)) Effect.persist(Incremented(100))
case IncrementTwiceAndThenLog
Effect
.persist(Incremented(1), Incremented(1))
.andThen {
loggingActor ! firstLogging
}
.andThen {
loggingActor ! secondLogging
}
case EmptyEventsListAndThenLog
Effect
.persist(List.empty) // send empty list of events
.andThen {
loggingActor ! firstLogging
}
case DoNothingAndThenLog
Effect
.none
.andThen {
loggingActor ! firstLogging
}
}) })
.onSignal { .onSignal {
case (_, _, Terminated(_)) case (_, _, Terminated(_))
@ -140,6 +174,48 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve
} }
} }
/**
* Verify that all side-effects callbacks are called (in order) and only once.
* The [[IncrementTwiceAndThenLog]] command will emit two Increment events
*/
def `chainable side effects with events`(): Unit = {
val loggingProbe = TestProbe[String]
val c = start(counter("c5", loggingProbe.ref))
val probe = TestProbe[State]
c ! IncrementTwiceAndThenLog
c ! GetValue(probe.ref)
probe.expectMsg(State(2, Vector(0, 1)))
loggingProbe.expectMsg(firstLogging)
loggingProbe.expectMsg(secondLogging)
}
/** Proves that side-effects are called when emitting an empty list of events */
def `chainable side effects without events`(): Unit = {
val loggingProbe = TestProbe[String]
val c = start(counter("c6", loggingProbe.ref))
val probe = TestProbe[State]
c ! EmptyEventsListAndThenLog
c ! GetValue(probe.ref)
probe.expectMsg(State(0, Vector.empty))
loggingProbe.expectMsg(firstLogging)
}
/** Proves that side-effects are called when explicitly calling Effect.none */
def `chainable side effects when doing nothing (Effect.none)`(): Unit = {
val loggingProbe = TestProbe[String]
val c = start(counter("c7", loggingProbe.ref))
val probe = TestProbe[State]
c ! DoNothingAndThenLog
c ! GetValue(probe.ref)
probe.expectMsg(State(0, Vector.empty))
loggingProbe.expectMsg(firstLogging)
}
def `work when wrapped in other behavior`(): Unit = { def `work when wrapped in other behavior`(): Unit = {
// FIXME This is a major problem with current implementation. Since the // FIXME This is a major problem with current implementation. Since the
// behavior is running as an untyped PersistentActor it's not possible to // behavior is running as an untyped PersistentActor it's not possible to

View file

@ -97,28 +97,37 @@ import akka.typed.internal.adapter.ActorRefAdapter
} catch { } catch {
case e: MatchError throw new IllegalStateException( case e: MatchError throw new IllegalStateException(
s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " + s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " +
s"in [${behavior.getClass.getName}] with persistenceId [${persistenceId}]") s"in [${behavior.getClass.getName}] with persistenceId [$persistenceId]")
} }
} }
private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match { private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match {
case CompositeEffect(Some(persist), sideEffects) case CompositeEffect(Some(persist), currentSideEffects)
applyEffects(msg, persist, sideEffects) applyEffects(msg, persist, currentSideEffects ++ sideEffects)
case CompositeEffect(_, sideEffects) case CompositeEffect(_, currentSideEffects)
sideEffects.foreach(applySideEffect) (currentSideEffects ++ sideEffects).foreach(applySideEffect)
case Persist(event) case Persist(event)
// apply the event before persist so that validation exception is handled before persisting // apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler. // the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
state = applyEvent(state, event) state = applyEvent(state, event)
persist(event) { _ persist(event) { _
sideEffects.foreach(applySideEffect) sideEffects.foreach(applySideEffect)
} }
case PersistAll(events) case PersistAll(events)
if (events.nonEmpty) {
// apply the event before persist so that validation exception is handled before persisting // apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler. // the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
var count = events.size
state = events.foldLeft(state)(applyEvent) state = events.foldLeft(state)(applyEvent)
persistAll(scala.collection.immutable.Seq(events)) { _ persistAll(events) { _
count -= 1
if (count == 0) sideEffects.foreach(applySideEffect)
}
} else {
// run side-effects even when no events are emitted
sideEffects.foreach(applySideEffect) sideEffects.foreach(applySideEffect)
} }
case _: PersistNothing.type @unchecked case _: PersistNothing.type @unchecked

View file

@ -41,19 +41,29 @@ object PersistentActor {
* Factories for effects - how a persitent actor reacts on a command * Factories for effects - how a persitent actor reacts on a command
*/ */
object Effect { object Effect {
def persist[Event, State](event: Event): Effect[Event, State] = def persist[Event, State](event: Event): Effect[Event, State] =
new Persist[Event, State](event) Persist(event)
def persistAll[Event, State](events: im.Seq[Event]): Effect[Event, State] = def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] =
new PersistAll[Event, State](events) persist(evt1 :: evt2 :: events.toList)
def persistAll[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] =
eventOpt match {
case Some(evt) persist[Event, State](evt)
case _ none[Event, State]
}
def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] =
PersistAll(events)
def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] =
new CompositeEffect[Event, State](Some(new PersistAll[Event, State](events)), sideEffects) new CompositeEffect[Event, State](Some(new PersistAll[Event, State](events)), sideEffects)
/** /**
* Do not persist anything * Do not persist anything
*/ */
def done[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
/** /**
* This command is not handled, but it is not an error that it isn't. * This command is not handled, but it is not an error that it isn't.
@ -64,7 +74,6 @@ object PersistentActor {
* Stop this persistent actor * Stop this persistent actor
*/ */
def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]]
} }
/** /**
@ -98,7 +107,8 @@ object PersistentActor {
def apply[Event, State](effect: Effect[Event, State], sideEffects: ChainableEffect[Event, State]): Effect[Event, State] = def apply[Event, State](effect: Effect[Event, State], sideEffects: ChainableEffect[Event, State]): Effect[Event, State] =
CompositeEffect[Event, State]( CompositeEffect[Event, State](
if (effect.events.isEmpty) None else Some(effect), if (effect.events.isEmpty) None else Some(effect),
sideEffects :: Nil) sideEffects :: Nil
)
} }
@InternalApi @InternalApi
@ -116,7 +126,7 @@ object PersistentActor {
@InternalApi @InternalApi
private[akka] case class Persist[Event, State](event: Event) extends Effect[Event, State] { private[akka] case class Persist[Event, State](event: Event) extends Effect[Event, State] {
override val events = event :: Nil override def events = event :: Nil
} }
@InternalApi @InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State] private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State]