=con #13869 Speedup startup of Cluster singleton

* avoid the hand-over/take-over attempts when starting the
  ClusterSingletonManager for the normal clase when the cluster is
  in a good shape, i.e. no exiting member that might run previous
  singleton instance
This commit is contained in:
Patrik Nordwall 2015-02-09 15:34:58 +01:00
parent ea2219950d
commit 2ecfa42801
3 changed files with 260 additions and 7 deletions

View file

@ -145,7 +145,7 @@ object ClusterSingletonManager {
/**
* The first event, corresponding to CurrentClusterState.
*/
final case class InitialOldestState(oldest: Option[Address], memberCount: Int)
final case class InitialOldestState(oldest: Option[Address], safeToBeOldest: Boolean)
final case class OldestChanged(oldest: Option[Address])
}
@ -191,8 +191,9 @@ object ClusterSingletonManager {
def handleInitial(state: CurrentClusterState): Unit = {
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m
m.status == MemberStatus.Up && matchingRole(m))
val initial = InitialOldestState(membersByAge.headOption.map(_.address), membersByAge.size)
(m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m))
val safeToBeOldest = !state.members.exists { m (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) }
val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest)
changes :+= initial
}
@ -421,10 +422,10 @@ class ClusterSingletonManager(
getNextOldestChanged()
stay
case Event(InitialOldestState(oldestOption, memberCount), _)
case Event(InitialOldestState(oldestOption, safeToBeOldest), _)
oldestChangedReceived = true
if (oldestOption == selfAddressOption && memberCount == 1)
// alone, oldest immediately
if (oldestOption == selfAddressOption && safeToBeOldest)
// oldest immediately
gotoOldest()
else if (oldestOption == selfAddressOption)
goto(BecomingOldest) using BecomingOldestData(None)
@ -592,7 +593,10 @@ class ClusterSingletonManager(
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
handOverTo foreach { _ ! HandOverDone }
if (selfExited || removed.contains(cluster.selfAddress))
if (removed.contains(cluster.selfAddress)) {
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
} else if (selfExited)
goto(End) using EndData
else
goto(Younger) using YoungerData(newOldest)
@ -612,6 +616,9 @@ class ClusterSingletonManager(
logInfo("Exited [{}]", m.address)
}
stay
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _)
if (!selfExited) logInfo("Member removed [{}]", m.address)
addRemoved(m.address)

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.ActorSelection
import akka.cluster.MemberStatus
object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = off
"""))
case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
override def postStop(): Unit = {
testActor ! "stopped"
}
def receive = {
case _
sender() ! self
}
}
}
class ClusterSingletonManagerLeaveMultiJvmNode1 extends ClusterSingletonManagerLeaveSpec
class ClusterSingletonManagerLeaveMultiJvmNode2 extends ClusterSingletonManagerLeaveSpec
class ClusterSingletonManagerLeaveMultiJvmNode3 extends ClusterSingletonManagerLeaveSpec
class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonManagerLeaveSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerLeaveSpec._
override def initialParticipants = roles.size
lazy val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
createSingleton()
}
}
def createSingleton(): ActorRef = {
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
singletonName = "echo",
terminationMessage = PoisonPill,
role = None),
name = "singleton")
}
lazy val echoProxy: ActorRef = {
system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/echo",
role = None),
name = "echoProxy")
}
"Leaving ClusterSingletonManager" must {
"hand-over to new instance" in {
join(first, first)
runOn(first) {
echoProxy ! "hello"
expectMsgType[ActorRef](5.seconds)
}
enterBarrier("first-active")
join(second, first)
join(third, first)
within(10.seconds) {
awaitAssert(cluster.state.members.count(m m.status == MemberStatus.Up) should be(3))
}
runOn(second) {
cluster.leave(node(first).address)
}
runOn(first) {
expectMsg(10.seconds, "stopped")
}
enterBarrier("first-stopped")
runOn(second, third) {
val p = TestProbe()
val firstAddress = node(first).address
p.within(10.seconds) {
p.awaitAssert {
echoProxy.tell("hello2", p.ref)
p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress)
}
}
}
enterBarrier("hand-over-done")
}
}
}

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.ActorSelection
import akka.cluster.MemberStatus
object ClusterSingletonManagerStartupSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))
case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
def receive = {
case _
sender() ! self
}
}
}
class ClusterSingletonManagerStartupMultiJvmNode1 extends ClusterSingletonManagerStartupSpec
class ClusterSingletonManagerStartupMultiJvmNode2 extends ClusterSingletonManagerStartupSpec
class ClusterSingletonManagerStartupMultiJvmNode3 extends ClusterSingletonManagerStartupSpec
class ClusterSingletonManagerStartupSpec extends MultiNodeSpec(ClusterSingletonManagerStartupSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerStartupSpec._
override def initialParticipants = roles.size
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createSingleton()
}
}
def createSingleton(): ActorRef = {
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
singletonName = "echo",
terminationMessage = PoisonPill,
role = None),
name = "singleton")
}
lazy val echoProxy: ActorRef = {
system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/echo",
role = None),
name = "echoProxy")
}
"Startup of Cluster Singleton" must {
"be quick" in {
join(first, first)
join(second, first)
join(third, first)
within(7.seconds) {
awaitAssert {
val members = Cluster(system).state.members
members.size should be(3)
members.forall(_.status == MemberStatus.Up) should be(true)
}
}
enterBarrier("all-up")
// the singleton instance is expected to start "instantly"
echoProxy ! "hello"
expectMsgType[ActorRef](3.seconds)
enterBarrier("done")
}
}
}