From f92e1c16e70782196e8fcd705f843ab8b562146b Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Tue, 14 Nov 2017 16:48:10 +0100 Subject: [PATCH] Add overloaded Effect.persist and renaming Effect.done to none #23964 --- .../ClusterShardingPersistenceSpec.scala | 2 +- .../PersistentActorCompileOnlyTest.scala | 21 ++--- .../scaladsl/PersistentActorSpec.scala | 84 ++++++++++++++++++- .../internal/PersistentActorImpl.scala | 27 ++++-- .../scaladsl/PersistentActor.scala | 26 ++++-- 5 files changed, 128 insertions(+), 32 deletions(-) diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala index 5c55e7ebbf..6ba51a7f2f 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingPersistenceSpec.scala @@ -57,7 +57,7 @@ object ClusterShardingPersistenceSpec { case Add(s) ⇒ Effect.persist(s) case Get(replyTo) ⇒ replyTo ! state - Effect.done + Effect.none case StopPlz ⇒ Effect.stop }), eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt) diff --git a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala index 9fec9205fe..4e116c3cc4 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorCompileOnlyTest.scala @@ -143,13 +143,13 @@ object PersistentActorCompileOnlyTest { case Happy ⇒ CommandHandler.command { case Greet(whom) ⇒ println(s"Super happy to meet you $whom!") - Effect.done + Effect.none case MoodSwing ⇒ Effect.persist(MoodChanged(Sad)) } case Sad ⇒ CommandHandler.command { case Greet(whom) ⇒ println(s"hi $whom") - Effect.done + Effect.none case MoodSwing ⇒ Effect.persist(MoodChanged(Happy)) } }, @@ -299,7 +299,8 @@ object PersistentActorCompileOnlyTest { val adapt = ctx.spawnAdapter((m: MetaData) ⇒ GotMetaData(m)) def addItem(id: Id, self: ActorRef[Command]) = - Persist[Event, List[Id]](ItemAdded(id)) + Effect + .persist[Event, List[Id]](ItemAdded(id)) .andThen(metadataRegistry ! GetMetaData(id, adapt)) PersistentActor.immutable[Command, Event, List[Id]]( @@ -313,10 +314,10 @@ object PersistentActorCompileOnlyTest { case RemoveItem(id) ⇒ Effect.persist(ItemRemoved(id)) case GotMetaData(data) ⇒ basket = basket.updatedWith(data) - Effect.done + Effect.none case GetTotalPrice(sender) ⇒ sender ! basket.items.map(_.price).sum - Effect.done + Effect.none } } else CommandHandler { (ctx, state, cmd) ⇒ @@ -329,10 +330,10 @@ object PersistentActorCompileOnlyTest { stash.foreach(ctx.self ! _) stash = Nil } - Effect.done + Effect.none case cmd: GetTotalPrice ⇒ stash :+= cmd - Effect.done + Effect.none } }), eventHandler = (state, evt) ⇒ evt match { @@ -362,7 +363,7 @@ object PersistentActorCompileOnlyTest { case class Remembered(memory: String) extends Event def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] = - if (currentState == newMood) Effect.done + if (currentState == newMood) Effect.none else Effect.persist(MoodChanged(newMood)) PersistentActor.immutable[Command, Event, Mood]( @@ -372,7 +373,7 @@ object PersistentActorCompileOnlyTest { cmd match { case Greet(whom) ⇒ println(s"Hi there, I'm $state!") - Effect.done + Effect.none case CheerUp(sender) ⇒ changeMoodIfNeeded(state, Happy) .andThen { sender ! Ack } @@ -380,7 +381,7 @@ object PersistentActorCompileOnlyTest { // A more elaborate example to show we still have full control over the effects // if needed (e.g. when some logic is factored out but you want to add more effects) val commonEffects = changeMoodIfNeeded(state, Happy) - Effect.persistAll(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) + Effect.persist(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects) } }, diff --git a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala index 3bd07189e8..cd411392dc 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/persistence/scaladsl/PersistentActorSpec.scala @@ -31,6 +31,9 @@ object PersistentActorSpec { final case object Increment extends Command final case object IncrementLater extends Command final case object IncrementAfterReceiveTimeout extends Command + final case object IncrementTwiceAndThenLog extends Command + final case object DoNothingAndThenLog extends Command + final case object EmptyEventsListAndThenLog extends Command final case class GetValue(replyTo: ActorRef[State]) extends Command private case object Timeout extends Command @@ -41,7 +44,14 @@ object PersistentActorSpec { case object Tick - def counter(persistenceId: String): Behavior[Command] = { + val firstLogging = "first logging" + val secondLogging = "second logging" + + def counter(persistenceId: String)(implicit actorSystem: ActorSystem[TypedSpec.Command], testSettings: TestKitSettings): Behavior[Command] = + counter(persistenceId, TestProbe[String].ref) + + def counter(persistenceId: String, loggingActor: ActorRef[String]): Behavior[Command] = { + PersistentActor.immutable[Command, Event, State]( persistenceId, initialState = State(0, Vector.empty), @@ -50,7 +60,7 @@ object PersistentActorSpec { Effect.persist(Incremented(1)) case GetValue(replyTo) ⇒ replyTo ! state - Effect.done + Effect.none case IncrementLater ⇒ // purpose is to test signals val delay = ctx.spawnAnonymous(Actor.withTimers[Tick.type] { timers ⇒ @@ -60,13 +70,37 @@ object PersistentActorSpec { }) }) ctx.watch(delay) - Effect.done + Effect.none case IncrementAfterReceiveTimeout ⇒ ctx.setReceiveTimeout(10.millis, Timeout) - Effect.done + Effect.none case Timeout ⇒ ctx.cancelReceiveTimeout() Effect.persist(Incremented(100)) + + case IncrementTwiceAndThenLog ⇒ + Effect + .persist(Incremented(1), Incremented(1)) + .andThen { + loggingActor ! firstLogging + } + .andThen { + loggingActor ! secondLogging + } + + case EmptyEventsListAndThenLog ⇒ + Effect + .persist(List.empty) // send empty list of events + .andThen { + loggingActor ! firstLogging + } + + case DoNothingAndThenLog ⇒ + Effect + .none + .andThen { + loggingActor ! firstLogging + } }) .onSignal { case (_, _, Terminated(_)) ⇒ @@ -140,6 +174,48 @@ class PersistentActorSpec extends TypedSpec(PersistentActorSpec.config) with Eve } } + /** + * Verify that all side-effects callbacks are called (in order) and only once. + * The [[IncrementTwiceAndThenLog]] command will emit two Increment events + */ + def `chainable side effects with events`(): Unit = { + val loggingProbe = TestProbe[String] + val c = start(counter("c5", loggingProbe.ref)) + + val probe = TestProbe[State] + + c ! IncrementTwiceAndThenLog + c ! GetValue(probe.ref) + probe.expectMsg(State(2, Vector(0, 1))) + + loggingProbe.expectMsg(firstLogging) + loggingProbe.expectMsg(secondLogging) + } + + /** Proves that side-effects are called when emitting an empty list of events */ + def `chainable side effects without events`(): Unit = { + val loggingProbe = TestProbe[String] + val c = start(counter("c6", loggingProbe.ref)) + + val probe = TestProbe[State] + c ! EmptyEventsListAndThenLog + c ! GetValue(probe.ref) + probe.expectMsg(State(0, Vector.empty)) + loggingProbe.expectMsg(firstLogging) + } + + /** Proves that side-effects are called when explicitly calling Effect.none */ + def `chainable side effects when doing nothing (Effect.none)`(): Unit = { + val loggingProbe = TestProbe[String] + val c = start(counter("c7", loggingProbe.ref)) + + val probe = TestProbe[State] + c ! DoNothingAndThenLog + c ! GetValue(probe.ref) + probe.expectMsg(State(0, Vector.empty)) + loggingProbe.expectMsg(firstLogging) + } + def `work when wrapped in other behavior`(): Unit = { // FIXME This is a major problem with current implementation. Since the // behavior is running as an untyped PersistentActor it's not possible to diff --git a/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala b/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala index 1112ddc63e..5985571103 100644 --- a/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/persistence/internal/PersistentActorImpl.scala @@ -97,28 +97,37 @@ import akka.typed.internal.adapter.ActorRefAdapter } catch { case e: MatchError ⇒ throw new IllegalStateException( s"Undefined state [${state.getClass.getName}] or handler for [${msg.getClass.getName} " + - s"in [${behavior.getClass.getName}] with persistenceId [${persistenceId}]") + s"in [${behavior.getClass.getName}] with persistenceId [$persistenceId]") } } private def applyEffects(msg: Any, effect: Effect[E, S], sideEffects: Seq[ChainableEffect[_, S]] = Nil): Unit = effect match { - case CompositeEffect(Some(persist), sideEffects) ⇒ - applyEffects(msg, persist, sideEffects) - case CompositeEffect(_, sideEffects) ⇒ - sideEffects.foreach(applySideEffect) + case CompositeEffect(Some(persist), currentSideEffects) ⇒ + applyEffects(msg, persist, currentSideEffects ++ sideEffects) + case CompositeEffect(_, currentSideEffects) ⇒ + (currentSideEffects ++ sideEffects).foreach(applySideEffect) case Persist(event) ⇒ // apply the event before persist so that validation exception is handled before persisting // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event state = applyEvent(state, event) persist(event) { _ ⇒ sideEffects.foreach(applySideEffect) } case PersistAll(events) ⇒ - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - state = events.foldLeft(state)(applyEvent) - persistAll(scala.collection.immutable.Seq(events)) { _ ⇒ + if (events.nonEmpty) { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + var count = events.size + state = events.foldLeft(state)(applyEvent) + persistAll(events) { _ ⇒ + count -= 1 + if (count == 0) sideEffects.foreach(applySideEffect) + } + } else { + // run side-effects even when no events are emitted sideEffects.foreach(applySideEffect) } case _: PersistNothing.type @unchecked ⇒ diff --git a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala index a51bc6ef00..5f3292bcf9 100644 --- a/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala +++ b/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala @@ -41,19 +41,29 @@ object PersistentActor { * Factories for effects - how a persitent actor reacts on a command */ object Effect { + def persist[Event, State](event: Event): Effect[Event, State] = - new Persist[Event, State](event) + Persist(event) - def persistAll[Event, State](events: im.Seq[Event]): Effect[Event, State] = - new PersistAll[Event, State](events) + def persist[Event, A <: Event, B <: Event, State](evt1: A, evt2: B, events: Event*): Effect[Event, State] = + persist(evt1 :: evt2 :: events.toList) - def persistAll[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = + def persist[Event, State](eventOpt: Option[Event]): Effect[Event, State] = + eventOpt match { + case Some(evt) ⇒ persist[Event, State](evt) + case _ ⇒ none[Event, State] + } + + def persist[Event, State](events: im.Seq[Event]): Effect[Event, State] = + PersistAll(events) + + def persist[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] = new CompositeEffect[Event, State](Some(new PersistAll[Event, State](events)), sideEffects) /** * Do not persist anything */ - def done[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] + def none[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]] /** * This command is not handled, but it is not an error that it isn't. @@ -64,7 +74,6 @@ object PersistentActor { * Stop this persistent actor */ def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]] - } /** @@ -98,7 +107,8 @@ object PersistentActor { def apply[Event, State](effect: Effect[Event, State], sideEffects: ChainableEffect[Event, State]): Effect[Event, State] = CompositeEffect[Event, State]( if (effect.events.isEmpty) None else Some(effect), - sideEffects :: Nil) + sideEffects :: Nil + ) } @InternalApi @@ -116,7 +126,7 @@ object PersistentActor { @InternalApi private[akka] case class Persist[Event, State](event: Event) extends Effect[Event, State] { - override val events = event :: Nil + override def events = event :: Nil } @InternalApi private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State]