diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 4ff8ad80a3..e6f76b6521 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -732,34 +732,43 @@ private[akka] final class FunctionRef( } } - // requires sychronized access because AddressTerminatedTopic must be updated together with this + // watching, _watchedBy and maintainAddressTerminatedSubscription requires sychronized access because + // AddressTerminatedTopic must be updated together with the variables here. + // Important: don't include calls to sendSystemMessage inside the synchronized since that can + // result in deadlock, see issue #26326 private[this] var watching = ActorCell.emptyActorRefSet - // requires sychronized access because AddressTerminatedTopic must be updated together with this private[this] var _watchedBy: OptionVal[Set[ActorRef]] = OptionVal.Some(ActorCell.emptyActorRefSet) override def isTerminated: Boolean = _watchedBy.isEmpty //noinspection EmptyCheck - protected def sendTerminated(): Unit = synchronized { + protected def sendTerminated(): Unit = { def unwatchWatched(watched: ActorRef): Unit = watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this)) - _watchedBy match { - case OptionVal.Some(watchedBy) ⇒ - if (watchedBy.nonEmpty) { - watchedBy foreach sendTerminated(ifLocal = false) - watchedBy foreach sendTerminated(ifLocal = true) - } - - if (watching.nonEmpty) { - watching foreach unwatchWatched + val (toUnwatch, watchedBy) = this.synchronized { + _watchedBy match { + case OptionVal.Some(wBy) ⇒ + val oldWatching = watching watching = Set.empty - } - unsubscribeAddressTerminated() - _watchedBy = OptionVal.None + unsubscribeAddressTerminated() + _watchedBy = OptionVal.None - case OptionVal.None ⇒ + (oldWatching, wBy) + + case OptionVal.None ⇒ + (ActorCell.emptyActorRefSet, ActorCell.emptyActorRefSet) + } + } + + // outside of synchronized block + if (toUnwatch.nonEmpty) + toUnwatch foreach unwatchWatched + + if (watchedBy.nonEmpty) { + watchedBy foreach sendTerminated(ifLocal = false) + watchedBy foreach sendTerminated(ifLocal = true) } } @@ -767,48 +776,61 @@ private[akka] final class FunctionRef( if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false)) - private def addressTerminated(address: Address): Unit = synchronized { - // cleanup watchedBy since we know they are dead - _watchedBy match { - case OptionVal.None ⇒ // terminated - case OptionVal.Some(watchedBy) ⇒ - maintainAddressTerminatedSubscription(OptionVal.None) { - _watchedBy = OptionVal.Some(watchedBy.filterNot(_.path.address == address)) - } - // send DeathWatchNotification to self for all matching subjects - for (a ← watching; if a.path.address == address) { - this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) - } + private def addressTerminated(address: Address): Unit = { + val toNotify = this.synchronized { + // cleanup watchedBy since we know they are dead + _watchedBy match { + case OptionVal.None ⇒ + // terminated + ActorCell.emptyActorRefSet + case OptionVal.Some(watchedBy) ⇒ + maintainAddressTerminatedSubscription(OptionVal.None) { + _watchedBy = OptionVal.Some(watchedBy.filterNot(_.path.address == address)) + } + watching + } + } + + // outside of synchronized block + // send DeathWatchNotification to self for all matching subjects + for (a ← toNotify; if a.path.address == address) { + this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) } } override def stop(): Unit = sendTerminated() - private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized { - _watchedBy match { - case OptionVal.None ⇒ - sendTerminated(ifLocal = true)(watcher) - sendTerminated(ifLocal = false)(watcher) + private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + val selfTerminated = this.synchronized { + _watchedBy match { + case OptionVal.None ⇒ + true + case OptionVal.Some(watchedBy) ⇒ + val watcheeSelf = watchee == this + val watcherSelf = watcher == this - case OptionVal.Some(watchedBy) ⇒ - val watcheeSelf = watchee == this - val watcherSelf = watcher == this - - if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) { - maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) { - _watchedBy = OptionVal.Some(watchedBy + watcher) + if (watcheeSelf && !watcherSelf) { + if (!watchedBy.contains(watcher)) { + maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) { + _watchedBy = OptionVal.Some(watchedBy + watcher) + } } + } else if (!watcheeSelf && watcherSelf) { + publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef")) + } else { + publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this")) } - } else if (!watcheeSelf && watcherSelf) { - publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef")) - } else { - publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this")) - } + false + } + } + // outside of synchronized block + if (selfTerminated) { + sendTerminated(ifLocal = true)(watcher) + sendTerminated(ifLocal = false)(watcher) } } - private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized { + private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = this.synchronized { _watchedBy match { case OptionVal.None ⇒ // do nothing... case OptionVal.Some(watchedBy) ⇒ @@ -837,10 +859,13 @@ private[akka] final class FunctionRef( * Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak, * which is different from an ordinary actor. */ - def watch(actorRef: ActorRef): Unit = synchronized { - maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) { - watching += actorRef + def watch(actorRef: ActorRef): Unit = { + this.synchronized { + maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) { + watching += actorRef + } } + // outside of synchronized block actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this)) } @@ -850,17 +875,20 @@ private[akka] final class FunctionRef( * Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak, * which is different from an ordinary actor. */ - def unwatch(actorRef: ActorRef): Unit = synchronized { - maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) { - watching -= actorRef + def unwatch(actorRef: ActorRef): Unit = { + this.synchronized { + maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) { + watching -= actorRef + } } + // outside of synchronized block actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this)) } /** * Query whether this FunctionRef is currently watching the given Actor. */ - def isWatching(actorRef: ActorRef): Boolean = synchronized { + def isWatching(actorRef: ActorRef): Boolean = this.synchronized { watching.contains(actorRef) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index ff01b29e05..99a24137d3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory import akka.actor.CoordinatedShutdown import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Sink, Source, StreamRefs } import scala.concurrent.Await @@ -171,6 +173,40 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } } + "terminate ActorSystem via CoordinatedShutdown.run when a stream involving StreamRefs is running" in { + val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.coordinated-shutdown.terminate-actor-system = on + """)) + try { + val probe = TestProbe()(sys2) + Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent]) + probe.expectMsgType[CurrentClusterState] + Cluster(sys2).join(Cluster(sys2).selfAddress) + probe.expectMsgType[MemberUp] + val mat = ActorMaterializer()(sys2) + val sink = Await.result(StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat), 10.seconds) + Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat) + + CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) + probe.expectMsgType[MemberLeft] + // MemberExited might not be published before MemberRemoved + val removed = probe.fishForMessage() { + case _: MemberExited ⇒ false + case _: MemberRemoved ⇒ true + }.asInstanceOf[MemberRemoved] + removed.previousStatus should ===(MemberStatus.Exiting) + + Await.result(sys2.whenTerminated, 10.seconds) + Cluster(sys2).isTerminated should ===(true) + CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.UnknownReason)) + } finally { + shutdown(sys2) + } + } + "leave via CoordinatedShutdown.run when member status is Joining" in { val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" akka.actor.provider = "cluster"