diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java index 1f25776e93..2cbf071334 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java @@ -9846,17 +9846,17 @@ public final class ReplicatedDataMessages { akka.protobuf.ByteString getStringKeyBytes(); - // required .akka.cluster.ddata.OtherMessage value = 2; + // optional .akka.cluster.ddata.OtherMessage value = 2; /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ boolean hasValue(); /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getValue(); /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getValueOrBuilder(); @@ -10069,23 +10069,23 @@ public final class ReplicatedDataMessages { } } - // required .akka.cluster.ddata.OtherMessage value = 2; + // optional .akka.cluster.ddata.OtherMessage value = 2; public static final int VALUE_FIELD_NUMBER = 2; private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value_; /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public boolean hasValue() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getValue() { return value_; } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getValueOrBuilder() { return value_; @@ -10157,13 +10157,11 @@ public final class ReplicatedDataMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasValue()) { - memoizedIsInitialized = 0; - return false; - } - if (!getValue().isInitialized()) { - memoizedIsInitialized = 0; - return false; + if (hasValue()) { + if (!getValue().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } } if (hasOtherKey()) { if (!getOtherKey().isInitialized()) { @@ -10452,13 +10450,11 @@ public final class ReplicatedDataMessages { } public final boolean isInitialized() { - if (!hasValue()) { - - return false; - } - if (!getValue().isInitialized()) { - - return false; + if (hasValue()) { + if (!getValue().isInitialized()) { + + return false; + } } if (hasOtherKey()) { if (!getOtherKey().isInitialized()) { @@ -10562,18 +10558,18 @@ public final class ReplicatedDataMessages { return this; } - // required .akka.cluster.ddata.OtherMessage value = 2; + // optional .akka.cluster.ddata.OtherMessage value = 2; private akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance(); private akka.protobuf.SingleFieldBuilder< akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder> valueBuilder_; /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public boolean hasValue() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage getValue() { if (valueBuilder_ == null) { @@ -10583,7 +10579,7 @@ public final class ReplicatedDataMessages { } } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public Builder setValue(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value) { if (valueBuilder_ == null) { @@ -10599,7 +10595,7 @@ public final class ReplicatedDataMessages { return this; } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public Builder setValue( akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder builderForValue) { @@ -10613,7 +10609,7 @@ public final class ReplicatedDataMessages { return this; } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public Builder mergeValue(akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage value) { if (valueBuilder_ == null) { @@ -10632,7 +10628,7 @@ public final class ReplicatedDataMessages { return this; } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public Builder clearValue() { if (valueBuilder_ == null) { @@ -10645,7 +10641,7 @@ public final class ReplicatedDataMessages { return this; } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder getValueBuilder() { bitField0_ |= 0x00000002; @@ -10653,7 +10649,7 @@ public final class ReplicatedDataMessages { return getValueFieldBuilder().getBuilder(); } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ public akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getValueOrBuilder() { if (valueBuilder_ != null) { @@ -10663,7 +10659,7 @@ public final class ReplicatedDataMessages { } } /** - * required .akka.cluster.ddata.OtherMessage value = 2; + * optional .akka.cluster.ddata.OtherMessage value = 2; */ private akka.protobuf.SingleFieldBuilder< akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.Builder, akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder> @@ -18396,7 +18392,7 @@ public final class ReplicatedDataMessages { "age\"\263\003\n\017ORMapDeltaGroup\022:\n\007entries\030\001 \003(\013" + "2).akka.cluster.ddata.ORMapDeltaGroup.En" + "try\032\243\001\n\010MapEntry\022\021\n\tstringKey\030\001 \001(\t\022/\n\005v" + - "alue\030\002 \002(\0132 .akka.cluster.ddata.OtherMes" + + "alue\030\002 \001(\0132 .akka.cluster.ddata.OtherMes" + "sage\022\016\n\006intKey\030\003 \001(\021\022\017\n\007longKey\030\004 \001(\022\0222\n" + "\010otherKey\030\005 \001(\0132 .akka.cluster.ddata.Oth" + "erMessage\032\275\001\n\005Entry\0223\n\toperation\030\001 \002(\0162 ", diff --git a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto index 33efe55f5a..ca6b2fbdca 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatedDataMessages.proto @@ -78,7 +78,7 @@ message ORMap { message ORMapDeltaGroup { message MapEntry { optional string stringKey = 1; - required OtherMessage value = 2; + optional OtherMessage value = 2; optional sint32 intKey = 3; optional sint64 longKey = 4; optional OtherMessage otherKey = 5; diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala index 1240e7fdbd..b8336f2239 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala @@ -25,7 +25,7 @@ object ORMap { */ def unapply[A, B <: ReplicatedData](m: ORMap[A, B]): Option[Map[A, B]] = Some(m.entries) - sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas { + sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas with ReplicatedDataSerialization { type T = DeltaOp override def zero: DeltaReplicatedData } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala index fa3eacd744..7f449ccfee 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORSet.scala @@ -38,7 +38,7 @@ object ORSet { */ @InternalApi private[akka]type Dot = VersionVector - sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas { + sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas with ReplicatedDataSerialization { type T = DeltaOp } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index cd9eff03b5..8d713e4633 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -585,6 +585,13 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) } } + def singleKeyEntryFromProto[PEntry <: GeneratedMessage, A <: GeneratedMessage](entry: PEntry)(implicit eh: ProtoMapEntryReader[PEntry, A]): Any = + if (eh.hasStringKey(entry)) eh.getStringKey(entry) + else if (eh.hasIntKey(entry)) eh.getIntKey(entry) + else if (eh.hasLongKey(entry)) eh.getLongKey(entry) + else if (eh.hasOtherKey(entry)) otherMessageFromProto(eh.getOtherKey(entry)) + else throw new IllegalArgumentException(s"Can't deserialize the key in the ORMap delta") + // wire protocol is always DeltaGroup private def ormapPutFromBinary(bytes: Array[Byte]): ORMap.PutDeltaOp[Any, ReplicatedData] = { val group = ormapDeltaGroupFromBinary(bytes) @@ -639,11 +646,10 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData]) ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag)) } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) { - val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData]) ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag)) } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) { - val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedData]) - ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), map.keySet.head, zeroTagFromCode(entry.getZeroTag)) + val elem = singleKeyEntryFromProto(entry.getEntryData) + ORMap.RemoveKeyDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), elem, zeroTagFromCode(entry.getZeroTag)) } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapUpdate) { val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) ⇒ otherMessageFromProto(v).asInstanceOf[ReplicatedDelta]) ORMap.UpdateDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map, zeroTagFromCode(entry.getZeroTag)) @@ -653,20 +659,20 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) ORMap.DeltaGroup(ops) } - private def ormapPutToProto(addDelta: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) + private def ormapPutToProto(deltaOp: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = { + ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) } - private def ormapRemoveToProto(addDelta: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) + private def ormapRemoveToProto(deltaOp: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = { + ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) } - private def ormapRemoveKeyToProto(addDelta: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) + private def ormapRemoveKeyToProto(deltaOp: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = { + ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) } - private def ormapUpdateToProto(addDelta: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = { - ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) + private def ormapUpdateToProto(deltaOp: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = { + ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp]))) } private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = { @@ -691,6 +697,22 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) } } + def createEntryWithKey(opType: rd.ORMapDeltaOp, u: ORSet[_], k: Any, zt: Int) = { + val entryDataBuilder = rd.ORMapDeltaGroup.MapEntry.newBuilder() + k match { + case key: String ⇒ entryDataBuilder.setStringKey(key) + case key: Int ⇒ entryDataBuilder.setIntKey(key) + case key: Long ⇒ entryDataBuilder.setLongKey(key) + case key ⇒ entryDataBuilder.setOtherKey(otherMessageToProto(key)) + } + val builder = rd.ORMapDeltaGroup.Entry.newBuilder() + .setOperation(opType) + .setUnderlying(orsetToProto(u)) + .setZeroTag(zt) + builder.setEntryData(entryDataBuilder.build()) + builder + } + val b = rd.ORMapDeltaGroup.newBuilder() deltaGroup.ops.foreach { case ORMap.PutDeltaOp(op, pair, zt) ⇒ @@ -698,7 +720,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) case ORMap.RemoveDeltaOp(op, zt) ⇒ b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map.empty, zt.value)) case ORMap.RemoveKeyDeltaOp(op, k, zt) ⇒ - b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map(k → k), zt.value)) + b.addEntries(createEntryWithKey(rd.ORMapDeltaOp.ORMapRemoveKey, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, k, zt.value)) case ORMap.UpdateDeltaOp(op, m, zt) ⇒ b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapUpdate, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, m, zt.value)) case ORMap.DeltaGroup(u) ⇒ diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala index f083fdbf30..f0597e79db 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorMapDeltaSpec.scala @@ -25,6 +25,11 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig { akka.loglevel = DEBUG akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off + akka.actor { + serialize-messages = off + serialize-creators = off + allow-java-serialization = off + } """)) testTransport(on = true) @@ -182,8 +187,8 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2) } List(KeyD, KeyE, KeyF).foreach { key ⇒ - fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2, Set("a"))) - deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2, Set("a"))) + fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 → Set("a"))) + deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 → Set("a"))) } } enterBarrier("updated-1") diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala index 3d1a9581a8..6925b80044 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorORSetDeltaSpec.scala @@ -22,6 +22,11 @@ object ReplicatorORSetDeltaSpec extends MultiNodeConfig { akka.loglevel = INFO akka.actor.provider = "cluster" akka.log-dead-letters-during-shutdown = off + akka.actor { + serialize-messages = off + serialize-creators = off + allow-java-serialization = off + } """)) testTransport(on = true) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala index da67a36434..b05bb5fe50 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorPruningSpec.scala @@ -91,10 +91,10 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" } expectMsg(UpdateSuccess(KeyC, None)) - replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a", Set("A")) } + replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" → Set("A")) } expectMsg(UpdateSuccess(KeyD, None)) - replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a", GSet.empty[String].add("A")) } + replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" → GSet.empty[String].add("A")) } expectMsg(UpdateSuccess(KeyE, None)) enterBarrier("updates-done") diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala index b77c363f2b..da86a68dc2 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala @@ -17,6 +17,7 @@ import akka.testkit.TestKit import akka.cluster.UniqueAddress import akka.remote.RARP import com.typesafe.config.ConfigFactory +import akka.actor.Props class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( "ReplicatedDataSerializerSpec", @@ -24,6 +25,11 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( akka.actor.provider=cluster akka.remote.netty.tcp.port=0 akka.remote.artery.canonical.port = 0 + akka.actor { + serialize-messages = off + serialize-creators = off + allow-java-serialization = off + } """))) with WordSpecLike with Matchers with BeforeAndAfterAll { val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem]) @@ -34,6 +40,10 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L) val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L) + val ref1 = system.actorOf(Props.empty, "ref1") + val ref2 = system.actorOf(Props.empty, "ref2") + val ref3 = system.actorOf(Props.empty, "ref3") + override def afterAll { shutdown() } @@ -71,14 +81,14 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( checkSerialization(GSet() + "a" + "b") checkSerialization(GSet() + 1 + 2 + 3) - checkSerialization(GSet() + address1 + address2) + checkSerialization(GSet() + ref1 + ref2) - checkSerialization(GSet() + 1L + "2" + 3 + address1) + checkSerialization(GSet() + 1L + "2" + 3 + ref1) checkSameContent(GSet() + "a" + "b", GSet() + "a" + "b") checkSameContent(GSet() + "a" + "b", GSet() + "b" + "a") - checkSameContent(GSet() + address1 + address2 + address3, GSet() + address2 + address1 + address3) - checkSameContent(GSet() + address1 + address2 + address3, GSet() + address3 + address2 + address1) + checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref2 + ref1 + ref3) + checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref3 + ref2 + ref1) } "serialize ORSet" in { @@ -89,7 +99,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( checkSerialization(ORSet().add(address1, "a").add(address2, "b").remove(address1, "a")) checkSerialization(ORSet().add(address1, 1).add(address2, 2)) checkSerialization(ORSet().add(address1, 1L).add(address2, 2L)) - checkSerialization(ORSet().add(address1, "a").add(address2, 2).add(address3, 3L).add(address3, address3)) + checkSerialization(ORSet().add(address1, "a").add(address2, 2).add(address3, 3L).add(address3, ref3)) val s1 = ORSet().add(address1, "a").add(address2, "b") val s2 = ORSet().add(address2, "b").add(address1, "a") @@ -267,6 +277,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, 1, "A")) checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, 1L, "A")) checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, Flag(), "A")) + checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String].addBinding(address1, "a", "A").remove(address1, "a").delta.get) checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String] .addBinding(address1, "a", "A1") .put(address2, "b", Set("B1", "B2", "B3")) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index 7bec0ef523..22fb7d772a 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -26,6 +26,8 @@ import com.typesafe.config.ConfigFactory import akka.cluster.ddata.DurableStore.DurableDataEnvelope import akka.cluster.ddata.GCounter import akka.cluster.ddata.VersionVector +import akka.cluster.ddata.ORSet +import akka.cluster.ddata.ORMultiMap class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( "ReplicatorMessageSerializerSpec", @@ -33,6 +35,11 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( akka.actor.provider=cluster akka.remote.netty.tcp.port=0 akka.remote.artery.canonical.port = 0 + akka.actor { + serialize-messages = off + serialize-creators = off + allow-java-serialization = off + } """))) with WordSpecLike with Matchers with BeforeAndAfterAll { val serializer = new ReplicatorMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) @@ -61,8 +68,10 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( "serialize Replicator messages" in { val ref1 = system.actorOf(Props.empty, "ref1") val data1 = GSet.empty[String] + "a" - val delta1 = GCounter.empty.increment(address1, 17).increment(address2, 2) - val delta2 = delta1.increment(address2, 1) + val delta1 = GCounter.empty.increment(address1, 17).increment(address2, 2).delta.get + val delta2 = delta1.increment(address2, 1).delta.get + val delta3 = ORSet.empty[String].add(address1, "a").delta.get + val delta4 = ORMultiMap.empty[String, String].addBinding(address1, "a", "b").delta.get checkSerialization(Get(keyA, ReadLocal)) checkSerialization(Get(keyA, ReadMajority(2.seconds), Some("x"))) @@ -92,7 +101,9 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( "B" → DataEnvelope(GSet() + "b" + "c")), sendBack = true)) checkSerialization(DeltaPropagation(address1, reply = true, Map( "A" → Delta(DataEnvelope(delta1), 1L, 1L), - "B" → Delta(DataEnvelope(delta2), 3L, 5L)))) + "B" → Delta(DataEnvelope(delta2), 3L, 5L), + "C" → Delta(DataEnvelope(delta3), 1L, 1L), + "DC" → Delta(DataEnvelope(delta4), 1L, 1L)))) checkSerialization(new DurableDataEnvelope(data1)) val pruning = Map(