fix serialization of pruning in DurableDataEnvelope

This commit is contained in:
Patrik Nordwall 2017-02-22 15:17:30 +01:00
parent 40b883cda7
commit f2fe8582fa
2 changed files with 42 additions and 13 deletions

View file

@ -30,6 +30,7 @@ import java.io.NotSerializableException
import akka.actor.Address import akka.actor.Address
import akka.cluster.ddata.VersionVector import akka.cluster.ddata.VersionVector
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.ddata.PruningState.PruningPerformed
/** /**
* INTERNAL API * INTERNAL API
@ -424,10 +425,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
Changed(key)(data) Changed(key)(data)
} }
private def dataEnvelopeToProto(dataEnvelope: DataEnvelope): dm.DataEnvelope = { private def pruningToProto(entries: Map[UniqueAddress, PruningState]): Iterable[dm.DataEnvelope.PruningEntry] = {
val dataEnvelopeBuilder = dm.DataEnvelope.newBuilder(). entries.map {
setData(otherMessageToProto(dataEnvelope.data))
dataEnvelope.pruning.foreach {
case (removedAddress, state) case (removedAddress, state)
val b = dm.DataEnvelope.PruningEntry.newBuilder(). val b = dm.DataEnvelope.PruningEntry.newBuilder().
setRemovedAddress(uniqueAddressToProto(removedAddress)) setRemovedAddress(uniqueAddressToProto(removedAddress))
@ -442,8 +441,15 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
// wire backwards compatibility with 2.4.16 (required field) // wire backwards compatibility with 2.4.16 (required field)
b.setOwnerAddress(uniqueAddressToProto(dummyAddress)) b.setOwnerAddress(uniqueAddressToProto(dummyAddress))
} }
dataEnvelopeBuilder.addPruning(b) b.build()
} }
}
private def dataEnvelopeToProto(dataEnvelope: DataEnvelope): dm.DataEnvelope = {
val dataEnvelopeBuilder = dm.DataEnvelope.newBuilder().
setData(otherMessageToProto(dataEnvelope.data))
dataEnvelopeBuilder.addAllPruning(pruningToProto(dataEnvelope.pruning).asJava)
if (!dataEnvelope.deltaVersions.isEmpty) if (!dataEnvelope.deltaVersions.isEmpty)
dataEnvelopeBuilder.setDeltaVersions(versionVectorToProto(dataEnvelope.deltaVersions)) dataEnvelopeBuilder.setDeltaVersions(versionVectorToProto(dataEnvelope.deltaVersions))
@ -517,9 +523,18 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
} }
private def durableDataEnvelopeToProto(durableDataEnvelope: DurableDataEnvelope): dm.DurableDataEnvelope = { private def durableDataEnvelopeToProto(durableDataEnvelope: DurableDataEnvelope): dm.DurableDataEnvelope = {
dm.DurableDataEnvelope.newBuilder() // only keep the PruningPerformed entries
val pruning = durableDataEnvelope.dataEnvelope.pruning.filter {
case (_, _: PruningPerformed) true
case _ false
}
val builder = dm.DurableDataEnvelope.newBuilder()
.setData(otherMessageToProto(durableDataEnvelope.data)) .setData(otherMessageToProto(durableDataEnvelope.data))
.build()
builder.addAllPruning(pruningToProto(pruning).asJava)
builder.build()
} }
private def durableDataEnvelopeFromBinary(bytes: Array[Byte]): DurableDataEnvelope = private def durableDataEnvelopeFromBinary(bytes: Array[Byte]): DurableDataEnvelope =
@ -529,7 +544,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem)
val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData] val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData]
val pruning = pruningFromProto(durableDataEnvelope.getPruningList) val pruning = pruningFromProto(durableDataEnvelope.getPruningList)
new DurableDataEnvelope(data) new DurableDataEnvelope(DataEnvelope(data, pruning))
} }
} }

View file

@ -25,6 +25,7 @@ import akka.remote.RARP
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.cluster.ddata.DurableStore.DurableDataEnvelope import akka.cluster.ddata.DurableStore.DurableDataEnvelope
import akka.cluster.ddata.GCounter import akka.cluster.ddata.GCounter
import akka.cluster.ddata.VersionVector
class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
"ReplicatorMessageSerializerSpec", "ReplicatorMessageSerializerSpec",
@ -48,10 +49,11 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
shutdown() shutdown()
} }
def checkSerialization(obj: AnyRef): Unit = { def checkSerialization[T <: AnyRef](obj: T): T = {
val blob = serializer.toBinary(obj) val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj)) val deserialized = serializer.fromBinary(blob, serializer.manifest(obj))
ref should be(obj) deserialized should be(obj)
deserialized.asInstanceOf[T]
} }
"ReplicatorMessageSerializer" must { "ReplicatorMessageSerializer" must {
@ -90,10 +92,22 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem(
checkSerialization(DeltaPropagation(address1, Map( checkSerialization(DeltaPropagation(address1, Map(
"A" Delta(DataEnvelope(delta1), 1L, 1L), "A" Delta(DataEnvelope(delta1), 1L, 1L),
"B" Delta(DataEnvelope(delta2), 3L, 5L)))) "B" Delta(DataEnvelope(delta2), 3L, 5L))))
checkSerialization(new DurableDataEnvelope(data1)) checkSerialization(new DurableDataEnvelope(data1))
checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning = Map( val pruning = Map(
address1 PruningPerformed(System.currentTimeMillis()), address1 PruningPerformed(System.currentTimeMillis()),
address3 PruningInitialized(address2, Set(address1.address)))))) address3 PruningInitialized(address2, Set(address1.address)))
val deserializedDurableDataEnvelope =
checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning,
deltaVersions = VersionVector(address1, 13L))))
// equals of DurableDataEnvelope is only checking the data, PruningPerformed
// should be serialized
val expectedPruning = pruning.filter {
case (_, _: PruningPerformed) true
case _ false
}
deserializedDurableDataEnvelope.dataEnvelope.pruning should ===(expectedPruning)
deserializedDurableDataEnvelope.dataEnvelope.deltaVersions.size should ===(0)
} }
} }