Merge pull request #21813 from akka/wip-21518-patriknw
minor cleanup of cluster.subscribe usage
This commit is contained in:
commit
a7eed00948
2 changed files with 4 additions and 3 deletions
|
|
@ -21,6 +21,7 @@ import akka.cluster.ddata.Replicator._
|
||||||
import akka.dispatch.ExecutionContexts
|
import akka.dispatch.ExecutionContexts
|
||||||
import akka.pattern.{ AskTimeoutException, pipe }
|
import akka.pattern.{ AskTimeoutException, pipe }
|
||||||
import akka.persistence._
|
import akka.persistence._
|
||||||
|
import akka.cluster.ClusterEvent
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see [[ClusterSharding$ ClusterSharding extension]]
|
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||||
|
|
@ -840,7 +841,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||||
val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState")
|
val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState")
|
||||||
val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities)
|
val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities)
|
||||||
|
|
||||||
node.subscribe(self, ClusterShuttingDown.getClass)
|
node.subscribe(self, ClusterEvent.InitialStateAsEvents, ClusterShuttingDown.getClass)
|
||||||
|
|
||||||
// get state from ddata replicator, repeat until GetSuccess
|
// get state from ddata replicator, repeat until GetSuccess
|
||||||
getState()
|
getState()
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import akka.cluster.MemberStatus
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.cluster.UniqueAddress
|
import akka.cluster.UniqueAddress
|
||||||
|
import akka.cluster.ClusterEvent
|
||||||
|
|
||||||
object ClusterSingletonManagerSettings {
|
object ClusterSingletonManagerSettings {
|
||||||
|
|
||||||
|
|
@ -435,7 +436,7 @@ class ClusterSingletonManager(
|
||||||
require(!cluster.isTerminated, "Cluster node must not be terminated")
|
require(!cluster.isTerminated, "Cluster node must not be terminated")
|
||||||
|
|
||||||
// subscribe to cluster changes, re-subscribe when restart
|
// subscribe to cluster changes, re-subscribe when restart
|
||||||
cluster.subscribe(self, classOf[MemberExited], classOf[MemberRemoved])
|
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberExited], classOf[MemberRemoved])
|
||||||
|
|
||||||
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
|
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
|
||||||
|
|
||||||
|
|
@ -715,7 +716,6 @@ class ClusterSingletonManager(
|
||||||
}
|
}
|
||||||
|
|
||||||
whenUnhandled {
|
whenUnhandled {
|
||||||
case Event(_: CurrentClusterState, _) ⇒ stay
|
|
||||||
case Event(MemberExited(m), _) ⇒
|
case Event(MemberExited(m), _) ⇒
|
||||||
if (m.uniqueAddress == cluster.selfUniqueAddress) {
|
if (m.uniqueAddress == cluster.selfUniqueAddress) {
|
||||||
selfExited = true
|
selfExited = true
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue