From 742352caba180081450aa22169cd4905b7823621 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Thu, 30 Jul 2020 10:03:46 +0100 Subject: [PATCH] Restart replication stream from correct seen seqNr (#29436) --- .../typed/ReplicatedEventSourcingSpec.scala | 24 ++++++++ .../internal/EventSourcedBehaviorImpl.scala | 5 ++ .../typed/internal/ReplayingEvents.scala | 3 +- .../typed/internal/ReplayingSnapshot.scala | 3 +- .../persistence/typed/internal/Running.scala | 58 +++++++++++++------ 5 files changed, 72 insertions(+), 21 deletions(-) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala index 0bbbfb1a39..4a4c19766b 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala @@ -13,6 +13,7 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.testkit.scaladsl.PersistenceTestKit import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing, ReplicationContext } import akka.serialization.jackson.CborSerializable import org.scalatest.concurrent.Eventually @@ -397,5 +398,28 @@ class ReplicatedEventSourcingSpec } } + + "restart replication stream" in { + val testkit = PersistenceTestKit(system) + val entityId = nextEntityId + val stateProbe = createTestProbe[State]() + val probe = createTestProbe[Done]() + val eventProbeR1 = createTestProbe[EventAndContext]() + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) + val r2 = spawn(testBehavior(entityId, "R2")) + + // ensure recovery is complete + r1 ! GetState(stateProbe.ref) + stateProbe.expectMessage(State(Nil)) + r2 ! GetState(stateProbe.ref) + stateProbe.expectMessage(State(Nil)) + + // make reads fail for the replication + testkit.failNextNReads(s"$entityId|R2", 1) + + // should restart the replication stream + r2 ! StoreMe("from r2", probe.ref) + eventProbeR1.expectMessageType[EventAndContext].event shouldEqual "from r2" + } } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 299cfab3d5..56daa2493d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -73,6 +73,11 @@ private[akka] object EventSourcedBehaviorImpl { */ final case class GetState[State](replyTo: ActorRef[State]) extends InternalProtocol + /** + * Used to start the replication stream at the correct sequence number + */ + final case class GetSeenSequenceNr(replica: ReplicaId, replyTo: ActorRef[Long]) extends InternalProtocol + } @InternalApi diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 992b2a1ca4..037a056041 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -20,7 +20,7 @@ import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryFailed import akka.persistence.typed.ReplicaId import akka.persistence.typed.SingleEventSeq -import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState } import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.util.OptionVal @@ -97,6 +97,7 @@ private[akka] final class ReplayingEvents[C, E, S]( case pe: PublishedEventImpl => onInternalCommand(pe) case cmd: IncomingCommand[C] => onInternalCommand(cmd) case get: GetState[S @unchecked] => stashInternal(get) + case get: GetSeenSequenceNr => stashInternal(get) case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 5b8d538fab..4257995eb1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -12,7 +12,7 @@ import akka.persistence._ import akka.persistence.SnapshotProtocol.LoadSnapshotFailed import akka.persistence.SnapshotProtocol.LoadSnapshotResult import akka.persistence.typed.{ RecoveryFailed, ReplicaId } -import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState } import akka.util.unused import akka.actor.typed.scaladsl.LoggerOps @@ -71,6 +71,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup } else onCommand(cmd) case get: GetState[S @unchecked] => stashInternal(get) + case get: GetSeenSequenceNr => stashInternal(get) case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } .receiveSignal(returnPermitOnStop.orElse { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 5815e80653..0bfff4afed 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -48,7 +48,7 @@ import akka.persistence.typed.{ SnapshotMetadata, SnapshotSelectionCriteria } -import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.{ GetSeenSequenceNr, GetState } import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope import akka.persistence.typed.internal.JournalInteractions.EventToPersist import akka.persistence.typed.internal.Running.WithSeqNrAccessible @@ -128,33 +128,39 @@ private[akka] object Running { val query = PersistenceQuery(system) replicationSetup.allReplicas.foldLeft(state) { (state, replicaId) => if (replicaId != replicationSetup.replicaId) { - val seqNr = state.seenPerReplica(replicaId) val pid = PersistenceId.replicatedUniqueId(replicationSetup.aaContext.entityId, replicaId) val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId) val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId) implicit val timeout = Timeout(30.seconds) + implicit val scheduler = setup.context.system.scheduler + implicit val ec = setup.context.system.executionContext val controlRef = new AtomicReference[ReplicationStreamControl]() + import akka.actor.typed.scaladsl.AskPattern._ val source = RestartSource .withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => - replication - .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) - // from each replica, only get the events that originated there, this prevents most of the event filtering - // the downside is that events can't be received via other replicas in the event of an uneven network partition - .filter(event => - event.eventMetadata match { - case Some(replicatedMeta: ReplicatedEventMetadata) => replicatedMeta.originReplica == replicaId - case _ => - throw new IllegalArgumentException( - s"Replication stream from replica ${replicaId} for ${setup.persistenceId} contains event " + - s"(sequence nr ${event.sequenceNr}) without replication metadata. " + - s"Is the persistence id used by a regular event sourced actor there or the journal for that replica (${queryPluginId}) " + - "used that does not support Replicated Event Sourcing?") - }) - .viaMat(new FastForwardingFilter)(Keep.right) - .mapMaterializedValue(streamControl => controlRef.set(streamControl)) + Source.futureSource { + setup.context.self.ask[Long](replyTo => GetSeenSequenceNr(replicaId, replyTo)).map { seqNr => + replication + .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) + // from each replica, only get the events that originated there, this prevents most of the event filtering + // the downside is that events can't be received via other replicas in the event of an uneven network partition + .filter(event => + event.eventMetadata match { + case Some(replicatedMeta: ReplicatedEventMetadata) => replicatedMeta.originReplica == replicaId + case _ => + throw new IllegalArgumentException( + s"Replication stream from replica ${replicaId} for ${setup.persistenceId} contains event " + + s"(sequence nr ${event.sequenceNr}) without replication metadata. " + + s"Is the persistence id used by a regular event sourced actor there or the journal for that replica (${queryPluginId}) " + + "used that does not support Replicated Event Sourcing?") + }) + .viaMat(new FastForwardingFilter)(Keep.right) + .mapMaterializedValue(streamControl => controlRef.set(streamControl)) + } + } } // needs to be outside of the restart source so that it actually cancels when terminating the replica .via(ActorFlow @@ -177,7 +183,7 @@ private[akka] object Running { source.runWith(Sink.ignore)(SystemMaterializer(system).materializer) - // FIXME support from journal to fast forward https://github.com/akka/akka/issues/29311 + // TODO support from journal to fast forward https://github.com/akka/akka/issues/29311 state.copy( replicationControl = state.replicationControl.updated(replicaId, new ReplicationStreamControl { @@ -244,6 +250,7 @@ private[akka] object Running { case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) case get: GetState[S @unchecked] => onGetState(get) + case get: GetSeenSequenceNr => onGetSeenSequenceNr(get) case _ => Behaviors.unhandled } @@ -386,6 +393,11 @@ private[akka] object Running { this } + def onGetSeenSequenceNr(get: GetSeenSequenceNr): Behavior[InternalProtocol] = { + get.replyTo ! state.seenPerReplica(get.replica) + this + } + def withContext[A](aa: ReplicationSetup, withReplication: ReplicationSetup => Unit, f: () => A): A = { withReplication(aa) val result = f() @@ -632,6 +644,7 @@ private[akka] object Running { case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(re) case pe: PublishedEventImpl => onPublishedEvent(pe) case get: GetState[S @unchecked] => stashInternal(get) + case getSeqNr: GetSeenSequenceNr => onGetSeenSequenceNr(getSeqNr) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state) case RecoveryTickEvent(_) => Behaviors.unhandled case RecoveryPermitGranted => Behaviors.unhandled @@ -648,6 +661,11 @@ private[akka] object Running { } } + def onGetSeenSequenceNr(get: GetSeenSequenceNr): PersistingEvents = { + get.replyTo ! state.seenPerReplica(get.replica) + this + } + def onReplicatedEvent(event: InternalProtocol.ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = { if (state.receivedPoisonPill) { Behaviors.unhandled @@ -826,6 +844,8 @@ private[akka] object Running { } case get: GetState[S @unchecked] => stashInternal(get) + case get: GetSeenSequenceNr => + stashInternal(get) case _ => Behaviors.unhandled }