ReplicatedEventSourcingSpec race fix #30661 (#30735)

This commit is contained in:
Johan Andrén 2021-10-07 14:44:02 +02:00 committed by GitHub
parent 10dff2e839
commit 5da7dc6c19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -27,9 +27,9 @@ object ReplicatedEventSourcingSpec {
sealed trait Command
case class GetState(replyTo: ActorRef[State]) extends Command
case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0))
case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1))
extends Command
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0))
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1))
extends Command
case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command
case object Stop extends Command
@ -54,9 +54,11 @@ object ReplicatedEventSourcingSpec {
replyTo.tell((replicationContext.replicaId, replicationContext.allReplicas))
Effect.none
case StoreMe(evt, ack, latch) =>
latch.countDown()
latch.await(10, TimeUnit.SECONDS)
Effect.persist(evt).thenRun(_ => ack ! Done)
case StoreUs(evts, replyTo, latch) =>
latch.countDown()
latch.await(10, TimeUnit.SECONDS)
Effect.persist(evts).thenRun(_ => replyTo ! Done)
case Stop =>
@ -211,18 +213,17 @@ class ReplicatedEventSourcingSpec
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)
val latch = new CountDownLatch(3)
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref, latch1)
r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref, latch2)
r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref, latch)
r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref, latch)
// the commands have arrived in both actors, waiting for the latch,
// so that the persist of the events will be concurrent
latch1.countDown()
latch2.countDown()
latch.countDown()
latch.await(10, TimeUnit.SECONDS)
probe.receiveMessage()
probe.receiveMessage()
@ -355,15 +356,14 @@ class ReplicatedEventSourcingSpec
val eventProbeR1 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)
r1 ! StoreMe("from r1", probe.ref, latch1) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref, latch2) // R2 0 R1 0 -> R2 1 R1 0
val latch = new CountDownLatch(3)
r1 ! StoreMe("from r1", probe.ref, latch) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref, latch) // R2 0 R1 0 -> R2 1 R1 0
// the commands have arrived in both actors, waiting for the latch,
// so that the persist of the events will be concurrent
latch1.countDown()
latch2.countDown()
latch.countDown()
latch.await(10, TimeUnit.SECONDS)
// local event isn't concurrent, remote event is
eventProbeR1.expectMessage(