From 2044c1712be3acf0dab3608169315b9e07cd4560 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 26 Jun 2017 16:28:44 +0200 Subject: [PATCH] Support cross team in ClusterSingletonProxy, #23230 --- .../singleton/ClusterSingletonManager.scala | 9 ++-- .../singleton/ClusterSingletonProxy.scala | 27 +++++++++--- .../singleton/TeamSingletonManagerSpec.scala | 41 +++++++++++++------ 3 files changed, 54 insertions(+), 23 deletions(-) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 3578d22abe..a0e9f1c868 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -30,6 +30,7 @@ import akka.Done import akka.actor.CoordinatedShutdown import akka.pattern.ask import akka.util.Timeout +import akka.cluster.ClusterSettings object ClusterSingletonManagerSettings { @@ -256,12 +257,10 @@ object ClusterSingletonManager { } override def postStop(): Unit = cluster.unsubscribe(self) - private val selfTeam = "team-" + cluster.settings.Team + private val selfTeam = ClusterSettings.TeamRolePrefix + cluster.settings.Team - def matchingRole(member: Member): Boolean = member.hasRole(selfTeam) && (role match { - case None ⇒ true - case Some(r) ⇒ member.hasRole(r) - }) + def matchingRole(member: Member): Boolean = + member.hasRole(selfTeam) && role.forall(member.hasRole) def trackChange(block: () ⇒ Unit): Unit = { val before = membersByAge.headOption diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 0cb2c06b56..a31dc53035 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -19,6 +19,7 @@ import com.typesafe.config.Config import akka.actor.NoSerializationVerificationNeeded import akka.event.Logging import akka.util.MessageBuffer +import akka.cluster.ClusterSettings object ClusterSingletonProxySettings { @@ -63,6 +64,7 @@ object ClusterSingletonProxySettings { /** * @param singletonName The actor name of the singleton actor that is started by the [[ClusterSingletonManager]]. * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. + * @param team The team of the cluster nodes where the singleton is running. If None then the same team as current node. * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. * @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages * and deliver them when the singleton is identified. When the buffer is full old messages will be dropped @@ -72,9 +74,18 @@ object ClusterSingletonProxySettings { final class ClusterSingletonProxySettings( val singletonName: String, val role: Option[String], + val team: Option[String], val singletonIdentificationInterval: FiniteDuration, val bufferSize: Int) extends NoSerializationVerificationNeeded { + // for backwards compatibility + def this( + singletonName: String, + role: Option[String], + singletonIdentificationInterval: FiniteDuration, + bufferSize: Int) = + this(singletonName, role, None, singletonIdentificationInterval, bufferSize) + require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000") def withSingletonName(name: String): ClusterSingletonProxySettings = copy(singletonName = name) @@ -83,6 +94,8 @@ final class ClusterSingletonProxySettings( def withRole(role: Option[String]): ClusterSingletonProxySettings = copy(role = role) + def withTeam(team: String): ClusterSingletonProxySettings = copy(team = Some(team)) + def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings = copy(singletonIdentificationInterval = singletonIdentificationInterval) @@ -92,9 +105,10 @@ final class ClusterSingletonProxySettings( private def copy( singletonName: String = singletonName, role: Option[String] = role, + team: Option[String] = team, singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, bufferSize: Int = bufferSize): ClusterSingletonProxySettings = - new ClusterSingletonProxySettings(singletonName, role, singletonIdentificationInterval, bufferSize) + new ClusterSingletonProxySettings(singletonName, role, team, singletonIdentificationInterval, bufferSize) } object ClusterSingletonProxy { @@ -162,12 +176,13 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste identifyTimer = None } - private val selfTeam = "team-" + cluster.settings.Team + private val targetTeam = settings.team match { + case Some(t) ⇒ ClusterSettings.TeamRolePrefix + t + case None ⇒ ClusterSettings.TeamRolePrefix + cluster.settings.Team + } - def matchingRole(member: Member): Boolean = member.hasRole(selfTeam) && (role match { - case None ⇒ true - case Some(r) ⇒ member.hasRole(r) - }) + def matchingRole(member: Member): Boolean = + member.hasRole(targetTeam) && role.forall(member.hasRole) def handleInitial(state: CurrentClusterState): Unit = { trackChange { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala index f3b4ee65c3..dd63e8ae70 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/TeamSingletonManagerSpec.scala @@ -25,19 +25,22 @@ object TeamSingletonManagerSpec extends MultiNodeConfig { akka.actor.serialize-creators = off akka.remote.log-remote-lifecycle-events = off""")) - nodeConfig(controller)( - ConfigFactory.parseString("akka.cluster.team = one\n" + - "akka.cluster.roles = [ ]")) - nodeConfig(first) { - ConfigFactory.parseString("akka.cluster.team = one\n" + - "akka.cluster.roles = [ worker ]") + nodeConfig(controller) { + ConfigFactory.parseString(""" + akka.cluster.team = one + akka.cluster.roles = []""") + } + + nodeConfig(first) { + ConfigFactory.parseString(""" + akka.cluster.team = one + akka.cluster.roles = [ worker ]""") + } + nodeConfig(second, third) { + ConfigFactory.parseString(""" + akka.cluster.team = two + akka.cluster.roles = [ worker ]""") } - nodeConfig(second)( - ConfigFactory.parseString("akka.cluster.team = two\n" + - "akka.cluster.roles = [ worker ]")) - nodeConfig(third)( - ConfigFactory.parseString("akka.cluster.team = two\n" + - "akka.cluster.roles = [ worker ]")) } class TeamSingletonManagerMultiJvmNode1 extends TeamSingletonManagerSpec @@ -106,5 +109,19 @@ abstract class TeamSingletonManagerSpec extends MultiNodeSpec(TeamSingletonManag enterBarrier("after-1") } + "be able to use proxy across different team" in { + runOn(third) { + val proxy = system.actorOf(ClusterSingletonProxy.props( + "/user/singletonManager", + ClusterSingletonProxySettings(system).withRole(worker).withTeam("one"))) + proxy ! TeamSingleton.Ping + val pong = expectMsgType[TeamSingleton.Pong](10.seconds) + pong.fromTeam should ===("one") + pong.roles should contain(worker) + pong.roles should contain("team-one") + } + enterBarrier("after-1") + } + } }