diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala index aa8a90cf59..663b7785ad 100644 --- a/akka-actors/src/main/scala/nio/Cluster.scala +++ b/akka-actors/src/main/scala/nio/Cluster.scala @@ -1,20 +1,20 @@ -package se.scalablesolutions.akka.actor +package se.scalablesolutions.akka.nio import se.scalablesolutions.akka.Config.config import se.scalablesolutions.akka.util.Logging import org.jgroups.{JChannel,View,Address,Message,ExtendedMembershipListener,Receiver,SetStateEvent} -import se.scalablesolutions.akka.serialization.Serializer.Protobuf -import se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress +import se.scalablesolutions.akka.serialization.Serializer +import se.scalablesolutions.akka.config.ScalaConfig._ import scala.collection.immutable.{Map,HashMap,HashSet} import org.jgroups.util.Util -import se.scalablesolutions.akka.nio.RemoteServer -import se.scalablesolutions.akka.actor.Cluster.Node +import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor} +import se.scalablesolutions.akka.nio.Cluster.Node trait Cluster { def members : List[Node] def name : String - def registerLocalNode(server : RemoteAddress) : Unit - def deregisterLocalNode(server : RemoteAddress) : Unit + def registerLocalNode(hostname : String, port : Int) : Unit + def deregisterLocalNode(hostname : String, port : Int) : Unit } abstract class ClusterActor(val name : String) extends Actor with Cluster @@ -24,17 +24,26 @@ object Cluster extends Cluster { lazy val impl : Option[ClusterActor] = { config.getString("akka.remote.cluster.actor") map ( name => { - Class.forName(name) - .getDeclaredConstructor(Array(classOf[String]): _*) - .newInstance(config.getString("akka.remote.cluster.name") getOrElse "default") - .asInstanceOf[ClusterActor] + val actor = Class.forName(name) + .getDeclaredConstructor(Array(classOf[String]): _*) + .newInstance(config.getString("akka.remote.cluster.name") getOrElse "default") + .asInstanceOf[ClusterActor] + + SupervisorFactory( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), + Supervise(actor, LifeCycle(Permanent)):: Nil + ) + ).newInstance.start + actor !! Init(None) + actor }) } def name = impl.map(_.name).getOrElse("No cluster") def members = impl.map(_.members).getOrElse(Nil) - def registerLocalNode(hostname : String, port : Int) : Unit = impl.map(_.registerLocalNode(RemoteAddress(hostname,port))) - def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(RemoteAddress(hostname,port))) + def registerLocalNode(hostname : String, port : Int) : Unit = impl.map(_.registerLocalNode(hostname,port)) + def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(hostname,port)) } object JGroupsClusterActor { @@ -54,13 +63,14 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) import org.scala_tools.javautils.Implicits._ private var local : Node = Node(Nil) - private var channel : JChannel = null + private var channel : Option[JChannel] = None private var remotes : Map[Address,Node] = Map() override def init(config : AnyRef) = { + log info "Initiating cluster actor" remotes = new HashMap[Address,Node] val me = this - channel = new JChannel { + channel = Some(new JChannel { setReceiver(new Receiver with ExtendedMembershipListener { def getState : Array[Byte] = null def setState(state : Array[Byte]) : Unit = () @@ -70,17 +80,20 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) def block : Unit = me ! Block def unblock : Unit = me ! Unblock }) - } - channel connect name + }) + channel.map(_.connect(name)) } - protected def serializer = Protobuf //FIXME make this configurable + protected def serializer = Serializer.Java //FIXME make this configurable + def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes + def registerLocalNode(hostname : String, port : Int) : Unit = this ! RegisterLocalNode(RemoteAddress(hostname,port)) + def deregisterLocalNode(hostname : String, port : Int) : Unit = this ! DeregisterLocalNode(RemoteAddress(hostname,port)) - private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) = { - recipients.foreach( to => channel.send(new Message(to,null,serializer out msg))) - } + private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit = + for(c <- channel; r <- recipients) c.send(new Message(r,null,serializer out msg)) - private def broadcast[T <: AnyRef](msg : T) : Unit = channel.send(new Message(null,null,serializer out msg)) + private def broadcast[T <: AnyRef](msg : T) : Unit = + channel.map( _.send(new Message(null,null,serializer out msg))) override def receive = { case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead @@ -102,12 +115,16 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) remotes = remotes -- zombies } - case m : Message if m.getSrc != channel.getAddress => { - ( serializer in(m.getRawBuffer,None) ) match { - case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints)) - case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) - case unknown => log info unknown.toString - } + case m : Message => { + + if(m.getSrc != channel.map(_.getAddress).getOrElse(null)) + ( serializer in(m.getRawBuffer,None) ) match { + case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints)) + case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) + case unknown => log info unknown.toString + } + else + log info "Self-originating message: " + m } case RegisterLocalNode(s) => { @@ -126,12 +143,9 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) case Unblock => log info "Asked to unblock" //TODO HotSwap back and flush the buffer } - def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes - def registerLocalNode(server : RemoteAddress) : Unit = this ! RegisterLocalNode(server) - def deregisterLocalNode(server : RemoteAddress) : Unit = this ! DeregisterLocalNode(server) - override def shutdown = { + channel.map(_.close) remotes = Map() - channel.close + channel = None } } \ No newline at end of file diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 04c8f37128..8c6601d25b 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -36,11 +36,11 @@ compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 - - + name = "default" - actor = "se.scalablesolutions.akka.cluster.JGroupsClusterActor" - + actor = "se.scalablesolutions.akka.nio.JGroupsClusterActor" + + service = on hostname = "localhost" port = 9999