Harden PersistentBehaviorSpec (#25301)

As the snapshot is async it might not be ready for the next part of the
test. Use a probe to make sure it is done.

Fixes #25296
This commit is contained in:
Christopher Batey 2018-07-03 12:58:10 +01:00 committed by Arnout Engelen
parent 29cf96b90c
commit 85754e8a2a

View file

@ -96,6 +96,9 @@ object PersistentBehaviorSpec {
def counter(persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counterWithProbe(persistenceId: String, probe: ActorRef[(State, Event)], snapshotProbe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, TestProbe[String].ref, probe, snapshotProbe)
def counterWithProbe(persistenceId: String, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref)
@ -423,10 +426,13 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe(pid, probeC2.ref).snapshotEvery(2))
val snapshotProbe = TestProbe[Try[Done]]()
val c2 = spawn(counterWithProbe(pid, probeC2.ref, snapshotProbe.ref)
.snapshotEvery(2))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))
val watchProbeC2 = watcher(c2)
c2 ! Increment
snapshotProbe.expectMessage(Try(Done))
c2 ! LogThenStop
watchProbeC2.expectMessage("Terminated")
@ -440,13 +446,15 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"snapshot every N sequence nrs when persisting multiple events" in {
val pid = nextPid
val c = spawn(counter(pid).snapshotEvery(2))
val snapshotProbe = TestProbe[Try[Done]]()
val c = spawn(counterWithSnapshotProbe(pid, snapshotProbe.ref).snapshotEvery(2))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
snapshotProbe.expectMessage(Try(Done))
c ! LogThenStop
watchProbe.expectMessage("Terminated")