From 9926658f7efa177d8dbecd96a5f38cd370a8bbff Mon Sep 17 00:00:00 2001 From: Ostapenko Evgeniy Date: Fri, 21 Aug 2015 17:28:35 +0300 Subject: [PATCH] =cls #18287 failed: ClusterShardingSpec ddata mode --- .../cluster/sharding/ShardCoordinator.scala | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 1239234f26..d1ae87073b 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -9,19 +9,9 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Success -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Cancellable -import akka.actor.Deploy -import akka.actor.NoSerializationVerificationNeeded -import akka.actor.Props -import akka.actor.ReceiveTimeout -import akka.actor.Stash -import akka.actor.Terminated +import akka.actor._ import akka.cluster.Cluster -import akka.cluster.ClusterEvent.ClusterShuttingDown -import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent._ import akka.cluster.ddata.LWWRegisterKey import akka.cluster.ddata.LWWRegister import akka.cluster.ddata.Replicator._ @@ -391,11 +381,12 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti // regions that have requested handoff, for graceful shutdown var gracefulShutdownInProgress = Set.empty[ActorRef] var aliveRegions = Set.empty[ActorRef] + var members = Set.empty[Address] import context.dispatcher val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) - Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass) + Cluster(context.system).subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass, classOf[MemberEvent]) override def postStop(): Unit = { super.postStop() @@ -403,23 +394,33 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti Cluster(context.system).unsubscribe(self) } - def active: Receive = { - case Register(region) ⇒ - log.debug("ShardRegion registered: [{}]", region) - aliveRegions += region - if (state.regions.contains(region)) - region ! RegisterAck(self) - else { - gracefulShutdownInProgress -= region - update(ShardRegionRegistered(region)) { evt ⇒ - val firstRegion = state.regions.isEmpty - state = state.updated(evt) - context.watch(region) - region ! RegisterAck(self) + def common: Receive = { + case MemberUp(m) ⇒ members += m.address + case MemberRemoved(m, _) ⇒ members -= m.address + case _: MemberEvent ⇒ + } - if (firstRegion) - allocateShardHomes() + def active: Receive = ({ + case Register(region) ⇒ + if (region.path.address == self.path.address || members(region.path.address)) { + log.debug("ShardRegion registered: [{}]", region) + aliveRegions += region + if (state.regions.contains(region)) + region ! RegisterAck(self) + else { + gracefulShutdownInProgress -= region + update(ShardRegionRegistered(region)) { evt ⇒ + val firstRegion = state.regions.isEmpty + state = state.updated(evt) + context.watch(region) + region ! RegisterAck(self) + + if (firstRegion) + allocateShardHomes() + } } + } else { + log.warning("ShardRegion {} was not registered since the coordinator currently does not know about a node of that region", region) } case RegisterProxy(proxy) ⇒ @@ -554,7 +555,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti sender() ! reply case _: CurrentClusterState ⇒ - } + }: Receive) orElse common def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit @@ -730,7 +731,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, override def receive: Receive = waitingForState // This state will drop all other messages since they will be retried - def waitingForState: Receive = { + def waitingForState: Receive = ({ case g @ GetSuccess(CoordinatorStateKey, _) ⇒ state = g.get(CoordinatorStateKey).value state.regionProxies.foreach(context.watch) @@ -746,9 +747,9 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, getState() case NotFound(CoordinatorStateKey, _) ⇒ activate() - } + }: Receive) orElse common - def waitingForUpdate[E <: DomainEvent](evt: E): Receive = { + def waitingForUpdate[E <: DomainEvent](evt: E): Receive = ({ case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒ log.debug("The coordinator state was successfully updated with {}", evt) updateSuccess(evt) @@ -768,7 +769,7 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, evt) throw cause case _ ⇒ stash() - } + }: Receive) orElse common def activate() = { context.become(active)