Restart replication stream from correct seen seqNr (#29436)

This commit is contained in:
Christopher Batey 2020-07-30 10:03:46 +01:00
parent 4b27bc34a8
commit 742352caba
5 changed files with 72 additions and 21 deletions

View file

@ -13,6 +13,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing, ReplicationContext }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.Eventually
@ -397,5 +398,28 @@ class ReplicatedEventSourcingSpec
}
}
"restart replication stream" in {
val testkit = PersistenceTestKit(system)
val entityId = nextEntityId
val stateProbe = createTestProbe[State]()
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
// ensure recovery is complete
r1 ! GetState(stateProbe.ref)
stateProbe.expectMessage(State(Nil))
r2 ! GetState(stateProbe.ref)
stateProbe.expectMessage(State(Nil))
// make reads fail for the replication
testkit.failNextNReads(s"$entityId|R2", 1)
// should restart the replication stream
r2 ! StoreMe("from r2", probe.ref)
eventProbeR1.expectMessageType[EventAndContext].event shouldEqual "from r2"
}
}
}

View file

@ -73,6 +73,11 @@ private[akka] object EventSourcedBehaviorImpl {
*/
final case class GetState[State](replyTo: ActorRef[State]) extends InternalProtocol
/**
* Used to start the replication stream at the correct sequence number
*/
final case class GetSeenSequenceNr(replica: ReplicaId, replyTo: ActorRef[Long]) extends InternalProtocol
}
@InternalApi

View file

@ -20,7 +20,7 @@ import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState }
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.util.OptionVal
@ -97,6 +97,7 @@ private[akka] final class ReplayingEvents[C, E, S](
case pe: PublishedEventImpl => onInternalCommand(pe)
case cmd: IncomingCommand[C] => onInternalCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case get: GetSeenSequenceNr => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
}
}

View file

@ -12,7 +12,7 @@ import akka.persistence._
import akka.persistence.SnapshotProtocol.LoadSnapshotFailed
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
import akka.persistence.typed.{ RecoveryFailed, ReplicaId }
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState }
import akka.util.unused
import akka.actor.typed.scaladsl.LoggerOps
@ -71,6 +71,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
} else
onCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case get: GetSeenSequenceNr => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
}
.receiveSignal(returnPermitOnStop.orElse {

View file

@ -48,7 +48,7 @@ import akka.persistence.typed.{
SnapshotMetadata,
SnapshotSelectionCriteria
}
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState }
import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope
import akka.persistence.typed.internal.JournalInteractions.EventToPersist
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
@ -128,33 +128,39 @@ private[akka] object Running {
val query = PersistenceQuery(system)
replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) =>
if (replicaId != replicationSetup.replicaId) {
val seqNr = state.seenPerReplica(replicaId)
val pid = PersistenceId.replicatedUniqueId(replicationSetup.aaContext.entityId, replicaId)
val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId)
val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId)
implicit val timeout = Timeout(30.seconds)
implicit val scheduler = setup.context.system.scheduler
implicit val ec = setup.context.system.executionContext
val controlRef = new AtomicReference[ReplicationStreamControl]()
import akka.actor.typed.scaladsl.AskPattern._
val source = RestartSource
.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(event =>
event.eventMetadata match {
case Some(replicatedMeta: ReplicatedEventMetadata) => replicatedMeta.originReplica == replicaId
case _ =>
throw new IllegalArgumentException(
s"Replication stream from replica ${replicaId} for ${setup.persistenceId} contains event " +
s"(sequence nr ${event.sequenceNr}) without replication metadata. " +
s"Is the persistence id used by a regular event sourced actor there or the journal for that replica (${queryPluginId}) " +
"used that does not support Replicated Event Sourcing?")
})
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
Source.futureSource {
setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(event =>
event.eventMetadata match {
case Some(replicatedMeta: ReplicatedEventMetadata) => replicatedMeta.originReplica == replicaId
case _ =>
throw new IllegalArgumentException(
s"Replication stream from replica ${replicaId} for ${setup.persistenceId} contains event " +
s"(sequence nr ${event.sequenceNr}) without replication metadata. " +
s"Is the persistence id used by a regular event sourced actor there or the journal for that replica (${queryPluginId}) " +
"used that does not support Replicated Event Sourcing?")
})
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
}
}
}
// needs to be outside of the restart source so that it actually cancels when terminating the replica
.via(ActorFlow
@ -177,7 +183,7 @@ private[akka] object Running {
source.runWith(Sink.ignore)(SystemMaterializer(system).materializer)
// FIXME support from journal to fast forward https://github.com/akka/akka/issues/29311
// TODO support from journal to fast forward https://github.com/akka/akka/issues/29311
state.copy(
replicationControl =
state.replicationControl.updated(replicaId, new ReplicationStreamControl {
@ -244,6 +250,7 @@ private[akka] object Running {
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
case get: GetState[S @unchecked] => onGetState(get)
case get: GetSeenSequenceNr => onGetSeenSequenceNr(get)
case _ => Behaviors.unhandled
}
@ -386,6 +393,11 @@ private[akka] object Running {
this
}
def onGetSeenSequenceNr(get: GetSeenSequenceNr): Behavior[InternalProtocol] = {
get.replyTo ! state.seenPerReplica(get.replica)
this
}
def withContext[A](aa: ReplicationSetup, withReplication: ReplicationSetup => Unit, f: () => A): A = {
withReplication(aa)
val result = f()
@ -632,6 +644,7 @@ private[akka] object Running {
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(re)
case pe: PublishedEventImpl => onPublishedEvent(pe)
case get: GetState[S @unchecked] => stashInternal(get)
case getSeqNr: GetSeenSequenceNr => onGetSeenSequenceNr(getSeqNr)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state)
case RecoveryTickEvent(_) => Behaviors.unhandled
case RecoveryPermitGranted => Behaviors.unhandled
@ -648,6 +661,11 @@ private[akka] object Running {
}
}
def onGetSeenSequenceNr(get: GetSeenSequenceNr): PersistingEvents = {
get.replyTo ! state.seenPerReplica(get.replica)
this
}
def onReplicatedEvent(event: InternalProtocol.ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = {
if (state.receivedPoisonPill) {
Behaviors.unhandled
@ -826,6 +844,8 @@ private[akka] object Running {
}
case get: GetState[S @unchecked] =>
stashInternal(get)
case get: GetSeenSequenceNr =>
stashInternal(get)
case _ =>
Behaviors.unhandled
}