=cls #18287 failed: ClusterShardingSpec ddata mode
This commit is contained in:
parent
3d4b5f57b0
commit
9926658f7e
1 changed files with 34 additions and 33 deletions
|
|
@ -9,19 +9,9 @@ import scala.collection.immutable
|
|||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Success
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Cancellable
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.NoSerializationVerificationNeeded
|
||||
import akka.actor.Props
|
||||
import akka.actor.ReceiveTimeout
|
||||
import akka.actor.Stash
|
||||
import akka.actor.Terminated
|
||||
import akka.actor._
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.ClusterEvent.ClusterShuttingDown
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.cluster.ddata.LWWRegisterKey
|
||||
import akka.cluster.ddata.LWWRegister
|
||||
import akka.cluster.ddata.Replicator._
|
||||
|
|
@ -391,11 +381,12 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
|
|||
// regions that have requested handoff, for graceful shutdown
|
||||
var gracefulShutdownInProgress = Set.empty[ActorRef]
|
||||
var aliveRegions = Set.empty[ActorRef]
|
||||
var members = Set.empty[Address]
|
||||
|
||||
import context.dispatcher
|
||||
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
|
||||
|
||||
Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass)
|
||||
Cluster(context.system).subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass, classOf[MemberEvent])
|
||||
|
||||
override def postStop(): Unit = {
|
||||
super.postStop()
|
||||
|
|
@ -403,23 +394,33 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
|
|||
Cluster(context.system).unsubscribe(self)
|
||||
}
|
||||
|
||||
def active: Receive = {
|
||||
case Register(region) ⇒
|
||||
log.debug("ShardRegion registered: [{}]", region)
|
||||
aliveRegions += region
|
||||
if (state.regions.contains(region))
|
||||
region ! RegisterAck(self)
|
||||
else {
|
||||
gracefulShutdownInProgress -= region
|
||||
update(ShardRegionRegistered(region)) { evt ⇒
|
||||
val firstRegion = state.regions.isEmpty
|
||||
state = state.updated(evt)
|
||||
context.watch(region)
|
||||
region ! RegisterAck(self)
|
||||
def common: Receive = {
|
||||
case MemberUp(m) ⇒ members += m.address
|
||||
case MemberRemoved(m, _) ⇒ members -= m.address
|
||||
case _: MemberEvent ⇒
|
||||
}
|
||||
|
||||
if (firstRegion)
|
||||
allocateShardHomes()
|
||||
def active: Receive = ({
|
||||
case Register(region) ⇒
|
||||
if (region.path.address == self.path.address || members(region.path.address)) {
|
||||
log.debug("ShardRegion registered: [{}]", region)
|
||||
aliveRegions += region
|
||||
if (state.regions.contains(region))
|
||||
region ! RegisterAck(self)
|
||||
else {
|
||||
gracefulShutdownInProgress -= region
|
||||
update(ShardRegionRegistered(region)) { evt ⇒
|
||||
val firstRegion = state.regions.isEmpty
|
||||
state = state.updated(evt)
|
||||
context.watch(region)
|
||||
region ! RegisterAck(self)
|
||||
|
||||
if (firstRegion)
|
||||
allocateShardHomes()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.warning("ShardRegion {} was not registered since the coordinator currently does not know about a node of that region", region)
|
||||
}
|
||||
|
||||
case RegisterProxy(proxy) ⇒
|
||||
|
|
@ -554,7 +555,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti
|
|||
sender() ! reply
|
||||
|
||||
case _: CurrentClusterState ⇒
|
||||
}
|
||||
}: Receive) orElse common
|
||||
|
||||
def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit
|
||||
|
||||
|
|
@ -730,7 +731,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
override def receive: Receive = waitingForState
|
||||
|
||||
// This state will drop all other messages since they will be retried
|
||||
def waitingForState: Receive = {
|
||||
def waitingForState: Receive = ({
|
||||
case g @ GetSuccess(CoordinatorStateKey, _) ⇒
|
||||
state = g.get(CoordinatorStateKey).value
|
||||
state.regionProxies.foreach(context.watch)
|
||||
|
|
@ -746,9 +747,9 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
getState()
|
||||
|
||||
case NotFound(CoordinatorStateKey, _) ⇒ activate()
|
||||
}
|
||||
}: Receive) orElse common
|
||||
|
||||
def waitingForUpdate[E <: DomainEvent](evt: E): Receive = {
|
||||
def waitingForUpdate[E <: DomainEvent](evt: E): Receive = ({
|
||||
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||
log.debug("The coordinator state was successfully updated with {}", evt)
|
||||
updateSuccess(evt)
|
||||
|
|
@ -768,7 +769,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
evt)
|
||||
throw cause
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
}: Receive) orElse common
|
||||
|
||||
def activate() = {
|
||||
context.become(active)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue