From 58db22ca1e703c79e14094e90d680cda95d38efa Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Tue, 4 Jul 2017 05:52:03 -0700 Subject: [PATCH] Introduce missing team role if necessary (#23276) * Introduce missing team role if necessary (#23243) When receiving gossip from a node that did not contain any team information (such as gossip from a node running a previous version of Akka), add the default team role during deserialization. * Simpler implementation of adding default role * More efficient `rolesFromProto` Now actually outperforms the previous implementation. Still room for improvement as this probably checks for duplicates in the set on each add, but creating our own array-backed set here is probably going overboard :). * Fixes following rebase --- .../protobuf/ClusterMessageSerializer.scala | 18 ++++++++++++- .../scala/akka/cluster/QuickRestartSpec.scala | 2 +- .../ClusterMessageSerializerSpec.scala | 26 ++++++++++++------- 3 files changed, 35 insertions(+), 11 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 4896c706e3..25a9e69eef 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -346,7 +346,23 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri def memberFromProto(member: cm.Member) = new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber), - member.getRolesIndexesList.asScala.map(roleMapping(_))(breakOut)) + rolesFromProto(member.getRolesIndexesList.asScala)) + + def rolesFromProto(roleIndexes: Seq[Integer]): Set[String] = { + var containsDc = false + var roles = Set.empty[String] + + for { + roleIndex ← roleIndexes + role = roleMapping(roleIndex) + } { + if (role.startsWith(ClusterSettings.TeamRolePrefix)) containsDc = true + roles += role + } + + if (!containsDc) roles + (ClusterSettings.TeamRolePrefix + "default") + else roles + } def tombstoneFromProto(tombstone: cm.Tombstone): (UniqueAddress, Long) = (addressMapping(tombstone.getAddressIndex), tombstone.getTimestamp) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala index df14986ac1..9da57760ca 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala @@ -91,7 +91,7 @@ abstract class QuickRestartSpec Cluster(system).state.members.size should ===(totalNumberOfNodes) Cluster(system).state.members.map(_.status == MemberStatus.Up) // use the role to test that it is the new incarnation that joined, sneaky - Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", "team-default")) + Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", ClusterSettings.TeamRolePrefix + "default")) } } enterBarrier("members-up-" + n) diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index db327a7fbc..fd4c323309 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -16,16 +16,18 @@ class ClusterMessageSerializerSpec extends AkkaSpec( val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) - def checkSerialization(obj: AnyRef): Unit = { + def roundtrip[T <: AnyRef](obj: T): T = { val blob = serializer.toBinary(obj) - val ref = serializer.fromBinary(blob, obj.getClass) - obj match { - case env: GossipEnvelope ⇒ - val env2 = ref.asInstanceOf[GossipEnvelope] + serializer.fromBinary(blob, obj.getClass).asInstanceOf[T] + } + + def checkSerialization(obj: AnyRef): Unit = { + (obj, roundtrip(obj)) match { + case (env: GossipEnvelope, env2: GossipEnvelope) ⇒ env2.from should ===(env.from) env2.to should ===(env.to) env2.gossip should ===(env.gossip) - case _ ⇒ + case (_, ref) ⇒ ref should ===(obj) } @@ -35,10 +37,10 @@ class ClusterMessageSerializerSpec extends AkkaSpec( val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty) val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1")) - val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set("r2")) - val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1", "r2")) + val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty, "foo") + val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1"), "foo") val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3")) - val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r2", "r3")) + val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r3"), "foo") "ClusterMessages" must { @@ -77,6 +79,12 @@ class ClusterMessageSerializerSpec extends AkkaSpec( checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) } + + "add a default team role if none is present" in { + val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1)))) + env.gossip.members.head.roles should be(Set(ClusterSettings.TeamRolePrefix + "default")) + env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.TeamRolePrefix + "foo")) + } } "Cluster router pool" must { "be serializable" in {