diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala index 5e1ba61e80..7073984f3e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala @@ -15,7 +15,7 @@ import akka.annotation.InternalApi import akka.persistence.typed.PublishedEvent import akka.persistence.typed.ReplicaId -import scala.collection.JavaConverters._ +import akka.util.ccompat.JavaConverters._ /** * Used when sharding Active Active entities in multiple instances of sharding, for example one per DC in a Multi DC diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala index 09ae1fcb8c..d74d88aaf3 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingSettings.scala @@ -11,7 +11,7 @@ import akka.persistence.typed.ReplicaId import scala.collection.immutable import scala.reflect.ClassTag -import scala.collection.JavaConverters._ +import akka.util.ccompat.JavaConverters._ import java.util.{ Set => JSet } import akka.annotation.ApiMayChange diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ActiveActiveShardingExtensionImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ActiveActiveShardingExtensionImpl.scala index 2d8eea6155..9c57b9f960 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ActiveActiveShardingExtensionImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ActiveActiveShardingExtensionImpl.scala @@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory import akka.actor.typed.scaladsl.LoggerOps import akka.cluster.sharding.typed.ActiveActiveShardingDirectReplication -import scala.collection.JavaConverters._ +import akka.util.ccompat.JavaConverters._ /** * INTERNAL API diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 3380126b73..b34fcfa708 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -339,6 +339,14 @@ abstract class JournalSpec(config: Config) _) => payload should be(event) } + + journal ! ReplayMessages(6, 6, 1, Pid, receiverProbe.ref) + receiverProbe.expectMsgPF() { + case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, Some(`meta`))) => + payload should be(event) + } + receiverProbe.expectMsg(RecoverySuccess(6L)) + } } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala index 099e9ed358..f881e1d433 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala @@ -295,7 +295,7 @@ final class ORSet[A] private[akka] ( * Java API */ def getElements(): java.util.Set[A] = { - import scala.collection.JavaConverters._ + import akka.util.ccompat.JavaConverters._ elements.asJava } @@ -324,7 +324,7 @@ final class ORSet[A] private[akka] ( * `elems` must not be empty. */ def addAll(elems: java.util.Set[A]): ORSet.DeltaOp = { - import scala.collection.JavaConverters._ + import akka.util.ccompat.JavaConverters._ addAll(elems.asScala.toSet) } @@ -366,7 +366,7 @@ final class ORSet[A] private[akka] ( * `elems` must not be empty. */ def removeAll(elems: java.util.Set[A]): ORSet.DeltaOp = { - import scala.collection.JavaConverters._ + import akka.util.ccompat.JavaConverters._ removeAll(elems.asScala.toSet) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 1ead8a0af6..5b8d538fab 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -147,15 +147,15 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup case LoadSnapshotResult(sso, toSnr) => var state: S = setup.emptyState - val (seqNr: Long, seenPerReplica: Map[ReplicaId, Long], version: VersionVector) = sso match { + val (seqNr: Long, seenPerReplica, version) = sso match { case Some(SelectedSnapshot(metadata, snapshot)) => state = setup.snapshotAdapter.fromJournal(snapshot) - setup.context.log.debug("Loaded snapshot with metadata {}", metadata) + setup.context.log.debug("Loaded snapshot with metadata [{}]", metadata) metadata.metadata match { case Some(rm: ReplicatedSnapshotMetadata) => (metadata.sequenceNr, rm.seenPerReplica, rm.version) - case _ => (metadata.sequenceNr, Map.empty.withDefaultValue(0L), VersionVector.empty) + case _ => (metadata.sequenceNr, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty) } - case None => (0L, Map.empty.withDefaultValue(0L), VersionVector.empty) + case None => (0L, Map.empty[ReplicaId, Long].withDefaultValue(0L), VersionVector.empty) } setup.context.log.debugN("Snapshot recovered from {} {} {}", seqNr, seenPerReplica, version) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala index 67761d410a..d59b812bc6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/ActiveActiveEventSourcing.scala @@ -13,7 +13,7 @@ import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId import akka.persistence.typed.scaladsl.ActiveActiveContextImpl -import scala.collection.JavaConverters._ +import akka.util.ccompat.JavaConverters._ /** * Provides access to Active Active specific state diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala index eec1a91976..7570fa3b31 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala @@ -8,7 +8,7 @@ import akka.persistence.typed.PersistenceId import akka.persistence.typed.ReplicaId import akka.util.{ OptionVal, WallClock } -import scala.collection.JavaConverters._ +import akka.util.ccompat.JavaConverters._ // FIXME docs trait ActiveActiveContext { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ActiveActiveSerializer.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ActiveActiveSerializer.scala index 28aa020c4e..51633d6eff 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ActiveActiveSerializer.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ActiveActiveSerializer.scala @@ -21,7 +21,7 @@ import akka.remote.serialization.WrappedPayloadSupport import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } import scala.annotation.tailrec -import scala.collection.JavaConverters._ +import akka.util.ccompat.JavaConverters._ import scala.collection.immutable.TreeMap object ActiveActiveSerializer { diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 9cb54077c0..52a61acb07 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -169,6 +169,21 @@ public final class MessageFormats { * @return The timestamp. */ long getTimestamp(); + + /** + * optional .PersistentPayload metadata = 15; + * @return Whether the metadata field is set. + */ + boolean hasMetadata(); + /** + * optional .PersistentPayload metadata = 15; + * @return The metadata. + */ + akka.persistence.serialization.MessageFormats.PersistentPayload getMetadata(); + /** + * optional .PersistentPayload metadata = 15; + */ + akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getMetadataOrBuilder(); } /** * Protobuf type {@code PersistentMessage} @@ -272,6 +287,19 @@ public final class MessageFormats { timestamp_ = input.readSInt64(); break; } + case 122: { + akka.persistence.serialization.MessageFormats.PersistentPayload.Builder subBuilder = null; + if (((bitField0_ & 0x00000100) != 0)) { + subBuilder = metadata_.toBuilder(); + } + metadata_ = input.readMessage(akka.persistence.serialization.MessageFormats.PersistentPayload.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(metadata_); + metadata_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000100; + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -591,6 +619,29 @@ public final class MessageFormats { return timestamp_; } + public static final int METADATA_FIELD_NUMBER = 15; + private akka.persistence.serialization.MessageFormats.PersistentPayload metadata_; + /** + * optional .PersistentPayload metadata = 15; + * @return Whether the metadata field is set. + */ + public boolean hasMetadata() { + return ((bitField0_ & 0x00000100) != 0); + } + /** + * optional .PersistentPayload metadata = 15; + * @return The metadata. + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload getMetadata() { + return metadata_ == null ? akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_; + } + /** + * optional .PersistentPayload metadata = 15; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getMetadataOrBuilder() { + return metadata_ == null ? akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -604,6 +655,12 @@ public final class MessageFormats { return false; } } + if (hasMetadata()) { + if (!getMetadata().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -635,6 +692,9 @@ public final class MessageFormats { if (((bitField0_ & 0x00000080) != 0)) { output.writeSInt64(14, timestamp_); } + if (((bitField0_ & 0x00000100) != 0)) { + output.writeMessage(15, getMetadata()); + } unknownFields.writeTo(output); } @@ -672,6 +732,10 @@ public final class MessageFormats { size += akka.protobufv3.internal.CodedOutputStream .computeSInt64Size(14, timestamp_); } + if (((bitField0_ & 0x00000100) != 0)) { + size += akka.protobufv3.internal.CodedOutputStream + .computeMessageSize(15, getMetadata()); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -727,6 +791,11 @@ public final class MessageFormats { if (getTimestamp() != other.getTimestamp()) return false; } + if (hasMetadata() != other.hasMetadata()) return false; + if (hasMetadata()) { + if (!getMetadata() + .equals(other.getMetadata())) return false; + } if (!unknownFields.equals(other.unknownFields)) return false; return true; } @@ -773,6 +842,10 @@ public final class MessageFormats { hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong( getTimestamp()); } + if (hasMetadata()) { + hash = (37 * hash) + METADATA_FIELD_NUMBER; + hash = (53 * hash) + getMetadata().hashCode(); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -902,6 +975,7 @@ public final class MessageFormats { if (akka.protobufv3.internal.GeneratedMessageV3 .alwaysUseFieldBuilders) { getPayloadFieldBuilder(); + getMetadataFieldBuilder(); } } @java.lang.Override @@ -927,6 +1001,12 @@ public final class MessageFormats { bitField0_ = (bitField0_ & ~0x00000040); timestamp_ = 0L; bitField0_ = (bitField0_ & ~0x00000080); + if (metadataBuilder_ == null) { + metadata_ = null; + } else { + metadataBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -991,6 +1071,14 @@ public final class MessageFormats { result.timestamp_ = timestamp_; to_bitField0_ |= 0x00000080; } + if (((from_bitField0_ & 0x00000100) != 0)) { + if (metadataBuilder_ == null) { + result.metadata_ = metadata_; + } else { + result.metadata_ = metadataBuilder_.build(); + } + to_bitField0_ |= 0x00000100; + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1072,6 +1160,9 @@ public final class MessageFormats { if (other.hasTimestamp()) { setTimestamp(other.getTimestamp()); } + if (other.hasMetadata()) { + mergeMetadata(other.getMetadata()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -1084,6 +1175,11 @@ public final class MessageFormats { return false; } } + if (hasMetadata()) { + if (!getMetadata().isInitialized()) { + return false; + } + } return true; } @@ -1737,6 +1833,126 @@ public final class MessageFormats { onChanged(); return this; } + + private akka.persistence.serialization.MessageFormats.PersistentPayload metadata_; + private akka.protobufv3.internal.SingleFieldBuilderV3< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> metadataBuilder_; + /** + * optional .PersistentPayload metadata = 15; + * @return Whether the metadata field is set. + */ + public boolean hasMetadata() { + return ((bitField0_ & 0x00000100) != 0); + } + /** + * optional .PersistentPayload metadata = 15; + * @return The metadata. + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload getMetadata() { + if (metadataBuilder_ == null) { + return metadata_ == null ? akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_; + } else { + return metadataBuilder_.getMessage(); + } + } + /** + * optional .PersistentPayload metadata = 15; + */ + public Builder setMetadata(akka.persistence.serialization.MessageFormats.PersistentPayload value) { + if (metadataBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + metadata_ = value; + onChanged(); + } else { + metadataBuilder_.setMessage(value); + } + bitField0_ |= 0x00000100; + return this; + } + /** + * optional .PersistentPayload metadata = 15; + */ + public Builder setMetadata( + akka.persistence.serialization.MessageFormats.PersistentPayload.Builder builderForValue) { + if (metadataBuilder_ == null) { + metadata_ = builderForValue.build(); + onChanged(); + } else { + metadataBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000100; + return this; + } + /** + * optional .PersistentPayload metadata = 15; + */ + public Builder mergeMetadata(akka.persistence.serialization.MessageFormats.PersistentPayload value) { + if (metadataBuilder_ == null) { + if (((bitField0_ & 0x00000100) != 0) && + metadata_ != null && + metadata_ != akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance()) { + metadata_ = + akka.persistence.serialization.MessageFormats.PersistentPayload.newBuilder(metadata_).mergeFrom(value).buildPartial(); + } else { + metadata_ = value; + } + onChanged(); + } else { + metadataBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000100; + return this; + } + /** + * optional .PersistentPayload metadata = 15; + */ + public Builder clearMetadata() { + if (metadataBuilder_ == null) { + metadata_ = null; + onChanged(); + } else { + metadataBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000100); + return this; + } + /** + * optional .PersistentPayload metadata = 15; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload.Builder getMetadataBuilder() { + bitField0_ |= 0x00000100; + onChanged(); + return getMetadataFieldBuilder().getBuilder(); + } + /** + * optional .PersistentPayload metadata = 15; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getMetadataOrBuilder() { + if (metadataBuilder_ != null) { + return metadataBuilder_.getMessageOrBuilder(); + } else { + return metadata_ == null ? + akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance() : metadata_; + } + } + /** + * optional .PersistentPayload metadata = 15; + */ + private akka.protobufv3.internal.SingleFieldBuilderV3< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> + getMetadataFieldBuilder() { + if (metadataBuilder_ == null) { + metadataBuilder_ = new akka.protobufv3.internal.SingleFieldBuilderV3< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder>( + getMetadata(), + getParentForChildren(), + isClean()); + metadata_ = null; + } + return metadataBuilder_; + } @java.lang.Override public final Builder setUnknownFields( final akka.protobufv3.internal.UnknownFieldSet unknownFields) { @@ -7059,27 +7275,28 @@ public final class MessageFormats { descriptor; static { java.lang.String[] descriptorData = { - "\n\024MessageFormats.proto\"\275\001\n\021PersistentMes" + + "\n\024MessageFormats.proto\"\343\001\n\021PersistentMes" + "sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" + "d\022\022\n\nsequenceNr\030\002 \001(\003\022\025\n\rpersistenceId\030\003" + " \001(\t\022\017\n\007deleted\030\004 \001(\010\022\016\n\006sender\030\013 \001(\t\022\020\n" + "\010manifest\030\014 \001(\t\022\022\n\nwriterUuid\030\r \001(\t\022\021\n\tt" + - "imestamp\030\016 \001(\022\"S\n\021PersistentPayload\022\024\n\014s" + - "erializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017pa" + - "yloadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007pa" + - "yload\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtLe" + - "astOnceDeliverySnapshot\022\031\n\021currentDelive" + - "ryId\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003(" + - "\01320.AtLeastOnceDeliverySnapshot.Unconfir" + - "medDelivery\032c\n\023UnconfirmedDelivery\022\022\n\nde" + - "liveryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007p" + - "ayload\030\003 \002(\0132\022.PersistentPayload\"\\\n\032Pers" + - "istentStateChangeEvent\022\027\n\017stateIdentifie" + - "r\030\001 \002(\t\022\017\n\007timeout\030\002 \001(\t\022\024\n\014timeoutNanos" + - "\030\003 \001(\003\"h\n\025PersistentFSMSnapshot\022\027\n\017state" + - "Identifier\030\001 \002(\t\022 \n\004data\030\002 \002(\0132\022.Persist" + - "entPayload\022\024\n\014timeoutNanos\030\003 \001(\003B\"\n\036akka" + - ".persistence.serializationH\001" + "imestamp\030\016 \001(\022\022$\n\010metadata\030\017 \001(\0132\022.Persi" + + "stentPayload\"S\n\021PersistentPayload\022\024\n\014ser" + + "ializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017payl" + + "oadManifest\030\003 \001(\014\"2\n\013AtomicWrite\022#\n\007payl" + + "oad\030\001 \003(\0132\022.PersistentMessage\"\356\001\n\033AtLeas" + + "tOnceDeliverySnapshot\022\031\n\021currentDelivery" + + "Id\030\001 \002(\003\022O\n\025unconfirmedDeliveries\030\002 \003(\0132" + + "0.AtLeastOnceDeliverySnapshot.Unconfirme" + + "dDelivery\032c\n\023UnconfirmedDelivery\022\022\n\ndeli" + + "veryId\030\001 \002(\003\022\023\n\013destination\030\002 \002(\t\022#\n\007pay" + + "load\030\003 \002(\0132\022.PersistentPayload\"\\\n\032Persis" + + "tentStateChangeEvent\022\027\n\017stateIdentifier\030" + + "\001 \002(\t\022\017\n\007timeout\030\002 \001(\t\022\024\n\014timeoutNanos\030\003" + + " \001(\003\"h\n\025PersistentFSMSnapshot\022\027\n\017stateId" + + "entifier\030\001 \002(\t\022 \n\004data\030\002 \002(\0132\022.Persisten" + + "tPayload\022\024\n\014timeoutNanos\030\003 \001(\003B\"\n\036akka.p" + + "ersistence.serializationH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -7090,7 +7307,7 @@ public final class MessageFormats { internal_static_PersistentMessage_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_PersistentMessage_descriptor, - new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", "Timestamp", }); + new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Sender", "Manifest", "WriterUuid", "Timestamp", "Metadata", }); internal_static_PersistentPayload_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_PersistentPayload_fieldAccessorTable = new diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index e5954743f1..3bdc13b5f1 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -21,6 +21,7 @@ message PersistentMessage { optional string manifest = 12; optional string writerUuid = 13; optional sint64 timestamp = 14; + optional PersistentPayload metadata = 15; } message PersistentPayload { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index b7285058f9..159b3ae5d3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -243,6 +243,6 @@ private[persistence] final case class PersistentImpl( } override def toString: String = { - s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp)" + s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp,$metadata)" } } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 8916b0bb7a..11a5540fc4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -166,6 +166,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer if (persistent.manifest != PersistentRepr.Undefined) builder.setManifest(persistent.manifest) builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef])) + persistent.metadata match { + case Some(meta) => + builder.setMetadata(persistentPayloadBuilder(meta.asInstanceOf[AnyRef])) + case _ => + } + builder.setSequenceNr(persistent.sequenceNr) // deleted is not used in new records from 2.4 if (persistent.writerUuid != Undefined) builder.setWriterUuid(persistent.writerUuid) @@ -199,7 +205,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer // private def persistent(persistentMessage: mf.PersistentMessage): PersistentRepr = { - val repr = PersistentRepr( + var repr = PersistentRepr( payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, @@ -209,7 +215,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer else Actor.noSender, if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined) - if (persistentMessage.hasTimestamp) repr.withTimestamp(persistentMessage.getTimestamp) else repr + repr = if (persistentMessage.hasTimestamp) repr.withTimestamp(persistentMessage.getTimestamp) else repr + if (persistentMessage.hasMetadata) repr.withMetadata(payload(persistentMessage.getMetadata)) else repr } private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = { diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/MessageSerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/MessageSerializerSpec.scala new file mode 100644 index 0000000000..added09810 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/MessageSerializerSpec.scala @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.serialization + +import akka.persistence.PersistentRepr +import akka.serialization.SerializationExtension +import akka.testkit.AkkaSpec + +class MessageSerializerSpec extends AkkaSpec { + + "Message serializer" should { + "serialize metadata for persistent repr" in { + val pr = PersistentRepr("payload", 1L, "pid1").withMetadata("meta") + val serialization = SerializationExtension(system) + val deserialzied = serialization.deserialize(serialization.serialize(pr).get, classOf[PersistentRepr]).get + deserialzied.metadata shouldEqual Some("meta") + } + } + +}