Active active journal support changes (#29421)

* Add missing serializers (and rename classes because metadata is a word)
* Add test instances for journal and snapshot support
* Method to add metadata to existing snapshotmetadata
* Better error message if replicated stream does not have metadata
* Snapshot store tck support for metadata
* Docs for tck coverage of AA metadata
This commit is contained in:
Johan Andrén 2020-07-28 16:13:18 +02:00 committed by Christopher Batey
parent e4f5781d65
commit b8a1584e10
16 changed files with 3970 additions and 622 deletions

View file

@ -290,7 +290,9 @@ to fast forward the stream of events for the origin replica. (With additional po
## Journal Support ## Journal Support
For a journal plugin to support active active it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr] For a journal plugin to support active active it needs to store and read metadata for each event if it is defined in the @apiref[PersistentRepr]
`metadata` field. To attach the metadata after writing it, `PersistentRepr.withMetadata` is used. `metadata` field. To attach the metadata after writing it, `PersistentRepr.withMetadata` is used. The @apidoc[JournalSpec] in the Persistence TCK provides
a capability flag `supportsMetadata` to toggle verification that metadata is handled correctly.
For a snapshot plugin to support active active it needs to store and read metadata for the snapshot if it is defined in the @apiref[akka.persistence.SnapshotMetadata] `metadata` field. For a snapshot plugin to support active active it needs to store and read metadata for the snapshot if it is defined in the @apiref[akka.persistence.SnapshotMetadata] `metadata` field.
To attach the metadata when reading the snapshot the `akka.persistence.SnapshotMetadata.apply` factory overload taking a `metadata` parameter is used. To attach the metadata when reading the snapshot the `akka.persistence.SnapshotMetadata.apply` factory overload taking a `metadata` parameter is used.
The @apidoc[SnapshotStoreSpec] in the Persistence TCK provides a capability flag `supportsMetadata` to toggle verification that metadata is handled correctly.

View file

@ -69,5 +69,11 @@ trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
* deserialize snapshots. * deserialize snapshots.
*/ */
protected def supportsSerialization: CapabilityFlag protected def supportsSerialization: CapabilityFlag
/**
* When `true` enables tests which check if the snapshot store properly stores and
* loads metadata (needed for Active Active) along with the snapshots
*/
protected def supportsMetadata: CapabilityFlag
} }
//#snapshot-store-flags //#snapshot-store-flags

View file

@ -323,7 +323,7 @@ abstract class JournalSpec(config: Config)
AtomicWrite( AtomicWrite(
PersistentRepr( PersistentRepr(
payload = event, payload = event,
sequenceNr = 1L, sequenceNr = 6L,
persistenceId = pid, persistenceId = pid,
sender = Actor.noSender, sender = Actor.noSender,
writerUuid = writerUuid).withMetadata(meta)) writerUuid = writerUuid).withMetadata(meta))
@ -335,7 +335,7 @@ abstract class JournalSpec(config: Config)
val WriterUuid = writerUuid val WriterUuid = writerUuid
probe.expectMsgPF() { probe.expectMsgPF() {
case WriteMessageSuccess( case WriteMessageSuccess(
PersistentImpl(payload, 1L, Pid, _, _, Actor.noSender, WriterUuid, _, Some(`meta`)), PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, Some(`meta`)),
_) => _) =>
payload should be(event) payload should be(event)
} }

View file

@ -52,6 +52,7 @@ abstract class SnapshotStoreSpec(config: Config)
private var metadata: Seq[SnapshotMetadata] = Nil private var metadata: Seq[SnapshotMetadata] = Nil
override protected def supportsSerialization: CapabilityFlag = true override protected def supportsSerialization: CapabilityFlag = true
override protected def supportsMetadata: CapabilityFlag = false
override protected def beforeEach(): Unit = { override protected def beforeEach(): Unit = {
super.beforeEach() super.beforeEach()
@ -199,5 +200,27 @@ abstract class SnapshotStoreSpec(config: Config)
} }
} }
} }
optional(flag = supportsMetadata) {
"store metadata" in {
// we do not have the actual ReplicatedSnapshot metadata on classpath, but since
// the plugin should defer to serialization defined by Akka, so in general the type
// should not really be important to the plugin
val fictionalMeta = "fictional metadata"
val metadata = SnapshotMetadata(pid, 100).withMetadata(fictionalMeta)
val snap = "snap"
snapshotStore.tell(SaveSnapshot(metadata, snap), senderProbe.ref)
senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) => md }
val Pid = pid
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
senderProbe.expectMsgPF() {
case LoadSnapshotResult(
Some(SelectedSnapshot(meta @ SnapshotMetadata(Pid, 100, _), payload)),
Long.MaxValue) =>
payload should be(snap)
meta.metadata should ===(Some(fictionalMeta))
}
}
}
} }
} }

View file

@ -48,3 +48,19 @@ message VersionVector {
} }
repeated Entry entries = 1; repeated Entry entries = 1;
} }
message ReplicatedEventMetadata {
required string originReplica = 1;
required int64 originSequenceNr = 2;
required VersionVector versionVector = 3;
required bool concurrent = 4;
}
message ReplicatedSnapshotMetadata {
message Seen {
required string replicaId = 1;
required int64 sequenceNr = 2;
}
required VersionVector version = 1;
repeated Seen seenPerReplica = 2;
}

View file

@ -1,15 +1,17 @@
akka.actor { akka.actor {
serialization-identifiers."akka.persistence.typed.serialization.CrdtSerializer" = 40 serialization-identifiers."akka.persistence.typed.serialization.ActiveActiveSerializer" = 40
serializers.replicated-crdts = "akka.persistence.typed.serialization.CrdtSerializer" serializers.active-active = "akka.persistence.typed.serialization.ActiveActiveSerializer"
serialization-bindings { serialization-bindings {
"akka.persistence.typed.crdt.Counter" = replicated-crdts "akka.persistence.typed.internal.VersionVector" = active-active
"akka.persistence.typed.crdt.Counter$Updated" = replicated-crdts "akka.persistence.typed.crdt.Counter" = active-active
"akka.persistence.typed.internal.VersionVector" = replicated-crdts "akka.persistence.typed.crdt.Counter$Updated" = active-active
"akka.persistence.typed.crdt.ORSet" = replicated-crdts "akka.persistence.typed.crdt.ORSet" = active-active
"akka.persistence.typed.crdt.ORSet$DeltaOp" = replicated-crdts "akka.persistence.typed.crdt.ORSet$DeltaOp" = active-active
"akka.persistence.typed.internal.ReplicatedEventMetadata" = active-active
"akka.persistence.typed.internal.ReplicatedSnapshotMetadata" = active-active
} }
} }

View file

@ -271,7 +271,16 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
} }
// FIXME serializer object ReplicatedEventMetadata {
/**
* For a journal supporting active active needing to add test coverage, use this instance as metadata and defer
* to the built in serializer for serialization format
*/
@ApiMayChange
def instanceForJournalTest: Any = ReplicatedEventMetadata(ReplicaId("DC-A"), 1L, VersionVector.empty + "DC-A", true)
}
/** /**
* @param originReplica Where the event originally was created * @param originReplica Where the event originally was created
* @param originSequenceNr The original sequenceNr in the origin DC * @param originSequenceNr The original sequenceNr in the origin DC
@ -279,15 +288,28 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
* at each location as they are received at different times * at each location as they are received at different times
*/ */
@InternalApi @InternalApi
private[akka] final case class ReplicatedEventMetaData( private[akka] final case class ReplicatedEventMetadata(
originReplica: ReplicaId, originReplica: ReplicaId,
originSequenceNr: Long, originSequenceNr: Long,
version: VersionVector, version: VersionVector,
concurrent: Boolean) // whether when the event handler was executed the event was concurrent concurrent: Boolean) // whether when the event handler was executed the event was concurrent
// FIXME serializer object ReplicatedSnapshotMetadata {
/**
* For a snapshot store supporting active active needing to add test coverage, use this instance as metadata and defer
* to the built in serializer for serialization format
*/
@ApiMayChange
def instanceForSnapshotStoreTest: Any =
ReplicatedSnapshotMetadata(
VersionVector.empty + "DC-B" + "DC-A",
Map(ReplicaId("DC-A") -> 1L, ReplicaId("DC-B") -> 1L))
}
@InternalApi @InternalApi
private[akka] final case class ReplicatedSnapshotMetaData(version: VersionVector, seenPerReplica: Map[ReplicaId, Long]) private[akka] final case class ReplicatedSnapshotMetadata(version: VersionVector, seenPerReplica: Map[ReplicaId, Long])
/** /**
* An event replicated from a different replica. * An event replicated from a different replica.

View file

@ -29,7 +29,7 @@ private[akka] object JournalInteractions {
final case class EventToPersist( final case class EventToPersist(
adaptedEvent: EventOrTaggedOrReplicated, adaptedEvent: EventOrTaggedOrReplicated,
manifest: String, manifest: String,
metadata: Option[ReplicatedEventMetaData]) metadata: Option[ReplicatedEventMetadata])
} }
@ -193,7 +193,7 @@ private[akka] trait SnapshotInteractions[C, E, S] {
else { else {
val meta = setup.activeActive match { val meta = setup.activeActive match {
case Some(_) => case Some(_) =>
val m = ReplicatedSnapshotMetaData(state.version, state.seenPerReplica) val m = ReplicatedSnapshotMetadata(state.version, state.seenPerReplica)
Some(m) Some(m)
case None => None case None => None
} }

View file

@ -123,11 +123,11 @@ private[akka] final class ReplayingEvents[C, E, S](
eventForErrorReporting = OptionVal.Some(event) eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr) state = state.copy(seqNr = repr.sequenceNr)
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetaData, ReplicaId, ActiveActive)] = val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ActiveActive)] =
setup.activeActive match { setup.activeActive match {
case Some(aa) => case Some(aa) =>
val meta = repr.metadata match { val meta = repr.metadata match {
case Some(m) => m.asInstanceOf[ReplicatedEventMetaData] case Some(m) => m.asInstanceOf[ReplicatedEventMetadata]
case None => case None =>
throw new IllegalStateException( throw new IllegalStateException(
s"Active active enabled but existing event has no metadata. Migration isn't supported yet.") s"Active active enabled but existing event has no metadata. Migration isn't supported yet.")

View file

@ -152,7 +152,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
state = setup.snapshotAdapter.fromJournal(snapshot) state = setup.snapshotAdapter.fromJournal(snapshot)
setup.context.log.debug("Loaded snapshot with metadata {}", metadata) setup.context.log.debug("Loaded snapshot with metadata {}", metadata)
metadata.metadata match { metadata.metadata match {
case Some(rm: ReplicatedSnapshotMetaData) => (metadata.sequenceNr, rm.seenPerReplica, rm.version) case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version)
case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty) case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty)
} }
case None => (0L, Map.empty.withDefaultValue(0L), VersionVector.empty) case None => (0L, Map.empty.withDefaultValue(0L), VersionVector.empty)

View file

@ -144,7 +144,16 @@ private[akka] object Running {
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering // from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition // the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId) .filter(event =>
event.eventMetadata match {
case Some(replicatedMeta: ReplicatedEventMetadata) => replicatedMeta.originReplica == replicaId
case _ =>
throw new IllegalArgumentException(
s"Replication stream from replica ${replicaId} for ${setup.persistenceId} contains event " +
s"(sequence nr ${event.sequenceNr}) without replication metadata. " +
s"Is the persistence id used by a regular event sourced actor there or the journal for that replica (${queryPluginId}) " +
"used that does not support active active?")
})
.viaMat(new FastForwardingFilter)(Keep.right) .viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl)) .mapMaterializedValue(streamControl => controlRef.set(streamControl))
} }
@ -152,7 +161,7 @@ private[akka] object Running {
.via(ActorFlow .via(ActorFlow
.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) => .ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) =>
// Need to handle this not being available migration from non-active-active is supported // Need to handle this not being available migration from non-active-active is supported
val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData] val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetadata]
val re = val re =
ReplicatedEvent[E]( ReplicatedEvent[E](
eventEnvelope.event.asInstanceOf[E], eventEnvelope.event.asInstanceOf[E],
@ -412,7 +421,7 @@ private[akka] object Running {
event.event, event.event,
"", "",
OptionVal.Some( OptionVal.Some(
ReplicatedEventMetaData(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent))) ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent)))
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
@ -457,7 +466,7 @@ private[akka] object Running {
eventToPersist, eventToPersist,
eventAdapterManifest, eventAdapterManifest,
OptionVal.Some( OptionVal.Some(
ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false))) ReplicatedEventMetadata(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false)))
.copy(version = updatedVersion) .copy(version = updatedVersion)
if (setup.log.isTraceEnabled()) if (setup.log.isTraceEnabled())
@ -485,10 +494,10 @@ private[akka] object Running {
// also, ensure that there is an event handler for each single event // also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr _currentSequenceNumber = state.seqNr
val metadataTemplate: Option[ReplicatedEventMetaData] = setup.activeActive match { val metadataTemplate: Option[ReplicatedEventMetadata] = setup.activeActive match {
case Some(aa) => case Some(aa) =>
aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent
Some(ReplicatedEventMetaData(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later Some(ReplicatedEventMetadata(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later
case None => None case None => None
} }

View file

@ -10,7 +10,10 @@ import java.{ lang => jl }
import akka.actor.ExtendedActorSystem import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.{ Counter, ORSet } import akka.persistence.typed.crdt.{ Counter, ORSet }
import akka.persistence.typed.internal.ReplicatedEventMetadata
import akka.persistence.typed.internal.ReplicatedSnapshotMetadata
import akka.persistence.typed.internal.VersionVector import akka.persistence.typed.internal.VersionVector
import akka.protobufv3.internal.ByteString import akka.protobufv3.internal.ByteString
import akka.remote.ContainerFormats.Payload import akka.remote.ContainerFormats.Payload
@ -21,7 +24,7 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.immutable.TreeMap import scala.collection.immutable.TreeMap
object CrdtSerializer { object ActiveActiveSerializer {
object Comparator extends Comparator[Payload] { object Comparator extends Comparator[Payload] {
override def compare(a: Payload, b: Payload): Int = { override def compare(a: Payload, b: Payload): Int = {
val aByteString = a.getEnclosedMessage val aByteString = a.getEnclosedMessage
@ -50,7 +53,7 @@ object CrdtSerializer {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] final class CrdtSerializer(val system: ExtendedActorSystem) @InternalApi private[akka] final class ActiveActiveSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest extends SerializerWithStringManifest
with BaseSerializer { with BaseSerializer {
@ -67,6 +70,9 @@ object CrdtSerializer {
private val VersionVectorManifest = "DA" private val VersionVectorManifest = "DA"
private val ReplicatedEventMetadataManifest = "RE"
private val ReplicatedSnapshotMetadataManifest = "RS"
def manifest(o: AnyRef) = o match { def manifest(o: AnyRef) = o match {
case _: ORSet[_] => ORSetManifest case _: ORSet[_] => ORSetManifest
case _: ORSet.AddDeltaOp[_] => ORSetAddManifest case _: ORSet.AddDeltaOp[_] => ORSetAddManifest
@ -78,11 +84,19 @@ object CrdtSerializer {
case _: Counter.Updated => CrdtCounterUpdatedManifest case _: Counter.Updated => CrdtCounterUpdatedManifest
case _: VersionVector => VersionVectorManifest case _: VersionVector => VersionVectorManifest
case _: ReplicatedEventMetadata => ReplicatedEventMetadataManifest
case _: ReplicatedSnapshotMetadata => ReplicatedSnapshotMetadataManifest
case _ => case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
} }
def toBinary(o: AnyRef) = o match { def toBinary(o: AnyRef) = o match {
case m: ReplicatedEventMetadata => replicatedEventMetadataToProtoByteArray(m)
case m: ReplicatedSnapshotMetadata => replicatedSnapshotMetadataToByteArray(m)
case m: VersionVector => versionVectorToProto(m).toByteArray
case m: ORSet[_] => orsetToProto(m).toByteArray case m: ORSet[_] => orsetToProto(m).toByteArray
case m: ORSet.AddDeltaOp[_] => orsetToProto(m.underlying).toByteArray case m: ORSet.AddDeltaOp[_] => orsetToProto(m.underlying).toByteArray
case m: ORSet.RemoveDeltaOp[_] => orsetToProto(m.underlying).toByteArray case m: ORSet.RemoveDeltaOp[_] => orsetToProto(m.underlying).toByteArray
@ -91,12 +105,18 @@ object CrdtSerializer {
case m: Counter => counterToProtoByteArray(m) case m: Counter => counterToProtoByteArray(m)
case m: Counter.Updated => counterUpdatedToProtoBufByteArray(m) case m: Counter.Updated => counterUpdatedToProtoBufByteArray(m)
case m: VersionVector => versionVectorToProto(m).toByteArray
case _ => case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
} }
def fromBinary(bytes: Array[Byte], manifest: String) = manifest match { def fromBinary(bytes: Array[Byte], manifest: String) = manifest match {
case ReplicatedEventMetadataManifest => replicatedEventMetadataFromBinary(bytes)
case ReplicatedSnapshotMetadataManifest => replicatedSnapshotMetadataFromBinary(bytes)
case VersionVectorManifest => versionVectorFromBinary(bytes)
case ORSetManifest => orsetFromBinary(bytes) case ORSetManifest => orsetFromBinary(bytes)
case ORSetAddManifest => orsetAddFromBinary(bytes) case ORSetAddManifest => orsetAddFromBinary(bytes)
case ORSetRemoveManifest => orsetRemoveFromBinary(bytes) case ORSetRemoveManifest => orsetRemoveFromBinary(bytes)
@ -106,29 +126,29 @@ object CrdtSerializer {
case CrdtCounterManifest => counterFromBinary(bytes) case CrdtCounterManifest => counterFromBinary(bytes)
case CrdtCounterUpdatedManifest => counterUpdatedFromBinary(bytes) case CrdtCounterUpdatedManifest => counterUpdatedFromBinary(bytes)
case VersionVectorManifest => versionVectorFromBinary(bytes)
case _ => case _ =>
throw new NotSerializableException( throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
} }
def counterFromBinary(bytes: Array[Byte]): Counter = def counterFromBinary(bytes: Array[Byte]): Counter =
Counter(BigInt(Crdts.Counter.parseFrom(bytes).getValue.toByteArray)) Counter(BigInt(ActiveActive.Counter.parseFrom(bytes).getValue.toByteArray))
def counterUpdatedFromBinary(bytes: Array[Byte]): Counter.Updated = def counterUpdatedFromBinary(bytes: Array[Byte]): Counter.Updated =
Counter.Updated(BigInt(Crdts.CounterUpdate.parseFrom(bytes).getDelta.toByteArray)) Counter.Updated(BigInt(ActiveActive.CounterUpdate.parseFrom(bytes).getDelta.toByteArray))
def counterToProtoByteArray(counter: Counter): Array[Byte] = def counterToProtoByteArray(counter: Counter): Array[Byte] =
Crdts.Counter.newBuilder().setValue(ByteString.copyFrom(counter.value.toByteArray)).build().toByteArray ActiveActive.Counter.newBuilder().setValue(ByteString.copyFrom(counter.value.toByteArray)).build().toByteArray
def counterUpdatedToProtoBufByteArray(updated: Counter.Updated): Array[Byte] = def counterUpdatedToProtoBufByteArray(updated: Counter.Updated): Array[Byte] =
Crdts.CounterUpdate.newBuilder().setDelta(ByteString.copyFrom(updated.delta.toByteArray)).build().toByteArray ActiveActive.CounterUpdate.newBuilder().setDelta(ByteString.copyFrom(updated.delta.toByteArray)).build().toByteArray
def orsetToProto(orset: ORSet[_]): Crdts.ORSet = def orsetToProto(orset: ORSet[_]): ActiveActive.ORSet =
orsetToProtoImpl(orset.asInstanceOf[ORSet[Any]]) orsetToProtoImpl(orset.asInstanceOf[ORSet[Any]])
private def orsetToProtoImpl(orset: ORSet[Any]): Crdts.ORSet = { private def orsetToProtoImpl(orset: ORSet[Any]): ActiveActive.ORSet = {
val b = Crdts.ORSet.newBuilder().setOriginDc(orset.originReplica).setVvector(versionVectorToProto(orset.vvector)) val b =
ActiveActive.ORSet.newBuilder().setOriginDc(orset.originReplica).setVvector(versionVectorToProto(orset.vvector))
// using java collections and sorting for performance (avoid conversions) // using java collections and sorting for performance (avoid conversions)
val stringElements = new ArrayList[String] val stringElements = new ArrayList[String]
val intElements = new ArrayList[Integer] val intElements = new ArrayList[Integer]
@ -174,7 +194,7 @@ object CrdtSerializer {
addDots(longElements) addDots(longElements)
} }
if (!otherElements.isEmpty) { if (!otherElements.isEmpty) {
Collections.sort(otherElements, CrdtSerializer.Comparator) Collections.sort(otherElements, ActiveActiveSerializer.Comparator)
b.addAllOtherElements(otherElements) b.addAllOtherElements(otherElements)
addDots(otherElements) addDots(otherElements)
} }
@ -182,31 +202,55 @@ object CrdtSerializer {
b.build() b.build()
} }
def replicatedEventMetadataToProtoByteArray(rem: ReplicatedEventMetadata): Array[Byte] = {
ActiveActive.ReplicatedEventMetadata
.newBuilder()
.setOriginSequenceNr(rem.originSequenceNr)
.setConcurrent(rem.concurrent)
.setOriginReplica(rem.originReplica.id)
.setVersionVector(versionVectorToProto(rem.version))
.build()
.toByteArray
}
def replicatedSnapshotMetadataToByteArray(rsm: ReplicatedSnapshotMetadata): Array[Byte] = {
ActiveActive.ReplicatedSnapshotMetadata
.newBuilder()
.setVersion(versionVectorToProto(rsm.version))
.addAllSeenPerReplica(rsm.seenPerReplica.map(seenToProto).asJava)
.build()
.toByteArray
}
def seenToProto(t: (ReplicaId, Long)): ActiveActive.ReplicatedSnapshotMetadata.Seen = {
ActiveActive.ReplicatedSnapshotMetadata.Seen.newBuilder().setReplicaId(t._1.id).setSequenceNr(t._2).build()
}
def orsetFromBinary(bytes: Array[Byte]): ORSet[Any] = def orsetFromBinary(bytes: Array[Byte]): ORSet[Any] =
orsetFromProto(Crdts.ORSet.parseFrom(bytes)) orsetFromProto(ActiveActive.ORSet.parseFrom(bytes))
private def orsetAddFromBinary(bytes: Array[Byte]): ORSet.AddDeltaOp[Any] = private def orsetAddFromBinary(bytes: Array[Byte]): ORSet.AddDeltaOp[Any] =
new ORSet.AddDeltaOp(orsetFromProto(Crdts.ORSet.parseFrom(bytes))) new ORSet.AddDeltaOp(orsetFromProto(ActiveActive.ORSet.parseFrom(bytes)))
private def orsetRemoveFromBinary(bytes: Array[Byte]): ORSet.RemoveDeltaOp[Any] = private def orsetRemoveFromBinary(bytes: Array[Byte]): ORSet.RemoveDeltaOp[Any] =
new ORSet.RemoveDeltaOp(orsetFromProto(Crdts.ORSet.parseFrom(bytes))) new ORSet.RemoveDeltaOp(orsetFromProto(ActiveActive.ORSet.parseFrom(bytes)))
private def orsetFullFromBinary(bytes: Array[Byte]): ORSet.FullStateDeltaOp[Any] = private def orsetFullFromBinary(bytes: Array[Byte]): ORSet.FullStateDeltaOp[Any] =
new ORSet.FullStateDeltaOp(orsetFromProto(Crdts.ORSet.parseFrom(bytes))) new ORSet.FullStateDeltaOp(orsetFromProto(ActiveActive.ORSet.parseFrom(bytes)))
private def orsetDeltaGroupToProto(deltaGroup: ORSet.DeltaGroup[_]): Crdts.ORSetDeltaGroup = { private def orsetDeltaGroupToProto(deltaGroup: ORSet.DeltaGroup[_]): ActiveActive.ORSetDeltaGroup = {
def createEntry(opType: Crdts.ORSetDeltaOp, u: ORSet[_]) = { def createEntry(opType: ActiveActive.ORSetDeltaOp, u: ORSet[_]) = {
Crdts.ORSetDeltaGroup.Entry.newBuilder().setOperation(opType).setUnderlying(orsetToProto(u)) ActiveActive.ORSetDeltaGroup.Entry.newBuilder().setOperation(opType).setUnderlying(orsetToProto(u))
} }
val b = Crdts.ORSetDeltaGroup.newBuilder() val b = ActiveActive.ORSetDeltaGroup.newBuilder()
deltaGroup.ops.foreach { deltaGroup.ops.foreach {
case ORSet.AddDeltaOp(u) => case ORSet.AddDeltaOp(u) =>
b.addEntries(createEntry(Crdts.ORSetDeltaOp.Add, u)) b.addEntries(createEntry(ActiveActive.ORSetDeltaOp.Add, u))
case ORSet.RemoveDeltaOp(u) => case ORSet.RemoveDeltaOp(u) =>
b.addEntries(createEntry(Crdts.ORSetDeltaOp.Remove, u)) b.addEntries(createEntry(ActiveActive.ORSetDeltaOp.Remove, u))
case ORSet.FullStateDeltaOp(u) => case ORSet.FullStateDeltaOp(u) =>
b.addEntries(createEntry(Crdts.ORSetDeltaOp.Full, u)) b.addEntries(createEntry(ActiveActive.ORSetDeltaOp.Full, u))
case ORSet.DeltaGroup(_) => case ORSet.DeltaGroup(_) =>
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested") throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
} }
@ -214,14 +258,14 @@ object CrdtSerializer {
} }
private def orsetDeltaGroupFromBinary(bytes: Array[Byte]): ORSet.DeltaGroup[Any] = { private def orsetDeltaGroupFromBinary(bytes: Array[Byte]): ORSet.DeltaGroup[Any] = {
val deltaGroup = Crdts.ORSetDeltaGroup.parseFrom(bytes) val deltaGroup = ActiveActive.ORSetDeltaGroup.parseFrom(bytes)
val ops: Vector[ORSet.DeltaOp] = val ops: Vector[ORSet.DeltaOp] =
deltaGroup.getEntriesList.asScala.map { entry => deltaGroup.getEntriesList.asScala.map { entry =>
if (entry.getOperation == Crdts.ORSetDeltaOp.Add) if (entry.getOperation == ActiveActive.ORSetDeltaOp.Add)
ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)) ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying))
else if (entry.getOperation == Crdts.ORSetDeltaOp.Remove) else if (entry.getOperation == ActiveActive.ORSetDeltaOp.Remove)
ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)) ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying))
else if (entry.getOperation == Crdts.ORSetDeltaOp.Full) else if (entry.getOperation == ActiveActive.ORSetDeltaOp.Full)
ORSet.FullStateDeltaOp(orsetFromProto(entry.getUnderlying)) ORSet.FullStateDeltaOp(orsetFromProto(entry.getUnderlying))
else else
throw new NotSerializableException(s"Unknow ORSet delta operation ${entry.getOperation}") throw new NotSerializableException(s"Unknow ORSet delta operation ${entry.getOperation}")
@ -229,7 +273,7 @@ object CrdtSerializer {
ORSet.DeltaGroup(ops) ORSet.DeltaGroup(ops)
} }
def orsetFromProto(orset: Crdts.ORSet): ORSet[Any] = { def orsetFromProto(orset: ActiveActive.ORSet): ORSet[Any] = {
val elements: Iterator[Any] = val elements: Iterator[Any] =
(orset.getStringElementsList.iterator.asScala ++ (orset.getStringElementsList.iterator.asScala ++
orset.getIntElementsList.iterator.asScala ++ orset.getIntElementsList.iterator.asScala ++
@ -242,18 +286,18 @@ object CrdtSerializer {
new ORSet(orset.getOriginDc, elementsMap, vvector = versionVectorFromProto(orset.getVvector)) new ORSet(orset.getOriginDc, elementsMap, vvector = versionVectorFromProto(orset.getVvector))
} }
def versionVectorToProto(versionVector: VersionVector): Crdts.VersionVector = { def versionVectorToProto(versionVector: VersionVector): ActiveActive.VersionVector = {
val b = Crdts.VersionVector.newBuilder() val b = ActiveActive.VersionVector.newBuilder()
versionVector.versionsIterator.foreach { versionVector.versionsIterator.foreach {
case (key, value) => b.addEntries(Crdts.VersionVector.Entry.newBuilder().setKey(key).setVersion(value)) case (key, value) => b.addEntries(ActiveActive.VersionVector.Entry.newBuilder().setKey(key).setVersion(value))
} }
b.build() b.build()
} }
def versionVectorFromBinary(bytes: Array[Byte]): VersionVector = def versionVectorFromBinary(bytes: Array[Byte]): VersionVector =
versionVectorFromProto(Crdts.VersionVector.parseFrom(bytes)) versionVectorFromProto(ActiveActive.VersionVector.parseFrom(bytes))
def versionVectorFromProto(versionVector: Crdts.VersionVector): VersionVector = { def versionVectorFromProto(versionVector: ActiveActive.VersionVector): VersionVector = {
val entries = versionVector.getEntriesList val entries = versionVector.getEntriesList
if (entries.isEmpty) if (entries.isEmpty)
VersionVector.empty VersionVector.empty
@ -266,4 +310,20 @@ object CrdtSerializer {
} }
} }
def replicatedEventMetadataFromBinary(bytes: Array[Byte]): ReplicatedEventMetadata = {
val parsed = ActiveActive.ReplicatedEventMetadata.parseFrom(bytes)
ReplicatedEventMetadata(
ReplicaId(parsed.getOriginReplica),
parsed.getOriginSequenceNr,
versionVectorFromProto(parsed.getVersionVector),
parsed.getConcurrent)
}
def replicatedSnapshotMetadataFromBinary(bytes: Array[Byte]): ReplicatedSnapshotMetadata = {
val parsed = ActiveActive.ReplicatedSnapshotMetadata.parseFrom(bytes)
ReplicatedSnapshotMetadata(
versionVectorFromProto(parsed.getVersion),
parsed.getSeenPerReplicaList.asScala.map(seen => ReplicaId(seen.getReplicaId) -> seen.getSequenceNr).toMap)
}
} }

View file

@ -0,0 +1,40 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.typed.crdt.Counter
import akka.persistence.typed.crdt.ORSet
import akka.persistence.typed.internal.ReplicatedEventMetadata
import akka.persistence.typed.internal.ReplicatedSnapshotMetadata
import akka.persistence.typed.internal.VersionVector
import org.scalatest.wordspec.AnyWordSpecLike
class ActiveActiveSerializationSpec
extends ScalaTestWithActorTestKit(ClusterSingletonPersistenceSpec.config)
with AnyWordSpecLike
with LogCapturing {
"The ActiveActive components that needs to be serializable" must {
"be serializable" in {
serializationTestKit.verifySerialization(
ReplicatedEventMetadata(ReplicaId("DC-A"), 2L, VersionVector.empty.increment("DC-B"), true))
serializationTestKit.verifySerialization(
ReplicatedSnapshotMetadata(
VersionVector.empty.increment("DC-B"),
Map(ReplicaId("DC-A") -> 1L, ReplicaId("DC-B") -> 2L)))
serializationTestKit.verifySerialization(Counter(BigInt(24)))
serializationTestKit.verifySerialization(Counter.Updated(BigInt(1)))
serializationTestKit.verifySerialization(ORSet(ReplicaId("DC-A")))
serializationTestKit.verifySerialization(ORSet.AddDeltaOp(ORSet(ReplicaId("DC-A"))))
// FIXME DeltaGroup?
}
}
}

View file

@ -30,6 +30,9 @@ final class SnapshotMetadata(
this(persistenceId, sequenceNr, 0L, meta) this(persistenceId, sequenceNr, 0L, meta)
} }
def withMetadata(metadata: Any): SnapshotMetadata =
new SnapshotMetadata(persistenceId, sequenceNr, timestamp, Some(metadata))
// for bincompat, used to be a case class // for bincompat, used to be a case class
def copy( def copy(
persistenceId: String = this.persistenceId, persistenceId: String = this.persistenceId,

View file

@ -65,15 +65,14 @@ object OSGi {
val protobuf = exports(Seq("akka.protobuf.*")) val protobuf = exports(Seq("akka.protobuf.*"))
val protobufV3 = osgiSettings ++ Seq( val protobufV3 = osgiSettings ++ Seq(
OsgiKeys.importPackage := Seq( OsgiKeys.importPackage := Seq(
"!sun.misc", "!sun.misc",
scalaJava8CompatImport(), scalaJava8CompatImport(),
scalaVersion(scalaImport).value, scalaVersion(scalaImport).value,
configImport(), configImport(),
"*"), "*"),
OsgiKeys.exportPackage := Seq("akka.protobufv3.internal.*"), OsgiKeys.exportPackage := Seq("akka.protobufv3.internal.*"),
OsgiKeys.privatePackage := Seq("google.protobuf.*") OsgiKeys.privatePackage := Seq("google.protobuf.*"))
)
val jackson = exports(Seq("akka.serialization.jackson.*")) val jackson = exports(Seq("akka.serialization.jackson.*"))