Merge pull request #26346 from akka/wip-26326-FunctionRef-deadlock-patriknw

Fix deadlock in FunctionRef, #26326
This commit is contained in:
Patrik Nordwall 2019-02-12 15:56:28 +01:00 committed by GitHub
commit e0a3fb28c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 54 deletions

View file

@ -732,34 +732,43 @@ private[akka] final class FunctionRef(
}
}
// requires sychronized access because AddressTerminatedTopic must be updated together with this
// watching, _watchedBy and maintainAddressTerminatedSubscription requires sychronized access because
// AddressTerminatedTopic must be updated together with the variables here.
// Important: don't include calls to sendSystemMessage inside the synchronized since that can
// result in deadlock, see issue #26326
private[this] var watching = ActorCell.emptyActorRefSet
// requires sychronized access because AddressTerminatedTopic must be updated together with this
private[this] var _watchedBy: OptionVal[Set[ActorRef]] = OptionVal.Some(ActorCell.emptyActorRefSet)
override def isTerminated: Boolean = _watchedBy.isEmpty
//noinspection EmptyCheck
protected def sendTerminated(): Unit = synchronized {
protected def sendTerminated(): Unit = {
def unwatchWatched(watched: ActorRef): Unit =
watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this))
_watchedBy match {
case OptionVal.Some(watchedBy)
if (watchedBy.nonEmpty) {
watchedBy foreach sendTerminated(ifLocal = false)
watchedBy foreach sendTerminated(ifLocal = true)
}
if (watching.nonEmpty) {
watching foreach unwatchWatched
val (toUnwatch, watchedBy) = this.synchronized {
_watchedBy match {
case OptionVal.Some(wBy)
val oldWatching = watching
watching = Set.empty
}
unsubscribeAddressTerminated()
_watchedBy = OptionVal.None
unsubscribeAddressTerminated()
_watchedBy = OptionVal.None
case OptionVal.None
(oldWatching, wBy)
case OptionVal.None
(ActorCell.emptyActorRefSet, ActorCell.emptyActorRefSet)
}
}
// outside of synchronized block
if (toUnwatch.nonEmpty)
toUnwatch foreach unwatchWatched
if (watchedBy.nonEmpty) {
watchedBy foreach sendTerminated(ifLocal = false)
watchedBy foreach sendTerminated(ifLocal = true)
}
}
@ -767,48 +776,61 @@ private[akka] final class FunctionRef(
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal)
watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false))
private def addressTerminated(address: Address): Unit = synchronized {
// cleanup watchedBy since we know they are dead
_watchedBy match {
case OptionVal.None // terminated
case OptionVal.Some(watchedBy)
maintainAddressTerminatedSubscription(OptionVal.None) {
_watchedBy = OptionVal.Some(watchedBy.filterNot(_.path.address == address))
}
// send DeathWatchNotification to self for all matching subjects
for (a watching; if a.path.address == address) {
this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true))
}
private def addressTerminated(address: Address): Unit = {
val toNotify = this.synchronized {
// cleanup watchedBy since we know they are dead
_watchedBy match {
case OptionVal.None
// terminated
ActorCell.emptyActorRefSet
case OptionVal.Some(watchedBy)
maintainAddressTerminatedSubscription(OptionVal.None) {
_watchedBy = OptionVal.Some(watchedBy.filterNot(_.path.address == address))
}
watching
}
}
// outside of synchronized block
// send DeathWatchNotification to self for all matching subjects
for (a toNotify; if a.path.address == address) {
this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true))
}
}
override def stop(): Unit = sendTerminated()
private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized {
_watchedBy match {
case OptionVal.None
sendTerminated(ifLocal = true)(watcher)
sendTerminated(ifLocal = false)(watcher)
private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val selfTerminated = this.synchronized {
_watchedBy match {
case OptionVal.None
true
case OptionVal.Some(watchedBy)
val watcheeSelf = watchee == this
val watcherSelf = watcher == this
case OptionVal.Some(watchedBy)
val watcheeSelf = watchee == this
val watcherSelf = watcher == this
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) {
_watchedBy = OptionVal.Some(watchedBy + watcher)
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) {
_watchedBy = OptionVal.Some(watchedBy + watcher)
}
}
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef"))
} else {
publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this"))
}
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef"))
} else {
publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this"))
}
false
}
}
// outside of synchronized block
if (selfTerminated) {
sendTerminated(ifLocal = true)(watcher)
sendTerminated(ifLocal = false)(watcher)
}
}
private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized {
private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = this.synchronized {
_watchedBy match {
case OptionVal.None // do nothing...
case OptionVal.Some(watchedBy)
@ -837,10 +859,13 @@ private[akka] final class FunctionRef(
* Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak,
* which is different from an ordinary actor.
*/
def watch(actorRef: ActorRef): Unit = synchronized {
maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) {
watching += actorRef
def watch(actorRef: ActorRef): Unit = {
this.synchronized {
maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) {
watching += actorRef
}
}
// outside of synchronized block
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this))
}
@ -850,17 +875,20 @@ private[akka] final class FunctionRef(
* Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak,
* which is different from an ordinary actor.
*/
def unwatch(actorRef: ActorRef): Unit = synchronized {
maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) {
watching -= actorRef
def unwatch(actorRef: ActorRef): Unit = {
this.synchronized {
maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) {
watching -= actorRef
}
}
// outside of synchronized block
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this))
}
/**
* Query whether this FunctionRef is currently watching the given Actor.
*/
def isWatching(actorRef: ActorRef): Boolean = synchronized {
def isWatching(actorRef: ActorRef): Boolean = this.synchronized {
watching.contains(actorRef)
}

View file

@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory
import akka.actor.CoordinatedShutdown
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source, StreamRefs }
import scala.concurrent.Await
@ -171,6 +173,40 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
}
}
"terminate ActorSystem via CoordinatedShutdown.run when a stream involving StreamRefs is running" in {
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.coordinated-shutdown.terminate-actor-system = on
"""))
try {
val probe = TestProbe()(sys2)
Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent])
probe.expectMsgType[CurrentClusterState]
Cluster(sys2).join(Cluster(sys2).selfAddress)
probe.expectMsgType[MemberUp]
val mat = ActorMaterializer()(sys2)
val sink = Await.result(StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat), 10.seconds)
Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat)
CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason)
probe.expectMsgType[MemberLeft]
// MemberExited might not be published before MemberRemoved
val removed = probe.fishForMessage() {
case _: MemberExited false
case _: MemberRemoved true
}.asInstanceOf[MemberRemoved]
removed.previousStatus should ===(MemberStatus.Exiting)
Await.result(sys2.whenTerminated, 10.seconds)
Cluster(sys2).isTerminated should ===(true)
CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.UnknownReason))
} finally {
shutdown(sys2)
}
}
"leave via CoordinatedShutdown.run when member status is Joining" in {
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
akka.actor.provider = "cluster"