fix serialization of delta ops, #22604

This commit is contained in:
Patrik Nordwall 2017-03-20 10:42:38 +01:00
parent 45f4727625
commit cc7065601a
8 changed files with 54 additions and 24 deletions

View file

@ -25,7 +25,7 @@ object ORMap {
*/ */
def unapply[A, B <: ReplicatedData](m: ORMap[A, B]): Option[Map[A, B]] = Some(m.entries) def unapply[A, B <: ReplicatedData](m: ORMap[A, B]): Option[Map[A, B]] = Some(m.entries)
sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas { sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas with ReplicatedDataSerialization {
type T = DeltaOp type T = DeltaOp
override def zero: DeltaReplicatedData override def zero: DeltaReplicatedData
} }

View file

@ -38,7 +38,7 @@ object ORSet {
*/ */
@InternalApi private[akka]type Dot = VersionVector @InternalApi private[akka]type Dot = VersionVector
sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas { sealed trait DeltaOp extends ReplicatedDelta with RequiresCausalDeliveryOfDeltas with ReplicatedDataSerialization {
type T = DeltaOp type T = DeltaOp
} }

View file

@ -639,7 +639,6 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData]) val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag)) ORMap.PutDeltaOp(ORSet.AddDeltaOp(orsetFromProto(entry.getUnderlying)), map.head, zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) { } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemove) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag)) ORMap.RemoveDeltaOp(ORSet.RemoveDeltaOp(orsetFromProto(entry.getUnderlying)), zeroTagFromCode(entry.getZeroTag))
} else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) { } else if (entry.getOperation == rd.ORMapDeltaOp.ORMapRemoveKey) {
val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData]) val map = singleMapEntryFromProto(entry.getEntryData, (v: dm.OtherMessage) otherMessageFromProto(v).asInstanceOf[ReplicatedData])
@ -653,20 +652,20 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
ORMap.DeltaGroup(ops) ORMap.DeltaGroup(ops)
} }
private def ormapPutToProto(addDelta: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = { private def ormapPutToProto(deltaOp: ORMap.PutDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
} }
private def ormapRemoveToProto(addDelta: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = { private def ormapRemoveToProto(deltaOp: ORMap.RemoveDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
} }
private def ormapRemoveKeyToProto(addDelta: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = { private def ormapRemoveKeyToProto(deltaOp: ORMap.RemoveKeyDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
} }
private def ormapUpdateToProto(addDelta: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = { private def ormapUpdateToProto(deltaOp: ORMap.UpdateDeltaOp[_, _]): rd.ORMapDeltaGroup = {
ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(addDelta.asInstanceOf[ORMap.DeltaOp]))) ormapDeltaGroupToProto(ORMap.DeltaGroup(scala.collection.immutable.IndexedSeq(deltaOp.asInstanceOf[ORMap.DeltaOp])))
} }
private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = { private def ormapDeltaGroupToProto(deltaGroup: ORMap.DeltaGroup[_, _]): rd.ORMapDeltaGroup = {
@ -698,7 +697,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem)
case ORMap.RemoveDeltaOp(op, zt) case ORMap.RemoveDeltaOp(op, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map.empty, zt.value)) b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map.empty, zt.value))
case ORMap.RemoveKeyDeltaOp(op, k, zt) case ORMap.RemoveKeyDeltaOp(op, k, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemove, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map(k k), zt.value)) b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapRemoveKey, op.asInstanceOf[ORSet.RemoveDeltaOp[_]].underlying, Map(k k), zt.value))
case ORMap.UpdateDeltaOp(op, m, zt) case ORMap.UpdateDeltaOp(op, m, zt)
b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapUpdate, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, m, zt.value)) b.addEntries(createEntry(rd.ORMapDeltaOp.ORMapUpdate, op.asInstanceOf[ORSet.AddDeltaOp[_]].underlying, m, zt.value))
case ORMap.DeltaGroup(u) case ORMap.DeltaGroup(u)

View file

@ -25,6 +25,11 @@ object ReplicatorMapDeltaSpec extends MultiNodeConfig {
akka.loglevel = DEBUG akka.loglevel = DEBUG
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
akka.actor {
serialize-messages = off
serialize-creators = off
allow-java-serialization = off
}
""")) """))
testTransport(on = true) testTransport(on = true)
@ -182,8 +187,8 @@ class ReplicatorMapDeltaSpec extends MultiNodeSpec(ReplicatorMapDeltaSpec) with
deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2) deltaReplicator ! Update(key._1, PNCounterMap.empty[String], WriteLocal)(_ increment key._2)
} }
List(KeyD, KeyE, KeyF).foreach { key List(KeyD, KeyE, KeyF).foreach { key
fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2, Set("a"))) fullStateReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 Set("a")))
deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2, Set("a"))) deltaReplicator ! Update(key._1, ORMultiMap.emptyWithValueDeltas[String, String], WriteLocal)(_ + (key._2 Set("a")))
} }
} }
enterBarrier("updated-1") enterBarrier("updated-1")

View file

@ -22,6 +22,11 @@ object ReplicatorORSetDeltaSpec extends MultiNodeConfig {
akka.loglevel = INFO akka.loglevel = INFO
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.log-dead-letters-during-shutdown = off akka.log-dead-letters-during-shutdown = off
akka.actor {
serialize-messages = off
serialize-creators = off
allow-java-serialization = off
}
""")) """))
testTransport(on = true) testTransport(on = true)

View file

@ -91,10 +91,10 @@ class ReplicatorPruningSpec extends MultiNodeSpec(ReplicatorPruningSpec) with ST
replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" } replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }
expectMsg(UpdateSuccess(KeyC, None)) expectMsg(UpdateSuccess(KeyC, None))
replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a", Set("A")) } replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" Set("A")) }
expectMsg(UpdateSuccess(KeyD, None)) expectMsg(UpdateSuccess(KeyD, None))
replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a", GSet.empty[String].add("A")) } replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" GSet.empty[String].add("A")) }
expectMsg(UpdateSuccess(KeyE, None)) expectMsg(UpdateSuccess(KeyE, None))
enterBarrier("updates-done") enterBarrier("updates-done")

View file

@ -17,6 +17,7 @@ import akka.testkit.TestKit
import akka.cluster.UniqueAddress import akka.cluster.UniqueAddress
import akka.remote.RARP import akka.remote.RARP
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.Props
class ReplicatedDataSerializerSpec extends TestKit(ActorSystem( class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
"ReplicatedDataSerializerSpec", "ReplicatedDataSerializerSpec",
@ -24,6 +25,11 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
akka.actor.provider=cluster akka.actor.provider=cluster
akka.remote.netty.tcp.port=0 akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = 0
akka.actor {
serialize-messages = off
serialize-creators = off
allow-java-serialization = off
}
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll { """))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem]) val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem])
@ -34,6 +40,10 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L) val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L) val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L)
val ref1 = system.actorOf(Props.empty, "ref1")
val ref2 = system.actorOf(Props.empty, "ref2")
val ref3 = system.actorOf(Props.empty, "ref3")
override def afterAll { override def afterAll {
shutdown() shutdown()
} }
@ -71,14 +81,14 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
checkSerialization(GSet() + "a" + "b") checkSerialization(GSet() + "a" + "b")
checkSerialization(GSet() + 1 + 2 + 3) checkSerialization(GSet() + 1 + 2 + 3)
checkSerialization(GSet() + address1 + address2) checkSerialization(GSet() + ref1 + ref2)
checkSerialization(GSet() + 1L + "2" + 3 + address1) checkSerialization(GSet() + 1L + "2" + 3 + ref1)
checkSameContent(GSet() + "a" + "b", GSet() + "a" + "b") checkSameContent(GSet() + "a" + "b", GSet() + "a" + "b")
checkSameContent(GSet() + "a" + "b", GSet() + "b" + "a") checkSameContent(GSet() + "a" + "b", GSet() + "b" + "a")
checkSameContent(GSet() + address1 + address2 + address3, GSet() + address2 + address1 + address3) checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref2 + ref1 + ref3)
checkSameContent(GSet() + address1 + address2 + address3, GSet() + address3 + address2 + address1) checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref3 + ref2 + ref1)
} }
"serialize ORSet" in { "serialize ORSet" in {
@ -89,7 +99,7 @@ class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
checkSerialization(ORSet().add(address1, "a").add(address2, "b").remove(address1, "a")) checkSerialization(ORSet().add(address1, "a").add(address2, "b").remove(address1, "a"))
checkSerialization(ORSet().add(address1, 1).add(address2, 2)) checkSerialization(ORSet().add(address1, 1).add(address2, 2))
checkSerialization(ORSet().add(address1, 1L).add(address2, 2L)) checkSerialization(ORSet().add(address1, 1L).add(address2, 2L))
checkSerialization(ORSet().add(address1, "a").add(address2, 2).add(address3, 3L).add(address3, address3)) checkSerialization(ORSet().add(address1, "a").add(address2, 2).add(address3, 3L).add(address3, ref3))
val s1 = ORSet().add(address1, "a").add(address2, "b") val s1 = ORSet().add(address1, "a").add(address2, "b")
val s2 = ORSet().add(address2, "b").add(address1, "a") val s2 = ORSet().add(address2, "b").add(address1, "a")

View file

@ -26,6 +26,8 @@ import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.DurableStore.DurableDataEnvelope import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import akka.cluster.ddata.GCounter import akka.cluster.ddata.GCounter
import akka.cluster.ddata.VersionVector import akka.cluster.ddata.VersionVector
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORMultiMap
class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"ReplicatorMessageSerializerSpec", "ReplicatorMessageSerializerSpec",
@ -33,6 +35,11 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
akka.actor.provider=cluster akka.actor.provider=cluster
akka.remote.netty.tcp.port=0 akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.port = 0
akka.actor {
serialize-messages = off
serialize-creators = off
allow-java-serialization = off
}
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll { """))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatorMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) val serializer = new ReplicatorMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
@ -61,8 +68,10 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"serialize Replicator messages" in { "serialize Replicator messages" in {
val ref1 = system.actorOf(Props.empty, "ref1") val ref1 = system.actorOf(Props.empty, "ref1")
val data1 = GSet.empty[String] + "a" val data1 = GSet.empty[String] + "a"
val delta1 = GCounter.empty.increment(address1, 17).increment(address2, 2) val delta1 = GCounter.empty.increment(address1, 17).increment(address2, 2).delta.get
val delta2 = delta1.increment(address2, 1) val delta2 = delta1.increment(address2, 1).delta.get
val delta3 = ORSet.empty[String].add(address1, "a").delta.get
val delta4 = ORMultiMap.empty[String, String].addBinding(address1, "a", "b").delta.get
checkSerialization(Get(keyA, ReadLocal)) checkSerialization(Get(keyA, ReadLocal))
checkSerialization(Get(keyA, ReadMajority(2.seconds), Some("x"))) checkSerialization(Get(keyA, ReadMajority(2.seconds), Some("x")))
@ -92,7 +101,9 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"B" DataEnvelope(GSet() + "b" + "c")), sendBack = true)) "B" DataEnvelope(GSet() + "b" + "c")), sendBack = true))
checkSerialization(DeltaPropagation(address1, reply = true, Map( checkSerialization(DeltaPropagation(address1, reply = true, Map(
"A" Delta(DataEnvelope(delta1), 1L, 1L), "A" Delta(DataEnvelope(delta1), 1L, 1L),
"B" Delta(DataEnvelope(delta2), 3L, 5L)))) "B" Delta(DataEnvelope(delta2), 3L, 5L),
"C" Delta(DataEnvelope(delta3), 1L, 1L),
"DC" Delta(DataEnvelope(delta4), 1L, 1L))))
checkSerialization(new DurableDataEnvelope(data1)) checkSerialization(new DurableDataEnvelope(data1))
val pruning = Map( val pruning = Map(