Abstracted the FailureDetector into a interface trait and added controllable failure detector mock.

- Abstracted a FailureDetector trait.
- Added a FailureDetectorPuppet mock that can be user controllable
- Added option to define a custom failure detector
- Misc minor fixes

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-06-10 16:52:33 +02:00
parent e6ee3e2a95
commit a4499b06bb
7 changed files with 151 additions and 26 deletions

View file

@ -306,7 +306,22 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def lookup = Cluster
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
override def createExtension(system: ExtendedActorSystem): Cluster = {
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
def createDefaultFD() = new AccrualFailureDetector(system, clusterSettings)
val failureDetector = clusterSettings.FailureDetectorImplementationClass match {
case None createDefaultFD()
case Some(fqcn) system.dynamicAccess.createInstanceFor[FailureDetector](fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match {
case Right(fd) fd
case Left(e)
system.log.error(e, "Could not create custom failure detector - falling back to default")
createDefaultFD()
}
}
new Cluster(system, failureDetector)
}
}
/**
@ -349,7 +364,7 @@ trait ClusterNodeMBean {
* if (Cluster(system).isLeader) { ... }
* }}}
*/
class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode
class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode
/**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
@ -369,8 +384,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
import clusterSettings._
val selfAddress = remote.transport.address
val failureDetector = new AccrualFailureDetector(
system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize)
private val vclockNode = VectorClock.Node(selfAddress.toString)