Redis tests now passes with new STM + misc minor changes to Cluster

This commit is contained in:
Jonas Bonér 2010-03-04 19:02:23 +01:00
parent 73a0648292
commit 36c0266a5d
11 changed files with 131 additions and 96 deletions

View file

@ -60,19 +60,18 @@ private[remote] object ClusterActor {
abstract class BasicClusterActor extends ClusterActor {
import ClusterActor._
case class Message(sender : ADDR_T,msg : Array[Byte])
case class Message(sender : ADDR_T, msg : Array[Byte])
case object PapersPlease extends ClusterMessage
case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
case object Block extends ClusterMessage
case object Unblock extends ClusterMessage
case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage
case class View(othersPresent: Set[ADDR_T]) extends ClusterMessage
case class Zombie(address: ADDR_T) extends ClusterMessage
case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
type ADDR_T
@volatile private var local: Node = Node(Nil)
@volatile private var remotes: Map[ADDR_T, Node] = Map()
@ -206,17 +205,21 @@ abstract class BasicClusterActor extends ClusterActor {
* Loads a specified ClusterActor and delegates to that instance.
*/
object Cluster extends Cluster with Logging {
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
@volatile private[remote] var supervisor: Option[Supervisor] = None
// FIXME Use the supervisor member field
private[remote] lazy val serializer: Serializer = {
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
Class.forName(className).newInstance.asInstanceOf[Serializer]
}
private[remote] lazy val serializer: Serializer =
Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
.newInstance.asInstanceOf[Serializer]
private[remote] def createClusterActor : Option[ClusterActor] = {
private[remote] def createClusterActor: Option[ClusterActor] = {
val name = config.getString("akka.remote.cluster.actor")
if (name.isEmpty) throw new IllegalArgumentException(
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
try {
name map { fqn =>
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
@ -225,7 +228,7 @@ object Cluster extends Cluster with Logging {
}
}
catch {
case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None
case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None
}
}
@ -250,10 +253,11 @@ object Cluster extends Cluster with Logging {
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg))
def foreach(f : (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f))
def foreach(f: (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f))
def start : Unit = synchronized {
if(supervisor.isEmpty) {
def start: Unit = synchronized {
log.info("Starting up Cluster Service...")
if (supervisor.isEmpty) {
for(actor <- createClusterActor;
sup <- createSupervisor(actor)) {
clusterActor = Some(actor)
@ -262,7 +266,8 @@ object Cluster extends Cluster with Logging {
}
}
def shutdown : Unit = synchronized {
def shutdown: Unit = synchronized {
log.info("Shutting down Cluster Service...")
supervisor.foreach(_.stop)
supervisor = None
clusterActor = None