diff --git a/akka-docs/src/main/paradox/persistence.md b/akka-docs/src/main/paradox/persistence.md index 1aa95c1858..6c58f2f6dd 100644 --- a/akka-docs/src/main/paradox/persistence.md +++ b/akka-docs/src/main/paradox/persistence.md @@ -566,9 +566,11 @@ Scala Java : @@snip [LambdaPersistenceDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #save-snapshot } -where `metadata` is of type `SnapshotMetadata`: +where `metadata` is of type `SnapshotMetadata` and contains: -@@snip [SnapshotProtocol.scala](/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala) { #snapshot-metadata } +* persistenceId +* sequenceNr +* timestamp During recovery, the persistent actor is offered the latest saved snapshot via a `SnapshotOffer` message from which it can initialize internal state. diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSnapshotSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSnapshotSpec.scala new file mode 100644 index 0000000000..8eeda55c15 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSnapshotSpec.scala @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.typed.{ ActorRef, Behavior } +import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSnapshotPlugin } +import akka.persistence.testkit.scaladsl.{ PersistenceTestKit, SnapshotTestKit } +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector } +import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing +import org.scalatest.concurrent.Eventually +import org.scalatest.wordspec.AnyWordSpecLike + +object ActiveActiveSnapshotSpec { + + import ActiveActiveSpec._ + + def behaviorWithSnapshotting(entityId: String, replicaId: ReplicaId): Behavior[Command] = + behaviorWithSnapshotting(entityId, replicaId, None) + + def behaviorWithSnapshotting( + entityId: String, + replicaId: ReplicaId, + eventProbe: ActorRef[EventAndContext]): Behavior[Command] = + behaviorWithSnapshotting(entityId, replicaId, Some(eventProbe)) + + def behaviorWithSnapshotting( + entityId: String, + replicaId: ReplicaId, + probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = { + ActiveActiveEventSourcing.withSharedJournal( + entityId, + replicaId, + AllReplicas, + PersistenceTestKitReadJournal.Identifier)(aaContext => + eventSourcedBehavior(aaContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0)) + + } +} + +class ActiveActiveSnapshotSpec + extends ScalaTestWithActorTestKit( + PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config)) + with AnyWordSpecLike + with LogCapturing + with Eventually { + import ActiveActiveSpec._ + import ActiveActiveSnapshotSpec._ + + val ids = new AtomicInteger(0) + def nextEntityId = s"e-${ids.getAndIncrement()}" + + val snapshotTestKit = SnapshotTestKit(system) + val persistenceTestKit = PersistenceTestKit(system) + + val R1 = ReplicaId("R1") + val R2 = ReplicaId("R2") + + "ActiveActive" should { + "recover state from snapshots" in { + val entityId = nextEntityId + val persistenceIdR1 = s"$entityId|R1" + val persistenceIdR2 = s"$entityId|R2" + val probe = createTestProbe[Done]() + val r2EventProbe = createTestProbe[EventAndContext]() + + { + val r1 = spawn(behaviorWithSnapshotting(entityId, R1)) + val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref)) + r1 ! StoreMe("r1 1", probe.ref) + r1 ! StoreMe("r1 2", probe.ref) + r2EventProbe.expectMessageType[EventAndContext] + r2EventProbe.expectMessageType[EventAndContext] + + snapshotTestKit.expectNextPersisted(persistenceIdR1, State(List("r1 2", "r1 1"))) + snapshotTestKit.expectNextPersisted(persistenceIdR2, State(List("r1 2", "r1 1"))) + + r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + PersistenceId.replicatedUniqueId(entityId, R1), + 1L, + "two-again", + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty))) + + // r2 should now filter out that event if it receives it again + r2EventProbe.expectNoMessage() + } + + // restart r2 from a snapshot, the event should still be filtered + { + val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref)) + r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + PersistenceId.replicatedUniqueId(entityId, R1), + 1L, + "two-again", + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty))) + r2EventProbe.expectNoMessage() + + val stateProbe = createTestProbe[State]() + r2 ! GetState(stateProbe.ref) + stateProbe.expectMessage(State(List("r1 2", "r1 1"))) + } + + } + } + +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala index e9f33b0bc2..5ae2cf87a0 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala @@ -13,9 +13,8 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal -import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing -import akka.persistence.typed.scaladsl.Effect -import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior } +import akka.serialization.jackson.CborSerializable import org.scalatest.concurrent.Eventually import org.scalatest.wordspec.AnyWordSpecLike @@ -30,10 +29,38 @@ object ActiveActiveSpec { case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command case object Stop extends Command - case class State(all: List[String]) + case class State(all: List[String]) extends CborSerializable + def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] = testBehavior(entityId, replicaId, Some(probe)) + def eventSourcedBehavior( + aaContext: ActiveActiveContext, + probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = { + EventSourcedBehavior[Command, String, State]( + aaContext.persistenceId, + State(Nil), + (state, command) => + command match { + case GetState(replyTo) => + replyTo ! state + Effect.none + case GetReplica(replyTo) => + replyTo.tell((aaContext.replicaId, aaContext.allReplicas)) + Effect.none + case StoreMe(evt, ack) => + Effect.persist(evt).thenRun(_ => ack ! Done) + case StoreUs(evts, replyTo) => + Effect.persist(evts).thenRun(_ => replyTo ! Done) + case Stop => + Effect.stop() + }, + (state, event) => { + probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent)) + state.copy(all = event :: state.all) + }) + } + def testBehavior( entityId: String, replicaId: String, @@ -42,30 +69,7 @@ object ActiveActiveSpec { entityId, ReplicaId(replicaId), AllReplicas, - PersistenceTestKitReadJournal.Identifier)( - aaContext => - EventSourcedBehavior[Command, String, State]( - aaContext.persistenceId, - State(Nil), - (state, command) => - command match { - case GetState(replyTo) => - replyTo ! state - Effect.none - case GetReplica(replyTo) => - replyTo.tell((aaContext.replicaId, aaContext.allReplicas)) - Effect.none - case StoreMe(evt, ack) => - Effect.persist(evt).thenRun(_ => ack ! Done) - case StoreUs(evts, replyTo) => - Effect.persist(evts).thenRun(_ => replyTo ! Done) - case Stop => - Effect.stop() - }, - (state, event) => { - probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent)) - state.copy(all = event :: state.all) - })) + PersistenceTestKitReadJournal.Identifier)(aaContext => eventSourcedBehavior(aaContext, probe)) } @@ -338,6 +342,5 @@ class ActiveActiveSpec eventProbeR1Take2.expectMessage( EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = true, concurrent = true)) } - } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 95e474860e..b9f032786d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -287,6 +287,10 @@ private[akka] final case class ReplicatedEventMetaData( version: VersionVector, concurrent: Boolean) // whether when the event handler was executed the event was concurrent +// FIXME serializer +@InternalApi +private[akka] final case class ReplicatedSnapshotMetaData(version: VersionVector, seenPerReplica: Map[ReplicaId, Long]) + /** * An event replicated from a different replica. * diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 3fd33ab510..510099c1ed 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -18,7 +18,6 @@ import akka.annotation.InternalStableApi import akka.persistence._ import akka.persistence.JournalProtocol.ReplayMessages import akka.persistence.SnapshotProtocol.LoadSnapshot -import akka.persistence.typed.internal.JournalInteractions.EventOrTaggedOrReplicated import akka.util.{ unused, OptionVal } /** INTERNAL API */ @@ -191,12 +190,19 @@ private[akka] trait SnapshotInteractions[C, E, S] { setup.log.debug("Saving snapshot sequenceNr [{}]", state.seqNr) if (state.state == null) throw new IllegalStateException("A snapshot must not be a null state.") - else + else { + val meta = setup.activeActive match { + case Some(_) => + val m = ReplicatedSnapshotMetaData(state.version, state.seenPerReplica) + Some(m) + case None => None + } setup.snapshotStore.tell( SnapshotProtocol.SaveSnapshot( - SnapshotMetadata(setup.persistenceId.id, state.seqNr), + new SnapshotMetadata(setup.persistenceId.id, state.seqNr, meta), setup.snapshotAdapter.toJournal(state.state)), setup.selfClassic) + } } /** Deletes the snapshots up to and including the `sequenceNr`. */ diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 9f2d464ec3..524901b342 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -11,9 +11,10 @@ import akka.annotation.{ InternalApi, InternalStableApi } import akka.persistence._ import akka.persistence.SnapshotProtocol.LoadSnapshotFailed import akka.persistence.SnapshotProtocol.LoadSnapshotResult -import akka.persistence.typed.RecoveryFailed +import akka.persistence.typed.{ RecoveryFailed, ReplicaId } import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState import akka.util.unused +import akka.actor.typed.scaladsl.LoggerOps /** * INTERNAL API @@ -146,14 +147,32 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup case LoadSnapshotResult(sso, toSnr) => var state: S = setup.emptyState - val seqNr: Long = sso match { + val (seqNr: Long, seenPerReplica: Map[ReplicaId, Long], version: VersionVector) = sso match { case Some(SelectedSnapshot(metadata, snapshot)) => state = setup.snapshotAdapter.fromJournal(snapshot) - metadata.sequenceNr - case None => 0 // from the beginning please + setup.context.log.debug("Loaded snapshot with metadata {}", metadata) + metadata.meta match { + case Some(rm: ReplicatedSnapshotMetaData) => (metadata.sequenceNr, rm.seenPerReplica, rm.version) + case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty) + } + case None => (0L, Map.empty.withDefaultValue(0L), VersionVector.empty) } - becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill) + setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version) + + setup.cancelRecoveryTimer() + + ReplayingEvents[C, E, S]( + setup, + ReplayingEvents.ReplayingState( + seqNr, + state, + eventSeenInInterval = false, + toSnr, + receivedPoisonPill, + System.nanoTime(), + version, + seenPerReplica)) case LoadSnapshotFailed(cause) => onRecoveryFailure(cause) @@ -163,26 +182,4 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup } } - private def becomeReplayingEvents( - state: S, - lastSequenceNr: Long, - toSnr: Long, - receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { - setup.cancelRecoveryTimer() - - ReplayingEvents[C, E, S]( - setup, - ReplayingEvents.ReplayingState( - lastSequenceNr, - state, - eventSeenInInterval = false, - toSnr, - receivedPoisonPill, - System.nanoTime(), - VersionVector.empty, - // FIXME seqNrs for other replicas needs to come from snapshot. - seenSeqNrPerReplica = - setup.activeActive.map(_.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty))) - } - } diff --git a/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes b/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes index 91d5ea01ed..3e13002376 100644 --- a/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes +++ b/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes @@ -2,3 +2,10 @@ ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemMessages.*") ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemJournal.*") + +# marked as do not inherit +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentRepr.metadata") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentRepr.withMetadata") + +# changes to internal class +ProblemFilters.exclude[Problem]("akka.persistence.PersistentImpl*") diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index ec4559ce1b..91d9b25755 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -3,6 +3,7 @@ */ package akka.persistence +import scala.runtime.AbstractFunction3 /** * Snapshot metadata. @@ -10,12 +11,54 @@ package akka.persistence * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. + * @param meta a journal can optionally support persisting metadata separate to the domain state */ -@SerialVersionUID(1L) //#snapshot-metadata -final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) -//#snapshot-metadata +@SerialVersionUID(1L) +final class SnapshotMetadata( + val persistenceId: String, + val sequenceNr: Long, + val timestamp: Long, + val meta: Option[Any]) + extends Product3[String, Long, Long] + with Serializable { -object SnapshotMetadata { + def this(persistenceId: String, sequenceNr: Long, timestamp: Long) = { + this(persistenceId, sequenceNr, timestamp, None) + } + + private[akka] def this(persistenceId: String, sequenceNr: Long, meta: Option[Any]) = { + this(persistenceId, sequenceNr, 0L, meta) + } + + // for bincompat, used to be a case class + def copy( + persistenceId: String = this.persistenceId, + sequenceNr: Long = this.sequenceNr, + timestamp: Long = this.timestamp): SnapshotMetadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp, meta) + + override def toString = s"SnapshotMetadata($persistenceId, $sequenceNr, $timestamp, $meta)" + + // Product 3 + override def productPrefix = "SnapshotMetadata" + override def _1: String = persistenceId + override def _2: Long = sequenceNr + override def _3: Long = timestamp + override def canEqual(that: Any): Boolean = that.isInstanceOf[SnapshotMetadata] + + override def equals(other: Any): Boolean = other match { + case that: SnapshotMetadata => + persistenceId == that.persistenceId && + sequenceNr == that.sequenceNr && + timestamp == that.timestamp + case _ => false + } + override def hashCode(): Int = { + val state = Seq[Any](persistenceId, sequenceNr, timestamp) + state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) + } +} + +object SnapshotMetadata extends AbstractFunction3[String, Long, Long, SnapshotMetadata] { implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) => if (a eq b) false else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0 @@ -23,6 +66,22 @@ object SnapshotMetadata { else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp else false } + + def apply(persistenceId: String, sequenceNr: Long, timestamp: Long, meta: Option[Any]): SnapshotMetadata = + new SnapshotMetadata(persistenceId, sequenceNr, timestamp, meta) + + def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata = + new SnapshotMetadata(persistenceId, sequenceNr, timestamp, None) + + def apply(persistenceId: String, sequenceNr: Long): SnapshotMetadata = + new SnapshotMetadata(persistenceId, sequenceNr, 0, None) + + def unapply(sm: SnapshotMetadata): Option[(String, Long, Long)] = + Some((sm.persistenceId, sm.sequenceNr, sm.timestamp)) + + def apply$default$3(): Long = 0L + + def `$default$3`: Long = 0L } /**