From 5da7dc6c19ffcedaaa71410484a02955511a0348 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 7 Oct 2021 14:44:02 +0200 Subject: [PATCH] ReplicatedEventSourcingSpec race fix #30661 (#30735) --- .../typed/ReplicatedEventSourcingSpec.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index d345a27027..efe9b8415e 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -27,9 +27,9 @@ object ReplicatedEventSourcingSpec { sealed trait Command case class GetState(replyTo: ActorRef[State]) extends Command - case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0)) + case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1)) extends Command - case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0)) + case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1)) extends Command case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command case object Stop extends Command @@ -54,9 +54,11 @@ object ReplicatedEventSourcingSpec { replyTo.tell((replicationContext.replicaId, replicationContext.allReplicas)) Effect.none case StoreMe(evt, ack, latch) => + latch.countDown() latch.await(10, TimeUnit.SECONDS) Effect.persist(evt).thenRun(_ => ack ! Done) case StoreUs(evts, replyTo, latch) => + latch.countDown() latch.await(10, TimeUnit.SECONDS) Effect.persist(evts).thenRun(_ => replyTo ! Done) case Stop => @@ -211,18 +213,17 @@ class ReplicatedEventSourcingSpec val probe = createTestProbe[Done]() val eventProbeR1 = createTestProbe[EventAndContext]() - val latch1 = new CountDownLatch(1) - val latch2 = new CountDownLatch(1) + val latch = new CountDownLatch(3) val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) val r2 = spawn(testBehavior(entityId, "R2")) - r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref, latch1) - r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref, latch2) + r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref, latch) + r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref, latch) // the commands have arrived in both actors, waiting for the latch, // so that the persist of the events will be concurrent - latch1.countDown() - latch2.countDown() + latch.countDown() + latch.await(10, TimeUnit.SECONDS) probe.receiveMessage() probe.receiveMessage() @@ -355,15 +356,14 @@ class ReplicatedEventSourcingSpec val eventProbeR1 = createTestProbe[EventAndContext]() val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) val r2 = spawn(testBehavior(entityId, "R2")) - val latch1 = new CountDownLatch(1) - val latch2 = new CountDownLatch(1) - r1 ! StoreMe("from r1", probe.ref, latch1) // R1 0 R2 0 -> R1 1 R2 0 - r2 ! StoreMe("from r2", probe.ref, latch2) // R2 0 R1 0 -> R2 1 R1 0 + val latch = new CountDownLatch(3) + r1 ! StoreMe("from r1", probe.ref, latch) // R1 0 R2 0 -> R1 1 R2 0 + r2 ! StoreMe("from r2", probe.ref, latch) // R2 0 R1 0 -> R2 1 R1 0 // the commands have arrived in both actors, waiting for the latch, // so that the persist of the events will be concurrent - latch1.countDown() - latch2.countDown() + latch.countDown() + latch.await(10, TimeUnit.SECONDS) // local event isn't concurrent, remote event is eventProbeR1.expectMessage(