diff --git a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala index 2b6730dd60..12d93ef272 100644 --- a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala +++ b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala @@ -44,7 +44,8 @@ class JGroupsClusterActor extends BasicClusterActor { log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer }) }) - channel.map(_.connect(name)) + + channel.foreach(_.connect(name)) } protected def toOneNode(dest : Address, msg: Array[Byte]): Unit = diff --git a/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala b/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala index c656bd4b81..3d83a46ef3 100644 --- a/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala +++ b/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala @@ -85,6 +85,7 @@ class ShoalClusterActor extends BasicClusterActor { */ protected def createCallback : CallBack = { import org.scala_tools.javautils.Imports._ + import ClusterActor._ val me = this new CallBack { def processNotification(signal : Signal) { @@ -92,10 +93,10 @@ class ShoalClusterActor extends BasicClusterActor { signal.acquire() if(isActive) { signal match { - case ms : MessageSignal => me send Message(ms.getMemberToken,ms.getMessage) - case jns : JoinNotificationSignal => me send View(Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) - case fss : FailureSuspectedSignal => me send Zombie(fss.getMemberToken) - case fns : FailureNotificationSignal => me send Zombie(fns.getMemberToken) + case ms : MessageSignal => me send Message[ADDR_T](ms.getMemberToken,ms.getMessage) + case jns : JoinNotificationSignal => me send View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) + case fss : FailureSuspectedSignal => me send Zombie[ADDR_T](fss.getMemberToken) + case fns : FailureNotificationSignal => me send Zombie[ADDR_T](fns.getMemberToken) case _ => log.debug("Unhandled signal: [%s]",signal) } } diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 1b56dcea5a..6c3183ef8c 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -34,14 +34,21 @@ trait BootableRemoteActorService extends Bootable with Logging { super.onLoad } - + abstract override def onUnload = { - super.onUnload - if (remoteServerThread.isAlive) { - log.info("Shutting down Remote Actors Service") - RemoteNode.shutdown - remoteServerThread.join(1000) - } + super.onUnload + + log.info("Shutting down Remote Actors Service") + + RemoteNode.shutdown + + if (remoteServerThread.isAlive) + remoteServerThread.join(1000) + + log.info("Shutting down Cluster") Cluster.shutdown + + log.info("Remote Actors Service has been shut down") } + } \ No newline at end of file diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 294ce5bd94..fb14b6b357 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -26,8 +26,8 @@ trait Cluster { def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] - - def foreach(f : (RemoteAddress) => Unit) : Unit + + def foreach(f: (RemoteAddress) => Unit): Unit } /** @@ -48,7 +48,15 @@ private[remote] object ClusterActor { sealed trait ClusterMessage private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage - + private[remote] case class Message[ADDR_T](sender: ADDR_T, msg: Array[Byte]) + private[remote] case object PapersPlease extends ClusterMessage + private[remote] case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage + private[remote] case object Block extends ClusterMessage + private[remote] case object Unblock extends ClusterMessage + private[remote] case class View[ADDR_T](othersPresent: Set[ADDR_T]) extends ClusterMessage + private[remote] case class Zombie[ADDR_T](address: ADDR_T) extends ClusterMessage + private[remote] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage + private[remote] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage private[remote] case class Node(endpoints: List[RemoteAddress]) } @@ -59,75 +67,65 @@ private[remote] object ClusterActor { */ abstract class BasicClusterActor extends ClusterActor { import ClusterActor._ - - case class Message(sender : ADDR_T, msg : Array[Byte]) - case object PapersPlease extends ClusterMessage - case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage - case object Block extends ClusterMessage - case object Unblock extends ClusterMessage - case class View(othersPresent: Set[ADDR_T]) extends ClusterMessage - case class Zombie(address: ADDR_T) extends ClusterMessage - case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage - case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage - type ADDR_T @volatile private var local: Node = Node(Nil) @volatile private var remotes: Map[ADDR_T, Node] = Map() override def init = { - remotes = new HashMap[ADDR_T, Node] + remotes = new HashMap[ADDR_T, Node] } override def shutdown = { - remotes = Map() + remotes = Map() } def receive = { - case v @ View(members) => { + case v: View[ADDR_T] => { // Not present in the cluster anymore = presumably zombies // Nodes we have no prior knowledge existed = unknowns - val zombies = Set[ADDR_T]() ++ remotes.keySet -- members - val unknown = members -- remotes.keySet + val zombies = Set[ADDR_T]() ++ remotes.keySet -- v.othersPresent + val unknown = v.othersPresent -- remotes.keySet log debug ("Updating view") - log debug ("Other memebers: [%s]",members) - log debug ("Zombies: [%s]",zombies) - log debug ("Unknowns: [%s]",unknown) + log debug ("Other memebers: [%s]", v.othersPresent) + log debug ("Zombies: [%s]", zombies) + log debug ("Unknowns: [%s]", unknown) // Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead broadcast(zombies ++ unknown, PapersPlease) remotes = remotes -- zombies } - case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead - log debug ("Killing Zombie Node: %s", x) - broadcast(x :: Nil, PapersPlease) - remotes = remotes - x + case z: Zombie[ADDR_T] => { //Ask the presumed zombie for papers and prematurely treat it as dead + log debug ("Killing Zombie Node: %s", z.address) + broadcast(z.address :: Nil, PapersPlease) + remotes = remotes - z.address } - case rm @ RelayedMessage(_, _) => { + case rm@RelayedMessage(_, _) => { log debug ("Relaying message: %s", rm) broadcast(rm) } - case m @ Message(src,msg) => { - (Cluster.serializer in (msg, None)) match { + case m: Message[ADDR_T] => { + val (src, msg) = (m.sender, m.msg) + (Cluster.serializer in (msg, None)) match { - case PapersPlease => { - log debug ("Asked for papers by %s", src) - broadcast(src :: Nil, Papers(local.endpoints)) + case PapersPlease => { + log debug ("Asked for papers by %s", src) + broadcast(src :: Nil, Papers(local.endpoints)) - if (remotes.get(src).isEmpty) // If we were asked for papers from someone we don't know, ask them! - broadcast(src :: Nil, PapersPlease) - } - - case Papers(x) => remotes = remotes + (src -> Node(x)) - - case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ send m) - - case unknown => log debug ("Unknown message: %s", unknown.toString) + if (remotes.get(src).isEmpty) // If we were asked for papers from someone we don't know, ask them! + broadcast(src :: Nil, PapersPlease) } + + case Papers(x) => remotes = remotes + (src -> Node(x)) + + case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).foreach(_ send m) + + case unknown => log debug ("Unknown message: %s", unknown.toString) + } } case RegisterLocalNode(s) => { @@ -146,12 +144,12 @@ abstract class BasicClusterActor extends ClusterActor { /** * Implement this in a subclass to add node-to-node messaging */ - protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit + protected def toOneNode(dest: ADDR_T, msg: Array[Byte]): Unit /** * Implement this in a subclass to add node-to-many-nodes messaging */ - protected def toAllNodes(msg : Array[Byte]) : Unit + protected def toAllNodes(msg: Array[Byte]): Unit /** * Sends the specified message to the given recipients using the serializer @@ -159,7 +157,7 @@ abstract class BasicClusterActor extends ClusterActor { */ protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = { lazy val m = Cluster.serializer out msg - for (r <- recipients) toOneNode(r,m) + for (r <- recipients) toOneNode(r, m) } /** @@ -174,11 +172,11 @@ abstract class BasicClusterActor extends ClusterActor { */ def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] = remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress) - + /** * Applies the given function to all remote addresses known */ - def foreach(f : (RemoteAddress) => Unit) : Unit = remotes.values.toList.flatMap(_.endpoints).foreach(f) + def foreach(f: (RemoteAddress) => Unit): Unit = remotes.values.toList.flatMap(_.endpoints).foreach(f) /** * Registers a local endpoint @@ -206,25 +204,24 @@ abstract class BasicClusterActor extends ClusterActor { */ object Cluster extends Cluster with Logging { lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName - + @volatile private[remote] var clusterActor: Option[ClusterActor] = None - @volatile private[remote] var supervisor: Option[Supervisor] = None // FIXME Use the supervisor member field - + @volatile private[remote] var supervisor: Option[Supervisor] = None + private[remote] lazy val serializer: Serializer = - Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) - .newInstance.asInstanceOf[Serializer] + Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] private[remote] def createClusterActor: Option[ClusterActor] = { val name = config.getString("akka.remote.cluster.actor") if (name.isEmpty) throw new IllegalArgumentException( "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") try { - name map { fqn => - val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] - a.start - a + name map { + fqn => + Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] } } catch { @@ -232,13 +229,12 @@ object Cluster extends Cluster with Logging { } } - private[remote] def createSupervisor(actor : ClusterActor) : Option[Supervisor] = { + private[remote] def createSupervisor(actor: ClusterActor): Option[Supervisor] = { val sup = SupervisorFactory( SupervisorConfig( RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), Supervise(actor, LifeCycle(Permanent)) :: Nil) ).newInstance - sup.start Some(sup) } @@ -252,16 +248,17 @@ object Cluster extends Cluster with Logging { def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.foreach(_.deregisterLocalNode(hostname, port)) def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) - - def foreach(f: (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f)) + + def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f)) def start: Unit = synchronized { log.info("Starting up Cluster Service...") if (supervisor.isEmpty) { - for(actor <- createClusterActor; - sup <- createSupervisor(actor)) { - clusterActor = Some(actor) - supervisor = Some(sup) + for (actor <- createClusterActor; + sup <- createSupervisor(actor)) { + clusterActor = Some(actor) + supervisor = Some(sup) + sup.start } } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 6da2ceea99..02cf98bcd2 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer */ object RemoteServer { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) + val PORT = config.getInt("akka.remote.server.port", 9966) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)