Allow initial state to be null while using EventSourcedBehaviorTestKit (#30823)

* Allow initial state to be null while using EventSourcedBehaviorTestKit
This commit is contained in:
Sebastian Alfers 2021-10-26 16:41:18 +02:00 committed by GitHub
parent 4d14c6f977
commit 659eb40146
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 36 additions and 16 deletions

View file

@ -8,7 +8,6 @@ import scala.collection.immutable
import scala.concurrent.Await
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.SerializationTestKit
import akka.actor.typed.ActorRef
@ -26,6 +25,7 @@ import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.Serializati
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.EventSourcedBehaviorImpl
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetStateReply
import akka.stream.scaladsl.Sink
/**
@ -99,7 +99,7 @@ import akka.stream.scaladsl.Sink
PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery](PersistenceTestKitReadJournal.Identifier)
private val probe = actorTestKit.createTestProbe[Any]()
private val stateProbe = actorTestKit.createTestProbe[State]()
private val stateProbe = actorTestKit.createTestProbe[GetStateReply[State]]()
private var actor: ActorRef[Command] = actorTestKit.spawn(behavior)
private def internalActor = actor.unsafeUpcast[Any]
private val persistenceId: PersistenceId = {
@ -175,7 +175,7 @@ import akka.stream.scaladsl.Sink
override def getState(): State = {
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
stateProbe.receiveMessage()
stateProbe.receiveMessage().currentState
}
private def preCommandCheck(command: Command): Unit = {
@ -212,7 +212,7 @@ import akka.stream.scaladsl.Sink
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
try {
val state = stateProbe.receiveMessage()
RestartResultImpl(state)
RestartResultImpl(state.currentState)
} catch {
case NonFatal(_) =>
throw new IllegalStateException("Could not restart. Maybe exception from event handler. See logs.")

View file

@ -6,11 +6,8 @@ package akka.persistence.testkit.javadsl
import java.util.{ List => JList }
import java.util.function.{ Function => JFunction }
import scala.reflect.ClassTag
import com.typesafe.config.Config
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior

View file

@ -6,10 +6,8 @@ package akka.persistence.testkit.scaladsl
import scala.collection.immutable
import scala.reflect.ClassTag
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem

View file

@ -5,9 +5,7 @@
package akka.persistence.testkit.scaladsl
import java.io.NotSerializableException
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
@ -17,7 +15,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKitSpec.TestCounter.NotSerializableState
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKitSpec.TestCounter.{ NotSerializableState, NullState }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.JournalFailureException
import akka.persistence.typed.scaladsl.Effect
@ -43,6 +41,7 @@ object EventSourcedBehaviorTestKitSpec {
sealed trait State
final case class RealState(value: Int, history: Vector[Int]) extends State with CborSerializable
final case class NullState() extends State with CborSerializable
case object IncrementWithNotSerializableEvent extends Command with CborSerializable
final case class NotSerializableEvent(delta: Int) extends Event
@ -120,6 +119,8 @@ object EventSourcedBehaviorTestKitSpec {
NotSerializableState(value + delta, history :+ value)
case (state: NotSerializableState, _) =>
throw new IllegalStateException(state.toString)
case (null, _) => NullState()
case (NullState(), _) => NullState()
})
}
}
@ -129,17 +130,30 @@ class EventSourcedBehaviorTestKitSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
with AnyWordSpecLike
with LogCapturing {
import EventSourcedBehaviorTestKitSpec._
private val persistenceId = PersistenceId.ofUniqueId("test")
private val behavior = TestCounter(persistenceId)
private def createTestKitNull() = {
EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](
system,
TestCounter(persistenceId, null))
}
private def createTestKit() = {
EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](system, behavior)
}
"EventSourcedBehaviorTestKit" must {
"handle null state" in {
val eventSourcedTestKit = createTestKitNull()
val result = eventSourcedTestKit.runCommand(TestCounter.Increment)
result.state shouldBe NullState()
}
"run commands" in {
val eventSourcedTestKit = createTestKit()

View file

@ -0,0 +1,7 @@
# this is an internal api to avoid sending state as null vioa an Actor message
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl#GetState.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl#GetState.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl#GetState.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl#GetState.replyTo")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl#GetState.copy")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl#GetState.copy$default$1")

View file

@ -72,7 +72,12 @@ private[akka] object EventSourcedBehaviorImpl {
* Used by EventSourcedBehaviorTestKit to retrieve the state.
* Can't be a Signal because those are not stashed.
*/
final case class GetState[State](replyTo: ActorRef[State]) extends InternalProtocol
final case class GetState[State](replyTo: ActorRef[GetStateReply[State]]) extends InternalProtocol
/**
* Used to send a state being `null` as an Actor message
*/
final case class GetStateReply[State](currentState: State)
/**
* Used to start the replication stream at the correct sequence number

View file

@ -9,7 +9,6 @@ import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.UnhandledMessage
@ -49,7 +48,7 @@ import akka.persistence.typed.{
SnapshotMetadata,
SnapshotSelectionCriteria
}
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState }
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState, GetStateReply }
import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope
import akka.persistence.typed.internal.JournalInteractions.EventToPersist
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
@ -384,7 +383,7 @@ private[akka] object Running {
// Used by EventSourcedBehaviorTestKit to retrieve the state.
def onGetState(get: GetState[S]): Behavior[InternalProtocol] = {
get.replyTo ! state.state
get.replyTo ! GetStateReply(state.state)
this
}