Sprinkling extra output for debugging
This commit is contained in:
parent
81151db50f
commit
7d7db8d621
2 changed files with 41 additions and 14 deletions
|
|
@ -7,20 +7,22 @@ import se.scalablesolutions.akka.serialization.Serializer
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
import scala.collection.immutable.{Map,HashMap,HashSet}
|
||||
import org.jgroups.util.Util
|
||||
import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor}
|
||||
import se.scalablesolutions.akka.nio.Cluster.Node
|
||||
import se.scalablesolutions.akka.actor.{Init,SupervisorFactory,Actor,ActorRegistry}
|
||||
import se.scalablesolutions.akka.nio.Cluster.{Node,RelayedMessage}
|
||||
|
||||
trait Cluster {
|
||||
def members : List[Node]
|
||||
def name : String
|
||||
def registerLocalNode(hostname : String, port : Int) : Unit
|
||||
def deregisterLocalNode(hostname : String, port : Int) : Unit
|
||||
def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit
|
||||
}
|
||||
|
||||
abstract class ClusterActor(val name : String) extends Actor with Cluster
|
||||
|
||||
object Cluster extends Cluster {
|
||||
case class Node(endpoints : List[RemoteAddress])
|
||||
case class RelayedMessage(actorClass : Class[_ <: Actor],msg : AnyRef)
|
||||
|
||||
lazy val impl : Option[ClusterActor] = {
|
||||
config.getString("akka.remote.cluster.actor") map ( name => {
|
||||
|
|
@ -35,7 +37,7 @@ object Cluster extends Cluster {
|
|||
Supervise(actor, LifeCycle(Permanent)):: Nil
|
||||
)
|
||||
).newInstance.start
|
||||
actor !! Init(None)
|
||||
actor !! Init(None) // FIXME for some reason the actor isn't init:ed here
|
||||
actor
|
||||
})
|
||||
}
|
||||
|
|
@ -44,6 +46,7 @@ object Cluster extends Cluster {
|
|||
def members = impl.map(_.members).getOrElse(Nil)
|
||||
def registerLocalNode(hostname : String, port : Int) : Unit = impl.map(_.registerLocalNode(hostname,port))
|
||||
def deregisterLocalNode(hostname : String, port : Int) : Unit = impl.map(_.deregisterLocalNode(hostname,port))
|
||||
def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit = impl.map(_.send(to,msg))
|
||||
}
|
||||
|
||||
object JGroupsClusterActor {
|
||||
|
|
@ -86,9 +89,11 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
|
|||
|
||||
protected def serializer = Serializer.Java //FIXME make this configurable
|
||||
def members = remotes.values.toList //FIXME We probably want to make this a !! InstalledRemotes
|
||||
|
||||
def registerLocalNode(hostname : String, port : Int) : Unit = this ! RegisterLocalNode(RemoteAddress(hostname,port))
|
||||
def deregisterLocalNode(hostname : String, port : Int) : Unit = this ! DeregisterLocalNode(RemoteAddress(hostname,port))
|
||||
|
||||
def relayMessage(to : Class[_ <: Actor],msg : AnyRef) : Unit = this ! RelayedMessage(to,msg)
|
||||
|
||||
private def broadcast[T <: AnyRef](recipients : Iterable[Address],msg : T) : Unit =
|
||||
for(c <- channel; r <- recipients) c.send(new Message(r,null,serializer out msg))
|
||||
|
||||
|
|
@ -110,21 +115,42 @@ class JGroupsClusterActor(name : String) extends ClusterActor(name)
|
|||
val zombies = Set[Address]() ++ remotes.keySet -- members
|
||||
val unknown = members -- remotes.keySet
|
||||
|
||||
log info "Updating view, zombies: " + zombies
|
||||
log info " , unknown: " + unknown
|
||||
log info " , members: " + members
|
||||
log info " , known: " + remotes.keySet
|
||||
|
||||
//Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead
|
||||
broadcast(zombies ++ unknown, PapersPlease)
|
||||
remotes = remotes -- zombies
|
||||
}
|
||||
|
||||
case m : Message => {
|
||||
|
||||
if(m.getSrc != channel.map(_.getAddress).getOrElse(null))
|
||||
( serializer in(m.getRawBuffer,None) ) match {
|
||||
case PapersPlease => broadcast(m.getSrc :: Nil,Papers(local.endpoints))
|
||||
case Papers(x) => remotes = remotes + (m.getSrc -> Node(x))
|
||||
case unknown => log info unknown.toString
|
||||
val payload = serializer in(m.getRawBuffer,None)
|
||||
if(m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected
|
||||
payload match {
|
||||
case PapersPlease => {
|
||||
log info "Asked for papers by " + m.getSrc
|
||||
broadcast(m.getSrc :: Nil,Papers(local.endpoints))
|
||||
}
|
||||
case Papers(x) => {
|
||||
log info "Got papers from " + m.getSrc
|
||||
remotes = remotes + (m.getSrc -> Node(x))
|
||||
log info "Installed nodes: " + remotes.keySet
|
||||
}
|
||||
case RelayedMessage(c,m) => {
|
||||
log info "Relaying [" + m + "] to ["+c.getName+"]"
|
||||
ActorRegistry.actorsFor(c).firstOption.map(_ ! m)
|
||||
}
|
||||
case unknown => log info "Unknown message: "+unknown.toString
|
||||
}
|
||||
else
|
||||
log info "Self-originating message: " + m
|
||||
//else
|
||||
// log info "Self-originating message: " + m + " msg: " + payload
|
||||
}
|
||||
|
||||
case rm@RelayedMessage => {
|
||||
log info "Relaying message: " + rm
|
||||
broadcast(rm)
|
||||
}
|
||||
|
||||
case RegisterLocalNode(s) => {
|
||||
|
|
|
|||
|
|
@ -40,10 +40,11 @@
|
|||
name = "default"
|
||||
actor = "se.scalablesolutions.akka.nio.JGroupsClusterActor"
|
||||
</cluster>
|
||||
|
||||
<server>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9999
|
||||
port = 9991
|
||||
connection-timeout = 1000 # in millis (1 sec default)
|
||||
<server>
|
||||
|
||||
|
|
@ -56,7 +57,7 @@
|
|||
<rest>
|
||||
service = on
|
||||
hostname = "localhost"
|
||||
port = 9998
|
||||
port = 9992
|
||||
filters = "[se.scalablesolutions.akka.security.AkkaSecurityFilterFactory]" # List with all jersey filters to use
|
||||
authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
|
||||
</rest>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue