Harden ReplicatedEventSourcingSpec, 30548 (#30613)

This commit is contained in:
Patrik Nordwall 2021-09-02 11:21:39 +02:00 committed by GitHub
parent a73cdf32fb
commit 568877b56c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,6 +4,8 @@
package akka.persistence.typed
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
@ -18,7 +20,6 @@ import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, Replicate
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
import akka.testkit.GHExcludeTest
object ReplicatedEventSourcingSpec {
@ -26,8 +27,10 @@ object ReplicatedEventSourcingSpec {
sealed trait Command
case class GetState(replyTo: ActorRef[State]) extends Command
case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done]) extends Command
case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0))
extends Command
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0))
extends Command
case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command
case object Stop extends Command
@ -50,9 +53,11 @@ object ReplicatedEventSourcingSpec {
case GetReplica(replyTo) =>
replyTo.tell((replicationContext.replicaId, replicationContext.allReplicas))
Effect.none
case StoreMe(evt, ack) =>
case StoreMe(evt, ack, latch) =>
latch.await(10, TimeUnit.SECONDS)
Effect.persist(evt).thenRun(_ => ack ! Done)
case StoreUs(evts, replyTo) =>
case StoreUs(evts, replyTo, latch) =>
latch.await(10, TimeUnit.SECONDS)
Effect.persist(evts).thenRun(_ => replyTo ! Done)
case Stop =>
Effect.stop()
@ -206,15 +211,22 @@ class ReplicatedEventSourcingSpec
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref)
r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref)
r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref, latch1)
r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref, latch2)
// the commands have arrived in both actors, waiting for the latch,
// so that the persist of the events will be concurrent
latch1.countDown()
latch2.countDown()
probe.receiveMessage()
probe.receiveMessage()
// events at r2 happened concurrently with events at r1
eventProbeR1.expectMessage(EventAndContext("1 from r1", ReplicaId("R1"), false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("2 from r1", ReplicaId("R1"), false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("1 from r2", ReplicaId("R2"), false, concurrent = true))
@ -239,8 +251,15 @@ class ReplicatedEventSourcingSpec
val eventProbeR2 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref))
r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)
r1 ! StoreMe("from r1", probe.ref, latch1) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref, latch2) // R2 0 R1 0 -> R2 1 R1 0
// the commands have arrived in both actors, waiting for the latch,
// so that the persist of the events will be concurrent
latch1.countDown()
latch2.countDown()
// each gets its local event
eventProbeR1.expectMessage(
@ -330,14 +349,22 @@ class ReplicatedEventSourcingSpec
eventProbeR3.expectNoMessage()
}
"set concurrent on replay of events" taggedAs GHExcludeTest in {
"set concurrent on replay of events" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)
r1 ! StoreMe("from r1", probe.ref, latch1) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref, latch2) // R2 0 R1 0 -> R2 1 R1 0
// the commands have arrived in both actors, waiting for the latch,
// so that the persist of the events will be concurrent
latch1.countDown()
latch2.countDown()
// local event isn't concurrent, remote event is
eventProbeR1.expectMessage(
EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = false))