From 9bf9da79e6a5b67e90ca74ea977a9edff23af24b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 22 Feb 2022 12:43:07 +0100 Subject: [PATCH] Make sure replicated persist is concurrent in LwwSpec #31167 (#31169) --- .../akka/persistence/typed/crdt/LwwSpec.scala | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala index 2d3369ec7d..ac47d05c1d 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/crdt/LwwSpec.scala @@ -4,19 +4,27 @@ package akka.persistence.typed.crdt -import akka.actor.typed.{ ActorRef, Behavior } +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.ReplicationId -import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing } -import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec } +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicatedEventSourcing +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.ReplicationBaseSpec import akka.serialization.jackson.CborSerializable +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + object LwwSpec { import ReplicationBaseSpec._ sealed trait Command - final case class Update(item: String, timestamp: Long, error: ActorRef[String]) extends Command + final case class Update(item: String, timestamp: Long, error: ActorRef[String], latch: Option[CountDownLatch]) + extends Command final case class Get(replyTo: ActorRef[Registry]) extends Command sealed trait Event extends CborSerializable @@ -36,11 +44,15 @@ object LwwSpec { Registry("", LwwTime(Long.MinValue, replicationContext.replicaId)), (state, command) => command match { - case Update(s, timestmap, error) => + case Update(s, timestmap, error, maybeLatch) => if (s == "") { error ! "bad value" Effect.none } else { + maybeLatch.foreach { l => + l.countDown() + l.await(10, TimeUnit.SECONDS) + } Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, replicationContext.replicaId))) } case Get(replyTo) => @@ -75,7 +87,7 @@ class LwwSpec extends ReplicationBaseSpec { "Lww Replicated Event Sourced Behavior" should { "replicate a single event" in new Setup { - r1 ! Update("a1", 1L, r1Probe.ref) + r1 ! Update("a1", 1L, r1Probe.ref, None) eventually { val probe = createTestProbe[Registry]() r2 ! Get(probe.ref) @@ -84,8 +96,8 @@ class LwwSpec extends ReplicationBaseSpec { } "resolve conflict" in new Setup { - r1 ! Update("a1", 1L, r1Probe.ref) - r2 ! Update("b1", 2L, r2Probe.ref) + r1 ! Update("a1", 1L, r1Probe.ref, None) + r2 ! Update("b1", 2L, r2Probe.ref, None) eventually { r1 ! Get(r1GetProbe.ref) r2 ! Get(r2GetProbe.ref) @@ -95,8 +107,15 @@ class LwwSpec extends ReplicationBaseSpec { } "have deterministic tiebreak when the same time" in new Setup { - r1 ! Update("a1", 1L, r1Probe.ref) - r2 ! Update("b1", 1L, r2Probe.ref) + val latch = new CountDownLatch(3) + r1 ! Update("a1", 1L, r1Probe.ref, Some(latch)) + r2 ! Update("b1", 1L, r2Probe.ref, Some(latch)) + + // the commands have arrived in both actors, waiting for the latch, + // so that the persist of the events will be concurrent + latch.countDown() + latch.await(10, TimeUnit.SECONDS) + // R1 < R2 eventually { r1 ! Get(r1GetProbe.ref)