diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 3db36d2230..7af9bfb55a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -37,9 +37,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri classOf[InternalClusterAction.Join] → { case bytes ⇒ val m = cm.Join.parseFrom(bytes) + val roles = Set.empty[String] ++ m.getRolesList.asScala InternalClusterAction.Join( uniqueAddressFromProto(m.getNode), - Set.empty[String] ++ m.getRolesList.asScala) + if (roles.find(_.startsWith(ClusterSettings.DcRolePrefix)).isDefined) roles + else roles + (ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter) + ) }, classOf[InternalClusterAction.Welcome] → { case bytes ⇒ @@ -363,7 +366,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri roles += role } - if (!containsDc) roles + (ClusterSettings.DcRolePrefix + "default") + if (!containsDc) roles + (ClusterSettings.DcRolePrefix + ClusterSettings.DefaultDataCenter) else roles } diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index d1aeeda936..26f84b678b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -49,7 +49,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec( val uniqueAddress = UniqueAddress(address, 17L) val address2 = Address("akka.tcp", "system", "other.host.org", 4711) val uniqueAddress2 = UniqueAddress(address2, 18L) - checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar"))) + checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar", "dc-A"))) checkSerialization(ClusterUserAction.Leave(address)) checkSerialization(ClusterUserAction.Down(address)) checkSerialization(InternalClusterAction.InitJoin) @@ -115,11 +115,16 @@ class ClusterMessageSerializerSpec extends AkkaSpec( } - "add a default data center role if none is present" in { + "add a default data center role to gossip if none is present" in { val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1)))) env.gossip.members.head.roles should be(Set(ClusterSettings.DcRolePrefix + "default")) env.gossip.members.tail.head.roles should be(Set("r1", ClusterSettings.DcRolePrefix + "foo")) } + + "add a default data center role to internal join action if none is present" in { + val join = roundtrip(InternalClusterAction.Join(a1.uniqueAddress, Set())) + join.roles should be(Set(ClusterSettings.DcRolePrefix + "default")) + } } "Cluster router pool" must { "be serializable with no role" in {