pekko/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala

308 lines
14 KiB
Scala
Raw Normal View History

/**
2017-01-04 17:37:10 +01:00
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.ddata.protobuf
import java.util.Base64
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.cluster.ddata._
import akka.cluster.ddata.Replicator.Internal._
import akka.testkit.TestKit
import akka.cluster.UniqueAddress
import akka.remote.RARP
import com.typesafe.config.ConfigFactory
2017-03-20 10:42:38 +01:00
import akka.actor.Props
class ReplicatedDataSerializerSpec extends TestKit(ActorSystem(
"ReplicatedDataSerializerSpec",
ConfigFactory.parseString("""
akka.actor.provider=cluster
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port = 0
2017-03-20 10:42:38 +01:00
akka.actor {
serialize-messages = off
serialize-creators = off
allow-java-serialization = off
}
"""))) with WordSpecLike with Matchers with BeforeAndAfterAll {
val serializer = new ReplicatedDataSerializer(system.asInstanceOf[ExtendedActorSystem])
val Protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp"
val address1 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4711), 1L)
val address2 = UniqueAddress(Address(Protocol, system.name, "other.host.org", 4711), 2L)
val address3 = UniqueAddress(Address(Protocol, system.name, "some.host.org", 4712), 3L)
2017-03-20 10:42:38 +01:00
val ref1 = system.actorOf(Props.empty, "ref1")
val ref2 = system.actorOf(Props.empty, "ref2")
val ref3 = system.actorOf(Props.empty, "ref3")
override def afterAll {
shutdown()
}
/**
* Given a blob created with the previous serializer (with only string keys for maps). If we deserialize it and then
* serialize it again and arive at the same BLOB we can assume that we are compatible in both directions.
*/
def checkCompatibility(oldBlobAsBase64: String, obj: AnyRef): Unit = {
val oldBlob = Base64.getDecoder.decode(oldBlobAsBase64)
val deserialized = serializer.fromBinary(oldBlob, serializer.manifest(obj))
val newBlob = serializer.toBinary(deserialized)
newBlob should equal(oldBlob)
}
def checkSerialization(obj: AnyRef): Int = {
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should be(obj)
blob.length
}
def checkSameContent(a: AnyRef, b: AnyRef): Unit = {
a should be(b)
val blobA = serializer.toBinary(a)
val blobB = serializer.toBinary(b)
blobA.toSeq should be(blobB.toSeq)
}
"ReplicatedDataSerializer" must {
"serialize GSet" in {
checkSerialization(GSet())
checkSerialization(GSet() + "a")
checkSerialization(GSet() + "a" + "b")
checkSerialization(GSet() + 1 + 2 + 3)
2017-03-20 10:42:38 +01:00
checkSerialization(GSet() + ref1 + ref2)
2017-03-20 10:42:38 +01:00
checkSerialization(GSet() + 1L + "2" + 3 + ref1)
checkSameContent(GSet() + "a" + "b", GSet() + "a" + "b")
checkSameContent(GSet() + "a" + "b", GSet() + "b" + "a")
2017-03-20 10:42:38 +01:00
checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref2 + ref1 + ref3)
checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref3 + ref2 + ref1)
}
"serialize ORSet" in {
checkSerialization(ORSet())
checkSerialization(ORSet().add(address1, "a"))
checkSerialization(ORSet().add(address1, "a").add(address2, "a"))
checkSerialization(ORSet().add(address1, "a").remove(address2, "a"))
checkSerialization(ORSet().add(address1, "a").add(address2, "b").remove(address1, "a"))
checkSerialization(ORSet().add(address1, 1).add(address2, 2))
checkSerialization(ORSet().add(address1, 1L).add(address2, 2L))
2017-03-20 10:42:38 +01:00
checkSerialization(ORSet().add(address1, "a").add(address2, 2).add(address3, 3L).add(address3, ref3))
val s1 = ORSet().add(address1, "a").add(address2, "b")
val s2 = ORSet().add(address2, "b").add(address1, "a")
!cdd #18328 optimize VersionVector for size 1 (typical dots) AFTER: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 2007.939 ± 74.673 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 337.110 ± 15.055 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 223.600 ± 8.403 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 46.697 ± 2.136 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 2542.537 ± 120.697 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 365.694 ± 17.571 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 216.323 ± 9.446 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 49.563 ± 2.725 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 9883.186 ± 725.672 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 3266.528 ± 189.993 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 3206.017 ± 124.623 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2709.031 ± 162.182 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 572.704 ± 21.504 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 249.226 ± 12.324 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 170.560 ± 10.320 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 46.373 ± 1.800 ops/ms BEFORE: [info] Benchmark (set1Size) Mode Cnt Score Error Units [info] ORSetMergeBenchmark.mergeAddFromBothNodes 1 thrpt 10 885.664 ± 99.718 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 10 thrpt 10 304.617 ± 4.755 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 20 thrpt 10 200.977 ± 3.708 ops/ms [info] ORSetMergeBenchmark.mergeAddFromBothNodes 100 thrpt 10 47.879 ± 4.352 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 1 thrpt 10 1586.848 ± 27.476 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 10 thrpt 10 354.408 ± 4.772 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 20 thrpt 10 210.563 ± 32.914 ops/ms [info] ORSetMergeBenchmark.mergeAddFromOtherNode 100 thrpt 10 52.750 ± 0.698 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 1 thrpt 10 3915.817 ± 420.643 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 10 thrpt 10 2369.476 ± 250.336 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 20 thrpt 10 2378.924 ± 47.160 ops/ms [info] ORSetMergeBenchmark.mergeAddFromSameNode 100 thrpt 10 2167.841 ± 20.339 ops/ms [info] ORSetMergeBenchmark.mergeComplex 1 thrpt 10 387.261 ± 8.820 ops/ms [info] ORSetMergeBenchmark.mergeComplex 10 thrpt 10 212.661 ± 4.802 ops/ms [info] ORSetMergeBenchmark.mergeComplex 20 thrpt 10 151.512 ± 2.627 ops/ms [info] ORSetMergeBenchmark.mergeComplex 100 thrpt 10 40.976 ± 2.014 ops/ms * use subtype polymorphism for VersionVector tmp
2015-09-21 13:09:19 +02:00
checkSameContent(s1.merge(s2), s2.merge(s1))
val s3 = ORSet().add(address1, "a").add(address2, 17).remove(address3, 17)
val s4 = ORSet().add(address2, 17).remove(address3, 17).add(address1, "a")
checkSameContent(s3.merge(s4), s4.merge(s3))
}
"serialize ORSet delta" in {
checkSerialization(ORSet().add(address1, "a").delta.get)
checkSerialization(ORSet().add(address1, "a").resetDelta.remove(address2, "a").delta.get)
checkSerialization(ORSet().add(address1, "a").remove(address2, "a").delta.get)
checkSerialization(ORSet().add(address1, "a").resetDelta.clear(address2).delta.get)
checkSerialization(ORSet().add(address1, "a").clear(address2).delta.get)
}
"serialize large GSet" in {
val largeSet = (10000 until 20000).foldLeft(GSet.empty[String]) {
case (acc, n) acc.resetDelta.add(n.toString)
}
val numberOfBytes = checkSerialization(largeSet)
info(s"size of GSet with ${largeSet.size} elements: $numberOfBytes bytes")
numberOfBytes should be <= (80000)
}
"serialize large ORSet" in {
val largeSet = (10000 until 20000).foldLeft(ORSet.empty[String]) {
case (acc, n)
val address = (n % 3) match {
case 0 address1
case 1 address2
case 2 address3
}
acc.resetDelta.add(address, n.toString)
}
val numberOfBytes = checkSerialization(largeSet)
// note that ORSet is compressed, and therefore smaller than GSet
info(s"size of ORSet with ${largeSet.size} elements: $numberOfBytes bytes")
numberOfBytes should be <= (50000)
}
"serialize Flag" in {
checkSerialization(Flag())
checkSerialization(Flag().switchOn)
}
"serialize LWWRegister" in {
checkSerialization(LWWRegister(address1, "value1", LWWRegister.defaultClock))
checkSerialization(LWWRegister(address1, "value2", LWWRegister.defaultClock[String])
.withValue(address2, "value3", LWWRegister.defaultClock[String]))
}
"serialize GCounter" in {
checkSerialization(GCounter())
checkSerialization(GCounter().increment(address1, 3))
checkSerialization(GCounter().increment(address1, 2).increment(address2, 5))
checkSameContent(
GCounter().increment(address1, 2).increment(address2, 5),
GCounter().increment(address2, 5).increment(address1, 1).increment(address1, 1))
checkSameContent(
GCounter().increment(address1, 2).increment(address3, 5),
GCounter().increment(address3, 5).increment(address1, 2))
}
"serialize PNCounter" in {
checkSerialization(PNCounter())
checkSerialization(PNCounter().increment(address1, 3))
checkSerialization(PNCounter().increment(address1, 3).decrement(address1, 1))
checkSerialization(PNCounter().increment(address1, 2).increment(address2, 5))
checkSerialization(PNCounter().increment(address1, 2).increment(address2, 5).decrement(address1, 1))
checkSameContent(
PNCounter().increment(address1, 2).increment(address2, 5),
PNCounter().increment(address2, 5).increment(address1, 1).increment(address1, 1))
checkSameContent(
PNCounter().increment(address1, 2).increment(address3, 5),
PNCounter().increment(address3, 5).increment(address1, 2))
checkSameContent(
PNCounter().increment(address1, 2).decrement(address1, 1).increment(address3, 5),
PNCounter().increment(address3, 5).increment(address1, 2).decrement(address1, 1))
}
"serialize ORMap" in {
checkSerialization(ORMap().put(address1, "a", GSet() + "A").put(address2, "b", GSet() + "B"))
checkSerialization(ORMap().put(address1, 1, GSet() + "A"))
checkSerialization(ORMap().put(address1, 1L, GSet() + "A"))
// use Flag for this test as object key because it is serializable
checkSerialization(ORMap().put(address1, Flag(), GSet() + "A"))
}
2017-02-23 01:20:33 +01:00
"serialize ORMap delta" in {
checkSerialization(ORMap().put(address1, "a", GSet() + "A").put(address2, "b", GSet() + "B").delta.get)
checkSerialization(ORMap().put(address1, "a", GSet() + "A").resetDelta.remove(address2, "a").delta.get)
checkSerialization(ORMap().put(address1, "a", GSet() + "A").remove(address2, "a").delta.get)
checkSerialization(ORMap().put(address1, 1, GSet() + "A").delta.get)
checkSerialization(ORMap().put(address1, 1L, GSet() + "A").delta.get)
checkSerialization(ORMap.empty[String, ORSet[String]]
.put(address1, "a", ORSet.empty[String].add(address1, "A"))
.put(address2, "b", ORSet.empty[String].add(address2, "B"))
.updated(address1, "a", ORSet.empty[String])(_.add(address1, "C")).delta.get)
checkSerialization(ORMap.empty[String, ORSet[String]]
.resetDelta
.updated(address1, "a", ORSet.empty[String])(_.add(address1, "C")).delta.get)
// use Flag for this test as object key because it is serializable
checkSerialization(ORMap().put(address1, Flag(), GSet() + "A").delta.get)
}
"be compatible with old ORMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOOax8jlyaXMJc8lzMWXX5KRWqSXkV9copdflC7wXEWUiYGBQRaIGQQkuJS45LiEuHiL83NTUdQwwtWIC6kQpUqVKAulGBOlGJOE+LkYE4W4uJi5GB0FuJUYnUACSRABJ7AAAOLO3C3DAAAA"
checkCompatibility(oldBlobAsBase64, ORMap())
}
"serialize LWWMap" in {
checkSerialization(LWWMap())
checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, 1, "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, 1L, "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, Flag(), "value1", LWWRegister.defaultClock[Any]))
checkSerialization(LWWMap().put(address1, "a", "value1", LWWRegister.defaultClock[Any])
.put(address2, "b", 17, LWWRegister.defaultClock[Any]))
}
"be compatible with old LWWMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC0kJEqZJiTBSy4AISxhwzrl2fuyRMiIAWKS4utrLEnNJUQwERAD96/peLAAAA"
checkCompatibility(oldBlobAsBase64, LWWMap())
}
"serialize PNCounterMap" in {
checkSerialization(PNCounterMap())
checkSerialization(PNCounterMap().increment(address1, "a", 3))
checkSerialization(PNCounterMap().increment(address1, 1, 3))
checkSerialization(PNCounterMap().increment(address1, 1L, 3))
checkSerialization(PNCounterMap().increment(address1, Flag(), 3))
checkSerialization(PNCounterMap().increment(address1, "a", 3).decrement(address2, "a", 2).
increment(address2, "b", 5))
}
"be compatible with old PNCounterMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwC8kJEqZJiTBTS4wISmlyqXMqE1AsxMgsxAADYQs/9gQAAAA=="
checkCompatibility(oldBlobAsBase64, PNCounterMap())
}
2015-06-18 16:17:53 +02:00
"serialize ORMultiMap" in {
checkSerialization(ORMultiMap())
checkSerialization(ORMultiMap().addBinding(address1, "a", "A"))
checkSerialization(ORMultiMap().addBinding(address1, 1, "A"))
checkSerialization(ORMultiMap().addBinding(address1, 1L, "A"))
checkSerialization(ORMultiMap().addBinding(address1, Flag(), "A"))
checkSerialization(ORMultiMap.empty[String, String]
2015-06-18 16:17:53 +02:00
.addBinding(address1, "a", "A1")
.put(address2, "b", Set("B1", "B2", "B3"))
.addBinding(address2, "a", "A2"))
val m1 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2")
val m2 = ORMultiMap.empty[String, String].put(address2, "b", Set("B1", "B2", "B3"))
2015-06-18 16:17:53 +02:00
checkSameContent(m1.merge(m2), m2.merge(m1))
}
"be compatible with old ORMultiMap serialization" in {
// Below blob was created with previous version of the serializer
val oldBlobAsBase64 = "H4sIAAAAAAAAAOPy51LhUuKS4xLi4i3Oz03Vy8gvLtHLL0oXeK4iysjAwCALxAwCakJEqZJiTBQK4QISxJmqSpSpqlKMjgDlsHjDpwAAAA=="
checkCompatibility(oldBlobAsBase64, ORMultiMap())
}
2017-02-23 01:20:33 +01:00
"serialize ORMultiMap withValueDeltas" in {
checkSerialization(ORMultiMap._emptyWithValueDeltas)
checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, "a", "A"))
checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, 1, "A"))
checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, 1L, "A"))
checkSerialization(ORMultiMap._emptyWithValueDeltas.addBinding(address1, Flag(), "A"))
2017-03-20 12:22:49 +01:00
checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String].addBinding(address1, "a", "A").remove(address1, "a").delta.get)
2017-02-23 01:20:33 +01:00
checkSerialization(ORMultiMap.emptyWithValueDeltas[String, String]
.addBinding(address1, "a", "A1")
.put(address2, "b", Set("B1", "B2", "B3"))
.addBinding(address2, "a", "A2"))
val m1 = ORMultiMap.emptyWithValueDeltas[String, String].addBinding(address1, "a", "A1").addBinding(address2, "a", "A2")
val m2 = ORMultiMap.emptyWithValueDeltas[String, String].put(address2, "b", Set("B1", "B2", "B3"))
checkSameContent(m1.merge(m2), m2.merge(m1))
}
"serialize DeletedData" in {
checkSerialization(DeletedData)
}
"serialize VersionVector" in {
checkSerialization(VersionVector())
checkSerialization(VersionVector().increment(address1))
checkSerialization(VersionVector().increment(address1).increment(address2))
val v1 = VersionVector().increment(address1).increment(address1)
val v2 = VersionVector().increment(address2)
checkSameContent(v1.merge(v2), v2.merge(v1))
}
}
}