Make sure replicated persist is concurrent in LwwSpec #31167 (#31169)

This commit is contained in:
Johan Andrén 2022-02-22 12:43:07 +01:00 committed by GitHub
parent 5acc75f663
commit 9bf9da79e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -4,19 +4,27 @@
package akka.persistence.typed.crdt package akka.persistence.typed.crdt
import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicationId import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing } import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.{ ReplicaId, ReplicationBaseSpec } import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationBaseSpec
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
object LwwSpec { object LwwSpec {
import ReplicationBaseSpec._ import ReplicationBaseSpec._
sealed trait Command sealed trait Command
final case class Update(item: String, timestamp: Long, error: ActorRef[String]) extends Command final case class Update(item: String, timestamp: Long, error: ActorRef[String], latch: Option[CountDownLatch])
extends Command
final case class Get(replyTo: ActorRef[Registry]) extends Command final case class Get(replyTo: ActorRef[Registry]) extends Command
sealed trait Event extends CborSerializable sealed trait Event extends CborSerializable
@ -36,11 +44,15 @@ object LwwSpec {
Registry("", LwwTime(Long.MinValue, replicationContext.replicaId)), Registry("", LwwTime(Long.MinValue, replicationContext.replicaId)),
(state, command) => (state, command) =>
command match { command match {
case Update(s, timestmap, error) => case Update(s, timestmap, error, maybeLatch) =>
if (s == "") { if (s == "") {
error ! "bad value" error ! "bad value"
Effect.none Effect.none
} else { } else {
maybeLatch.foreach { l =>
l.countDown()
l.await(10, TimeUnit.SECONDS)
}
Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, replicationContext.replicaId))) Effect.persist(Changed(s, state.updatedTimestamp.increase(timestmap, replicationContext.replicaId)))
} }
case Get(replyTo) => case Get(replyTo) =>
@ -75,7 +87,7 @@ class LwwSpec extends ReplicationBaseSpec {
"Lww Replicated Event Sourced Behavior" should { "Lww Replicated Event Sourced Behavior" should {
"replicate a single event" in new Setup { "replicate a single event" in new Setup {
r1 ! Update("a1", 1L, r1Probe.ref) r1 ! Update("a1", 1L, r1Probe.ref, None)
eventually { eventually {
val probe = createTestProbe[Registry]() val probe = createTestProbe[Registry]()
r2 ! Get(probe.ref) r2 ! Get(probe.ref)
@ -84,8 +96,8 @@ class LwwSpec extends ReplicationBaseSpec {
} }
"resolve conflict" in new Setup { "resolve conflict" in new Setup {
r1 ! Update("a1", 1L, r1Probe.ref) r1 ! Update("a1", 1L, r1Probe.ref, None)
r2 ! Update("b1", 2L, r2Probe.ref) r2 ! Update("b1", 2L, r2Probe.ref, None)
eventually { eventually {
r1 ! Get(r1GetProbe.ref) r1 ! Get(r1GetProbe.ref)
r2 ! Get(r2GetProbe.ref) r2 ! Get(r2GetProbe.ref)
@ -95,8 +107,15 @@ class LwwSpec extends ReplicationBaseSpec {
} }
"have deterministic tiebreak when the same time" in new Setup { "have deterministic tiebreak when the same time" in new Setup {
r1 ! Update("a1", 1L, r1Probe.ref) val latch = new CountDownLatch(3)
r2 ! Update("b1", 1L, r2Probe.ref) r1 ! Update("a1", 1L, r1Probe.ref, Some(latch))
r2 ! Update("b1", 1L, r2Probe.ref, Some(latch))
// the commands have arrived in both actors, waiting for the latch,
// so that the persist of the events will be concurrent
latch.countDown()
latch.await(10, TimeUnit.SECONDS)
// R1 < R2 // R1 < R2
eventually { eventually {
r1 ! Get(r1GetProbe.ref) r1 ! Get(r1GetProbe.ref)