From 568877b56cd85affae726ca9f1dcad7de388d649 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 2 Sep 2021 11:21:39 +0200 Subject: [PATCH] Harden ReplicatedEventSourcingSpec, 30548 (#30613) --- .../typed/ReplicatedEventSourcingSpec.scala | 53 ++++++++++++++----- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index 0b88db3d80..d345a27027 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -4,6 +4,8 @@ package akka.persistence.typed +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import akka.Done @@ -18,7 +20,6 @@ import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, Replicate import akka.serialization.jackson.CborSerializable import org.scalatest.concurrent.Eventually import org.scalatest.wordspec.AnyWordSpecLike -import akka.testkit.GHExcludeTest object ReplicatedEventSourcingSpec { @@ -26,8 +27,10 @@ object ReplicatedEventSourcingSpec { sealed trait Command case class GetState(replyTo: ActorRef[State]) extends Command - case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command - case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done]) extends Command + case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0)) + extends Command + case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(0)) + extends Command case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command case object Stop extends Command @@ -50,9 +53,11 @@ object ReplicatedEventSourcingSpec { case GetReplica(replyTo) => replyTo.tell((replicationContext.replicaId, replicationContext.allReplicas)) Effect.none - case StoreMe(evt, ack) => + case StoreMe(evt, ack, latch) => + latch.await(10, TimeUnit.SECONDS) Effect.persist(evt).thenRun(_ => ack ! Done) - case StoreUs(evts, replyTo) => + case StoreUs(evts, replyTo, latch) => + latch.await(10, TimeUnit.SECONDS) Effect.persist(evts).thenRun(_ => replyTo ! Done) case Stop => Effect.stop() @@ -206,15 +211,22 @@ class ReplicatedEventSourcingSpec val probe = createTestProbe[Done]() val eventProbeR1 = createTestProbe[EventAndContext]() + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) val r2 = spawn(testBehavior(entityId, "R2")) - r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref) - r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref) + r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref, latch1) + r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref, latch2) + + // the commands have arrived in both actors, waiting for the latch, + // so that the persist of the events will be concurrent + latch1.countDown() + latch2.countDown() probe.receiveMessage() probe.receiveMessage() // events at r2 happened concurrently with events at r1 - eventProbeR1.expectMessage(EventAndContext("1 from r1", ReplicaId("R1"), false, concurrent = false)) eventProbeR1.expectMessage(EventAndContext("2 from r1", ReplicaId("R1"), false, concurrent = false)) eventProbeR1.expectMessage(EventAndContext("1 from r2", ReplicaId("R2"), false, concurrent = true)) @@ -239,8 +251,15 @@ class ReplicatedEventSourcingSpec val eventProbeR2 = createTestProbe[EventAndContext]() val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref)) - r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0 - r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0 + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + r1 ! StoreMe("from r1", probe.ref, latch1) // R1 0 R2 0 -> R1 1 R2 0 + r2 ! StoreMe("from r2", probe.ref, latch2) // R2 0 R1 0 -> R2 1 R1 0 + + // the commands have arrived in both actors, waiting for the latch, + // so that the persist of the events will be concurrent + latch1.countDown() + latch2.countDown() // each gets its local event eventProbeR1.expectMessage( @@ -330,14 +349,22 @@ class ReplicatedEventSourcingSpec eventProbeR3.expectNoMessage() } - "set concurrent on replay of events" taggedAs GHExcludeTest in { + "set concurrent on replay of events" in { val entityId = nextEntityId val probe = createTestProbe[Done]() val eventProbeR1 = createTestProbe[EventAndContext]() val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) val r2 = spawn(testBehavior(entityId, "R2")) - r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0 - r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0 + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + r1 ! StoreMe("from r1", probe.ref, latch1) // R1 0 R2 0 -> R1 1 R2 0 + r2 ! StoreMe("from r2", probe.ref, latch2) // R2 0 R1 0 -> R2 1 R1 0 + + // the commands have arrived in both actors, waiting for the latch, + // so that the persist of the events will be concurrent + latch1.countDown() + latch2.countDown() + // local event isn't concurrent, remote event is eventProbeR1.expectMessage( EventAndContext("from r1", ReplicaId("R1"), recoveryRunning = false, concurrent = false))