diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index c48b8f4750..93a4f68cde 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -30,6 +30,7 @@ import java.io.NotSerializableException import akka.actor.Address import akka.cluster.ddata.VersionVector import akka.annotation.InternalApi +import akka.cluster.ddata.PruningState.PruningPerformed /** * INTERNAL API @@ -424,10 +425,8 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) Changed(key)(data) } - private def dataEnvelopeToProto(dataEnvelope: DataEnvelope): dm.DataEnvelope = { - val dataEnvelopeBuilder = dm.DataEnvelope.newBuilder(). - setData(otherMessageToProto(dataEnvelope.data)) - dataEnvelope.pruning.foreach { + private def pruningToProto(entries: Map[UniqueAddress, PruningState]): Iterable[dm.DataEnvelope.PruningEntry] = { + entries.map { case (removedAddress, state) ⇒ val b = dm.DataEnvelope.PruningEntry.newBuilder(). setRemovedAddress(uniqueAddressToProto(removedAddress)) @@ -442,8 +441,15 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) // wire backwards compatibility with 2.4.16 (required field) b.setOwnerAddress(uniqueAddressToProto(dummyAddress)) } - dataEnvelopeBuilder.addPruning(b) + b.build() } + } + + private def dataEnvelopeToProto(dataEnvelope: DataEnvelope): dm.DataEnvelope = { + val dataEnvelopeBuilder = dm.DataEnvelope.newBuilder(). + setData(otherMessageToProto(dataEnvelope.data)) + + dataEnvelopeBuilder.addAllPruning(pruningToProto(dataEnvelope.pruning).asJava) if (!dataEnvelope.deltaVersions.isEmpty) dataEnvelopeBuilder.setDeltaVersions(versionVectorToProto(dataEnvelope.deltaVersions)) @@ -517,9 +523,18 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) } private def durableDataEnvelopeToProto(durableDataEnvelope: DurableDataEnvelope): dm.DurableDataEnvelope = { - dm.DurableDataEnvelope.newBuilder() + // only keep the PruningPerformed entries + val pruning = durableDataEnvelope.dataEnvelope.pruning.filter { + case (_, _: PruningPerformed) ⇒ true + case _ ⇒ false + } + + val builder = dm.DurableDataEnvelope.newBuilder() .setData(otherMessageToProto(durableDataEnvelope.data)) - .build() + + builder.addAllPruning(pruningToProto(pruning).asJava) + + builder.build() } private def durableDataEnvelopeFromBinary(bytes: Array[Byte]): DurableDataEnvelope = @@ -529,7 +544,7 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val data = otherMessageFromProto(durableDataEnvelope.getData).asInstanceOf[ReplicatedData] val pruning = pruningFromProto(durableDataEnvelope.getPruningList) - new DurableDataEnvelope(data) + new DurableDataEnvelope(DataEnvelope(data, pruning)) } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index d920da41af..b1c4d03ea0 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -25,6 +25,7 @@ import akka.remote.RARP import com.typesafe.config.ConfigFactory import akka.cluster.ddata.DurableStore.DurableDataEnvelope import akka.cluster.ddata.GCounter +import akka.cluster.ddata.VersionVector class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( "ReplicatorMessageSerializerSpec", @@ -48,10 +49,11 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( shutdown() } - def checkSerialization(obj: AnyRef): Unit = { + def checkSerialization[T <: AnyRef](obj: T): T = { val blob = serializer.toBinary(obj) - val ref = serializer.fromBinary(blob, serializer.manifest(obj)) - ref should be(obj) + val deserialized = serializer.fromBinary(blob, serializer.manifest(obj)) + deserialized should be(obj) + deserialized.asInstanceOf[T] } "ReplicatorMessageSerializer" must { @@ -90,10 +92,22 @@ class ReplicatorMessageSerializerSpec extends TestKit(ActorSystem( checkSerialization(DeltaPropagation(address1, Map( "A" → Delta(DataEnvelope(delta1), 1L, 1L), "B" → Delta(DataEnvelope(delta2), 3L, 5L)))) + checkSerialization(new DurableDataEnvelope(data1)) - checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning = Map( + val pruning = Map( address1 → PruningPerformed(System.currentTimeMillis()), - address3 → PruningInitialized(address2, Set(address1.address)))))) + address3 → PruningInitialized(address2, Set(address1.address))) + val deserializedDurableDataEnvelope = + checkSerialization(new DurableDataEnvelope(DataEnvelope(data1, pruning, + deltaVersions = VersionVector(address1, 13L)))) + // equals of DurableDataEnvelope is only checking the data, PruningPerformed + // should be serialized + val expectedPruning = pruning.filter { + case (_, _: PruningPerformed) ⇒ true + case _ ⇒ false + } + deserializedDurableDataEnvelope.dataEnvelope.pruning should ===(expectedPruning) + deserializedDurableDataEnvelope.dataEnvelope.deltaVersions.size should ===(0) } }