Cluster singleton manager: don't send member events to FSM during shutdown (#24236)

There exists a race where a cluter node that is being downed seens its
self as the oldest node (as it has had the other nodes removed) and it
takes over the singleton manager sending the real oldest node to go into
the End state meaning that cluster singletons never work again.

This fix simply prevents Member events being given to the Cluster
Manager FSM during a shut down, instread relying on SelfExiting.

This also hardens the test by not downing the node that the current
sharding coordinator is running on as well as fixing a bug in the
probes.
This commit is contained in:
Christopher Batey 2018-01-05 08:47:43 +00:00 committed by Arnout Engelen
parent 1d14c387a0
commit 0380cc517a
4 changed files with 26 additions and 22 deletions

View file

@ -407,7 +407,7 @@ final class CoordinatedShutdown private[akka] (
remainingPhases match { remainingPhases match {
case Nil Future.successful(Done) case Nil Future.successful(Done)
case phase :: remaining case phase :: remaining
val phaseResult = (tasks.get(phase) match { val phaseResult = tasks.get(phase) match {
case null case null
if (debugEnabled) log.debug("Performing phase [{}] with [0] tasks", phase) if (debugEnabled) log.debug("Performing phase [{}] with [0] tasks", phase)
Future.successful(Done) Future.successful(Done)
@ -459,7 +459,7 @@ final class CoordinatedShutdown private[akka] (
result result
} }
Future.firstCompletedOf(List(result, timeoutFut)) Future.firstCompletedOf(List(result, timeoutFut))
}) }
if (remaining.isEmpty) if (remaining.isEmpty)
phaseResult // avoid flatMap when system terminated in last phase phaseResult // avoid flatMap when system terminated in last phase
else else

View file

@ -61,7 +61,7 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
Future.successful(Done) Future.successful(Done)
} }
CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { ()
probe1.ref ! "CS-unbind-3" probe3.ref ! "CS-unbind-3"
Future.successful(Done) Future.successful(Done)
} }
@ -70,13 +70,14 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
shutdown(sys2) shutdown(sys2)
} }
// Using region 2 as it is not shutdown in either test
def pingEntities(): Unit = { def pingEntities(): Unit = {
region3.tell(1, probe3.ref) region2.tell(1, probe2.ref)
probe3.expectMsg(10.seconds, 1) probe2.expectMsg(10.seconds, 1)
region3.tell(2, probe3.ref) region2.tell(2, probe2.ref)
probe3.expectMsg(2) probe2.expectMsg(10.seconds, 2)
region3.tell(3, probe3.ref) region2.tell(3, probe2.ref)
probe3.expectMsg(3) probe2.expectMsg(10.seconds, 3)
} }
"Sharding and CoordinatedShutdown" must { "Sharding and CoordinatedShutdown" must {
@ -134,18 +135,19 @@ class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardi
} }
"run coordinated shutdown when downing" in { "run coordinated shutdown when downing" in {
Cluster(sys3).down(Cluster(sys2).selfAddress) // coordinator is on sys2
probe2.expectMsg("CS-unbind-2") Cluster(sys2).down(Cluster(sys3).selfAddress)
probe3.expectMsg("CS-unbind-3")
within(10.seconds) { within(10.seconds) {
awaitAssert { awaitAssert {
Cluster(system).state.members.size should ===(1) Cluster(sys2).state.members.size should ===(1)
} }
} }
within(10.seconds) { within(10.seconds) {
awaitAssert { awaitAssert {
Cluster(sys2).isTerminated should ===(true) Cluster(sys3).isTerminated should ===(true)
sys2.whenTerminated.isCompleted should ===(true) sys3.whenTerminated.isCompleted should ===(true)
} }
} }

View file

@ -303,9 +303,12 @@ object ClusterSingletonManager {
} }
def sendFirstChange(): Unit = { def sendFirstChange(): Unit = {
val event = changes.head // don't send cluster change events if this node is shutting its self down, just wait for SelfExiting
changes = changes.tail if (!cluster.isTerminated) {
context.parent ! event val event = changes.head
changes = changes.tail
context.parent ! event
}
} }
def receive = { def receive = {
@ -331,7 +334,7 @@ object ClusterSingletonManager {
context.unbecome() context.unbecome()
case MemberUp(m) case MemberUp(m)
add(m) add(m)
deliverChanges deliverChanges()
case MemberRemoved(m, _) case MemberRemoved(m, _)
remove(m) remove(m)
deliverChanges() deliverChanges()
@ -357,9 +360,7 @@ object ClusterSingletonManager {
case _ super.unhandled(msg) case _ super.unhandled(msg)
} }
} }
} }
} }
} }
@ -763,7 +764,7 @@ class ClusterSingletonManager(
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton
handOverDone(handOverTo) handOverDone(handOverTo)
case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) case Event(HandOverToMe, HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender())
// retry // retry
sender() ! HandOverInProgress sender() ! HandOverInProgress
stay stay

View file

@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi
import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterSettings.DataCenter
import akka.dispatch.MonitorableThreadFactory import akka.dispatch.MonitorableThreadFactory
import akka.event.{ Logging, LoggingAdapter } import akka.event.{ Logging, LoggingAdapter }
@ -409,7 +410,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* Should not called by the user. The user can issue a LEAVE command which will tell the node * Should not called by the user. The user can issue a LEAVE command which will tell the node
* to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`. * to go through graceful handoff process `LEAVE -> EXITING -> REMOVED -> SHUTDOWN`.
*/ */
private[cluster] def shutdown(): Unit = { @InternalApi private[cluster] def shutdown(): Unit = {
if (_isTerminated.compareAndSet(false, true)) { if (_isTerminated.compareAndSet(false, true)) {
logInfo("Shutting down...") logInfo("Shutting down...")