From 8c0d81a37474d867cb61589a9a4ec242b64054a8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 21 Sep 2020 16:35:19 +0200 Subject: [PATCH] support async reply and noReply in EventSourcedBehaviorTestKit, #29602 #29581 --- .../EventSourcedBehaviorTestKitImpl.scala | 80 ++++++++++++------- .../javadsl/EventSourcedBehaviorTestKit.scala | 6 ++ .../EventSourcedBehaviorTestKit.scala | 5 ++ .../EventSourcedBehaviorTestKitSpec.scala | 47 ++++++++++- 4 files changed, 106 insertions(+), 32 deletions(-) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala index 02bea0a62d..c25ea0d52e 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala @@ -101,36 +101,15 @@ import akka.persistence.typed.internal.EventSourcedBehaviorImpl private var emptyStateVerified = false override def runCommand(command: Command): CommandResult[Command, Event, State] = { - if (serializationSettings.enabled && serializationSettings.verifyCommands) - verifySerializationAndThrow(command, "Command") - - if (serializationSettings.enabled && !emptyStateVerified) { - internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref) - val emptyState = stateProbe.receiveMessage() - verifySerializationAndThrow(emptyState, "Empty State") - emptyStateVerified = true - } - - // FIXME we can expand the api of persistenceTestKit to read from storage from a seqNr instead - val oldEvents = - persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]) + preCommandCheck(command) + val oldEvents = getEvents(dropOldEvents = 0) actor ! command - internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref) - val newState = stateProbe.receiveMessage() + val newState = getState() + val newEvents = getEvents(oldEvents.size) - val newEvents = - persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]).drop(oldEvents.size) - - if (serializationSettings.enabled) { - if (serializationSettings.verifyEvents) { - newEvents.foreach(verifySerializationAndThrow(_, "Event")) - } - - if (serializationSettings.verifyState) - verifySerializationAndThrow(newState, "State") - } + postCommandCheck(newEvents, newState, reply = None) CommandResultImpl[Command, Event, State, Nothing](command, newEvents, newState, None) } @@ -138,7 +117,10 @@ import akka.persistence.typed.internal.EventSourcedBehaviorImpl override def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R] = { val replyProbe = actorTestKit.createTestProbe[R]() val command = creator(replyProbe.ref) - val result = runCommand(command) + preCommandCheck(command) + val oldEvents = getEvents(dropOldEvents = 0) + + actor ! command val reply = try { replyProbe.receiveMessage() @@ -149,10 +131,48 @@ import akka.persistence.typed.internal.EventSourcedBehaviorImpl replyProbe.stop() } - if (serializationSettings.enabled && serializationSettings.verifyCommands) - verifySerializationAndThrow(reply, "Reply") + val newState = getState() + val newEvents = getEvents(oldEvents.size) - CommandResultImpl[Command, Event, State, R](result.command, result.events, result.state, Some(reply)) + postCommandCheck(newEvents, newState, Some(reply)) + + CommandResultImpl[Command, Event, State, R](command, newEvents, newState, Some(reply)) + } + + private def getEvents[R](dropOldEvents: Int): immutable.Seq[Event] = { + // FIXME we can expand the api of persistenceTestKit to read from storage from a seqNr instead + persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]).drop(dropOldEvents) + } + + override def getState(): State = { + internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref) + stateProbe.receiveMessage() + } + + private def preCommandCheck(command: Command): Unit = { + if (serializationSettings.enabled && serializationSettings.verifyCommands) + verifySerializationAndThrow(command, "Command") + + if (serializationSettings.enabled && !emptyStateVerified) { + val emptyState = getState() + verifySerializationAndThrow(emptyState, "Empty State") + emptyStateVerified = true + } + } + + private def postCommandCheck(newEvents: immutable.Seq[Event], newState: State, reply: Option[Any]): Unit = { + if (serializationSettings.enabled) { + if (serializationSettings.verifyEvents) { + newEvents.foreach(verifySerializationAndThrow(_, "Event")) + } + + if (serializationSettings.verifyState) + verifySerializationAndThrow(newState, "State") + + if (serializationSettings.verifyCommands) { + reply.foreach(verifySerializationAndThrow(_, "Reply")) + } + } } override def restart(): RestartResult[State] = { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala index 3446225e76..98d9914c63 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala @@ -223,6 +223,12 @@ final class EventSourcedBehaviorTestKit[Command, Event, State]( def runCommand[R](creator: JFunction[ActorRef[R], Command]): CommandResultWithReply[Command, Event, State, R] = new CommandResultWithReply(delegate.runCommand(replyTo => creator.apply(replyTo))) + /** + * Retrieve the current state of the Behavior. + */ + def getState(): State = + delegate.getState() + /** * Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing * that the recovery is correct. diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala index 2aed301b88..1a19cce956 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala @@ -204,6 +204,11 @@ object EventSourcedBehaviorTestKit { */ def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R] + /** + * Retrieve the current state of the Behavior. + */ + def getState(): State + /** * Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing * that the recovery is correct. diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala index 38ce74b8ea..6d8a470b40 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala @@ -24,7 +24,6 @@ import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.serialization.DisabledJavaSerializer import akka.serialization.jackson.CborSerializable -import akka.util.unused object EventSourcedBehaviorTestKitSpec { @@ -32,9 +31,13 @@ object EventSourcedBehaviorTestKitSpec { sealed trait Command case object Increment extends Command with CborSerializable final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command with CborSerializable + final case class IncrementWithNoReply(replyTo: ActorRef[Done]) extends Command with CborSerializable + final case class IncrementWithAsyncReply(replyTo: ActorRef[Done]) extends Command with CborSerializable case class IncrementSeveral(n: Int) extends Command with CborSerializable final case class GetValue(replyTo: ActorRef[State]) extends Command with CborSerializable + private case class AsyncReply(replyTo: ActorRef[Done]) extends Command with CborSerializable + sealed trait Event final case class Incremented(delta: Int) extends Event with CborSerializable @@ -62,7 +65,7 @@ object EventSourcedBehaviorTestKitSpec { Behaviors.setup(ctx => counter(ctx, persistenceId, emptyState)) private def counter( - @unused ctx: ActorContext[Command], + ctx: ActorContext[Command], persistenceId: PersistenceId, emptyState: State): EventSourcedBehavior[Command, Event, State] = { EventSourcedBehavior.withEnforcedReplies[Command, Event, State]( @@ -76,6 +79,16 @@ object EventSourcedBehaviorTestKitSpec { case IncrementWithConfirmation(replyTo) => Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done) + case IncrementWithAsyncReply(replyTo) => + ctx.self ! AsyncReply(replyTo) + Effect.noReply + + case AsyncReply(replyTo) => + Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done) + + case IncrementWithNoReply(_) => + Effect.persist(Incremented(1)).thenNoReply() + case IncrementSeveral(n: Int) => val events = (1 to n).map(_ => Incremented(1)) Effect.persist(events).thenNoReply() @@ -187,6 +200,36 @@ class EventSourcedBehaviorTestKitSpec result.state should ===(TestCounter.RealState(1, Vector(0))) } + "run command with async reply" in { + val eventSourcedTestKit = createTestKit() + val result = eventSourcedTestKit.runCommand[Done](replyTo => TestCounter.IncrementWithAsyncReply(replyTo)) + result.event should ===(TestCounter.Incremented(1)) + result.reply should ===(Done) + result.state should ===(TestCounter.RealState(1, Vector(0))) + } + + "run command with no reply" in { + // This is a fictive usage scenario. If the command has a replyTo the Behavior should (eventually) reply, + // but here we verify that it doesn't reply. + val eventSourcedTestKit = createTestKit() + val replyToProbe = createTestProbe[Done]() + val result = eventSourcedTestKit.runCommand(TestCounter.IncrementWithNoReply(replyToProbe.ref)) + result.event should ===(TestCounter.Incremented(1)) + result.state should ===(TestCounter.RealState(1, Vector(0))) + replyToProbe.expectNoMessage() + } + + "give access to current state" in { + val eventSourcedTestKit = createTestKit() + + // initial state + eventSourcedTestKit.getState() should ===(TestCounter.RealState(0, Vector.empty)) + + // state after command + eventSourcedTestKit.runCommand(TestCounter.Increment) + eventSourcedTestKit.getState() should ===(TestCounter.RealState(1, Vector(0))) + } + "detect non-serializable events" in { val eventSourcedTestKit = createTestKit()