Added more documentation
This commit is contained in:
parent
97c272328b
commit
7e4bd90cc9
2 changed files with 41 additions and 1 deletions
|
|
@ -4,6 +4,7 @@ import org.jgroups.{JChannel, View => JG_VIEW, Address, Message => JG_MSG, Exten
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clustering support via JGroups.
|
* Clustering support via JGroups.
|
||||||
|
* @Author Viktor Klang
|
||||||
*/
|
*/
|
||||||
class JGroupsClusterActor extends BasicClusterActor {
|
class JGroupsClusterActor extends BasicClusterActor {
|
||||||
import ClusterActor._
|
import ClusterActor._
|
||||||
|
|
|
||||||
|
|
@ -28,10 +28,20 @@ trait Cluster {
|
||||||
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
|
def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base trait for Cluster implementations
|
||||||
|
*
|
||||||
|
* @author Viktor Klang
|
||||||
|
*/
|
||||||
trait ClusterActor extends Actor with Cluster {
|
trait ClusterActor extends Actor with Cluster {
|
||||||
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
|
val name = config.getString("akka.remote.cluster.name") getOrElse "default"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Companion object to ClusterActor that defines some common messages
|
||||||
|
*
|
||||||
|
* @author Viktor Klang
|
||||||
|
*/
|
||||||
private[remote] object ClusterActor {
|
private[remote] object ClusterActor {
|
||||||
sealed trait ClusterMessage
|
sealed trait ClusterMessage
|
||||||
|
|
||||||
|
|
@ -42,6 +52,8 @@ private[remote] object ClusterActor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for cluster actor implementations.
|
* Base class for cluster actor implementations.
|
||||||
|
* Provides most of the behavior out of the box
|
||||||
|
* only needs to be gives hooks into the underlaying cluster impl.
|
||||||
*/
|
*/
|
||||||
abstract class BasicClusterActor extends ClusterActor {
|
abstract class BasicClusterActor extends ClusterActor {
|
||||||
import ClusterActor._
|
import ClusterActor._
|
||||||
|
|
@ -130,26 +142,53 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implement this in a subclass to add node-to-node messaging
|
||||||
|
*/
|
||||||
protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit
|
protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implement this in a subclass to add node-to-many-nodes messaging
|
||||||
|
*/
|
||||||
protected def toAllNodes(msg : Array[Byte]) : Unit
|
protected def toAllNodes(msg : Array[Byte]) : Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends the specified message to the given recipients using the serializer
|
||||||
|
* that's been set in the akka-conf
|
||||||
|
*/
|
||||||
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
|
protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = {
|
||||||
lazy val m = Cluster.serializer out msg
|
lazy val m = Cluster.serializer out msg
|
||||||
for (r <- recipients) toOneNode(r,m)
|
for (r <- recipients) toOneNode(r,m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends the specified message toall other nodes using the serializer
|
||||||
|
* that's been set in the akka-conf
|
||||||
|
*/
|
||||||
protected def broadcast[T <: AnyRef](msg: T): Unit =
|
protected def broadcast[T <: AnyRef](msg: T): Unit =
|
||||||
if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg)
|
if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Applies the given PartialFunction to all known RemoteAddresses
|
||||||
|
*/
|
||||||
def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] =
|
def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] =
|
||||||
remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress)
|
remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers a local endpoint
|
||||||
|
*/
|
||||||
def registerLocalNode(hostname: String, port: Int): Unit =
|
def registerLocalNode(hostname: String, port: Int): Unit =
|
||||||
send(RegisterLocalNode(RemoteAddress(hostname, port)))
|
send(RegisterLocalNode(RemoteAddress(hostname, port)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deregisters a local endpoint
|
||||||
|
*/
|
||||||
def deregisterLocalNode(hostname: String, port: Int): Unit =
|
def deregisterLocalNode(hostname: String, port: Int): Unit =
|
||||||
send(DeregisterLocalNode(RemoteAddress(hostname, port)))
|
send(DeregisterLocalNode(RemoteAddress(hostname, port)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Broadcasts the specified message to all Actors of type Class on all known Nodes
|
||||||
|
*/
|
||||||
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
|
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
|
||||||
send(RelayedMessage(to.getName, msg))
|
send(RelayedMessage(to.getName, msg))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue