Introduce missing team role if necessary (#23276)
* Introduce missing team role if necessary (#23243) When receiving gossip from a node that did not contain any team information (such as gossip from a node running a previous version of Akka), add the default team role during deserialization. * Simpler implementation of adding default role * More efficient `rolesFromProto` Now actually outperforms the previous implementation. Still room for improvement as this probably checks for duplicates in the set on each add, but creating our own array-backed set here is probably going overboard :). * Fixes following rebase
This commit is contained in:
parent
165831b064
commit
58db22ca1e
3 changed files with 35 additions and 11 deletions
|
|
@ -346,7 +346,23 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
|
||||||
|
|
||||||
def memberFromProto(member: cm.Member) =
|
def memberFromProto(member: cm.Member) =
|
||||||
new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber),
|
new Member(addressMapping(member.getAddressIndex), member.getUpNumber, memberStatusFromInt(member.getStatus.getNumber),
|
||||||
member.getRolesIndexesList.asScala.map(roleMapping(_))(breakOut))
|
rolesFromProto(member.getRolesIndexesList.asScala))
|
||||||
|
|
||||||
|
def rolesFromProto(roleIndexes: Seq[Integer]): Set[String] = {
|
||||||
|
var containsDc = false
|
||||||
|
var roles = Set.empty[String]
|
||||||
|
|
||||||
|
for {
|
||||||
|
roleIndex ← roleIndexes
|
||||||
|
role = roleMapping(roleIndex)
|
||||||
|
} {
|
||||||
|
if (role.startsWith(ClusterSettings.TeamRolePrefix)) containsDc = true
|
||||||
|
roles += role
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!containsDc) roles + (ClusterSettings.TeamRolePrefix + "default")
|
||||||
|
else roles
|
||||||
|
}
|
||||||
|
|
||||||
def tombstoneFromProto(tombstone: cm.Tombstone): (UniqueAddress, Long) =
|
def tombstoneFromProto(tombstone: cm.Tombstone): (UniqueAddress, Long) =
|
||||||
(addressMapping(tombstone.getAddressIndex), tombstone.getTimestamp)
|
(addressMapping(tombstone.getAddressIndex), tombstone.getTimestamp)
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,7 @@ abstract class QuickRestartSpec
|
||||||
Cluster(system).state.members.size should ===(totalNumberOfNodes)
|
Cluster(system).state.members.size should ===(totalNumberOfNodes)
|
||||||
Cluster(system).state.members.map(_.status == MemberStatus.Up)
|
Cluster(system).state.members.map(_.status == MemberStatus.Up)
|
||||||
// use the role to test that it is the new incarnation that joined, sneaky
|
// use the role to test that it is the new incarnation that joined, sneaky
|
||||||
Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", "team-default"))
|
Cluster(system).state.members.flatMap(_.roles) should ===(Set(s"round-$n", ClusterSettings.TeamRolePrefix + "default"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
enterBarrier("members-up-" + n)
|
enterBarrier("members-up-" + n)
|
||||||
|
|
|
||||||
|
|
@ -16,16 +16,18 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
|
||||||
|
|
||||||
val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
|
val serializer = new ClusterMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
|
||||||
|
|
||||||
def checkSerialization(obj: AnyRef): Unit = {
|
def roundtrip[T <: AnyRef](obj: T): T = {
|
||||||
val blob = serializer.toBinary(obj)
|
val blob = serializer.toBinary(obj)
|
||||||
val ref = serializer.fromBinary(blob, obj.getClass)
|
serializer.fromBinary(blob, obj.getClass).asInstanceOf[T]
|
||||||
obj match {
|
}
|
||||||
case env: GossipEnvelope ⇒
|
|
||||||
val env2 = ref.asInstanceOf[GossipEnvelope]
|
def checkSerialization(obj: AnyRef): Unit = {
|
||||||
|
(obj, roundtrip(obj)) match {
|
||||||
|
case (env: GossipEnvelope, env2: GossipEnvelope) ⇒
|
||||||
env2.from should ===(env.from)
|
env2.from should ===(env.from)
|
||||||
env2.to should ===(env.to)
|
env2.to should ===(env.to)
|
||||||
env2.gossip should ===(env.gossip)
|
env2.gossip should ===(env.gossip)
|
||||||
case _ ⇒
|
case (_, ref) ⇒
|
||||||
ref should ===(obj)
|
ref should ===(obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -35,10 +37,10 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
|
||||||
|
|
||||||
val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty)
|
val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty)
|
||||||
val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1"))
|
val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1"))
|
||||||
val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set("r2"))
|
val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty, "foo")
|
||||||
val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1", "r2"))
|
val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1"), "foo")
|
||||||
val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3"))
|
val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3"))
|
||||||
val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r2", "r3"))
|
val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r3"), "foo")
|
||||||
|
|
||||||
"ClusterMessages" must {
|
"ClusterMessages" must {
|
||||||
|
|
||||||
|
|
@ -77,6 +79,12 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
|
||||||
|
|
||||||
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
|
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"add a default team role if none is present" in {
|
||||||
|
val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1))))
|
||||||
|
env.gossip.members.head.roles should be(Set(ClusterSettings.TeamRolePrefix + "default"))
|
||||||
|
env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.TeamRolePrefix + "foo"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
"Cluster router pool" must {
|
"Cluster router pool" must {
|
||||||
"be serializable" in {
|
"be serializable" in {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue