Merge pull request #699 from akka/wip-2438-send-cluster-state-patriknw
Request send/publish of CurrentClusterState, see #2438
This commit is contained in:
commit
00b6bb660b
5 changed files with 57 additions and 31 deletions
|
|
@ -189,6 +189,24 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
def unsubscribe(subscriber: ActorRef): Unit =
|
||||
clusterCore ! InternalClusterAction.Unsubscribe(subscriber)
|
||||
|
||||
/**
|
||||
* Publish current (full) state of the cluster to subscribers,
|
||||
* that are subscribing to [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
|
||||
* or [[akka.cluster.ClusterEvent.CurrentClusterState]].
|
||||
* If you want this to happen periodically you need to schedule a call to
|
||||
* this method yourself.
|
||||
*/
|
||||
def publishCurrentClusterState(): Unit =
|
||||
clusterCore ! InternalClusterAction.PublishCurrentClusterState(None)
|
||||
|
||||
/**
|
||||
* Publish current (full) state of the cluster to the specified
|
||||
* receiver. If you want this to happen periodically you need to schedule
|
||||
* a call to this method yourself.
|
||||
*/
|
||||
def sendCurrentClusterState(receiver: ActorRef): Unit =
|
||||
clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver))
|
||||
|
||||
/**
|
||||
* Try to join this cluster node with the node specified by 'address'.
|
||||
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||
|
|
|
|||
|
|
@ -106,13 +106,15 @@ private[cluster] object InternalClusterAction {
|
|||
sealed trait SubscriptionMessage
|
||||
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage
|
||||
case class Unsubscribe(subscriber: ActorRef) extends SubscriptionMessage
|
||||
/**
|
||||
* @param receiver if `receiver` is defined the event will only be sent to that
|
||||
* actor, otherwise it will be sent to all subscribers via the `eventStream`.
|
||||
*/
|
||||
case class PublishCurrentClusterState(receiver: Option[ActorRef]) extends SubscriptionMessage
|
||||
|
||||
case class PublishChanges(oldGossip: Gossip, newGossip: Gossip)
|
||||
case object PublishDone
|
||||
|
||||
case class Ping(timestamp: Long = System.currentTimeMillis) extends ClusterMessage
|
||||
case class Pong(ping: Ping, timestamp: Long = System.currentTimeMillis) extends ClusterMessage
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -255,7 +257,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
|||
case Remove(address) ⇒ removing(address)
|
||||
case SendGossipTo(address) ⇒ gossipTo(address)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case p: Ping ⇒ ping(p)
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -831,7 +832,6 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
|||
|
||||
def publishInternalStats(): Unit = publisher ! CurrentInternalStats(stats)
|
||||
|
||||
def ping(p: Ping): Unit = sender ! Pong(p)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -177,6 +177,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
def receive = {
|
||||
case PublishChanges(oldGossip, newGossip) ⇒ publishChanges(oldGossip, newGossip)
|
||||
case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats)
|
||||
case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver)
|
||||
case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to)
|
||||
case Unsubscribe(subscriber) ⇒ unsubscribe(subscriber)
|
||||
case PublishDone ⇒ sender ! PublishDone
|
||||
|
|
@ -184,13 +185,21 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
|
||||
def eventStream: EventStream = context.system.eventStream
|
||||
|
||||
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
|
||||
subscriber ! CurrentClusterState(
|
||||
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
|
||||
val state = CurrentClusterState(
|
||||
members = latestGossip.members,
|
||||
unreachable = latestGossip.overview.unreachable,
|
||||
convergence = latestGossip.convergence,
|
||||
seenBy = latestGossip.seenBy,
|
||||
leader = latestGossip.leader)
|
||||
receiver match {
|
||||
case Some(ref) ⇒ ref ! state
|
||||
case None ⇒ eventStream publish state
|
||||
}
|
||||
}
|
||||
|
||||
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
|
||||
publishCurrentClusterState(Some(subscriber))
|
||||
eventStream.subscribe(subscriber, to)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -67,21 +67,11 @@ abstract class TransitionSpec
|
|||
memberStatus(address) == status
|
||||
}
|
||||
|
||||
def leaderActions(): Unit = {
|
||||
def leaderActions(): Unit =
|
||||
cluster.clusterCore ! LeaderActionsTick
|
||||
awaitPing()
|
||||
}
|
||||
|
||||
def reapUnreachable(): Unit = {
|
||||
def reapUnreachable(): Unit =
|
||||
cluster.clusterCore ! ReapUnreachableTick
|
||||
awaitPing()
|
||||
}
|
||||
|
||||
def awaitPing(): Unit = {
|
||||
val ping = Ping()
|
||||
cluster.clusterCore ! ping
|
||||
expectMsgPF() { case pong @ Pong(`ping`, _) ⇒ pong }
|
||||
}
|
||||
|
||||
// DSL sugar for `role1 gossipTo role2`
|
||||
implicit def roleExtras(role: RoleName): RoleWrapper = new RoleWrapper(role)
|
||||
|
|
|
|||
|
|
@ -6,10 +6,8 @@ package akka.cluster
|
|||
|
||||
import language.postfixOps
|
||||
import language.reflectiveCalls
|
||||
|
||||
import scala.concurrent.util.duration._
|
||||
import scala.concurrent.util.Duration
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.actor.ExtendedActorSystem
|
||||
|
|
@ -17,6 +15,7 @@ import akka.actor.Address
|
|||
import akka.cluster.InternalClusterAction._
|
||||
import java.lang.management.ManagementFactory
|
||||
import javax.management.ObjectName
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object ClusterSpec {
|
||||
val config = """
|
||||
|
|
@ -45,16 +44,8 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
val cluster = Cluster(system)
|
||||
def clusterView = cluster.readView
|
||||
|
||||
def leaderActions(): Unit = {
|
||||
def leaderActions(): Unit =
|
||||
cluster.clusterCore ! LeaderActionsTick
|
||||
awaitPing()
|
||||
}
|
||||
|
||||
def awaitPing(): Unit = {
|
||||
val ping = Ping()
|
||||
cluster.clusterCore ! ping
|
||||
expectMsgPF() { case pong @ Pong(`ping`, _) ⇒ pong }
|
||||
}
|
||||
|
||||
"A Cluster" must {
|
||||
|
||||
|
|
@ -79,7 +70,25 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
clusterView.status must be(MemberStatus.Joining)
|
||||
clusterView.convergence must be(true)
|
||||
leaderActions()
|
||||
clusterView.status must be(MemberStatus.Up)
|
||||
awaitCond(clusterView.status == MemberStatus.Up)
|
||||
}
|
||||
|
||||
"publish CurrentClusterState to subscribers when requested" in {
|
||||
try {
|
||||
cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent])
|
||||
// first, is in response to the subscription
|
||||
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
|
||||
|
||||
cluster.publishCurrentClusterState()
|
||||
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
|
||||
} finally {
|
||||
cluster.unsubscribe(testActor)
|
||||
}
|
||||
}
|
||||
|
||||
"send CurrentClusterState to one receiver when requested" in {
|
||||
cluster.sendCurrentClusterState(testActor)
|
||||
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue