Metadata for snapshots for active active (#29362)

This commit is contained in:
Christopher Batey 2020-07-16 16:06:04 +01:00
parent 2c0e837183
commit 7e91428428
8 changed files with 256 additions and 65 deletions

View file

@ -566,9 +566,11 @@ Scala
Java
: @@snip [LambdaPersistenceDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #save-snapshot }
where `metadata` is of type `SnapshotMetadata`:
where `metadata` is of type `SnapshotMetadata` and contains:
@@snip [SnapshotProtocol.scala](/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala) { #snapshot-metadata }
* persistenceId
* sequenceNr
* timestamp
During recovery, the persistent actor is offered the latest saved snapshot via a `SnapshotOffer` message from
which it can initialize internal state.

View file

@ -0,0 +1,113 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSnapshotPlugin }
import akka.persistence.testkit.scaladsl.{ PersistenceTestKit, SnapshotTestKit }
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveSnapshotSpec {
import ActiveActiveSpec._
def behaviorWithSnapshotting(entityId: String, replicaId: ReplicaId): Behavior[Command] =
behaviorWithSnapshotting(entityId, replicaId, None)
def behaviorWithSnapshotting(
entityId: String,
replicaId: ReplicaId,
eventProbe: ActorRef[EventAndContext]): Behavior[Command] =
behaviorWithSnapshotting(entityId, replicaId, Some(eventProbe))
def behaviorWithSnapshotting(
entityId: String,
replicaId: ReplicaId,
probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = {
ActiveActiveEventSourcing.withSharedJournal(
entityId,
replicaId,
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(aaContext =>
eventSourcedBehavior(aaContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0))
}
}
class ActiveActiveSnapshotSpec
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
with AnyWordSpecLike
with LogCapturing
with Eventually {
import ActiveActiveSpec._
import ActiveActiveSnapshotSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
val snapshotTestKit = SnapshotTestKit(system)
val persistenceTestKit = PersistenceTestKit(system)
val R1 = ReplicaId("R1")
val R2 = ReplicaId("R2")
"ActiveActive" should {
"recover state from snapshots" in {
val entityId = nextEntityId
val persistenceIdR1 = s"$entityId|R1"
val persistenceIdR2 = s"$entityId|R2"
val probe = createTestProbe[Done]()
val r2EventProbe = createTestProbe[EventAndContext]()
{
val r1 = spawn(behaviorWithSnapshotting(entityId, R1))
val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref))
r1 ! StoreMe("r1 1", probe.ref)
r1 ! StoreMe("r1 2", probe.ref)
r2EventProbe.expectMessageType[EventAndContext]
r2EventProbe.expectMessageType[EventAndContext]
snapshotTestKit.expectNextPersisted(persistenceIdR1, State(List("r1 2", "r1 1")))
snapshotTestKit.expectNextPersisted(persistenceIdR2, State(List("r1 2", "r1 1")))
r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(entityId, R1),
1L,
"two-again",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)))
// r2 should now filter out that event if it receives it again
r2EventProbe.expectNoMessage()
}
// restart r2 from a snapshot, the event should still be filtered
{
val r2 = spawn(behaviorWithSnapshotting(entityId, R2, r2EventProbe.ref))
r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
PersistenceId.replicatedUniqueId(entityId, R1),
1L,
"two-again",
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData(R1, VersionVector.empty)))
r2EventProbe.expectNoMessage()
val stateProbe = createTestProbe[State]()
r2 ! GetState(stateProbe.ref)
stateProbe.expectMessage(State(List("r1 2", "r1 1")))
}
}
}
}

View file

@ -13,9 +13,8 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
@ -30,20 +29,14 @@ object ActiveActiveSpec {
case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command
case object Stop extends Command
case class State(all: List[String])
case class State(all: List[String]) extends CborSerializable
def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] =
testBehavior(entityId, replicaId, Some(probe))
def testBehavior(
entityId: String,
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ActiveActiveEventSourcing.withSharedJournal(
entityId,
ReplicaId(replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(
aaContext =>
def eventSourcedBehavior(
aaContext: ActiveActiveContext,
probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = {
EventSourcedBehavior[Command, String, State](
aaContext.persistenceId,
State(Nil),
@ -65,7 +58,18 @@ object ActiveActiveSpec {
(state, event) => {
probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent))
state.copy(all = event :: state.all)
}))
})
}
def testBehavior(
entityId: String,
replicaId: String,
probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] =
ActiveActiveEventSourcing.withSharedJournal(
entityId,
ReplicaId(replicaId),
AllReplicas,
PersistenceTestKitReadJournal.Identifier)(aaContext => eventSourcedBehavior(aaContext, probe))
}
@ -338,6 +342,5 @@ class ActiveActiveSpec
eventProbeR1Take2.expectMessage(
EventAndContext("from r2", ReplicaId("R2"), recoveryRunning = true, concurrent = true))
}
}
}

View file

@ -287,6 +287,10 @@ private[akka] final case class ReplicatedEventMetaData(
version: VersionVector,
concurrent: Boolean) // whether when the event handler was executed the event was concurrent
// FIXME serializer
@InternalApi
private[akka] final case class ReplicatedSnapshotMetaData(version: VersionVector, seenPerReplica: Map[ReplicaId, Long])
/**
* An event replicated from a different replica.
*

View file

@ -18,7 +18,6 @@ import akka.annotation.InternalStableApi
import akka.persistence._
import akka.persistence.JournalProtocol.ReplayMessages
import akka.persistence.SnapshotProtocol.LoadSnapshot
import akka.persistence.typed.internal.JournalInteractions.EventOrTaggedOrReplicated
import akka.util.{ unused, OptionVal }
/** INTERNAL API */
@ -191,13 +190,20 @@ private[akka] trait SnapshotInteractions[C, E, S] {
setup.log.debug("Saving snapshot sequenceNr [{}]", state.seqNr)
if (state.state == null)
throw new IllegalStateException("A snapshot must not be a null state.")
else
else {
val meta = setup.activeActive match {
case Some(_) =>
val m = ReplicatedSnapshotMetaData(state.version, state.seenPerReplica)
Some(m)
case None => None
}
setup.snapshotStore.tell(
SnapshotProtocol.SaveSnapshot(
SnapshotMetadata(setup.persistenceId.id, state.seqNr),
new SnapshotMetadata(setup.persistenceId.id, state.seqNr, meta),
setup.snapshotAdapter.toJournal(state.state)),
setup.selfClassic)
}
}
/** Deletes the snapshots up to and including the `sequenceNr`. */
protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr: Long): Unit = {

View file

@ -11,9 +11,10 @@ import akka.annotation.{ InternalApi, InternalStableApi }
import akka.persistence._
import akka.persistence.SnapshotProtocol.LoadSnapshotFailed
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.{ RecoveryFailed, ReplicaId }
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.util.unused
import akka.actor.typed.scaladsl.LoggerOps
/**
* INTERNAL API
@ -146,14 +147,32 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
case LoadSnapshotResult(sso, toSnr) =>
var state: S = setup.emptyState
val seqNr: Long = sso match {
val (seqNr: Long, seenPerReplica: Map[ReplicaId, Long], version: VersionVector) = sso match {
case Some(SelectedSnapshot(metadata, snapshot)) =>
state = setup.snapshotAdapter.fromJournal(snapshot)
metadata.sequenceNr
case None => 0 // from the beginning please
setup.context.log.debug("Loaded snapshot with metadata {}", metadata)
metadata.meta match {
case Some(rm: ReplicatedSnapshotMetaData) => (metadata.sequenceNr, rm.seenPerReplica, rm.version)
case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty)
}
case None => (0L, Map.empty.withDefaultValue(0L), VersionVector.empty)
}
becomeReplayingEvents(state, seqNr, toSnr, receivedPoisonPill)
setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version)
setup.cancelRecoveryTimer()
ReplayingEvents[C, E, S](
setup,
ReplayingEvents.ReplayingState(
seqNr,
state,
eventSeenInInterval = false,
toSnr,
receivedPoisonPill,
System.nanoTime(),
version,
seenPerReplica))
case LoadSnapshotFailed(cause) =>
onRecoveryFailure(cause)
@ -163,26 +182,4 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
}
}
private def becomeReplayingEvents(
state: S,
lastSequenceNr: Long,
toSnr: Long,
receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
setup.cancelRecoveryTimer()
ReplayingEvents[C, E, S](
setup,
ReplayingEvents.ReplayingState(
lastSequenceNr,
state,
eventSeenInInterval = false,
toSnr,
receivedPoisonPill,
System.nanoTime(),
VersionVector.empty,
// FIXME seqNrs for other replicas needs to come from snapshot.
seenSeqNrPerReplica =
setup.activeActive.map(_.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty)))
}
}

View file

@ -2,3 +2,10 @@
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemMessages.*")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemJournal.*")
# marked as do not inherit
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentRepr.metadata")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentRepr.withMetadata")
# changes to internal class
ProblemFilters.exclude[Problem]("akka.persistence.PersistentImpl*")

View file

@ -3,6 +3,7 @@
*/
package akka.persistence
import scala.runtime.AbstractFunction3
/**
* Snapshot metadata.
@ -10,12 +11,54 @@ package akka.persistence
* @param persistenceId id of persistent actor from which the snapshot was taken.
* @param sequenceNr sequence number at which the snapshot was taken.
* @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
* @param meta a journal can optionally support persisting metadata separate to the domain state
*/
@SerialVersionUID(1L) //#snapshot-metadata
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata
@SerialVersionUID(1L)
final class SnapshotMetadata(
val persistenceId: String,
val sequenceNr: Long,
val timestamp: Long,
val meta: Option[Any])
extends Product3[String, Long, Long]
with Serializable {
object SnapshotMetadata {
def this(persistenceId: String, sequenceNr: Long, timestamp: Long) = {
this(persistenceId, sequenceNr, timestamp, None)
}
private[akka] def this(persistenceId: String, sequenceNr: Long, meta: Option[Any]) = {
this(persistenceId, sequenceNr, 0L, meta)
}
// for bincompat, used to be a case class
def copy(
persistenceId: String = this.persistenceId,
sequenceNr: Long = this.sequenceNr,
timestamp: Long = this.timestamp): SnapshotMetadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp, meta)
override def toString = s"SnapshotMetadata($persistenceId, $sequenceNr, $timestamp, $meta)"
// Product 3
override def productPrefix = "SnapshotMetadata"
override def _1: String = persistenceId
override def _2: Long = sequenceNr
override def _3: Long = timestamp
override def canEqual(that: Any): Boolean = that.isInstanceOf[SnapshotMetadata]
override def equals(other: Any): Boolean = other match {
case that: SnapshotMetadata =>
persistenceId == that.persistenceId &&
sequenceNr == that.sequenceNr &&
timestamp == that.timestamp
case _ => false
}
override def hashCode(): Int = {
val state = Seq[Any](persistenceId, sequenceNr, timestamp)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
object SnapshotMetadata extends AbstractFunction3[String, Long, Long, SnapshotMetadata] {
implicit val ordering: Ordering[SnapshotMetadata] = Ordering.fromLessThan[SnapshotMetadata] { (a, b) =>
if (a eq b) false
else if (a.persistenceId != b.persistenceId) a.persistenceId.compareTo(b.persistenceId) < 0
@ -23,6 +66,22 @@ object SnapshotMetadata {
else if (a.timestamp != b.timestamp) a.timestamp < b.timestamp
else false
}
def apply(persistenceId: String, sequenceNr: Long, timestamp: Long, meta: Option[Any]): SnapshotMetadata =
new SnapshotMetadata(persistenceId, sequenceNr, timestamp, meta)
def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata =
new SnapshotMetadata(persistenceId, sequenceNr, timestamp, None)
def apply(persistenceId: String, sequenceNr: Long): SnapshotMetadata =
new SnapshotMetadata(persistenceId, sequenceNr, 0, None)
def unapply(sm: SnapshotMetadata): Option[(String, Long, Long)] =
Some((sm.persistenceId, sm.sequenceNr, sm.timestamp))
def apply$default$3(): Long = 0L
def `<init>$default$3`: Long = 0L
}
/**