From f63ca66e56faf2a2ac45afa57f3131ee7915bc88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 22 Jul 2020 16:22:12 +0200 Subject: [PATCH] Active active docs mention plugin support (#29418) * Mention specific support for active active is needed in plugins * Align metadata name journal vs snapshot --- .../paradox/typed/persistence-active-active.md | 14 ++++++++++++-- .../typed/internal/ReplayingSnapshot.scala | 2 +- .../scala/akka/persistence/SnapshotProtocol.scala | 9 +++++---- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/akka-docs/src/main/paradox/typed/persistence-active-active.md b/akka-docs/src/main/paradox/typed/persistence-active-active.md index dcf79528b4..108cf7f04a 100644 --- a/akka-docs/src/main/paradox/typed/persistence-active-active.md +++ b/akka-docs/src/main/paradox/typed/persistence-active-active.md @@ -33,7 +33,9 @@ there is no longer the single writer principle as there is with a normal `EventS The state of an active-active `EventSourcedBehavior` is **eventually consistent**. Event replication may be delayed due to network partitions and outages and the event handler and those reading the state must be designed to handle this. -## Relaxing the single writer p`rinciple for availability +To be able to use active active the journal and snapshot store used is required to have specific support for the metadata that active active needs (see @ref[Journal Support](#journal-support)) + +## Relaxing the single writer principle for availability Taking the example of using active-active to run a replica per data center. @@ -283,4 +285,12 @@ and then enable direct replication through `withDirectReplication()` on @apidoc[ The "event publishing" feature publishes each event to the local system event bus as a side effect after it has been written, the `ActiveActiveShardingDirectReplication` actor subscribes to these events and forwards them to the replicas allowing them -to fast forward the stream of events for the origin replica. (With additional potential future support in journals for fast forwarding [#29311](https://github.com/akka/akka/issues/29311)). \ No newline at end of file +to fast forward the stream of events for the origin replica. (With additional potential future support in journals for fast forwarding [#29311](https://github.com/akka/akka/issues/29311)). + +## Journal Support + +For a journal plugin to support active active it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr] + `metadata` field. To attach the metadata after writing it, `PersistentRepr.withMetadata` is used. + +For a snapshot plugin to support active active it needs to store and read metadata for the snapshot if it is defined in the @apiref[akka.persistence.SnapshotMetadata] `metadata` field. +To attach the metadata when reading the snapshot the `akka.persistence.SnapshotMetadata.apply` factory overload taking a `metadata` parameter is used. \ No newline at end of file diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 524901b342..d4c6481dda 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -151,7 +151,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup case Some(SelectedSnapshot(metadata, snapshot)) => state = setup.snapshotAdapter.fromJournal(snapshot) setup.context.log.debug("Loaded snapshot with metadata {}", metadata) - metadata.meta match { + metadata.metadata match { case Some(rm: ReplicatedSnapshotMetaData) => (metadata.sequenceNr, rm.seenPerReplica, rm.version) case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty) } diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index 91d9b25755..c737aa21f8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -11,14 +11,14 @@ import scala.runtime.AbstractFunction3 * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. - * @param meta a journal can optionally support persisting metadata separate to the domain state + * @param metadata a journal can optionally support persisting metadata separate to the domain state, used for active active support */ @SerialVersionUID(1L) final class SnapshotMetadata( val persistenceId: String, val sequenceNr: Long, val timestamp: Long, - val meta: Option[Any]) + val metadata: Option[Any]) extends Product3[String, Long, Long] with Serializable { @@ -34,9 +34,10 @@ final class SnapshotMetadata( def copy( persistenceId: String = this.persistenceId, sequenceNr: Long = this.sequenceNr, - timestamp: Long = this.timestamp): SnapshotMetadata = SnapshotMetadata(persistenceId, sequenceNr, timestamp, meta) + timestamp: Long = this.timestamp): SnapshotMetadata = + SnapshotMetadata(persistenceId, sequenceNr, timestamp, metadata) - override def toString = s"SnapshotMetadata($persistenceId, $sequenceNr, $timestamp, $meta)" + override def toString = s"SnapshotMetadata($persistenceId, $sequenceNr, $timestamp, $metadata)" // Product 3 override def productPrefix = "SnapshotMetadata"