fix a race condition in ClusterReadView #24710 (#26922)

* remove lazy initialization for _cachedSelf
* don't update _state and friends after close()
This commit is contained in:
Patrik Nordwall 2019-05-23 14:08:26 +02:00 committed by Arnout Engelen
parent 3015f197f1
commit a6f717c9b0
5 changed files with 21 additions and 46 deletions

View file

@ -0,0 +1,2 @@
# #24710 remove internal ClusterReadView.refreshCurrentState
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.refreshCurrentState")

View file

@ -7,12 +7,16 @@ package akka.cluster
import java.io.Closeable import java.io.Closeable
import scala.collection.immutable import scala.collection.immutable
import akka.actor.{ Actor, ActorRef, Address, Props }
import akka.cluster.ClusterEvent._ import akka.actor.Actor
import akka.actor.PoisonPill import akka.actor.ActorRef
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.actor.Address
import akka.actor.Deploy import akka.actor.Deploy
import akka.util.OptionVal import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.ClusterEvent._
import akka.dispatch.RequiresMessageQueue
import akka.dispatch.UnboundedMessageQueueSemantics
/** /**
* INTERNAL API * INTERNAL API
@ -32,10 +36,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
@volatile @volatile
private var _reachability: Reachability = Reachability.empty private var _reachability: Reachability = Reachability.empty
// lazy init below, updated when state is updated
@volatile @volatile
private var _cachedSelf: OptionVal[Member] = OptionVal.None private var _cachedSelf: Member =
Member(cluster.selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)
@volatile @volatile
private var _closed: Boolean = false private var _closed: Boolean = false
@ -45,17 +48,16 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
@volatile @volatile
private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats()) private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats())
val selfAddress = cluster.selfAddress val selfAddress: Address = cluster.selfAddress
// create actor that subscribes to the cluster eventBus to update current read view state // create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = { private val eventBusListener: ActorRef = {
cluster.system cluster.system
.systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { .systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = cluster.unsubscribe(self)
def receive: Receive = { def receive: Receive = {
case e: ClusterDomainEvent => case e: ClusterDomainEvent if !_closed =>
e match { e match {
case SeenChanged(_, seenBy) => case SeenChanged(_, seenBy) =>
_state = _state.copy(seenBy = seenBy) _state = _state.copy(seenBy = seenBy)
@ -90,12 +92,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
e match { e match {
case e: MemberEvent if e.member.address == selfAddress => case e: MemberEvent if e.member.address == selfAddress =>
_cachedSelf match { _cachedSelf = e.member
case OptionVal.Some(s) if s.status == MemberStatus.Removed && _closed =>
// ignore as Cluster.close has been called
case _ =>
_cachedSelf = OptionVal.Some(e.member)
}
case _ => case _ =>
} }
@ -107,29 +104,16 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
logInfo("event {}", event) logInfo("event {}", event)
} }
case s: CurrentClusterState => _state = s case s: CurrentClusterState if !_closed =>
_state = s
_cachedSelf = s.members.find(_.uniqueAddress == cluster.selfUniqueAddress).getOrElse(_cachedSelf)
} }
}).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener")
} }
def state: CurrentClusterState = _state def state: CurrentClusterState = _state
def self: Member = { def self: Member = _cachedSelf
_cachedSelf match {
case OptionVal.None =>
// lazy initialization here, later updated from elsewhere
_cachedSelf = OptionVal.Some(selfFromStateOrPlaceholder)
_cachedSelf.get
case OptionVal.Some(member) => member
}
}
private def selfFromStateOrPlaceholder = {
import cluster.selfUniqueAddress
state.members
.find(_.uniqueAddress == selfUniqueAddress)
.getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed))
}
/** /**
* Returns true if this cluster instance has be shutdown. * Returns true if this cluster instance has be shutdown.
@ -183,12 +167,6 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def reachability: Reachability = _reachability def reachability: Reachability = _reachability
/**
* INTERNAL API
*/
private[cluster] def refreshCurrentState(): Unit =
cluster.sendCurrentClusterState(eventBusListener)
/** /**
* INTERNAL API * INTERNAL API
* The nodes that has seen current version of the Gossip. * The nodes that has seen current version of the Gossip.
@ -205,7 +183,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
*/ */
def close(): Unit = { def close(): Unit = {
_closed = true _closed = true
_cachedSelf = OptionVal.Some(self.copy(MemberStatus.Removed)) _cachedSelf = self.copy(MemberStatus.Removed)
if (!eventBusListener.isTerminated) if (!eventBusListener.isTerminated)
eventBusListener ! PoisonPill eventBusListener ! PoisonPill
} }

View file

@ -118,7 +118,6 @@ abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig)
runOn(first) { runOn(first) {
cluster.join(myself) cluster.join(myself)
awaitAssert { awaitAssert {
clusterView.refreshCurrentState()
clusterView.status should ===(Joining) clusterView.status should ===(Joining)
} }
} }
@ -132,7 +131,6 @@ abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig)
runOn(first, second) { runOn(first, second) {
val expectedAddresses = Set(first, second).map(address) val expectedAddresses = Set(first, second).map(address)
awaitAssert { awaitAssert {
clusterView.refreshCurrentState()
clusterView.members.map(_.address) should ===(expectedAddresses) clusterView.members.map(_.address) should ===(expectedAddresses)
} }
clusterView.members.unsorted.map(_.status) should ===(Set(Joining)) clusterView.members.unsorted.map(_.status) should ===(Set(Joining))

View file

@ -251,7 +251,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
cluster.join(joinNode) cluster.join(joinNode)
awaitCond( awaitCond(
{ {
clusterView.refreshCurrentState()
if (memberInState(joinNode, List(MemberStatus.Up)) && if (memberInState(joinNode, List(MemberStatus.Up)) &&
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up))) memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
true true

View file

@ -65,12 +65,10 @@ abstract class TransitionSpec
} }
def awaitMembers(addresses: Address*): Unit = awaitAssert { def awaitMembers(addresses: Address*): Unit = awaitAssert {
clusterView.refreshCurrentState()
memberAddresses should ===(addresses.toSet) memberAddresses should ===(addresses.toSet)
} }
def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitAssert { def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitAssert {
clusterView.refreshCurrentState()
memberStatus(address) should ===(status) memberStatus(address) should ===(status)
} }