PersistentRepr serializer support for metadata (#29434)

And fixing the warnings for the 2.13 build, not sure why PR
validation passes for these

The write side of the JDBC plugin works now as it
persists the PR
This commit is contained in:
Christopher Batey 2020-07-29 07:31:17 +01:00
parent b8a1584e10
commit 0b11ae362c
14 changed files with 289 additions and 34 deletions

View file

@ -15,7 +15,7 @@ import akka.annotation.InternalApi
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.ReplicaId
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
/**
* Used when sharding Active Active entities in multiple instances of sharding, for example one per DC in a Multi DC

View file

@ -11,7 +11,7 @@ import akka.persistence.typed.ReplicaId
import scala.collection.immutable
import scala.reflect.ClassTag
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
import java.util.{ Set => JSet }
import akka.annotation.ApiMayChange

View file

@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory
import akka.actor.typed.scaladsl.LoggerOps
import akka.cluster.sharding.typed.ActiveActiveShardingDirectReplication
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
/**
* INTERNAL API

View file

@ -339,6 +339,14 @@ abstract class JournalSpec(config: Config)
_) =>
payload should be(event)
}
journal ! ReplayMessages(6, 6, 1, Pid, receiverProbe.ref)
receiverProbe.expectMsgPF() {
case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, Some(`meta`))) =>
payload should be(event)
}
receiverProbe.expectMsg(RecoverySuccess(6L))
}
}
}

View file

@ -295,7 +295,7 @@ final class ORSet[A] private[akka] (
* Java API
*/
def getElements(): java.util.Set[A] = {
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
elements.asJava
}
@ -324,7 +324,7 @@ final class ORSet[A] private[akka] (
* `elems` must not be empty.
*/
def addAll(elems: java.util.Set[A]): ORSet.DeltaOp = {
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
addAll(elems.asScala.toSet)
}
@ -366,7 +366,7 @@ final class ORSet[A] private[akka] (
* `elems` must not be empty.
*/
def removeAll(elems: java.util.Set[A]): ORSet.DeltaOp = {
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
removeAll(elems.asScala.toSet)
}

View file

@ -147,15 +147,15 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
case LoadSnapshotResult(sso, toSnr) =>
var state: S = setup.emptyState
val (seqNr: Long, seenPerReplica: Map[ReplicaId, Long], version: VersionVector) = sso match {
val (seqNr: Long, seenPerReplica, version) = sso match {
case Some(SelectedSnapshot(metadata, snapshot)) =>
state = setup.snapshotAdapter.fromJournal(snapshot)
setup.context.log.debug("Loaded snapshot with metadata {}", metadata)
setup.context.log.debug("Loaded snapshot with metadata [{}]", metadata)
metadata.metadata match {
case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version)
case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty)
case _ => (metadata.sequenceNr, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty)
}
case None => (0L, Map.empty.withDefaultValue(0L), VersionVector.empty)
case None => (0L, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty)
}
setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version)

View file

@ -13,7 +13,7 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.ActiveActiveContextImpl
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
/**
* Provides access to Active Active specific state

View file

@ -8,7 +8,7 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.ReplicaId
import akka.util.{ OptionVal, WallClock }
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
// FIXME docs
trait ActiveActiveContext {

View file

@ -21,7 +21,7 @@ import akka.remote.serialization.WrappedPayloadSupport
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable.TreeMap
object ActiveActiveSerializer {

View file

@ -169,6 +169,21 @@ public final class MessageFormats {
* @return The timestamp.
*/
long getTimestamp();
/**
* <code>optional .PersistentPayload metadata = 15;</code>
* @return Whether the metadata field is set.
*/
boolean hasMetadata();
/**
* <code>optional .PersistentPayload metadata = 15;</code>
* @return The metadata.
*/
akka.persistence.serialization.MessageFormats.PersistentPayload getMetadata();
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getMetadataOrBuilder();
}
/**
* Protobuf type {@code PersistentMessage}
@ -272,6 +287,19 @@ public final class MessageFormats {
timestamp_ = input.readSInt64();
break;
}
case 122: {
akka.persistence.serialization.MessageFormats.PersistentPayload.Builder subBuilder = null;
if (((bitField0_ & 0x00000100) != 0)) {
subBuilder = metadata_.toBuilder();
}
metadata_ = input.readMessage(akka.persistence.serialization.MessageFormats.PersistentPayload.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(metadata_);
metadata_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000100;
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
@ -591,6 +619,29 @@ public final class MessageFormats {
return timestamp_;
}
public static final int METADATA_FIELD_NUMBER = 15;
private akka.persistence.serialization.MessageFormats.PersistentPayload metadata_;
/**
* <code>optional .PersistentPayload metadata = 15;</code>
* @return Whether the metadata field is set.
*/
public boolean hasMetadata() {
return ((bitField0_ & 0x00000100) != 0);
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
* @return The metadata.
*/
public akka.persistence.serialization.MessageFormats.PersistentPayload getMetadata() {
return metadata_ == null ? akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_;
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getMetadataOrBuilder() {
return metadata_ == null ? akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_;
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@ -604,6 +655,12 @@ public final class MessageFormats {
return false;
}
}
if (hasMetadata()) {
if (!getMetadata().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
@ -635,6 +692,9 @@ public final class MessageFormats {
if (((bitField0_ & 0x00000080) != 0)) {
output.writeSInt64(14, timestamp_);
}
if (((bitField0_ & 0x00000100) != 0)) {
output.writeMessage(15, getMetadata());
}
unknownFields.writeTo(output);
}
@ -672,6 +732,10 @@ public final class MessageFormats {
size += akka.protobufv3.internal.CodedOutputStream
.computeSInt64Size(14, timestamp_);
}
if (((bitField0_ & 0x00000100) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeMessageSize(15, getMetadata());
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@ -727,6 +791,11 @@ public final class MessageFormats {
if (getTimestamp()
!= other.getTimestamp()) return false;
}
if (hasMetadata() != other.hasMetadata()) return false;
if (hasMetadata()) {
if (!getMetadata()
.equals(other.getMetadata())) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@ -773,6 +842,10 @@ public final class MessageFormats {
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong(
getTimestamp());
}
if (hasMetadata()) {
hash = (37 * hash) + METADATA_FIELD_NUMBER;
hash = (53 * hash) + getMetadata().hashCode();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@ -902,6 +975,7 @@ public final class MessageFormats {
if (akka.protobufv3.internal.GeneratedMessageV3
.alwaysUseFieldBuilders) {
getPayloadFieldBuilder();
getMetadataFieldBuilder();
}
}
@java.lang.Override
@ -927,6 +1001,12 @@ public final class MessageFormats {
bitField0_ = (bitField0_ & ~0x00000040);
timestamp_ = 0L;
bitField0_ = (bitField0_ & ~0x00000080);
if (metadataBuilder_ == null) {
metadata_ = null;
} else {
metadataBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000100);
return this;
}
@ -991,6 +1071,14 @@ public final class MessageFormats {
result.timestamp_ = timestamp_;
to_bitField0_ |= 0x00000080;
}
if (((from_bitField0_ & 0x00000100) != 0)) {
if (metadataBuilder_ == null) {
result.metadata_ = metadata_;
} else {
result.metadata_ = metadataBuilder_.build();
}
to_bitField0_ |= 0x00000100;
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -1072,6 +1160,9 @@ public final class MessageFormats {
if (other.hasTimestamp()) {
setTimestamp(other.getTimestamp());
}
if (other.hasMetadata()) {
mergeMetadata(other.getMetadata());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@ -1084,6 +1175,11 @@ public final class MessageFormats {
return false;
}
}
if (hasMetadata()) {
if (!getMetadata().isInitialized()) {
return false;
}
}
return true;
}
@ -1737,6 +1833,126 @@ public final class MessageFormats {
onChanged();
return this;
}
private akka.persistence.serialization.MessageFormats.PersistentPayload metadata_;
private akka.protobufv3.internal.SingleFieldBuilderV3<
akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> metadataBuilder_;
/**
* <code>optional .PersistentPayload metadata = 15;</code>
* @return Whether the metadata field is set.
*/
public boolean hasMetadata() {
return ((bitField0_ & 0x00000100) != 0);
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
* @return The metadata.
*/
public akka.persistence.serialization.MessageFormats.PersistentPayload getMetadata() {
if (metadataBuilder_ == null) {
return metadata_ == null ? akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_;
} else {
return metadataBuilder_.getMessage();
}
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public Builder setMetadata(akka.persistence.serialization.MessageFormats.PersistentPayload value) {
if (metadataBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
metadata_ = value;
onChanged();
} else {
metadataBuilder_.setMessage(value);
}
bitField0_ |= 0x00000100;
return this;
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public Builder setMetadata(
akka.persistence.serialization.MessageFormats.PersistentPayload.Builder builderForValue) {
if (metadataBuilder_ == null) {
metadata_ = builderForValue.build();
onChanged();
} else {
metadataBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000100;
return this;
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public Builder mergeMetadata(akka.persistence.serialization.MessageFormats.PersistentPayload value) {
if (metadataBuilder_ == null) {
if (((bitField0_ & 0x00000100) != 0) &&
metadata_ != null &&
metadata_ != akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance()) {
metadata_ =
akka.persistence.serialization.MessageFormats.PersistentPayload.newBuilder(metadata_).mergeFrom(value).buildPartial();
} else {
metadata_ = value;
}
onChanged();
} else {
metadataBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000100;
return this;
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public Builder clearMetadata() {
if (metadataBuilder_ == null) {
metadata_ = null;
onChanged();
} else {
metadataBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000100);
return this;
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentPayload.Builder getMetadataBuilder() {
bitField0_ |= 0x00000100;
onChanged();
return getMetadataFieldBuilder().getBuilder();
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getMetadataOrBuilder() {
if (metadataBuilder_ != null) {
return metadataBuilder_.getMessageOrBuilder();
} else {
return metadata_ == null ?
akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_;
}
}
/**
* <code>optional .PersistentPayload metadata = 15;</code>
*/
private akka.protobufv3.internal.SingleFieldBuilderV3<
akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder>
getMetadataFieldBuilder() {
if (metadataBuilder_ == null) {
metadataBuilder_ = new akka.protobufv3.internal.SingleFieldBuilderV3<
akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder>(
getMetadata(),
getParentForChildren(),
isClean());
metadata_ = null;
}
return metadataBuilder_;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@ -7059,27 +7275,28 @@ public final class MessageFormats {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\024MessageFormats.proto\"\275\001\n\021PersistentMes" +
"\n\024MessageFormats.proto\"\343\001\n\021PersistentMes" +
"sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" +
"d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" +
" \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" +
"\010manifest\030\014 \001(\t\022\022\n\nwriterUuid\030\r \001(\t\022\021\n\tt" +
"imestamp\030\016 \001(\022\"S\n\021PersistentPayload\022\024\n\014s" +
"erializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017pa" +
"yloadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007pa" +
"yload\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtLe" +
"astOnceDeliverySnapshot\022\031\n\021currentDelive" +
"ryId\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003(" +
"\01320.AtLeastOnceDeliverySnapshot.Unconfir" +
"medDelivery\032c\n\023UnconfirmedDelivery\022\022\n\nde" +
"liveryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007p" +
"ayload\030\003 \002(\0132\022.PersistentPayload\"\\\n\032Pers" +
"istentStateChangeEvent\022\027\n\017stateIdentifie" +
"r\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\t\022\024\n\014timeoutNanos" +
"\030\003 \001(\003\"h\n\025PersistentFSMSnapshot\022\027\n\017state" +
"Identifier\030\001 \002(\t\022 \n\004data\030\002 \002(\0132\022.Persist" +
"entPayload\022\024\n\014timeoutNanos\030\003 \001(\003B\"\n\036akka" +
".persistence.serializationH\001"
"imestamp\030\016 \001(\022\022$\n\010metadata\030\017 \001(\0132\022.Persi" +
"stentPayload\"S\n\021PersistentPayload\022\024\n\014ser" +
"ializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017payl" +
"oadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007payl" +
"oad\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtLeas" +
"tOnceDeliverySnapshot\022\031\n\021currentDelivery" +
"Id\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003(\0132" +
"0.AtLeastOnceDeliverySnapshot.Unconfirme" +
"dDelivery\032c\n\023UnconfirmedDelivery\022\022\n\ndeli" +
"veryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007pay" +
"load\030\003 \002(\0132\022.PersistentPayload\"\\\n\032Persis" +
"tentStateChangeEvent\022\027\n\017stateIdentifier\030" +
"\001 \002(\t\022\017\n\007timeout\030\002 \001(\t\022\024\n\014timeoutNanos\030\003" +
" \001(\003\"h\n\025PersistentFSMSnapshot\022\027\n\017stateId" +
"entifier\030\001 \002(\t\022 \n\004data\030\002 \002(\0132\022.Persisten" +
"tPayload\022\024\n\014timeoutNanos\030\003 \001(\003B\"\n\036akka.p" +
"ersistence.serializationH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@ -7090,7 +7307,7 @@ public final class MessageFormats {
internal_static_PersistentMessage_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_PersistentMessage_descriptor,
new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", "Timestamp", });
new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", "Timestamp", "Metadata", });
internal_static_PersistentPayload_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_PersistentPayload_fieldAccessorTable = new

View file

@ -21,6 +21,7 @@ message PersistentMessage {
optional string manifest = 12;
optional string writerUuid = 13;
optional sint64 timestamp = 14;
optional PersistentPayload metadata = 15;
}
message PersistentPayload {

View file

@ -243,6 +243,6 @@ private[persistence] final case class PersistentImpl(
}
override def toString: String = {
s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp)"
s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp,$metadata)"
}
}

View file

@ -166,6 +166,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
if (persistent.manifest != PersistentRepr.Undefined) builder.setManifest(persistent.manifest)
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
persistent.metadata match {
case Some(meta) =>
builder.setMetadata(persistentPayloadBuilder(meta.asInstanceOf[AnyRef]))
case _ =>
}
builder.setSequenceNr(persistent.sequenceNr)
// deleted is not used in new records from 2.4
if (persistent.writerUuid != Undefined) builder.setWriterUuid(persistent.writerUuid)
@ -199,7 +205,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
//
private def persistent(persistentMessage: mf.PersistentMessage): PersistentRepr = {
val repr = PersistentRepr(
var repr = PersistentRepr(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
@ -209,7 +215,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
else Actor.noSender,
if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined)
if (persistentMessage.hasTimestamp) repr.withTimestamp(persistentMessage.getTimestamp) else repr
repr = if (persistentMessage.hasTimestamp) repr.withTimestamp(persistentMessage.getTimestamp) else repr
if (persistentMessage.hasMetadata) repr.withMetadata(payload(persistentMessage.getMetadata)) else repr
}
private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = {

View file

@ -0,0 +1,22 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.serialization
import akka.persistence.PersistentRepr
import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
class MessageSerializerSpec extends AkkaSpec {
"Message serializer" should {
"serialize metadata for persistent repr" in {
val pr = PersistentRepr("payload", 1L, "pid1").withMetadata("meta")
val serialization = SerializationExtension(system)
val deserialzied = serialization.deserialize(serialization.serialize(pr).get, classOf[PersistentRepr]).get
deserialzied.metadata shouldEqual Some("meta")
}
}
}