Clarifications of cluster singleton docs, see #2895
This commit is contained in:
parent
b0a9b9b9c5
commit
c48d9c058e
3 changed files with 67 additions and 36 deletions
|
|
@ -11,6 +11,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Address
|
||||
import akka.actor.Props
|
||||
import akka.actor.RootActorPath
|
||||
import akka.cluster.Cluster
|
||||
|
|
@ -141,6 +142,29 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// documentation of how to keep track of the leader address in user land
|
||||
//#singleton-proxy
|
||||
class ConsumerProxy extends Actor {
|
||||
// subscribe to LeaderChanged, re-subscribe when restart
|
||||
override def preStart(): Unit =
|
||||
Cluster(context.system).subscribe(self, classOf[LeaderChanged])
|
||||
override def postStop(): Unit =
|
||||
Cluster(context.system).unsubscribe(self)
|
||||
|
||||
var leaderAddress: Option[Address] = None
|
||||
|
||||
def receive = {
|
||||
case state: CurrentClusterState ⇒ leaderAddress = state.leader
|
||||
case LeaderChanged(leader) ⇒ leaderAddress = leader
|
||||
case other => consumer foreach { _ forward other }
|
||||
}
|
||||
|
||||
def consumer: Option[ActorRef] =
|
||||
leaderAddress map (a => context.actorFor(RootActorPath(a) /
|
||||
"user" / "singleton" / "consumer"))
|
||||
}
|
||||
//#singleton-proxy
|
||||
|
||||
}
|
||||
|
||||
class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec
|
||||
|
|
@ -158,14 +182,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
//#sort-cluster-roles
|
||||
// Sort the roles in the order used by the cluster.
|
||||
lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = {
|
||||
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
||||
import Member.addressOrdering
|
||||
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address)
|
||||
def compare(x: RoleName, y: RoleName) =
|
||||
addressOrdering.compare(node(x).address, node(y).address)
|
||||
}
|
||||
roles.filterNot(_ == controller).toVector.sorted
|
||||
}
|
||||
//#sort-cluster-roles
|
||||
|
||||
def queue: ActorRef = system.actorFor(node(controller) / "user" / "queue")
|
||||
|
||||
|
|
@ -180,18 +207,8 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
//#create-singleton-manager
|
||||
}
|
||||
|
||||
def consumer(leader: RoleName): ActorRef = {
|
||||
// the reason for this complicated way of creating the path is to illustrate
|
||||
// in documentation how it's typically done in user land
|
||||
LeaderChanged(Some(node(leader).address)) match {
|
||||
//#singleton-actorFor
|
||||
case LeaderChanged(Some(leaderAddress)) ⇒
|
||||
val path = RootActorPath(leaderAddress) / "user" / "singleton" / "consumer"
|
||||
val consumer = system.actorFor(path)
|
||||
//#singleton-actorFor
|
||||
consumer
|
||||
}
|
||||
}
|
||||
def consumer(leader: RoleName): ActorRef =
|
||||
system.actorFor(RootActorPath(node(leader).address) / "user" / "singleton" / "consumer")
|
||||
|
||||
def verify(leader: RoleName, msg: Int, expectedCurrent: Int): Unit = {
|
||||
enterBarrier("before-" + leader.name + "-verified")
|
||||
|
|
@ -284,12 +301,14 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
}
|
||||
|
||||
"hand over when leader leaves in 6 nodes cluster " in within(20 seconds) {
|
||||
//#test-leave
|
||||
val leaveRole = sortedClusterRoles(0)
|
||||
val newLeaderRole = sortedClusterRoles(1)
|
||||
|
||||
runOn(leaveRole) {
|
||||
Cluster(system) leave node(leaveRole).address
|
||||
}
|
||||
//#test-leave
|
||||
|
||||
verify(newLeaderRole, msg = 5, expectedCurrent = 4)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue