From a6f717c9b0900c520bb1c08e74d5cc3e2965881d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 23 May 2019 14:08:26 +0200 Subject: [PATCH] fix a race condition in ClusterReadView #24710 (#26922) * remove lazy initialization for _cachedSelf * don't update _state and friends after close() --- .../mima-filters/2.5.x.backwards.excludes | 2 + .../scala/akka/cluster/ClusterReadView.scala | 60 ++++++------------- .../akka/cluster/MinMembersBeforeUpSpec.scala | 2 - .../akka/cluster/MultiNodeClusterSpec.scala | 1 - .../scala/akka/cluster/TransitionSpec.scala | 2 - 5 files changed, 21 insertions(+), 46 deletions(-) create mode 100644 akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes diff --git a/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes b/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes new file mode 100644 index 0000000000..c36e04e81f --- /dev/null +++ b/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes @@ -0,0 +1,2 @@ +# #24710 remove internal ClusterReadView.refreshCurrentState +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterReadView.refreshCurrentState") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 487e2db400..dd579fa8d0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -7,12 +7,16 @@ package akka.cluster import java.io.Closeable import scala.collection.immutable -import akka.actor.{ Actor, ActorRef, Address, Props } -import akka.cluster.ClusterEvent._ -import akka.actor.PoisonPill -import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Address import akka.actor.Deploy -import akka.util.OptionVal +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.ClusterEvent._ +import akka.dispatch.RequiresMessageQueue +import akka.dispatch.UnboundedMessageQueueSemantics /** * INTERNAL API @@ -32,10 +36,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { @volatile private var _reachability: Reachability = Reachability.empty - // lazy init below, updated when state is updated @volatile - private var _cachedSelf: OptionVal[Member] = OptionVal.None - + private var _cachedSelf: Member = + Member(cluster.selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed) @volatile private var _closed: Boolean = false @@ -45,17 +48,16 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { @volatile private var _latestStats = CurrentInternalStats(GossipStats(), VectorClockStats()) - val selfAddress = cluster.selfAddress + val selfAddress: Address = cluster.selfAddress // create actor that subscribes to the cluster eventBus to update current read view state private val eventBusListener: ActorRef = { cluster.system .systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) - override def postStop(): Unit = cluster.unsubscribe(self) def receive: Receive = { - case e: ClusterDomainEvent => + case e: ClusterDomainEvent if !_closed => e match { case SeenChanged(_, seenBy) => _state = _state.copy(seenBy = seenBy) @@ -90,12 +92,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { e match { case e: MemberEvent if e.member.address == selfAddress => - _cachedSelf match { - case OptionVal.Some(s) if s.status == MemberStatus.Removed && _closed => - // ignore as Cluster.close has been called - case _ => - _cachedSelf = OptionVal.Some(e.member) - } + _cachedSelf = e.member case _ => } @@ -107,29 +104,16 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { logInfo("event {}", event) } - case s: CurrentClusterState => _state = s + case s: CurrentClusterState if !_closed => + _state = s + _cachedSelf = s.members.find(_.uniqueAddress == cluster.selfUniqueAddress).getOrElse(_cachedSelf) } }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") } def state: CurrentClusterState = _state - def self: Member = { - _cachedSelf match { - case OptionVal.None => - // lazy initialization here, later updated from elsewhere - _cachedSelf = OptionVal.Some(selfFromStateOrPlaceholder) - _cachedSelf.get - case OptionVal.Some(member) => member - } - } - - private def selfFromStateOrPlaceholder = { - import cluster.selfUniqueAddress - state.members - .find(_.uniqueAddress == selfUniqueAddress) - .getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)) - } + def self: Member = _cachedSelf /** * Returns true if this cluster instance has be shutdown. @@ -183,12 +167,6 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def reachability: Reachability = _reachability - /** - * INTERNAL API - */ - private[cluster] def refreshCurrentState(): Unit = - cluster.sendCurrentClusterState(eventBusListener) - /** * INTERNAL API * The nodes that has seen current version of the Gossip. @@ -205,7 +183,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { */ def close(): Unit = { _closed = true - _cachedSelf = OptionVal.Some(self.copy(MemberStatus.Removed)) + _cachedSelf = self.copy(MemberStatus.Removed) if (!eventBusListener.isTerminated) eventBusListener ! PoisonPill } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala index 97b1b6ef9c..0fabedf16b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MinMembersBeforeUpSpec.scala @@ -118,7 +118,6 @@ abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig) runOn(first) { cluster.join(myself) awaitAssert { - clusterView.refreshCurrentState() clusterView.status should ===(Joining) } } @@ -132,7 +131,6 @@ abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig) runOn(first, second) { val expectedAddresses = Set(first, second).map(address) awaitAssert { - clusterView.refreshCurrentState() clusterView.members.map(_.address) should ===(expectedAddresses) } clusterView.members.unsorted.map(_.status) should ===(Set(Joining)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 1a1d1ff55e..ce38a0bf71 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -251,7 +251,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro cluster.join(joinNode) awaitCond( { - clusterView.refreshCurrentState() if (memberInState(joinNode, List(MemberStatus.Up)) && memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up))) true diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 8f3b9197f9..77663a6eff 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -65,12 +65,10 @@ abstract class TransitionSpec } def awaitMembers(addresses: Address*): Unit = awaitAssert { - clusterView.refreshCurrentState() memberAddresses should ===(addresses.toSet) } def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitAssert { - clusterView.refreshCurrentState() memberStatus(address) should ===(status) }