EventSourcedBehaviorTestKit initial state setup (#31193)
* Add initialization from state and events to EventSourcedBehaviorTestKit. * Make SnapshotTestKit optional for EventSourcedBehaviorTestKit, add Java API. * Add test for EventSourcedBehaviorTestKit with snapshots not enabled. * Change EventSourcedBehaviorNoSnapshotTestKitSpec header.
This commit is contained in:
parent
15c9cc0543
commit
fccb0ca220
6 changed files with 145 additions and 4 deletions
|
|
@ -16,6 +16,7 @@ import akka.actor.typed.Behavior
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.persistence.query.PersistenceQuery
|
import akka.persistence.query.PersistenceQuery
|
||||||
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
|
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
|
||||||
|
import akka.persistence.testkit.SnapshotMeta
|
||||||
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
|
||||||
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
|
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
|
||||||
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResult
|
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResult
|
||||||
|
|
@ -23,6 +24,7 @@ import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResu
|
||||||
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.RestartResult
|
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.RestartResult
|
||||||
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.SerializationSettings
|
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.SerializationSettings
|
||||||
import akka.persistence.testkit.scaladsl.PersistenceTestKit
|
import akka.persistence.testkit.scaladsl.PersistenceTestKit
|
||||||
|
import akka.persistence.testkit.scaladsl.SnapshotTestKit
|
||||||
import akka.persistence.typed.PersistenceId
|
import akka.persistence.typed.PersistenceId
|
||||||
import akka.persistence.typed.internal.EventSourcedBehaviorImpl
|
import akka.persistence.typed.internal.EventSourcedBehaviorImpl
|
||||||
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetStateReply
|
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetStateReply
|
||||||
|
|
@ -95,6 +97,12 @@ import akka.stream.scaladsl.Sink
|
||||||
override val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)
|
override val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)
|
||||||
persistenceTestKit.clearAll()
|
persistenceTestKit.clearAll()
|
||||||
|
|
||||||
|
override val snapshotTestKit: Option[SnapshotTestKit] =
|
||||||
|
if (system.settings.config.getString("akka.persistence.snapshot-store.plugin") != "")
|
||||||
|
Some(SnapshotTestKit(system))
|
||||||
|
else None
|
||||||
|
snapshotTestKit.foreach(_.clearAll())
|
||||||
|
|
||||||
private val queries =
|
private val queries =
|
||||||
PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery](PersistenceTestKitReadJournal.Identifier)
|
PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdQuery](PersistenceTestKitReadJournal.Identifier)
|
||||||
|
|
||||||
|
|
@ -221,6 +229,7 @@ import akka.stream.scaladsl.Sink
|
||||||
|
|
||||||
override def clear(): Unit = {
|
override def clear(): Unit = {
|
||||||
persistenceTestKit.clearByPersistenceId(persistenceId.id)
|
persistenceTestKit.clearByPersistenceId(persistenceId.id)
|
||||||
|
snapshotTestKit.foreach(_.clearByPersistenceId(persistenceId.id))
|
||||||
restart()
|
restart()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -233,4 +242,21 @@ import akka.stream.scaladsl.Sink
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def initialize(state: State, events: Event*): Unit = internalInitialize(Some(state), events: _*)
|
||||||
|
|
||||||
|
override def initialize(events: Event*): Unit = internalInitialize(None, events: _*)
|
||||||
|
|
||||||
|
private def internalInitialize(stateOption: Option[State], events: Event*) = {
|
||||||
|
clear()
|
||||||
|
|
||||||
|
stateOption.foreach { state =>
|
||||||
|
snapshotTestKit match {
|
||||||
|
case Some(kit) => kit.persistForRecovery(persistenceId.id, (SnapshotMeta(0), state))
|
||||||
|
case _ => throw new IllegalArgumentException("Cannot initialize from state when snapshots are not used.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
persistenceTestKit.persistForRecovery(persistenceId.id, events)
|
||||||
|
|
||||||
|
restart()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,10 @@
|
||||||
|
|
||||||
package akka.persistence.testkit.javadsl
|
package akka.persistence.testkit.javadsl
|
||||||
|
|
||||||
|
import java.util.Optional
|
||||||
import java.util.{ List => JList }
|
import java.util.{ List => JList }
|
||||||
import java.util.function.{ Function => JFunction }
|
import java.util.function.{ Function => JFunction }
|
||||||
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import akka.actor.typed.ActorRef
|
import akka.actor.typed.ActorRef
|
||||||
|
|
@ -16,6 +18,8 @@ import akka.annotation.DoNotInherit
|
||||||
import akka.persistence.testkit.scaladsl
|
import akka.persistence.testkit.scaladsl
|
||||||
import akka.util.ccompat.JavaConverters._
|
import akka.util.ccompat.JavaConverters._
|
||||||
|
|
||||||
|
import scala.annotation.varargs
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testing of [[akka.persistence.typed.javadsl.EventSourcedBehavior]] implementations.
|
* Testing of [[akka.persistence.typed.javadsl.EventSourcedBehavior]] implementations.
|
||||||
* It supports running one command at a time and you can assert that the synchronously returned result is as expected.
|
* It supports running one command at a time and you can assert that the synchronously returned result is as expected.
|
||||||
|
|
@ -205,6 +209,10 @@ final class EventSourcedBehaviorTestKit[Command, Event, State](
|
||||||
import EventSourcedBehaviorTestKit._
|
import EventSourcedBehaviorTestKit._
|
||||||
|
|
||||||
private val _persistenceTestKit = new PersistenceTestKit(delegate.persistenceTestKit)
|
private val _persistenceTestKit = new PersistenceTestKit(delegate.persistenceTestKit)
|
||||||
|
private val _snapshotTestKit = {
|
||||||
|
import scala.compat.java8.OptionConverters._
|
||||||
|
delegate.snapshotTestKit.map(new SnapshotTestKit(_)).asJava
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run one command through the behavior. The returned result contains emitted events and the state
|
* Run one command through the behavior. The returned result contains emitted events and the state
|
||||||
|
|
@ -240,10 +248,25 @@ final class EventSourcedBehaviorTestKit[Command, Event, State](
|
||||||
delegate.clear()
|
delegate.clear()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The underlying `PersistenceTestKit` for the in-memory journal and snapshot storage.
|
* Initializes behavior from provided state and/or events.
|
||||||
|
*/
|
||||||
|
@varargs
|
||||||
|
def initialize(state: State, events: Event*): Unit = delegate.initialize(state, events: _*)
|
||||||
|
@varargs
|
||||||
|
def initialize(events: Event*): Unit = delegate.initialize(events: _*)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The underlying `PersistenceTestKit` for the in-memory journal.
|
||||||
* Can be useful for advanced testing scenarios, such as simulating failures or
|
* Can be useful for advanced testing scenarios, such as simulating failures or
|
||||||
* populating the journal with events that are used for replay.
|
* populating the journal with events that are used for replay.
|
||||||
*/
|
*/
|
||||||
def persistenceTestKit: PersistenceTestKit =
|
def persistenceTestKit: PersistenceTestKit =
|
||||||
_persistenceTestKit
|
_persistenceTestKit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The underlying `SnapshotTestKit` for snapshot storage. Present only if snapshots are enabled.
|
||||||
|
* Can be useful for advanced testing scenarios, such as simulating failures or
|
||||||
|
* populating the storage with snapshots that are used for replay.
|
||||||
|
*/
|
||||||
|
def snapshotTestKit: Optional[SnapshotTestKit] = _snapshotTestKit
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,9 +20,9 @@ import akka.util.ccompat.JavaConverters._
|
||||||
* Class for testing persisted snapshots in persistent actors.
|
* Class for testing persisted snapshots in persistent actors.
|
||||||
*/
|
*/
|
||||||
@ApiMayChange
|
@ApiMayChange
|
||||||
class SnapshotTestKit(system: ActorSystem) {
|
class SnapshotTestKit(scalaTestkit: ScalaTestKit) {
|
||||||
|
|
||||||
private val scalaTestkit = new ScalaTestKit(system)
|
def this(system: ActorSystem) = this(new ScalaTestKit(system))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that nothing has been saved in the storage.
|
* Check that nothing has been saved in the storage.
|
||||||
|
|
|
||||||
|
|
@ -221,9 +221,22 @@ object EventSourcedBehaviorTestKit {
|
||||||
def clear(): Unit
|
def clear(): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The underlying `PersistenceTestKit` for the in-memory journal and snapshot storage.
|
* The underlying `PersistenceTestKit` for the in-memory journal.
|
||||||
* Can be useful for advanced testing scenarios, such as simulating failures or
|
* Can be useful for advanced testing scenarios, such as simulating failures or
|
||||||
* populating the journal with events that are used for replay.
|
* populating the journal with events that are used for replay.
|
||||||
*/
|
*/
|
||||||
def persistenceTestKit: PersistenceTestKit
|
def persistenceTestKit: PersistenceTestKit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The underlying `SnapshotTestKit` for snapshot storage. Present only if snapshots are enabled.
|
||||||
|
* Can be useful for advanced testing scenarios, such as simulating failures or
|
||||||
|
* populating the storage with snapshots that are used for replay.
|
||||||
|
*/
|
||||||
|
def snapshotTestKit: Option[SnapshotTestKit]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes behavior from provided state and/or events.
|
||||||
|
*/
|
||||||
|
def initialize(state: State, events: Event*): Unit
|
||||||
|
def initialize(events: Event*): Unit
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,55 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.persistence.testkit.scaladsl
|
||||||
|
|
||||||
|
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||||
|
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||||
|
import akka.persistence.testkit.PersistenceTestKitPlugin
|
||||||
|
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKitSpec.TestCounter
|
||||||
|
import akka.persistence.typed.PersistenceId
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import org.scalatest.wordspec.AnyWordSpecLike
|
||||||
|
|
||||||
|
class EventSourcedBehaviorNoSnapshotTestKitSpec
|
||||||
|
extends ScalaTestWithActorTestKit(ConfigFactory.parseString("""
|
||||||
|
akka.persistence.testkit.events.serialize = off
|
||||||
|
akka.persistence.testkit.snapshots.serialize = off
|
||||||
|
""").withFallback(PersistenceTestKitPlugin.config))
|
||||||
|
with AnyWordSpecLike
|
||||||
|
with LogCapturing {
|
||||||
|
|
||||||
|
private def createTestKit() = {
|
||||||
|
EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](
|
||||||
|
system,
|
||||||
|
TestCounter(PersistenceId.ofUniqueId("test")))
|
||||||
|
}
|
||||||
|
|
||||||
|
"EventSourcedBehaviorTestKit" when {
|
||||||
|
"snapshots are not enabled" must {
|
||||||
|
"not provide SnapshotTestKit" in {
|
||||||
|
val eventSourcedTestKit = createTestKit()
|
||||||
|
|
||||||
|
eventSourcedTestKit.snapshotTestKit shouldBe empty
|
||||||
|
}
|
||||||
|
|
||||||
|
"fail initializing from snapshot" in {
|
||||||
|
val eventSourcedTestKit = createTestKit()
|
||||||
|
|
||||||
|
val ex = intercept[IllegalArgumentException] {
|
||||||
|
eventSourcedTestKit.initialize(TestCounter.RealState(1, Vector(0)))
|
||||||
|
}
|
||||||
|
ex.getMessage shouldEqual "Cannot initialize from state when snapshots are not used."
|
||||||
|
}
|
||||||
|
|
||||||
|
"initialize from event" in {
|
||||||
|
val eventSourcedTestKit = createTestKit()
|
||||||
|
eventSourcedTestKit.initialize(TestCounter.Incremented(1))
|
||||||
|
|
||||||
|
val result = eventSourcedTestKit.runCommand[TestCounter.State](TestCounter.GetValue(_))
|
||||||
|
result.reply shouldEqual TestCounter.RealState(1, Vector(0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -354,6 +354,30 @@ class EventSourcedBehaviorTestKitSpec
|
||||||
eventSourcedTestKit.clear()
|
eventSourcedTestKit.clear()
|
||||||
eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(1, Vector(0)))
|
eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(1, Vector(0)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"initialize from snapshot" in {
|
||||||
|
val eventSourcedTestKit = createTestKit()
|
||||||
|
eventSourcedTestKit.initialize(TestCounter.RealState(1, Vector(0)))
|
||||||
|
|
||||||
|
val result = eventSourcedTestKit.runCommand[TestCounter.State](TestCounter.GetValue(_))
|
||||||
|
result.reply shouldEqual TestCounter.RealState(1, Vector(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
"initialize from event" in {
|
||||||
|
val eventSourcedTestKit = createTestKit()
|
||||||
|
eventSourcedTestKit.initialize(TestCounter.Incremented(1))
|
||||||
|
|
||||||
|
val result = eventSourcedTestKit.runCommand[TestCounter.State](TestCounter.GetValue(_))
|
||||||
|
result.reply shouldEqual TestCounter.RealState(1, Vector(0))
|
||||||
|
}
|
||||||
|
|
||||||
|
"initialize from snapshot and event" in {
|
||||||
|
val eventSourcedTestKit = createTestKit()
|
||||||
|
eventSourcedTestKit.initialize(TestCounter.RealState(1, Vector(0)), TestCounter.Incremented(1))
|
||||||
|
|
||||||
|
val result = eventSourcedTestKit.runCommand[TestCounter.State](TestCounter.GetValue(_))
|
||||||
|
result.reply shouldEqual TestCounter.RealState(2, Vector(0, 1))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue