From 96692b2c0476517481bfa6014c0b8a42fee0ea35 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 17 Jan 2019 16:53:17 +0100 Subject: [PATCH] Terminate StreamRef on node failure, #25960 * manage AddressTerminated subscription in FunctionRef * implementation can be compared with akka/actor/dungeon/DeathWatch.scala * use synchronized access to the watch state and AddressTerminatedTopic * use OptionVal for _watchedBy --- .../scala/akka/actor/FunctionRefSpec.scala | 8 +- .../mima-filters/2.5.19.backwards.excludes | 4 + .../src/main/scala/akka/actor/ActorRef.scala | 199 ++++++++++---- .../scala/akka/actor/dungeon/Children.scala | 2 +- .../scala/akka/cluster/StreamRefSpec.scala | 259 ++++++++++++++++++ 5 files changed, 410 insertions(+), 62 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/FunctionRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FunctionRefSpec.scala index d84100036c..6334c7e26f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FunctionRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FunctionRefSpec.scala @@ -61,8 +61,14 @@ class FunctionRefSpec extends AkkaSpec with ImplicitSender { s ! GetForwarder(testActor) val f = expectMsgType[FunctionRef] forwarder.watch(f) + forwarder.isWatching(f) should ===(true) s ! DropForwarder(f) expectMsg(Forwarded(Terminated(f)(true, false), f)) + + // Upon receiving the Terminated message, unwatch() must be called, which is different from an ordinary actor. + forwarder.isWatching(f) should ===(true) + forwarder.unwatch(f) + forwarder.isWatching(f) should ===(false) } "terminate when their parent terminates" in { @@ -87,7 +93,7 @@ class FunctionRefSpec extends AkkaSpec with ImplicitSender { "not registered" must { "not be found" in { val provider = system.asInstanceOf[ExtendedActorSystem].provider - val ref = new FunctionRef(testActor.path / "blabla", provider, system.eventStream, (x, y) ⇒ ()) + val ref = new FunctionRef(testActor.path / "blabla", provider, system, (_, _) ⇒ ()) EventFilter[SerializationCheckFailedException](start = "Failed to serialize and deserialize message of type akka.actor.FunctionRefSpec", occurrences = 1) intercept { // needs to be something that fails when the deserialized form is not a FunctionRef // this relies upon serialize-messages during tests diff --git a/akka-actor/src/main/mima-filters/2.5.19.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.19.backwards.excludes index 37980de964..903cd3bd2e 100644 --- a/akka-actor/src/main/mima-filters/2.5.19.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.19.backwards.excludes @@ -6,6 +6,10 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.dns.internal.AsyncD ProblemFilters.exclude[MissingClassProblem]("akka.io.dns.internal.AsyncDnsResolver$Ipv6Type$") ProblemFilters.exclude[MissingClassProblem]("akka.io.dns.internal.AsyncDnsResolver$Ipv4Type$") +# #25960 AddressTerminated in FunctionRef +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FunctionRef.eventStream") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.FunctionRef.this") + # Changes related to adding Scala 2.13.0-M5 support ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.dungeon.ChildrenContainer#ChildrenIterable.this") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.dungeon.ChildrenContainer#ChildRestartsIterable.this") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 46799b187b..4ff8ad80a3 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -4,20 +4,22 @@ package akka.actor -import scala.collection.immutable -import akka.dispatch._ -import akka.dispatch.sysmsg._ -import java.lang.{ IllegalStateException, UnsupportedOperationException } - -import akka.serialization.{ JavaSerializer, Serialization } -import akka.event.{ EventStream, Logging, MarkerLoggingAdapter } +import java.util.concurrent.ConcurrentHashMap import scala.annotation.tailrec -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicReference - +import scala.collection.immutable import scala.util.control.NonFatal +import akka.dispatch._ +import akka.dispatch.sysmsg._ +import akka.event.AddressTerminatedTopic +import akka.event.EventStream +import akka.event.Logging +import akka.event.MarkerLoggingAdapter +import akka.serialization.JavaSerializer +import akka.serialization.Serialization +import akka.util.OptionVal + object ActorRef { /** @@ -702,17 +704,22 @@ private[akka] class VirtualPathContainer( * and do not prevent the parent from terminating. FunctionRef is properly * registered for remote lookup and ActorSelection. * - * When using the watch() feature you must ensure that upon reception of the - * Terminated message the watched actorRef is unwatch()ed. + * It can both be watched by other actors and also [[FunctionRef#watch]] other actors. + * When watching other actors and upon receiving the Terminated message, + * [[FunctionRef#unwatch]] must be called to avoid a resource leak, which is different + * from an ordinary actor. */ private[akka] final class FunctionRef( override val path: ActorPath, override val provider: ActorRefProvider, - val eventStream: EventStream, + system: ActorSystem, f: (ActorRef, Any) ⇒ Unit) extends MinimalActorRef { override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { - f(sender, message) + message match { + case AddressTerminated(address) ⇒ addressTerminated(address) + case _ ⇒ f(sender, message) + } } override def sendSystemMessage(message: SystemMessage): Unit = { @@ -725,23 +732,34 @@ private[akka] final class FunctionRef( } } + // requires sychronized access because AddressTerminatedTopic must be updated together with this private[this] var watching = ActorCell.emptyActorRefSet - private[this] val _watchedBy = new AtomicReference[Set[ActorRef]](ActorCell.emptyActorRefSet) + // requires sychronized access because AddressTerminatedTopic must be updated together with this + private[this] var _watchedBy: OptionVal[Set[ActorRef]] = OptionVal.Some(ActorCell.emptyActorRefSet) - override def isTerminated = _watchedBy.get() == null + override def isTerminated: Boolean = _watchedBy.isEmpty //noinspection EmptyCheck - protected def sendTerminated(): Unit = { - val watchedBy = _watchedBy.getAndSet(null) - if (watchedBy != null) { - if (watchedBy.nonEmpty) { - watchedBy foreach sendTerminated(ifLocal = false) - watchedBy foreach sendTerminated(ifLocal = true) - } - if (watching.nonEmpty) { - watching foreach unwatchWatched - watching = Set.empty - } + protected def sendTerminated(): Unit = synchronized { + def unwatchWatched(watched: ActorRef): Unit = + watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this)) + + _watchedBy match { + case OptionVal.Some(watchedBy) ⇒ + if (watchedBy.nonEmpty) { + watchedBy foreach sendTerminated(ifLocal = false) + watchedBy foreach sendTerminated(ifLocal = true) + } + + if (watching.nonEmpty) { + watching foreach unwatchWatched + watching = Set.empty + } + + unsubscribeAddressTerminated() + _watchedBy = OptionVal.None + + case OptionVal.None ⇒ } } @@ -749,43 +767,60 @@ private[akka] final class FunctionRef( if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false)) - private def unwatchWatched(watched: ActorRef): Unit = - watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this)) + private def addressTerminated(address: Address): Unit = synchronized { + // cleanup watchedBy since we know they are dead + _watchedBy match { + case OptionVal.None ⇒ // terminated + case OptionVal.Some(watchedBy) ⇒ + maintainAddressTerminatedSubscription(OptionVal.None) { + _watchedBy = OptionVal.Some(watchedBy.filterNot(_.path.address == address)) + } + // send DeathWatchNotification to self for all matching subjects + for (a ← watching; if a.path.address == address) { + this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) + } + } + } override def stop(): Unit = sendTerminated() - @tailrec private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = - _watchedBy.get() match { - case null ⇒ + private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized { + _watchedBy match { + case OptionVal.None ⇒ sendTerminated(ifLocal = true)(watcher) sendTerminated(ifLocal = false)(watcher) - case watchedBy ⇒ + case OptionVal.Some(watchedBy) ⇒ val watcheeSelf = watchee == this val watcherSelf = watcher == this if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) - if (!_watchedBy.compareAndSet(watchedBy, watchedBy + watcher)) - addWatcher(watchee, watcher) // try again + if (!watchedBy.contains(watcher)) { + maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) { + _watchedBy = OptionVal.Some(watchedBy + watcher) + } + } } else if (!watcheeSelf && watcherSelf) { publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef")) } else { publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this")) } } + } - @tailrec private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { - _watchedBy.get() match { - case null ⇒ // do nothing... - case watchedBy ⇒ + private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized { + _watchedBy match { + case OptionVal.None ⇒ // do nothing... + case OptionVal.Some(watchedBy) ⇒ val watcheeSelf = watchee == this val watcherSelf = watcher == this if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) - if (!_watchedBy.compareAndSet(watchedBy, watchedBy - watcher)) - remWatcher(watchee, watcher) // try again + if (watchedBy.contains(watcher)) { + maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) { + _watchedBy = OptionVal.Some(watchedBy - watcher) + } + } } else if (!watcheeSelf && watcherSelf) { publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered unwatch from $watcher to $watchee is illegal on FunctionRef")) } else { @@ -794,35 +829,79 @@ private[akka] final class FunctionRef( } } - private def publish(e: Logging.LogEvent): Unit = try eventStream.publish(e) catch { case NonFatal(_) ⇒ } + private def publish(e: Logging.LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ } /** - * Have this FunctionRef watch the given Actor. This method must not be - * called concurrently from different threads, it should only be called by - * its parent Actor. + * Have this FunctionRef watch the given Actor. * - * Upon receiving the Terminated message, unwatch() must be called from a - * safe context (i.e. normally from the parent Actor). + * Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak, + * which is different from an ordinary actor. */ - def watch(actorRef: ActorRef): Unit = { - watching += actorRef + def watch(actorRef: ActorRef): Unit = synchronized { + maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) { + watching += actorRef + } actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this)) } /** - * Have this FunctionRef unwatch the given Actor. This method must not be - * called concurrently from different threads, it should only be called by - * its parent Actor. + * Have this FunctionRef unwatch the given Actor. + * + * Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak, + * which is different from an ordinary actor. */ - def unwatch(actorRef: ActorRef): Unit = { - watching -= actorRef + def unwatch(actorRef: ActorRef): Unit = synchronized { + maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) { + watching -= actorRef + } actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this)) } /** - * Query whether this FunctionRef is currently watching the given Actor. This - * method must not be called concurrently from different threads, it should - * only be called by its parent Actor. + * Query whether this FunctionRef is currently watching the given Actor. */ - def isWatching(actorRef: ActorRef): Boolean = watching.contains(actorRef) + def isWatching(actorRef: ActorRef): Boolean = synchronized { + watching.contains(actorRef) + } + + /** + * Starts subscription to AddressTerminated if not already subscribing and the + * block adds a non-local ref to watching or watchedBy. + * Ends subscription to AddressTerminated if subscribing and the + * block removes the last non-local ref from watching and watchedBy. + * + * This method must only be used from synchronized methods because AddressTerminatedTopic + * must be updated together with changes to watching or watchedBy. + */ + private def maintainAddressTerminatedSubscription[T](change: OptionVal[ActorRef])(block: ⇒ T): T = { + def isNonLocal(ref: ActorRef) = ref match { + case a: InternalActorRef if !a.isLocal ⇒ true + case _ ⇒ false + } + + def watchedByOrEmpty: Set[ActorRef] = + _watchedBy match { + case OptionVal.Some(watchedBy) ⇒ watchedBy + case OptionVal.None ⇒ ActorCell.emptyActorRefSet + } + + change match { + case OptionVal.Some(ref) if !isNonLocal(ref) ⇒ + // AddressTerminatedTopic update not needed + block + case _ ⇒ + def hasNonLocalAddress: Boolean = (watching exists isNonLocal) || (watchedByOrEmpty exists isNonLocal) + + val had = hasNonLocalAddress + val result = block + val has = hasNonLocalAddress + if (had && !has) unsubscribeAddressTerminated() + else if (!had && has) subscribeAddressTerminated() + result + } + } + + private def unsubscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).unsubscribe(this) + + private def subscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).subscribe(this) } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 93328272cc..60e9733195 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -64,7 +64,7 @@ private[akka] trait Children { this: ActorCell ⇒ val r = randomName(new java.lang.StringBuilder("$$")) val n = if (name != "") s"$r-$name" else r val childPath = new ChildActorPath(self.path, n, ActorCell.newUid()) - val ref = new FunctionRef(childPath, provider, system.eventStream, f) + val ref = new FunctionRef(childPath, provider, system, f) @tailrec def rec(): Unit = { val old = functionRefs diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala new file mode 100644 index 0000000000..d6d2d3770f --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala @@ -0,0 +1,259 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.cluster + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success + +import akka.Done +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.pattern.pipe +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.stream.ActorMaterializer +import akka.stream.RemoteStreamRefActorTerminatedException +import akka.stream.SinkRef +import akka.stream.SourceRef +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.StreamRefs +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object StreamRefSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down-unreachable-after = 1s + }""")). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) + + case class RequestLogs(streamId: Int) + case class LogsOffer(streamId: Int, sourceRef: SourceRef[String]) + + object DataSource { + def props(streamLifecycleProbe: ActorRef): Props = + Props(new DataSource(streamLifecycleProbe)) + } + + class DataSource(streamLifecycleProbe: ActorRef) extends Actor { + import context.dispatcher + implicit val mat = ActorMaterializer()(context) + + def receive = { + case RequestLogs(streamId) ⇒ + // materialize the SourceRef: + val (done: Future[Done], ref: Future[SourceRef[String]]) = + Source.fromIterator(() ⇒ Iterator.from(1)) + .map(n ⇒ s"elem-$n") + .watchTermination()(Keep.right) + .toMat(StreamRefs.sourceRef())(Keep.both) + .mapMaterializedValue { m ⇒ + streamLifecycleProbe ! s"started-$streamId" + m + } + .run() + + done.onComplete { + case Success(_) ⇒ streamLifecycleProbe ! s"completed-$streamId" + case Failure(_) ⇒ streamLifecycleProbe ! s"failed-$streamId" + } + + // wrap the SourceRef in some domain message, such that the sender knows what source it is + val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _)) + + // reply to sender + reply pipeTo sender() + } + + } + + case class PrepareUpload(id: String) + case class MeasurementsSinkReady(id: String, sinkRef: SinkRef[String]) + + object DataReceiver { + def props(streamLifecycleProbe: ActorRef): Props = + Props(new DataReceiver(streamLifecycleProbe)) + } + + class DataReceiver(streamLifecycleProbe: ActorRef) extends Actor { + + import context.dispatcher + implicit val mat = ActorMaterializer()(context) + + def receive = { + case PrepareUpload(nodeId) ⇒ + + // materialize the SinkRef (the remote is like a source of data for us): + val (ref: Future[SinkRef[String]], done: Future[Done]) = + StreamRefs.sinkRef[String]() + .throttle(1, 1.second) + .toMat(Sink.ignore)(Keep.both) + .mapMaterializedValue { m ⇒ + streamLifecycleProbe ! s"started-$nodeId" + m + } + .run() + + done.onComplete { + case Success(_) ⇒ streamLifecycleProbe ! s"completed-$nodeId" + case Failure(_) ⇒ streamLifecycleProbe ! s"failed-$nodeId" + } + + // wrap the SinkRef in some domain message, such that the sender knows what source it is + val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _)) + + // reply to sender + reply pipeTo sender() + } + + } + +} + +class StreamRefMultiJvmNode1 extends StreamRefSpec +class StreamRefMultiJvmNode2 extends StreamRefSpec +class StreamRefMultiJvmNode3 extends StreamRefSpec + +abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) + with MultiNodeClusterSpec with ImplicitSender { + import StreamRefSpec._ + + private implicit val mat: ActorMaterializer = ActorMaterializer() + + "A cluster with Stream Refs" must { + + "join" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third) + + enterBarrier("after-1") + } + + "stop stream with SourceRef after downing and removal" taggedAs LongRunningTest in { + val dataSourceLifecycle = TestProbe() + runOn(second) { + system.actorOf(DataSource.props(dataSourceLifecycle.ref), "dataSource") + } + enterBarrier("actor-started") + + // only used from first + var destinationForSource: TestSubscriber.Probe[String] = null + + runOn(first) { + system.actorSelection(node(second) / "user" / "dataSource") ! Identify(None) + val ref = expectMsgType[ActorIdentity].ref.get + ref ! RequestLogs(1337) + val dataSourceRef = expectMsgType[LogsOffer].sourceRef + destinationForSource = dataSourceRef.runWith(TestSink.probe) + destinationForSource + .request(3) + .expectNext("elem-1") + .expectNext("elem-2") + .expectNext("elem-3") + } + runOn(second) { + dataSourceLifecycle.expectMsg("started-1337") + } + enterBarrier("streams-started") + + runOn(first) { + testConductor.blackhole(first, second, Direction.Both).await + testConductor.blackhole(third, second, Direction.Both).await + } + enterBarrier("after-split") + + // auto-down + runOn(first, third) { + awaitMembersUp(2, Set(second) map address) + } + runOn(second) { + awaitMembersUp(1, Set(first, third) map address) + } + enterBarrier("members-removed") + + runOn(first) { + destinationForSource.expectError().getClass should ===(classOf[RemoteStreamRefActorTerminatedException]) + } + runOn(second) { + // it will be cancelled, i.e. competed + dataSourceLifecycle.expectMsg("completed-1337") + } + + enterBarrier("after-2") + } + + "stop stream with SinkRef after downing and removal" taggedAs LongRunningTest in { + import system.dispatcher + val streamLifecycle1 = TestProbe() + val streamLifecycle3 = TestProbe() + runOn(third) { + system.actorOf(DataReceiver.props(streamLifecycle3.ref), "dataReceiver") + } + enterBarrier("actor-started") + + runOn(first) { + system.actorSelection(node(third) / "user" / "dataReceiver") ! Identify(None) + val ref = expectMsgType[ActorIdentity].ref.get + ref ! PrepareUpload("system-42-tmp") + val ready = expectMsgType[MeasurementsSinkReady] + + Source.fromIterator(() ⇒ Iterator.from(1)) + .map(n ⇒ s"elem-$n") + .watchTermination()(Keep.right) + .to(ready.sinkRef) + .run() + .onComplete { + case Success(_) ⇒ streamLifecycle1.ref ! s"completed-system-42-tmp" + case Failure(_) ⇒ streamLifecycle1.ref ! s"failed-system-42-tmp" + } + } + runOn(third) { + streamLifecycle3.expectMsg("started-system-42-tmp") + } + enterBarrier("streams-started") + + runOn(first) { + testConductor.blackhole(first, third, Direction.Both).await + } + enterBarrier("after-split") + + // auto-down + runOn(first) { + awaitMembersUp(1, Set(third) map address) + } + runOn(third) { + awaitMembersUp(1, Set(first) map address) + } + enterBarrier("members-removed") + + runOn(first) { + streamLifecycle1.expectMsg("completed-system-42-tmp") + } + runOn(third) { + streamLifecycle3.expectMsg("failed-system-42-tmp") + } + + enterBarrier("after-3") + } + + } + +}