Test coverage for active active published events after replay (#29335)

* Test coverage for published events after recovery
* Keep track of origin seqnrs when replaying
This commit is contained in:
Johan Andrén 2020-07-06 15:40:49 +02:00 committed by Christopher Batey
parent b86b6df7c7
commit 82b8d699ca
4 changed files with 159 additions and 28 deletions

View file

@ -23,6 +23,7 @@ object ActiveActiveEventPublishingSpec {
trait Command
case class Add(text: String, replyTo: ActorRef[Done]) extends Command
case class Get(replyTo: ActorRef[Set[String]]) extends Command
case object Stop extends Command
def apply(entityId: String, replicaId: String, allReplicas: Set[String]): Behavior[Command] =
Behaviors.setup { ctx =>
@ -42,6 +43,8 @@ object ActiveActiveEventPublishingSpec {
case Get(replyTo) =>
replyTo ! state
Effect.none
case Stop =>
Effect.stop()
},
(state, string) => state + string))
}
@ -53,12 +56,18 @@ class ActiveActiveEventPublishingSpec
with AnyWordSpecLike
with LogCapturing {
private var idCounter = 0
def nextEntityId(): String = {
idCounter += 1
s"myId$idCounter"
}
import ActiveActiveEventPublishingSpec._
"An active active actor" must {
"move forward when a published event from a replica is received" in {
val actor = spawn(MyActiveActive("myId1", "DC-A", Set("DC-A", "DC-B")))
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
@ -66,7 +75,7 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId1", "DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two",
System.currentTimeMillis())
@ -78,7 +87,8 @@ class ActiveActiveEventPublishingSpec
}
"ignore a published event from a replica is received but the sequence number is unexpected" in {
val actor = spawn(MyActiveActive("myId2", "DC-A", Set("DC-A", "DC-B")))
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
@ -86,7 +96,7 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId2", "DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
2L, // missing 1L
"two",
System.currentTimeMillis())
@ -98,7 +108,8 @@ class ActiveActiveEventPublishingSpec
}
"ignore a published event from an unknown replica" in {
val actor = spawn(MyActiveActive("myId3", "DC-A", Set("DC-A", "DC-B")))
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
@ -106,7 +117,7 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-C"),
PersistenceId.replicatedUniqueId("myId3", "DC-C"),
PersistenceId.replicatedUniqueId(id, "DC-C"),
1L,
"two",
System.currentTimeMillis())
@ -118,7 +129,8 @@ class ActiveActiveEventPublishingSpec
}
"ignore an already seen event from a replica" in {
val actor = spawn(MyActiveActive("myId4", "DC-A", Set("DC-A", "DC-B")))
val id = nextEntityId()
val actor = spawn(MyActiveActive(id, "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
@ -133,7 +145,7 @@ class ActiveActiveEventPublishingSpec
// simulate another published event from that replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId4", "DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two-again", // ofc this would be the same in the real world, different just so we can detect
System.currentTimeMillis())
@ -145,6 +157,74 @@ class ActiveActiveEventPublishingSpec
probe.expectMessage(Set("one", "two", "three"))
}
"handle published events after replay" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
val activeActiveBehavior = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))
val incarnation1 = spawn(activeActiveBehavior)
incarnation1 ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
incarnation1 ! MyActiveActive.Stop
probe.expectTerminated(incarnation1)
val incarnation2 = spawn(activeActiveBehavior)
incarnation2 ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one"))
// replay completed
// simulate a published event from another replica
incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two",
System.currentTimeMillis())
incarnation2 ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
incarnation2 ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three"))
}
"handle published events before and after replay" in {
val id = nextEntityId()
val probe = createTestProbe[Any]()
val activeActiveBehaviorA = MyActiveActive(id, "DC-A", Set("DC-A", "DC-B"))
val incarnationA1 = spawn(activeActiveBehaviorA)
incarnationA1 ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two",
System.currentTimeMillis())
incarnationA1 ! MyActiveActive.Stop
probe.expectTerminated(incarnationA1)
val incarnationA2 = spawn(activeActiveBehaviorA)
// simulate a published event from another replica
incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
2L,
"three",
System.currentTimeMillis())
incarnationA2 ! MyActiveActive.Add("four", probe.ref)
probe.expectMessage(Done)
incarnationA2 ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three", "four"))
}
}
}

View file

@ -24,6 +24,7 @@ object ActiveActiveSpec {
case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done]) extends Command
case class GetReplica(replyTo: ActorRef[(String, Set[String])]) extends Command
case object Stop extends Command
case class State(all: List[String])
def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] =
@ -50,6 +51,8 @@ object ActiveActiveSpec {
Effect.persist(evt).thenRun(_ => ack ! Done)
case StoreUs(evts, replyTo) =>
Effect.persist(evts).thenRun(_ => replyTo ! Done)
case Stop =>
Effect.stop()
},
(state, event) => {
probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning))
@ -104,6 +107,42 @@ class ActiveActiveSpec
}
}
"continue after recovery" in {
val entityId = nextEntityId
val r1Behavior = testBehavior(entityId, "R1")
val r2Behavior = testBehavior(entityId, "R2")
val probe = createTestProbe[Done]()
{
// first incarnation
val r1 = spawn(r1Behavior)
val r2 = spawn(r2Behavior)
r1 ! StoreMe("1 from r1", probe.ref)
r2 ! StoreMe("1 from r2", probe.ref)
r1 ! Stop
r2 ! Stop
probe.expectTerminated(r1)
probe.expectTerminated(r2)
}
{
// second incarnation
val r1 = spawn(r1Behavior)
val r2 = spawn(r2Behavior)
r1 ! StoreMe("2 from r1", probe.ref)
r2 ! StoreMe("2 from r2", probe.ref)
eventually {
val probe = createTestProbe[State]()
r1 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "1 from r2", "2 from r1", "2 from r2")
r2 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "1 from r2", "2 from r1", "2 from r2")
}
}
}
"have access to replica information" in {
val entityId = nextEntityId
val probe = createTestProbe[(String, Set[String])]()

View file

@ -50,7 +50,8 @@ private[akka] object ReplayingEvents {
eventSeenInInterval: Boolean,
toSeqNr: Long,
receivedPoisonPill: Boolean,
recoveryStartTime: Long)
recoveryStartTime: Long,
seenSeqNrPerReplica: Map[String, Long])
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
Behaviors.setup { _ =>
@ -121,20 +122,32 @@ private[akka] final class ReplayingEvents[C, E, S](
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr)
setup.activeActive match {
case Some(aa) =>
val meta = repr.metadata match {
case Some(m) => m.asInstanceOf[ReplicatedEventMetaData]
case None =>
throw new IllegalStateException(
s"Active active enabled but existing event has no metadata. Migration isn't supported yet.")
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, String)] =
setup.activeActive match {
case Some(aa) =>
val meta = repr.metadata match {
case Some(m) => m.asInstanceOf[ReplicatedEventMetaData]
case None =>
throw new IllegalStateException(
s"Active active enabled but existing event has no metadata. Migration isn't supported yet.")
}
aa.setContext(recoveryRunning = true, meta.originReplica)
case None =>
}
}
aa.setContext(recoveryRunning = true, meta.originReplica)
Some(meta -> aa.replicaId)
case None => None
}
val newState = setup.eventHandler(state.state, event)
state = state.copy(state = newState, eventSeenInInterval = true)
aaMetaAndSelfReplica match {
case Some((meta, selfReplica)) if meta.originReplica != selfReplica =>
// keep track of highest origin seqnr per other replica
state = state.copy(
state = newState,
eventSeenInInterval = true,
seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr))
case _ =>
state = state.copy(state = newState, eventSeenInInterval = true)
}
}
eventSeq match {
@ -253,10 +266,6 @@ private[akka] final class ReplayingEvents[C, E, S](
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
Behaviors.stopped
else {
val seenPerReplica: Map[String, Long] =
setup.activeActive
.map(aa => aa.allReplicas.filterNot(_ == aa.replicaId).map(replica => replica -> 0L).toMap)
.getOrElse(Map.empty)
val running =
Running[C, E, S](
setup,
@ -264,7 +273,7 @@ private[akka] final class ReplayingEvents[C, E, S](
seqNr = state.seqNr,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill,
seenPerReplica = seenPerReplica,
seenPerReplica = state.seenSeqNrPerReplica,
replicationControl = Map.empty))
tryUnstashOne(running)

View file

@ -178,7 +178,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
eventSeenInInterval = false,
toSnr,
receivedPoisonPill,
System.nanoTime()))
System.nanoTime(),
// FIXME seqNrs for other replicas needs to come from snapshot
seenSeqNrPerReplica =
setup.activeActive.map(_.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty)))
}
}