From 7d7db8d621f73adb5d4f306f217b11abe6389825 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 13 Dec 2009 15:40:01 +0100 Subject: [PATCH] Sprinkling extra output for debugging --- akka-actors/src/main/scala/nio/Cluster.scala | 50 +++++++++++++++----- config/akka-reference.conf | 5 +- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/akka-actors/src/main/scala/nio/Cluster.scala b/akka-actors/src/main/scala/nio/Cluster.scala index 663b7785ad..15a941d1b0 100644 --- a/akka-actors/src/main/scala/nio/Cluster.scala +++ b/akka-actors/src/main/scala/nio/Cluster.scala @@ -7,20 +7,22 @@ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.config.ScalaConfig._ import scala.collection.immutable.{Map,HashMap,HashSet} import org.jgroups.util.Util -import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor} -import se.scalablesolutions.akka.nio.Cluster.Node +import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor,ActorRegistry} +import se.scalablesolutions.akka.nio.Cluster.{Node,RelayedMessage} trait Cluster { def members : List[Node] def name : String def registerLocalNode(hostname : String, port : Int) : Unit def deregisterLocalNode(hostname : String, port : Int) : Unit + def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit } abstract class ClusterActor(val name : String) extends Actor with Cluster object Cluster extends Cluster { case class Node(endpoints : List[RemoteAddress]) + case class RelayedMessage(actorClass : Class[_ <: Actor],msg : AnyRef) lazy val impl : Option[ClusterActor] = { config.getString("akka.remote.cluster.actor") map ( name => { @@ -35,7 +37,7 @@ object Cluster extends Cluster { Supervise(actor, LifeCycle(Permanent)):: Nil ) ).newInstance.start - actor !! Init(None) + actor !! Init(None) // FIXME for some reason the actor isn't init:ed here actor }) } @@ -44,6 +46,7 @@ object Cluster extends Cluster { def members = impl.map(_.members).getOrElse(Nil) def registerLocalNode(hostname : String, port : Int) : Unit = impl.map(_.registerLocalNode(hostname,port)) def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(hostname,port)) + def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit = impl.map(_.send(to,msg)) } object JGroupsClusterActor { @@ -86,9 +89,11 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) protected def serializer = Serializer.Java //FIXME make this configurable def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes + def registerLocalNode(hostname : String, port : Int) : Unit = this ! RegisterLocalNode(RemoteAddress(hostname,port)) def deregisterLocalNode(hostname : String, port : Int) : Unit = this ! DeregisterLocalNode(RemoteAddress(hostname,port)) - + def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit = this ! RelayedMessage(to,msg) + private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit = for(c <- channel; r <- recipients) c.send(new Message(r,null,serializer out msg)) @@ -110,21 +115,42 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name) val zombies = Set[Address]() ++ remotes.keySet -- members val unknown = members -- remotes.keySet + log info "Updating view, zombies: " + zombies + log info " , unknown: " + unknown + log info " , members: " + members + log info " , known: " + remotes.keySet + //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead broadcast(zombies ++ unknown, PapersPlease) remotes = remotes -- zombies } case m : Message => { - - if(m.getSrc != channel.map(_.getAddress).getOrElse(null)) - ( serializer in(m.getRawBuffer,None) ) match { - case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints)) - case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) - case unknown => log info unknown.toString + val payload = serializer in(m.getRawBuffer,None) + if(m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected + payload match { + case PapersPlease => { + log info "Asked for papers by " + m.getSrc + broadcast(m.getSrc :: Nil,Papers(local.endpoints)) + } + case Papers(x) => { + log info "Got papers from " + m.getSrc + remotes = remotes + (m.getSrc -> Node(x)) + log info "Installed nodes: " + remotes.keySet + } + case RelayedMessage(c,m) => { + log info "Relaying [" + m + "] to ["+c.getName+"]" + ActorRegistry.actorsFor(c).firstOption.map(_ ! m) + } + case unknown => log info "Unknown message: "+unknown.toString } - else - log info "Self-originating message: " + m + //else + // log info "Self-originating message: " + m + " msg: " + payload + } + + case rm@RelayedMessage => { + log info "Relaying message: " + rm + broadcast(rm) } case RegisterLocalNode(s) => { diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 8c6601d25b..381fc55496 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -40,10 +40,11 @@ name = "default" actor = "se.scalablesolutions.akka.nio.JGroupsClusterActor" + service = on hostname = "localhost" - port = 9999 + port = 9991 connection-timeout = 1000 # in millis (1 sec default) @@ -56,7 +57,7 @@ service = on hostname = "localhost" - port = 9998 + port = 9992 filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]" # List with all jersey filters to use authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use