Fix leaking this in constructor of Cluster, see #2473

* Major refactoring to remove the need to use special
  Cluster instance for testing. Use default Cluster
  extension instead. Most of it is trivial changes.
* Used failure-detector.implementation-class from config
  to swap to Puppet
* Removed FailureDetectorStrategy, since it doesn't add any value
* Added Cluster.joinSeedNodes to be able to test seedNodes when Addresses
  are unknown before startup time.
* Removed ClusterEnvironment that was passed around among the actors,
  instead they use the ordinary Cluster extension.
* Overall much cleaner design
This commit is contained in:
Patrik Nordwall 2012-09-06 21:48:40 +02:00
parent 806b5efcdf
commit bd6c39178c
33 changed files with 313 additions and 340 deletions

View file

@ -58,6 +58,12 @@ private[cluster] object InternalClusterAction {
*/
case class JoinTo(address: Address) extends ClusterMessage
/**
* Command to initiate the process to join the specified
* seed nodes.
*/
case class JoinSeedNodes(seedNodes: IndexedSeq[Address])
/**
* Start message of the process to join one of the seed nodes.
* The node sends `InitJoin` to all seed nodes, which replies
@ -128,33 +134,21 @@ private[cluster] object ClusterLeaderAction {
case class Remove(address: Address) extends ClusterMessage
}
/**
* INTERNAL API
*
* The contextual pieces that ClusterDaemon actors need.
* Makes it easier to test the actors without using the Cluster extension.
*/
private[cluster] trait ClusterEnvironment {
private[cluster] def settings: ClusterSettings
private[cluster] def failureDetector: FailureDetector
private[cluster] def selfAddress: Address
private[cluster] def scheduler: Scheduler
private[cluster] def seedNodes: IndexedSeq[Address]
private[cluster] def shutdown(): Unit
}
/**
* INTERNAL API.
*
* Supervisor managing the different Cluster daemons.
*/
private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
private[cluster] final class ClusterDaemon extends Actor with ActorLogging {
val configuredDispatcher = environment.settings.UseDispatcher
val core = context.actorOf(Props(new ClusterCoreDaemon(environment)).
withDispatcher(configuredDispatcher), name = "core")
val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)).
withDispatcher(configuredDispatcher), name = "heartbeat")
// Important - don't use Cluster(context.system) here because that would
// cause deadlock. The Cluster extension is currently being created and is waiting
// for response from GetClusterCoreRef in its constructor.
val core = context.actorOf(Props[ClusterCoreDaemon].
withDispatcher(context.props.dispatcher), name = "core")
val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon].
withDispatcher(context.props.dispatcher), name = "heartbeat")
def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core
@ -165,16 +159,16 @@ private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) exte
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
import ClusterLeaderAction._
import InternalClusterAction._
import ClusterHeartbeatSender._
def selfAddress = environment.selfAddress
def clusterScheduler = environment.scheduler
def failureDetector = environment.failureDetector
val settings = environment.settings
import settings._
val cluster = Cluster(context.system)
def selfAddress = cluster.selfAddress
def clusterScheduler = cluster.scheduler
def failureDetector = cluster.failureDetector
import cluster.settings._
val vclockNode = VectorClock.Node(selfAddress.toString)
val selfHeartbeat = Heartbeat(selfAddress)
@ -186,11 +180,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
var stats = ClusterStats()
val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)).
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender")
val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)).
val coreSender = context.actorOf(Props[ClusterCoreSender].
withDispatcher(UseDispatcher), name = "coreSender")
val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)).
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
withDispatcher(UseDispatcher), name = "publisher")
import context.dispatcher
@ -227,14 +221,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
})
override def preStart(): Unit = {
if (AutoJoin) {
// only the node which is named first in the list of seed nodes will join itself
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
self ! JoinTo(selfAddress)
else
context.actorOf(Props(new JoinSeedNodeProcess(environment)).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
}
if (AutoJoin) self ! JoinSeedNodes(SeedNodes)
}
override def postStop(): Unit = {
@ -248,6 +235,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def uninitialized: Actor.Receive = {
case InitJoin // skip, not ready yet
case JoinTo(address) join(address)
case JoinSeedNodes(seedNodes) joinSeedNodes(seedNodes)
case msg: SubscriptionMessage publisher forward msg
case _: Tick // ignore periodic tasks until initialized
}
@ -282,6 +270,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = {
// only the node which is named first in the list of seed nodes will join itself
if (seedNodes.isEmpty || seedNodes.head == selfAddress)
self ! JoinTo(selfAddress)
else
context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)).
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
}
/**
* Try to join this cluster node with the node specified by 'address'.
* A 'Join(thisNodeAddress)' command is sent to the node to join.
@ -393,7 +390,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
// make sure the final (removed) state is published
// before shutting down
implicit val timeout = Timeout(5 seconds)
publisher ? PublishDone onComplete { case _ environment.shutdown() }
publisher ? PublishDone onComplete { case _ cluster.shutdown() }
}
/**
@ -796,8 +793,6 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
}
}
def seedNodes: IndexedSeq[Address] = environment.seedNodes
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
if (addresses.isEmpty) None
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
@ -865,22 +860,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
* 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2
*
*/
private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging {
private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) extends Actor with ActorLogging {
import InternalClusterAction._
def selfAddress = environment.selfAddress
def selfAddress = Cluster(context.system).selfAddress
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
if (seedNodes.isEmpty || seedNodes.head == selfAddress)
throw new IllegalArgumentException("Join seed node should not be done")
context.setReceiveTimeout(environment.settings.SeedNodeTimeout)
context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout)
override def preStart(): Unit = self ! JoinSeedNode
def receive = {
case JoinSeedNode
// send InitJoin to all seed nodes (except myself)
environment.seedNodes.collect {
seedNodes.collect {
case a if a != selfAddress context.system.actorFor(context.parent.path.toStringWithAddress(a))
} foreach { _ ! InitJoin }
case InitJoinAck(address)
@ -901,9 +896,11 @@ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging {
private[cluster] final class ClusterCoreSender extends Actor with ActorLogging {
import InternalClusterAction._
val selfAddress = Cluster(context.system).selfAddress
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/