This commit is contained in:
parent
b28d77b316
commit
8c0d81a374
4 changed files with 106 additions and 32 deletions
|
|
@ -101,36 +101,15 @@ import akka.persistence.typed.internal.EventSourcedBehaviorImpl
|
|||
private var emptyStateVerified = false
|
||||
|
||||
override def runCommand(command: Command): CommandResult[Command, Event, State] = {
|
||||
if (serializationSettings.enabled && serializationSettings.verifyCommands)
|
||||
verifySerializationAndThrow(command, "Command")
|
||||
|
||||
if (serializationSettings.enabled && !emptyStateVerified) {
|
||||
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
|
||||
val emptyState = stateProbe.receiveMessage()
|
||||
verifySerializationAndThrow(emptyState, "Empty State")
|
||||
emptyStateVerified = true
|
||||
}
|
||||
|
||||
// FIXME we can expand the api of persistenceTestKit to read from storage from a seqNr instead
|
||||
val oldEvents =
|
||||
persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event])
|
||||
preCommandCheck(command)
|
||||
val oldEvents = getEvents(dropOldEvents = 0)
|
||||
|
||||
actor ! command
|
||||
|
||||
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
|
||||
val newState = stateProbe.receiveMessage()
|
||||
val newState = getState()
|
||||
val newEvents = getEvents(oldEvents.size)
|
||||
|
||||
val newEvents =
|
||||
persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]).drop(oldEvents.size)
|
||||
|
||||
if (serializationSettings.enabled) {
|
||||
if (serializationSettings.verifyEvents) {
|
||||
newEvents.foreach(verifySerializationAndThrow(_, "Event"))
|
||||
}
|
||||
|
||||
if (serializationSettings.verifyState)
|
||||
verifySerializationAndThrow(newState, "State")
|
||||
}
|
||||
postCommandCheck(newEvents, newState, reply = None)
|
||||
|
||||
CommandResultImpl[Command, Event, State, Nothing](command, newEvents, newState, None)
|
||||
}
|
||||
|
|
@ -138,7 +117,10 @@ import akka.persistence.typed.internal.EventSourcedBehaviorImpl
|
|||
override def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R] = {
|
||||
val replyProbe = actorTestKit.createTestProbe[R]()
|
||||
val command = creator(replyProbe.ref)
|
||||
val result = runCommand(command)
|
||||
preCommandCheck(command)
|
||||
val oldEvents = getEvents(dropOldEvents = 0)
|
||||
|
||||
actor ! command
|
||||
|
||||
val reply = try {
|
||||
replyProbe.receiveMessage()
|
||||
|
|
@ -149,10 +131,48 @@ import akka.persistence.typed.internal.EventSourcedBehaviorImpl
|
|||
replyProbe.stop()
|
||||
}
|
||||
|
||||
if (serializationSettings.enabled && serializationSettings.verifyCommands)
|
||||
verifySerializationAndThrow(reply, "Reply")
|
||||
val newState = getState()
|
||||
val newEvents = getEvents(oldEvents.size)
|
||||
|
||||
CommandResultImpl[Command, Event, State, R](result.command, result.events, result.state, Some(reply))
|
||||
postCommandCheck(newEvents, newState, Some(reply))
|
||||
|
||||
CommandResultImpl[Command, Event, State, R](command, newEvents, newState, Some(reply))
|
||||
}
|
||||
|
||||
private def getEvents[R](dropOldEvents: Int): immutable.Seq[Event] = {
|
||||
// FIXME we can expand the api of persistenceTestKit to read from storage from a seqNr instead
|
||||
persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]).drop(dropOldEvents)
|
||||
}
|
||||
|
||||
override def getState(): State = {
|
||||
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
|
||||
stateProbe.receiveMessage()
|
||||
}
|
||||
|
||||
private def preCommandCheck(command: Command): Unit = {
|
||||
if (serializationSettings.enabled && serializationSettings.verifyCommands)
|
||||
verifySerializationAndThrow(command, "Command")
|
||||
|
||||
if (serializationSettings.enabled && !emptyStateVerified) {
|
||||
val emptyState = getState()
|
||||
verifySerializationAndThrow(emptyState, "Empty State")
|
||||
emptyStateVerified = true
|
||||
}
|
||||
}
|
||||
|
||||
private def postCommandCheck(newEvents: immutable.Seq[Event], newState: State, reply: Option[Any]): Unit = {
|
||||
if (serializationSettings.enabled) {
|
||||
if (serializationSettings.verifyEvents) {
|
||||
newEvents.foreach(verifySerializationAndThrow(_, "Event"))
|
||||
}
|
||||
|
||||
if (serializationSettings.verifyState)
|
||||
verifySerializationAndThrow(newState, "State")
|
||||
|
||||
if (serializationSettings.verifyCommands) {
|
||||
reply.foreach(verifySerializationAndThrow(_, "Reply"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def restart(): RestartResult[State] = {
|
||||
|
|
|
|||
|
|
@ -223,6 +223,12 @@ final class EventSourcedBehaviorTestKit[Command, Event, State](
|
|||
def runCommand[R](creator: JFunction[ActorRef[R], Command]): CommandResultWithReply[Command, Event, State, R] =
|
||||
new CommandResultWithReply(delegate.runCommand(replyTo => creator.apply(replyTo)))
|
||||
|
||||
/**
|
||||
* Retrieve the current state of the Behavior.
|
||||
*/
|
||||
def getState(): State =
|
||||
delegate.getState()
|
||||
|
||||
/**
|
||||
* Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing
|
||||
* that the recovery is correct.
|
||||
|
|
|
|||
|
|
@ -204,6 +204,11 @@ object EventSourcedBehaviorTestKit {
|
|||
*/
|
||||
def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R]
|
||||
|
||||
/**
|
||||
* Retrieve the current state of the Behavior.
|
||||
*/
|
||||
def getState(): State
|
||||
|
||||
/**
|
||||
* Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing
|
||||
* that the recovery is correct.
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import akka.persistence.typed.scaladsl.Effect
|
|||
import akka.persistence.typed.scaladsl.EventSourcedBehavior
|
||||
import akka.serialization.DisabledJavaSerializer
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import akka.util.unused
|
||||
|
||||
object EventSourcedBehaviorTestKitSpec {
|
||||
|
||||
|
|
@ -32,9 +31,13 @@ object EventSourcedBehaviorTestKitSpec {
|
|||
sealed trait Command
|
||||
case object Increment extends Command with CborSerializable
|
||||
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command with CborSerializable
|
||||
final case class IncrementWithNoReply(replyTo: ActorRef[Done]) extends Command with CborSerializable
|
||||
final case class IncrementWithAsyncReply(replyTo: ActorRef[Done]) extends Command with CborSerializable
|
||||
case class IncrementSeveral(n: Int) extends Command with CborSerializable
|
||||
final case class GetValue(replyTo: ActorRef[State]) extends Command with CborSerializable
|
||||
|
||||
private case class AsyncReply(replyTo: ActorRef[Done]) extends Command with CborSerializable
|
||||
|
||||
sealed trait Event
|
||||
final case class Incremented(delta: Int) extends Event with CborSerializable
|
||||
|
||||
|
|
@ -62,7 +65,7 @@ object EventSourcedBehaviorTestKitSpec {
|
|||
Behaviors.setup(ctx => counter(ctx, persistenceId, emptyState))
|
||||
|
||||
private def counter(
|
||||
@unused ctx: ActorContext[Command],
|
||||
ctx: ActorContext[Command],
|
||||
persistenceId: PersistenceId,
|
||||
emptyState: State): EventSourcedBehavior[Command, Event, State] = {
|
||||
EventSourcedBehavior.withEnforcedReplies[Command, Event, State](
|
||||
|
|
@ -76,6 +79,16 @@ object EventSourcedBehaviorTestKitSpec {
|
|||
case IncrementWithConfirmation(replyTo) =>
|
||||
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done)
|
||||
|
||||
case IncrementWithAsyncReply(replyTo) =>
|
||||
ctx.self ! AsyncReply(replyTo)
|
||||
Effect.noReply
|
||||
|
||||
case AsyncReply(replyTo) =>
|
||||
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done)
|
||||
|
||||
case IncrementWithNoReply(_) =>
|
||||
Effect.persist(Incremented(1)).thenNoReply()
|
||||
|
||||
case IncrementSeveral(n: Int) =>
|
||||
val events = (1 to n).map(_ => Incremented(1))
|
||||
Effect.persist(events).thenNoReply()
|
||||
|
|
@ -187,6 +200,36 @@ class EventSourcedBehaviorTestKitSpec
|
|||
result.state should ===(TestCounter.RealState(1, Vector(0)))
|
||||
}
|
||||
|
||||
"run command with async reply" in {
|
||||
val eventSourcedTestKit = createTestKit()
|
||||
val result = eventSourcedTestKit.runCommand[Done](replyTo => TestCounter.IncrementWithAsyncReply(replyTo))
|
||||
result.event should ===(TestCounter.Incremented(1))
|
||||
result.reply should ===(Done)
|
||||
result.state should ===(TestCounter.RealState(1, Vector(0)))
|
||||
}
|
||||
|
||||
"run command with no reply" in {
|
||||
// This is a fictive usage scenario. If the command has a replyTo the Behavior should (eventually) reply,
|
||||
// but here we verify that it doesn't reply.
|
||||
val eventSourcedTestKit = createTestKit()
|
||||
val replyToProbe = createTestProbe[Done]()
|
||||
val result = eventSourcedTestKit.runCommand(TestCounter.IncrementWithNoReply(replyToProbe.ref))
|
||||
result.event should ===(TestCounter.Incremented(1))
|
||||
result.state should ===(TestCounter.RealState(1, Vector(0)))
|
||||
replyToProbe.expectNoMessage()
|
||||
}
|
||||
|
||||
"give access to current state" in {
|
||||
val eventSourcedTestKit = createTestKit()
|
||||
|
||||
// initial state
|
||||
eventSourcedTestKit.getState() should ===(TestCounter.RealState(0, Vector.empty))
|
||||
|
||||
// state after command
|
||||
eventSourcedTestKit.runCommand(TestCounter.Increment)
|
||||
eventSourcedTestKit.getState() should ===(TestCounter.RealState(1, Vector(0)))
|
||||
}
|
||||
|
||||
"detect non-serializable events" in {
|
||||
val eventSourcedTestKit = createTestKit()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue