From 82b8d699ca7d1ccbc70e76c87d6665317f038fb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 6 Jul 2020 15:40:49 +0200 Subject: [PATCH] Test coverage for active active published events after replay (#29335) * Test coverage for published events after recovery * Keep track of origin seqnrs when replaying --- .../ActiveActiveEventPublishingSpec.scala | 98 +++++++++++++++++-- .../persistence/typed/ActiveActiveSpec.scala | 39 ++++++++ .../typed/internal/ReplayingEvents.scala | 45 +++++---- .../typed/internal/ReplayingSnapshot.scala | 5 +- 4 files changed, 159 insertions(+), 28 deletions(-) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala index c1238b2531..1acb5e67e7 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala @@ -23,6 +23,7 @@ object ActiveActiveEventPublishingSpec { trait Command case class Add(text: String, replyTo: ActorRef[Done]) extends Command case class Get(replyTo: ActorRef[Set[String]]) extends Command + case object Stop extends Command def apply(entityId: String, replicaId: String, allReplicas: Set[String]): Behavior[Command] = Behaviors.setup { ctx => @@ -42,6 +43,8 @@ object ActiveActiveEventPublishingSpec { case Get(replyTo) => replyTo ! state Effect.none + case Stop => + Effect.stop() }, (state, string) => state + string)) } @@ -53,12 +56,18 @@ class ActiveActiveEventPublishingSpec with AnyWordSpecLike with LogCapturing { + private var idCounter = 0 + def nextEntityId(): String = { + idCounter += 1 + s"myId$idCounter" + } + import ActiveActiveEventPublishingSpec._ "An active active actor" must { "move forward when a published event from a replica is received" in { - - val actor = spawn(MyActiveActive("myId1", "DC-A", Set("DC-A", "DC-B"))) + val id = nextEntityId() + val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) @@ -66,7 +75,7 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( Some("DC-B"), - PersistenceId.replicatedUniqueId("myId1", "DC-B"), + PersistenceId.replicatedUniqueId(id, "DC-B"), 1L, "two", System.currentTimeMillis()) @@ -78,7 +87,8 @@ class ActiveActiveEventPublishingSpec } "ignore a published event from a replica is received but the sequence number is unexpected" in { - val actor = spawn(MyActiveActive("myId2", "DC-A", Set("DC-A", "DC-B"))) + val id = nextEntityId() + val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) @@ -86,7 +96,7 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( Some("DC-B"), - PersistenceId.replicatedUniqueId("myId2", "DC-B"), + PersistenceId.replicatedUniqueId(id, "DC-B"), 2L, // missing 1L "two", System.currentTimeMillis()) @@ -98,7 +108,8 @@ class ActiveActiveEventPublishingSpec } "ignore a published event from an unknown replica" in { - val actor = spawn(MyActiveActive("myId3", "DC-A", Set("DC-A", "DC-B"))) + val id = nextEntityId() + val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) @@ -106,7 +117,7 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( Some("DC-C"), - PersistenceId.replicatedUniqueId("myId3", "DC-C"), + PersistenceId.replicatedUniqueId(id, "DC-C"), 1L, "two", System.currentTimeMillis()) @@ -118,7 +129,8 @@ class ActiveActiveEventPublishingSpec } "ignore an already seen event from a replica" in { - val actor = spawn(MyActiveActive("myId4", "DC-A", Set("DC-A", "DC-B"))) + val id = nextEntityId() + val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))) val probe = createTestProbe[Any]() actor ! MyActiveActive.Add("one", probe.ref) probe.expectMessage(Done) @@ -133,7 +145,7 @@ class ActiveActiveEventPublishingSpec // simulate another published event from that replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( Some("DC-B"), - PersistenceId.replicatedUniqueId("myId4", "DC-B"), + PersistenceId.replicatedUniqueId(id, "DC-B"), 1L, "two-again", // ofc this would be the same in the real world, different just so we can detect System.currentTimeMillis()) @@ -145,6 +157,74 @@ class ActiveActiveEventPublishingSpec probe.expectMessage(Set("one", "two", "three")) } + "handle published events after replay" in { + val id = nextEntityId() + val probe = createTestProbe[Any]() + val activeActiveBehavior = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")) + val incarnation1 = spawn(activeActiveBehavior) + incarnation1 ! MyActiveActive.Add("one", probe.ref) + probe.expectMessage(Done) + + incarnation1 ! MyActiveActive.Stop + probe.expectTerminated(incarnation1) + + val incarnation2 = spawn(activeActiveBehavior) + + incarnation2 ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one")) + // replay completed + + // simulate a published event from another replica + incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId(id, "DC-B"), + 1L, + "two", + System.currentTimeMillis()) + + incarnation2 ! MyActiveActive.Add("three", probe.ref) + probe.expectMessage(Done) + + incarnation2 ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one", "two", "three")) + } + + "handle published events before and after replay" in { + val id = nextEntityId() + val probe = createTestProbe[Any]() + val activeActiveBehaviorA = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")) + val incarnationA1 = spawn(activeActiveBehaviorA) + incarnationA1 ! MyActiveActive.Add("one", probe.ref) + probe.expectMessage(Done) + + // simulate a published event from another replica + incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId(id, "DC-B"), + 1L, + "two", + System.currentTimeMillis()) + + incarnationA1 ! MyActiveActive.Stop + probe.expectTerminated(incarnationA1) + + val incarnationA2 = spawn(activeActiveBehaviorA) + + // simulate a published event from another replica + incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId(id, "DC-B"), + 2L, + "three", + System.currentTimeMillis()) + + incarnationA2 ! MyActiveActive.Add("four", probe.ref) + probe.expectMessage(Done) + + incarnationA2 ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one", "two", "three", "four")) + } + } } diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala index 15606caa5d..5b7dba7ba9 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala @@ -24,6 +24,7 @@ object ActiveActiveSpec { case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done]) extends Command case class GetReplica(replyTo: ActorRef[(String, Set[String])]) extends Command + case object Stop extends Command case class State(all: List[String]) def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] = @@ -50,6 +51,8 @@ object ActiveActiveSpec { Effect.persist(evt).thenRun(_ => ack ! Done) case StoreUs(evts, replyTo) => Effect.persist(evts).thenRun(_ => replyTo ! Done) + case Stop => + Effect.stop() }, (state, event) => { probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning)) @@ -104,6 +107,42 @@ class ActiveActiveSpec } } + "continue after recovery" in { + val entityId = nextEntityId + val r1Behavior = testBehavior(entityId, "R1") + val r2Behavior = testBehavior(entityId, "R2") + val probe = createTestProbe[Done]() + + { + // first incarnation + val r1 = spawn(r1Behavior) + val r2 = spawn(r2Behavior) + r1 ! StoreMe("1 from r1", probe.ref) + r2 ! StoreMe("1 from r2", probe.ref) + r1 ! Stop + r2 ! Stop + probe.expectTerminated(r1) + probe.expectTerminated(r2) + } + + { + // second incarnation + val r1 = spawn(r1Behavior) + val r2 = spawn(r2Behavior) + + r1 ! StoreMe("2 from r1", probe.ref) + r2 ! StoreMe("2 from r2", probe.ref) + + eventually { + val probe = createTestProbe[State]() + r1 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "1 from r2", "2 from r1", "2 from r2") + r2 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "1 from r2", "2 from r1", "2 from r2") + } + } + } + "have access to replica information" in { val entityId = nextEntityId val probe = createTestProbe[(String, Set[String])]() diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index c229c842a1..a4502bb699 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -50,7 +50,8 @@ private[akka] object ReplayingEvents { eventSeenInInterval: Boolean, toSeqNr: Long, receivedPoisonPill: Boolean, - recoveryStartTime: Long) + recoveryStartTime: Long, + seenSeqNrPerReplica: Map[String, Long]) def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = Behaviors.setup { _ => @@ -121,20 +122,32 @@ private[akka] final class ReplayingEvents[C, E, S]( eventForErrorReporting = OptionVal.Some(event) state = state.copy(seqNr = repr.sequenceNr) - setup.activeActive match { - case Some(aa) => - val meta = repr.metadata match { - case Some(m) => m.asInstanceOf[ReplicatedEventMetaData] - case None => - throw new IllegalStateException( - s"Active active enabled but existing event has no metadata. Migration isn't supported yet.") + val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, String)] = + setup.activeActive match { + case Some(aa) => + val meta = repr.metadata match { + case Some(m) => m.asInstanceOf[ReplicatedEventMetaData] + case None => + throw new IllegalStateException( + s"Active active enabled but existing event has no metadata. Migration isn't supported yet.") - } - aa.setContext(recoveryRunning = true, meta.originReplica) - case None => - } + } + aa.setContext(recoveryRunning = true, meta.originReplica) + Some(meta -> aa.replicaId) + case None => None + } val newState = setup.eventHandler(state.state, event) - state = state.copy(state = newState, eventSeenInInterval = true) + + aaMetaAndSelfReplica match { + case Some((meta, selfReplica)) if meta.originReplica != selfReplica => + // keep track of highest origin seqnr per other replica + state = state.copy( + state = newState, + eventSeenInInterval = true, + seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr)) + case _ => + state = state.copy(state = newState, eventSeenInInterval = true) + } } eventSeq match { @@ -253,10 +266,6 @@ private[akka] final class ReplayingEvents[C, E, S]( if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else { - val seenPerReplica: Map[String, Long] = - setup.activeActive - .map(aa => aa.allReplicas.filterNot(_ == aa.replicaId).map(replica => replica -> 0L).toMap) - .getOrElse(Map.empty) val running = Running[C, E, S]( setup, @@ -264,7 +273,7 @@ private[akka] final class ReplayingEvents[C, E, S]( seqNr = state.seqNr, state = state.state, receivedPoisonPill = state.receivedPoisonPill, - seenPerReplica = seenPerReplica, + seenPerReplica = state.seenSeqNrPerReplica, replicationControl = Map.empty)) tryUnstashOne(running) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index cfabb7a43c..4977e36d78 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -178,7 +178,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup eventSeenInInterval = false, toSnr, receivedPoisonPill, - System.nanoTime())) + System.nanoTime(), + // FIXME seqNrs for other replicas needs to come from snapshot + seenSeqNrPerReplica = + setup.activeActive.map(_.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty))) } }