diff --git a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala index 5688bcedcf..2b6730dd60 100644 --- a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala +++ b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala @@ -4,6 +4,7 @@ import org.jgroups.{JChannel, View => JG_VIEW, Address, Message => JG_MSG, Exten /** * Clustering support via JGroups. + * @Author Viktor Klang */ class JGroupsClusterActor extends BasicClusterActor { import ClusterActor._ diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 7e98f41d1d..f355cf5ced 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -28,10 +28,20 @@ trait Cluster { def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] } +/** + * Base trait for Cluster implementations + * + * @author Viktor Klang + */ trait ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name") getOrElse "default" } +/** + * Companion object to ClusterActor that defines some common messages + * + * @author Viktor Klang + */ private[remote] object ClusterActor { sealed trait ClusterMessage @@ -42,6 +52,8 @@ private[remote] object ClusterActor { /** * Base class for cluster actor implementations. + * Provides most of the behavior out of the box + * only needs to be gives hooks into the underlaying cluster impl. */ abstract class BasicClusterActor extends ClusterActor { import ClusterActor._ @@ -130,32 +142,59 @@ abstract class BasicClusterActor extends ClusterActor { } } + /** + * Implement this in a subclass to add node-to-node messaging + */ protected def toOneNode(dest : ADDR_T, msg : Array[Byte]) : Unit + + /** + * Implement this in a subclass to add node-to-many-nodes messaging + */ protected def toAllNodes(msg : Array[Byte]) : Unit + /** + * Sends the specified message to the given recipients using the serializer + * that's been set in the akka-conf + */ protected def broadcast[T <: AnyRef](recipients: Iterable[ADDR_T], msg: T): Unit = { lazy val m = Cluster.serializer out msg for (r <- recipients) toOneNode(r,m) } + /** + * Sends the specified message toall other nodes using the serializer + * that's been set in the akka-conf + */ protected def broadcast[T <: AnyRef](msg: T): Unit = if (!remotes.isEmpty) toAllNodes(Cluster.serializer out msg) + /** + * Applies the given PartialFunction to all known RemoteAddresses + */ def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] = remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress) + /** + * Registers a local endpoint + */ def registerLocalNode(hostname: String, port: Int): Unit = send(RegisterLocalNode(RemoteAddress(hostname, port))) + /** + * Deregisters a local endpoint + */ def deregisterLocalNode(hostname: String, port: Int): Unit = send(DeregisterLocalNode(RemoteAddress(hostname, port))) + /** + * Broadcasts the specified message to all Actors of type Class on all known Nodes + */ def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = send(RelayedMessage(to.getName, msg)) } /** - * A singleton representing the Cluster. + * A singleton representing the Cluster. *

* Loads a specified ClusterActor and delegates to that instance. */