Optimized serializer for ORSet[ActorRef], #23703

* ORSet[ActorRef] is used by Akka Typed receptionist
* and similar for GSet[ActorRef]

ORSetSerializationBenchmark
This commit is contained in:
Patrik Nordwall 2018-06-05 14:15:46 +02:00
parent a05170c419
commit f754705c9c
7 changed files with 742 additions and 84 deletions

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.ddata
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.cluster.Cluster
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations.Benchmark
import org.openjdk.jmh.annotations.BenchmarkMode
import org.openjdk.jmh.annotations.Fork
import org.openjdk.jmh.annotations.Measurement
import org.openjdk.jmh.annotations.Mode
import org.openjdk.jmh.annotations.OutputTimeUnit
import org.openjdk.jmh.annotations.State
import org.openjdk.jmh.annotations.TearDown
import org.openjdk.jmh.annotations.Warmup
import org.openjdk.jmh.annotations.{ Scope JmhScope }
@Fork(2)
@State(JmhScope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@Warmup(iterations = 4)
@Measurement(iterations = 5)
@OutputTimeUnit(TimeUnit.SECONDS)
class ORSetSerializationBenchmark {
private val config = ConfigFactory.parseString(
"""
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0
akka.actor {
serialize-messages = off
allow-java-serialization = off
}
"""
)
private val system1 = ActorSystem("ORSetSerializationBenchmark", config)
private val system2 = ActorSystem("ORSetSerializationBenchmark", config)
private val ref1 = (1 to 10).map(n system1.actorOf(Props.empty, s"ref1-$n"))
private val ref2 = (1 to 10).map(n system2.actorOf(Props.empty, s"ref2-$n"))
private val orSet = {
val set1 = ref1.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) acc.add(Cluster(system1), r) }
val set2 = ref2.foldLeft(ORSet.empty[ActorRef]) { case (acc, r) acc.add(Cluster(system2), r) }
set1.merge(set2)
}
private val serialization = SerializationExtension(system1)
private val serializerId = serialization.findSerializerFor(orSet).identifier
private val manifest = Serializers.manifestFor(serialization.findSerializerFor(orSet), orSet)
@TearDown
def shutdown(): Unit = {
Await.result(system1.terminate(), 5.seconds)
Await.result(system2.terminate(), 5.seconds)
}
@Benchmark
def serializeRoundtrip: ORSet[ActorRef] = {
val bytes = serialization.serialize(orSet).get
serialization.deserialize(bytes, serializerId, manifest).get.asInstanceOf[ORSet[ActorRef]]
}
}

View file

@ -278,6 +278,42 @@ public final class ReplicatedDataMessages {
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getOtherElementsOrBuilder(
int index);
// repeated string actorRefElements = 5;
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
java.util.List<java.lang.String>
getActorRefElementsList();
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
int getActorRefElementsCount();
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
java.lang.String getActorRefElements(int index);
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
akka.protobuf.ByteString
getActorRefElementsBytes(int index);
}
/**
* Protobuf type {@code akka.cluster.ddata.GSet}
@ -388,6 +424,14 @@ public final class ReplicatedDataMessages {
otherElements_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.PARSER, extensionRegistry));
break;
}
case 42: {
if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
actorRefElements_ = new akka.protobuf.LazyStringArrayList();
mutable_bitField0_ |= 0x00000010;
}
actorRefElements_.add(input.readBytes());
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -408,6 +452,9 @@ public final class ReplicatedDataMessages {
if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
otherElements_ = java.util.Collections.unmodifiableList(otherElements_);
}
if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
actorRefElements_ = new akka.protobuf.UnmodifiableLazyStringList(actorRefElements_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@ -553,11 +600,58 @@ public final class ReplicatedDataMessages {
return otherElements_.get(index);
}
// repeated string actorRefElements = 5;
public static final int ACTORREFELEMENTS_FIELD_NUMBER = 5;
private akka.protobuf.LazyStringList actorRefElements_;
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.util.List<java.lang.String>
getActorRefElementsList() {
return actorRefElements_;
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public int getActorRefElementsCount() {
return actorRefElements_.size();
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.lang.String getActorRefElements(int index) {
return actorRefElements_.get(index);
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public akka.protobuf.ByteString
getActorRefElementsBytes(int index) {
return actorRefElements_.getByteString(index);
}
private void initFields() {
stringElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
intElements_ = java.util.Collections.emptyList();
longElements_ = java.util.Collections.emptyList();
otherElements_ = java.util.Collections.emptyList();
actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -597,6 +691,9 @@ public final class ReplicatedDataMessages {
for (int i = 0; i < otherElements_.size(); i++) {
output.writeMessage(4, otherElements_.get(i));
}
for (int i = 0; i < actorRefElements_.size(); i++) {
output.writeBytes(5, actorRefElements_.getByteString(i));
}
getUnknownFields().writeTo(output);
}
@ -647,6 +744,15 @@ public final class ReplicatedDataMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(4, otherElements_.get(i));
}
{
int dataSize = 0;
for (int i = 0; i < actorRefElements_.size(); i++) {
dataSize += akka.protobuf.CodedOutputStream
.computeBytesSizeNoTag(actorRefElements_.getByteString(i));
}
size += dataSize;
size += 1 * getActorRefElementsList().size();
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -776,6 +882,8 @@ public final class ReplicatedDataMessages {
} else {
otherElementsBuilder_.clear();
}
actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@ -828,6 +936,12 @@ public final class ReplicatedDataMessages {
} else {
result.otherElements_ = otherElementsBuilder_.build();
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
actorRefElements_ = new akka.protobuf.UnmodifiableLazyStringList(
actorRefElements_);
bitField0_ = (bitField0_ & ~0x00000010);
}
result.actorRefElements_ = actorRefElements_;
onBuilt();
return result;
}
@ -899,6 +1013,16 @@ public final class ReplicatedDataMessages {
}
}
}
if (!other.actorRefElements_.isEmpty()) {
if (actorRefElements_.isEmpty()) {
actorRefElements_ = other.actorRefElements_;
bitField0_ = (bitField0_ & ~0x00000010);
} else {
ensureActorRefElementsIsMutable();
actorRefElements_.addAll(other.actorRefElements_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -1397,6 +1521,135 @@ public final class ReplicatedDataMessages {
return otherElementsBuilder_;
}
// repeated string actorRefElements = 5;
private akka.protobuf.LazyStringList actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
private void ensureActorRefElementsIsMutable() {
if (!((bitField0_ & 0x00000010) == 0x00000010)) {
actorRefElements_ = new akka.protobuf.LazyStringArrayList(actorRefElements_);
bitField0_ |= 0x00000010;
}
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.util.List<java.lang.String>
getActorRefElementsList() {
return java.util.Collections.unmodifiableList(actorRefElements_);
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public int getActorRefElementsCount() {
return actorRefElements_.size();
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.lang.String getActorRefElements(int index) {
return actorRefElements_.get(index);
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public akka.protobuf.ByteString
getActorRefElementsBytes(int index) {
return actorRefElements_.getByteString(index);
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder setActorRefElements(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureActorRefElementsIsMutable();
actorRefElements_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder addActorRefElements(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureActorRefElementsIsMutable();
actorRefElements_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder addAllActorRefElements(
java.lang.Iterable<java.lang.String> values) {
ensureActorRefElementsIsMutable();
super.addAll(values, actorRefElements_);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder clearActorRefElements() {
actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000010);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 5;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder addActorRefElementsBytes(
akka.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureActorRefElementsIsMutable();
actorRefElements_.add(value);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.GSet)
}
@ -1522,6 +1775,42 @@ public final class ReplicatedDataMessages {
*/
akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getOtherElementsOrBuilder(
int index);
// repeated string actorRefElements = 7;
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
java.util.List<java.lang.String>
getActorRefElementsList();
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
int getActorRefElementsCount();
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
java.lang.String getActorRefElements(int index);
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
akka.protobuf.ByteString
getActorRefElementsBytes(int index);
}
/**
* Protobuf type {@code akka.cluster.ddata.ORSet}
@ -1653,6 +1942,14 @@ public final class ReplicatedDataMessages {
otherElements_.add(input.readMessage(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.PARSER, extensionRegistry));
break;
}
case 58: {
if (!((mutable_bitField0_ & 0x00000040) == 0x00000040)) {
actorRefElements_ = new akka.protobuf.LazyStringArrayList();
mutable_bitField0_ |= 0x00000040;
}
actorRefElements_.add(input.readBytes());
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -1676,6 +1973,9 @@ public final class ReplicatedDataMessages {
if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
otherElements_ = java.util.Collections.unmodifiableList(otherElements_);
}
if (((mutable_bitField0_ & 0x00000040) == 0x00000040)) {
actorRefElements_ = new akka.protobuf.UnmodifiableLazyStringList(actorRefElements_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@ -1880,6 +2180,52 @@ public final class ReplicatedDataMessages {
return otherElements_.get(index);
}
// repeated string actorRefElements = 7;
public static final int ACTORREFELEMENTS_FIELD_NUMBER = 7;
private akka.protobuf.LazyStringList actorRefElements_;
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.util.List<java.lang.String>
getActorRefElementsList() {
return actorRefElements_;
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public int getActorRefElementsCount() {
return actorRefElements_.size();
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.lang.String getActorRefElements(int index) {
return actorRefElements_.get(index);
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public akka.protobuf.ByteString
getActorRefElementsBytes(int index) {
return actorRefElements_.getByteString(index);
}
private void initFields() {
vvector_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.VersionVector.getDefaultInstance();
dots_ = java.util.Collections.emptyList();
@ -1887,6 +2233,7 @@ public final class ReplicatedDataMessages {
intElements_ = java.util.Collections.emptyList();
longElements_ = java.util.Collections.emptyList();
otherElements_ = java.util.Collections.emptyList();
actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -1946,6 +2293,9 @@ public final class ReplicatedDataMessages {
for (int i = 0; i < otherElements_.size(); i++) {
output.writeMessage(6, otherElements_.get(i));
}
for (int i = 0; i < actorRefElements_.size(); i++) {
output.writeBytes(7, actorRefElements_.getByteString(i));
}
getUnknownFields().writeTo(output);
}
@ -2004,6 +2354,15 @@ public final class ReplicatedDataMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(6, otherElements_.get(i));
}
{
int dataSize = 0;
for (int i = 0; i < actorRefElements_.size(); i++) {
dataSize += akka.protobuf.CodedOutputStream
.computeBytesSizeNoTag(actorRefElements_.getByteString(i));
}
size += dataSize;
size += 1 * getActorRefElementsList().size();
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -2147,6 +2506,8 @@ public final class ReplicatedDataMessages {
} else {
otherElementsBuilder_.clear();
}
actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@ -2217,6 +2578,12 @@ public final class ReplicatedDataMessages {
} else {
result.otherElements_ = otherElementsBuilder_.build();
}
if (((bitField0_ & 0x00000040) == 0x00000040)) {
actorRefElements_ = new akka.protobuf.UnmodifiableLazyStringList(
actorRefElements_);
bitField0_ = (bitField0_ & ~0x00000040);
}
result.actorRefElements_ = actorRefElements_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -2318,6 +2685,16 @@ public final class ReplicatedDataMessages {
}
}
}
if (!other.actorRefElements_.isEmpty()) {
if (actorRefElements_.isEmpty()) {
actorRefElements_ = other.actorRefElements_;
bitField0_ = (bitField0_ & ~0x00000040);
} else {
ensureActorRefElementsIsMutable();
actorRefElements_.addAll(other.actorRefElements_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -3187,6 +3564,135 @@ public final class ReplicatedDataMessages {
return otherElementsBuilder_;
}
// repeated string actorRefElements = 7;
private akka.protobuf.LazyStringList actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
private void ensureActorRefElementsIsMutable() {
if (!((bitField0_ & 0x00000040) == 0x00000040)) {
actorRefElements_ = new akka.protobuf.LazyStringArrayList(actorRefElements_);
bitField0_ |= 0x00000040;
}
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.util.List<java.lang.String>
getActorRefElementsList() {
return java.util.Collections.unmodifiableList(actorRefElements_);
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public int getActorRefElementsCount() {
return actorRefElements_.size();
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public java.lang.String getActorRefElements(int index) {
return actorRefElements_.get(index);
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public akka.protobuf.ByteString
getActorRefElementsBytes(int index) {
return actorRefElements_.getByteString(index);
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder setActorRefElements(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureActorRefElementsIsMutable();
actorRefElements_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder addActorRefElements(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureActorRefElementsIsMutable();
actorRefElements_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder addAllActorRefElements(
java.lang.Iterable<java.lang.String> values) {
ensureActorRefElementsIsMutable();
super.addAll(values, actorRefElements_);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder clearActorRefElements() {
actorRefElements_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000040);
onChanged();
return this;
}
/**
* <code>repeated string actorRefElements = 7;</code>
*
* <pre>
* added in Akka 2.5.14
* </pre>
*/
public Builder addActorRefElementsBytes(
akka.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureActorRefElementsIsMutable();
actorRefElements_.add(value);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.ddata.ORSet)
}
@ -18531,75 +19037,76 @@ public final class ReplicatedDataMessages {
static {
java.lang.String[] descriptorData = {
"\n\034ReplicatedDataMessages.proto\022\022akka.clu" +
"ster.ddata\032\030ReplicatorMessages.proto\"\212\001\n" +
"ster.ddata\032\030ReplicatorMessages.proto\"\244\001\n" +
"\004GSet\022\026\n\016stringElements\030\001 \003(\t\022\027\n\013intElem" +
"ents\030\002 \003(\021B\002\020\001\022\030\n\014longElements\030\003 \003(\022B\002\020\001" +
"\0227\n\rotherElements\030\004 \003(\0132 .akka.cluster.d" +
"data.OtherMessage\"\360\001\n\005ORSet\0222\n\007vvector\030\001" +
" \002(\0132!.akka.cluster.ddata.VersionVector\022" +
"/\n\004dots\030\002 \003(\0132!.akka.cluster.ddata.Versi" +
"onVector\022\026\n\016stringElements\030\003 \003(\t\022\027\n\013intE" +
"lements\030\004 \003(\021B\002\020\001\022\030\n\014longElements\030\005 \003(\022B",
"\002\020\001\0227\n\rotherElements\030\006 \003(\0132 .akka.cluste" +
"r.ddata.OtherMessage\"\272\001\n\017ORSetDeltaGroup" +
"\022:\n\007entries\030\001 \003(\0132).akka.cluster.ddata.O" +
"RSetDeltaGroup.Entry\032k\n\005Entry\0223\n\toperati" +
"on\030\001 \002(\0162 .akka.cluster.ddata.ORSetDelta" +
"Op\022-\n\nunderlying\030\002 \002(\0132\031.akka.cluster.dd" +
"ata.ORSet\"\027\n\004Flag\022\017\n\007enabled\030\001 \002(\010\"\202\001\n\013L" +
"WWRegister\022\021\n\ttimestamp\030\001 \002(\022\022/\n\004node\030\002 " +
"\002(\0132!.akka.cluster.ddata.UniqueAddress\022/" +
"\n\005state\030\003 \002(\0132 .akka.cluster.ddata.Other",
"Message\"\210\001\n\010GCounter\0223\n\007entries\030\001 \003(\0132\"." +
"akka.cluster.ddata.GCounter.Entry\032G\n\005Ent" +
"ry\022/\n\004node\030\001 \002(\0132!.akka.cluster.ddata.Un" +
"iqueAddress\022\r\n\005value\030\002 \002(\014\"o\n\tPNCounter\022" +
"0\n\nincrements\030\001 \002(\0132\034.akka.cluster.ddata" +
".GCounter\0220\n\ndecrements\030\002 \002(\0132\034.akka.clu" +
"ster.ddata.GCounter\"\205\002\n\005ORMap\022\'\n\004keys\030\001 " +
"\002(\0132\031.akka.cluster.ddata.ORSet\0220\n\007entrie" +
"s\030\002 \003(\0132\037.akka.cluster.ddata.ORMap.Entry" +
"\032\240\001\n\005Entry\022\021\n\tstringKey\030\001 \001(\t\022/\n\005value\030\002",
" \002(\0132 .akka.cluster.ddata.OtherMessage\022\016" +
"\n\006intKey\030\003 \001(\021\022\017\n\007longKey\030\004 \001(\022\0222\n\010other" +
"Key\030\005 \001(\0132 .akka.cluster.ddata.OtherMess" +
"age\"\263\003\n\017ORMapDeltaGroup\022:\n\007entries\030\001 \003(\013" +
"2).akka.cluster.ddata.ORMapDeltaGroup.En" +
"try\032\243\001\n\010MapEntry\022\021\n\tstringKey\030\001 \001(\t\022/\n\005v" +
"alue\030\002 \001(\0132 .akka.cluster.ddata.OtherMes" +
"sage\022\016\n\006intKey\030\003 \001(\021\022\017\n\007longKey\030\004 \001(\022\0222\n" +
"\010otherKey\030\005 \001(\0132 .akka.cluster.ddata.Oth" +
"erMessage\032\275\001\n\005Entry\0223\n\toperation\030\001 \002(\0162 ",
".akka.cluster.ddata.ORMapDeltaOp\022-\n\nunde" +
"rlying\030\002 \002(\0132\031.akka.cluster.ddata.ORSet\022" +
"\017\n\007zeroTag\030\003 \002(\021\022?\n\tentryData\030\004 \003(\0132,.ak" +
"ka.cluster.ddata.ORMapDeltaGroup.MapEntr" +
"y\"\206\002\n\006LWWMap\022\'\n\004keys\030\001 \002(\0132\031.akka.cluste" +
"r.ddata.ORSet\0221\n\007entries\030\002 \003(\0132 .akka.cl" +
"uster.ddata.LWWMap.Entry\032\237\001\n\005Entry\022\021\n\tst" +
"ringKey\030\001 \001(\t\022.\n\005value\030\002 \002(\0132\037.akka.clus" +
"ter.ddata.LWWRegister\022\016\n\006intKey\030\003 \001(\021\022\017\n" +
"\007longKey\030\004 \001(\022\0222\n\010otherKey\030\005 \001(\0132 .akka.",
"cluster.ddata.OtherMessage\"\220\002\n\014PNCounter" +
"Map\022\'\n\004keys\030\001 \002(\0132\031.akka.cluster.ddata.O" +
"RSet\0227\n\007entries\030\002 \003(\0132&.akka.cluster.dda" +
"ta.PNCounterMap.Entry\032\235\001\n\005Entry\022\021\n\tstrin" +
"gKey\030\001 \001(\t\022,\n\005value\030\002 \002(\0132\035.akka.cluster" +
".ddata.PNCounter\022\016\n\006intKey\030\003 \001(\021\022\017\n\007long" +
"Key\030\004 \001(\022\0222\n\010otherKey\030\005 \001(\0132 .akka.clust" +
"er.ddata.OtherMessage\"\241\002\n\nORMultiMap\022\'\n\004" +
"keys\030\001 \002(\0132\031.akka.cluster.ddata.ORSet\0225\n" +
"\007entries\030\002 \003(\0132$.akka.cluster.ddata.ORMu",
"ltiMap.Entry\022\027\n\017withValueDeltas\030\003 \001(\010\032\231\001" +
"\n\005Entry\022\021\n\tstringKey\030\001 \001(\t\022(\n\005value\030\002 \002(" +
"\0132\031.akka.cluster.ddata.ORSet\022\016\n\006intKey\030\003" +
"data.OtherMessage\022\030\n\020actorRefElements\030\005 " +
"\003(\t\"\212\002\n\005ORSet\0222\n\007vvector\030\001 \002(\0132!.akka.cl" +
"uster.ddata.VersionVector\022/\n\004dots\030\002 \003(\0132" +
"!.akka.cluster.ddata.VersionVector\022\026\n\016st" +
"ringElements\030\003 \003(\t\022\027\n\013intElements\030\004 \003(\021B",
"\002\020\001\022\030\n\014longElements\030\005 \003(\022B\002\020\001\0227\n\rotherEl" +
"ements\030\006 \003(\0132 .akka.cluster.ddata.OtherM" +
"essage\022\030\n\020actorRefElements\030\007 \003(\t\"\272\001\n\017ORS" +
"etDeltaGroup\022:\n\007entries\030\001 \003(\0132).akka.clu" +
"ster.ddata.ORSetDeltaGroup.Entry\032k\n\005Entr" +
"y\0223\n\toperation\030\001 \002(\0162 .akka.cluster.ddat" +
"a.ORSetDeltaOp\022-\n\nunderlying\030\002 \002(\0132\031.akk" +
"a.cluster.ddata.ORSet\"\027\n\004Flag\022\017\n\007enabled" +
"\030\001 \002(\010\"\202\001\n\013LWWRegister\022\021\n\ttimestamp\030\001 \002(" +
"\022\022/\n\004node\030\002 \002(\0132!.akka.cluster.ddata.Uni",
"queAddress\022/\n\005state\030\003 \002(\0132 .akka.cluster" +
".ddata.OtherMessage\"\210\001\n\010GCounter\0223\n\007entr" +
"ies\030\001 \003(\0132\".akka.cluster.ddata.GCounter." +
"Entry\032G\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka.clus" +
"ter.ddata.UniqueAddress\022\r\n\005value\030\002 \002(\014\"o" +
"\n\tPNCounter\0220\n\nincrements\030\001 \002(\0132\034.akka.c" +
"luster.ddata.GCounter\0220\n\ndecrements\030\002 \002(" +
"\0132\034.akka.cluster.ddata.GCounter\"\205\002\n\005ORMa" +
"p\022\'\n\004keys\030\001 \002(\0132\031.akka.cluster.ddata.ORS" +
"et\0220\n\007entries\030\002 \003(\0132\037.akka.cluster.ddata",
".ORMap.Entry\032\240\001\n\005Entry\022\021\n\tstringKey\030\001 \001(" +
"\t\022/\n\005value\030\002 \002(\0132 .akka.cluster.ddata.Ot" +
"herMessage\022\016\n\006intKey\030\003 \001(\021\022\017\n\007longKey\030\004 " +
"\001(\022\0222\n\010otherKey\030\005 \001(\0132 .akka.cluster.dda" +
"ta.OtherMessage\"\263\003\n\017ORMapDeltaGroup\022:\n\007e" +
"ntries\030\001 \003(\0132).akka.cluster.ddata.ORMapD" +
"eltaGroup.Entry\032\243\001\n\010MapEntry\022\021\n\tstringKe" +
"y\030\001 \001(\t\022/\n\005value\030\002 \001(\0132 .akka.cluster.dd" +
"ata.OtherMessage\022\016\n\006intKey\030\003 \001(\021\022\017\n\007long" +
"Key\030\004 \001(\022\0222\n\010otherKey\030\005 \001(\0132 .akka.clust",
"er.ddata.OtherMessage\032\275\001\n\005Entry\0223\n\topera" +
"tion\030\001 \002(\0162 .akka.cluster.ddata.ORMapDel" +
"taOp\022-\n\nunderlying\030\002 \002(\0132\031.akka.cluster." +
"ddata.ORSet\022\017\n\007zeroTag\030\003 \002(\021\022?\n\tentryDat" +
"a\030\004 \003(\0132,.akka.cluster.ddata.ORMapDeltaG" +
"roup.MapEntry\"\206\002\n\006LWWMap\022\'\n\004keys\030\001 \002(\0132\031" +
".akka.cluster.ddata.ORSet\0221\n\007entries\030\002 \003" +
"(\0132 .akka.cluster.ddata.LWWMap.Entry\032\237\001\n" +
"\005Entry\022\021\n\tstringKey\030\001 \001(\t\022.\n\005value\030\002 \002(\013" +
"2\037.akka.cluster.ddata.LWWRegister\022\016\n\006int",
"Key\030\003 \001(\021\022\017\n\007longKey\030\004 \001(\022\0222\n\010otherKey\030\005" +
" \001(\0132 .akka.cluster.ddata.OtherMessage\"\220" +
"\002\n\014PNCounterMap\022\'\n\004keys\030\001 \002(\0132\031.akka.clu" +
"ster.ddata.ORSet\0227\n\007entries\030\002 \003(\0132&.akka" +
".cluster.ddata.PNCounterMap.Entry\032\235\001\n\005En" +
"try\022\021\n\tstringKey\030\001 \001(\t\022,\n\005value\030\002 \002(\0132\035." +
"akka.cluster.ddata.PNCounter\022\016\n\006intKey\030\003" +
" \001(\021\022\017\n\007longKey\030\004 \001(\022\0222\n\010otherKey\030\005 \001(\0132" +
" .akka.cluster.ddata.OtherMessage*-\n\014ORS" +
"etDeltaOp\022\007\n\003Add\020\000\022\n\n\006Remove\020\001\022\010\n\004Full\020\002" +
"*R\n\014ORMapDeltaOp\022\014\n\010ORMapPut\020\000\022\017\n\013ORMapR" +
"emove\020\001\022\022\n\016ORMapRemoveKey\020\002\022\017\n\013ORMapUpda" +
"te\020\003B#\n\037akka.cluster.ddata.protobuf.msgH" +
"\001"
" .akka.cluster.ddata.OtherMessage\"\241\002\n\nOR" +
"MultiMap\022\'\n\004keys\030\001 \002(\0132\031.akka.cluster.dd",
"ata.ORSet\0225\n\007entries\030\002 \003(\0132$.akka.cluste" +
"r.ddata.ORMultiMap.Entry\022\027\n\017withValueDel" +
"tas\030\003 \001(\010\032\231\001\n\005Entry\022\021\n\tstringKey\030\001 \001(\t\022(" +
"\n\005value\030\002 \002(\0132\031.akka.cluster.ddata.ORSet" +
"\022\016\n\006intKey\030\003 \001(\021\022\017\n\007longKey\030\004 \001(\022\0222\n\010oth" +
"erKey\030\005 \001(\0132 .akka.cluster.ddata.OtherMe" +
"ssage*-\n\014ORSetDeltaOp\022\007\n\003Add\020\000\022\n\n\006Remove" +
"\020\001\022\010\n\004Full\020\002*R\n\014ORMapDeltaOp\022\014\n\010ORMapPut" +
"\020\000\022\017\n\013ORMapRemove\020\001\022\022\n\016ORMapRemoveKey\020\002\022" +
"\017\n\013ORMapUpdate\020\003B#\n\037akka.cluster.ddata.p",
"rotobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -18611,13 +19118,13 @@ public final class ReplicatedDataMessages {
internal_static_akka_cluster_ddata_GSet_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_GSet_descriptor,
new java.lang.String[] { "StringElements", "IntElements", "LongElements", "OtherElements", });
new java.lang.String[] { "StringElements", "IntElements", "LongElements", "OtherElements", "ActorRefElements", });
internal_static_akka_cluster_ddata_ORSet_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_akka_cluster_ddata_ORSet_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_ddata_ORSet_descriptor,
new java.lang.String[] { "Vvector", "Dots", "StringElements", "IntElements", "LongElements", "OtherElements", });
new java.lang.String[] { "Vvector", "Dots", "StringElements", "IntElements", "LongElements", "OtherElements", "ActorRefElements", });
internal_static_akka_cluster_ddata_ORSetDeltaGroup_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_akka_cluster_ddata_ORSetDeltaGroup_fieldAccessorTable = new

View file

@ -0,0 +1,10 @@
# #23703 Optimized serializer for ORSet[ActorRef]
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsCount")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsList")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElements")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsBytes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#ORSetOrBuilder.getActorRefElementsCount")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#ORSetOrBuilder.getActorRefElementsList")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#ORSetOrBuilder.getActorRefElements")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#ORSetOrBuilder.getActorRefElementsBytes")

View file

@ -12,6 +12,7 @@ message GSet {
repeated sint32 intElements = 2 [packed=true];
repeated sint64 longElements = 3 [packed=true];
repeated OtherMessage otherElements = 4;
repeated string actorRefElements = 5; // added in Akka 2.5.14
}
message ORSet {
@ -21,6 +22,7 @@ message ORSet {
repeated sint32 intElements = 4 [packed=true];
repeated sint64 longElements = 5 [packed=true];
repeated OtherMessage otherElements = 6;
repeated string actorRefElements = 7; // added in Akka 2.5.14
}
message ORSetDeltaGroup {

View file

@ -13,6 +13,7 @@ import java.util.TreeSet
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.breakOut
import akka.actor.ExtendedActorSystem
import akka.cluster.ddata._
import akka.cluster.ddata.Replicator.Internal._
@ -22,10 +23,11 @@ import akka.serialization.SerializerWithStringManifest
import akka.serialization.BaseSerializer
import akka.protobuf.{ ByteString, GeneratedMessage }
import akka.util.ByteString.UTF_8
import java.io.NotSerializableException
import akka.actor.ActorRef
import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage
import akka.serialization.Serialization
private object ReplicatedDataSerializer {
/*
@ -321,10 +323,12 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val intElements = new ArrayList[Integer]
val longElements = new ArrayList[jl.Long]
val otherElements = new ArrayList[dm.OtherMessage]
val actorRefElements = new ArrayList[String]
gset.elements.foreach {
case s: String stringElements.add(s)
case i: Int intElements.add(i)
case l: Long longElements.add(l)
case ref: ActorRef actorRefElements.add(Serialization.serializedActorPath(ref))
case other otherElements.add(otherMessageToProto(other))
}
if (!stringElements.isEmpty) {
@ -343,17 +347,26 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
Collections.sort(otherElements, OtherMessageComparator)
b.addAllOtherElements(otherElements)
}
if (!actorRefElements.isEmpty) {
Collections.sort(actorRefElements)
b.addAllActorRefElements(actorRefElements)
}
b.build()
}
def gsetFromBinary(bytes: Array[Byte]): GSet[_] =
gsetFromProto(rd.GSet.parseFrom(bytes))
def gsetFromProto(gset: rd.GSet): GSet[Any] =
GSet(gset.getStringElementsList.iterator.asScala.toSet ++
def gsetFromProto(gset: rd.GSet): GSet[Any] = {
val elements: Iterator[Any] = {
gset.getStringElementsList.iterator.asScala ++
gset.getIntElementsList.iterator.asScala ++
gset.getLongElementsList.iterator.asScala ++
gset.getOtherElementsList.iterator.asScala.map(otherMessageFromProto))
gset.getOtherElementsList.iterator.asScala.map(otherMessageFromProto) ++
gset.getActorRefElementsList.iterator.asScala.map(resolveActorRef)
}
GSet(elements.toSet)
}
def orsetToProto(orset: ORSet[_]): rd.ORSet =
orsetToProtoImpl(orset.asInstanceOf[ORSet[Any]])
@ -366,10 +379,12 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val longElements = new ArrayList[jl.Long]
val otherElements = new ArrayList[dm.OtherMessage]
var otherElementsMap = Map.empty[dm.OtherMessage, Any]
val actorRefElements = new ArrayList[ActorRef]
orset.elementsMap.keysIterator.foreach {
case s: String stringElements.add(s)
case i: Int intElements.add(i)
case l: Long longElements.add(l)
case ref: ActorRef actorRefElements.add(ref)
case other
val enclosedMsg = otherMessageToProto(other)
otherElements.add(enclosedMsg)
@ -409,6 +424,14 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
b.addAllOtherElements(otherElements)
addDots(otherElements)
}
if (!actorRefElements.isEmpty) {
Collections.sort(actorRefElements)
val iter = actorRefElements.iterator
while (iter.hasNext) {
b.addActorRefElements(Serialization.serializedActorPath(iter.next()))
}
addDots(actorRefElements)
}
b.build()
}
@ -463,11 +486,13 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
}
def orsetFromProto(orset: rd.ORSet): ORSet[Any] = {
val elements: Iterator[Any] =
(orset.getStringElementsList.iterator.asScala ++
val elements: Iterator[Any] = {
orset.getStringElementsList.iterator.asScala ++
orset.getIntElementsList.iterator.asScala ++
orset.getLongElementsList.iterator.asScala ++
orset.getOtherElementsList.iterator.asScala.map(otherMessageFromProto))
orset.getOtherElementsList.iterator.asScala.map(otherMessageFromProto) ++
orset.getActorRefElementsList.iterator.asScala.map(resolveActorRef)
}
val dots = orset.getDotsList.asScala.map(versionVectorFromProto).iterator
val elementsMap = elements.zip(dots).toMap

View file

@ -6,12 +6,15 @@ package akka.cluster.ddata.protobuf
import java.util.Base64
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Identify
import akka.cluster.ddata._
import akka.cluster.ddata.Replicator.Internal._
import akka.testkit.TestKit
@ -19,10 +22,14 @@ import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.testkit.TestActors
class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
"ReplicatedDataSerializerSpec",
ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0
@ -109,6 +116,35 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
val s3 = ORSet().add(address1, "a").add(address2, 17).remove(address3, 17)
val s4 = ORSet().add(address2, 17).remove(address3, 17).add(address1, "a")
checkSameContent(s3.merge(s4), s4.merge(s3))
// ORSet with ActorRef
checkSerialization(ORSet().add(address1, ref1))
checkSerialization(ORSet().add(address1, ref1).add(address1, ref2))
checkSerialization(ORSet().add(address1, ref1).add(address1, "a").add(address2, ref2) add (address2, "b"))
val s5 = ORSet().add(address1, "a").add(address2, ref1)
val s6 = ORSet().add(address2, ref1).add(address1, "a")
checkSameContent(s5.merge(s6), s6.merge(s5))
}
"serialize ORSet with ActorRef message sent between two systems" in {
val system2 = ActorSystem(system.name, system.settings.config)
try {
val echo1 = system.actorOf(TestActors.echoActorProps, "echo1")
system2.actorOf(TestActors.echoActorProps, "echo2")
system.actorSelection(RootActorPath(Cluster(system2).selfAddress) / "user" / "echo2").tell(
Identify("2"), testActor)
val echo2 = expectMsgType[ActorIdentity].ref.get
val msg = ORSet.empty[ActorRef].add(Cluster(system), echo1).add(Cluster(system), echo2)
echo2.tell(msg, testActor)
val reply = expectMsgType[ORSet[ActorRef]]
reply.elements should ===(Set(echo1, echo2))
} finally {
shutdown(system2)
}
}
"serialize ORSet delta" in {

View file

@ -91,7 +91,7 @@ lazy val benchJmh = akkaModule("akka-bench-jmh")
actor,
stream, streamTests,
persistence, persistenceTyped,
distributedData,
distributedData, clusterTyped,
testkit
).map(_ % "compile->compile;compile->test"): _*
)