Terminate StreamRef on node failure, #25960

* manage AddressTerminated subscription in FunctionRef
* implementation can be compared with akka/actor/dungeon/DeathWatch.scala
* use synchronized access to the watch state and AddressTerminatedTopic
* use OptionVal for _watchedBy
This commit is contained in:
Patrik Nordwall 2019-01-17 16:53:17 +01:00 committed by GitHub
parent dcb17b1d0c
commit 96692b2c04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 410 additions and 62 deletions

View file

@ -61,8 +61,14 @@ class FunctionRefSpec extends AkkaSpec with ImplicitSender {
s ! GetForwarder(testActor)
val f = expectMsgType[FunctionRef]
forwarder.watch(f)
forwarder.isWatching(f) should ===(true)
s ! DropForwarder(f)
expectMsg(Forwarded(Terminated(f)(true, false), f))
// Upon receiving the Terminated message, unwatch() must be called, which is different from an ordinary actor.
forwarder.isWatching(f) should ===(true)
forwarder.unwatch(f)
forwarder.isWatching(f) should ===(false)
}
"terminate when their parent terminates" in {
@ -87,7 +93,7 @@ class FunctionRefSpec extends AkkaSpec with ImplicitSender {
"not registered" must {
"not be found" in {
val provider = system.asInstanceOf[ExtendedActorSystem].provider
val ref = new FunctionRef(testActor.path / "blabla", provider, system.eventStream, (x, y) ())
val ref = new FunctionRef(testActor.path / "blabla", provider, system, (_, _) ())
EventFilter[SerializationCheckFailedException](start = "Failed to serialize and deserialize message of type akka.actor.FunctionRefSpec", occurrences = 1) intercept {
// needs to be something that fails when the deserialized form is not a FunctionRef
// this relies upon serialize-messages during tests

View file

@ -6,6 +6,10 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.dns.internal.AsyncD
ProblemFilters.exclude[MissingClassProblem]("akka.io.dns.internal.AsyncDnsResolver$Ipv6Type$")
ProblemFilters.exclude[MissingClassProblem]("akka.io.dns.internal.AsyncDnsResolver$Ipv4Type$")
# #25960 AddressTerminated in FunctionRef
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.FunctionRef.eventStream")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.FunctionRef.this")
# Changes related to adding Scala 2.13.0-M5 support
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.dungeon.ChildrenContainer#ChildrenIterable.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.actor.dungeon.ChildrenContainer#ChildRestartsIterable.this")

View file

@ -4,20 +4,22 @@
package akka.actor
import scala.collection.immutable
import akka.dispatch._
import akka.dispatch.sysmsg._
import java.lang.{ IllegalStateException, UnsupportedOperationException }
import akka.serialization.{ JavaSerializer, Serialization }
import akka.event.{ EventStream, Logging, MarkerLoggingAdapter }
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.dispatch._
import akka.dispatch.sysmsg._
import akka.event.AddressTerminatedTopic
import akka.event.EventStream
import akka.event.Logging
import akka.event.MarkerLoggingAdapter
import akka.serialization.JavaSerializer
import akka.serialization.Serialization
import akka.util.OptionVal
object ActorRef {
/**
@ -702,17 +704,22 @@ private[akka] class VirtualPathContainer(
* and do not prevent the parent from terminating. FunctionRef is properly
* registered for remote lookup and ActorSelection.
*
* When using the watch() feature you must ensure that upon reception of the
* Terminated message the watched actorRef is unwatch()ed.
* It can both be watched by other actors and also [[FunctionRef#watch]] other actors.
* When watching other actors and upon receiving the Terminated message,
* [[FunctionRef#unwatch]] must be called to avoid a resource leak, which is different
* from an ordinary actor.
*/
private[akka] final class FunctionRef(
override val path: ActorPath,
override val provider: ActorRefProvider,
val eventStream: EventStream,
system: ActorSystem,
f: (ActorRef, Any) Unit) extends MinimalActorRef {
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
f(sender, message)
message match {
case AddressTerminated(address) addressTerminated(address)
case _ f(sender, message)
}
}
override def sendSystemMessage(message: SystemMessage): Unit = {
@ -725,23 +732,34 @@ private[akka] final class FunctionRef(
}
}
// requires sychronized access because AddressTerminatedTopic must be updated together with this
private[this] var watching = ActorCell.emptyActorRefSet
private[this] val _watchedBy = new AtomicReference[Set[ActorRef]](ActorCell.emptyActorRefSet)
// requires sychronized access because AddressTerminatedTopic must be updated together with this
private[this] var _watchedBy: OptionVal[Set[ActorRef]] = OptionVal.Some(ActorCell.emptyActorRefSet)
override def isTerminated = _watchedBy.get() == null
override def isTerminated: Boolean = _watchedBy.isEmpty
//noinspection EmptyCheck
protected def sendTerminated(): Unit = {
val watchedBy = _watchedBy.getAndSet(null)
if (watchedBy != null) {
protected def sendTerminated(): Unit = synchronized {
def unwatchWatched(watched: ActorRef): Unit =
watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this))
_watchedBy match {
case OptionVal.Some(watchedBy)
if (watchedBy.nonEmpty) {
watchedBy foreach sendTerminated(ifLocal = false)
watchedBy foreach sendTerminated(ifLocal = true)
}
if (watching.nonEmpty) {
watching foreach unwatchWatched
watching = Set.empty
}
unsubscribeAddressTerminated()
_watchedBy = OptionVal.None
case OptionVal.None
}
}
@ -749,43 +767,60 @@ private[akka] final class FunctionRef(
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal)
watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(this, existenceConfirmed = true, addressTerminated = false))
private def unwatchWatched(watched: ActorRef): Unit =
watched.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(watched, this))
private def addressTerminated(address: Address): Unit = synchronized {
// cleanup watchedBy since we know they are dead
_watchedBy match {
case OptionVal.None // terminated
case OptionVal.Some(watchedBy)
maintainAddressTerminatedSubscription(OptionVal.None) {
_watchedBy = OptionVal.Some(watchedBy.filterNot(_.path.address == address))
}
// send DeathWatchNotification to self for all matching subjects
for (a watching; if a.path.address == address) {
this.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true))
}
}
}
override def stop(): Unit = sendTerminated()
@tailrec private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit =
_watchedBy.get() match {
case null
private def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized {
_watchedBy match {
case OptionVal.None
sendTerminated(ifLocal = true)(watcher)
sendTerminated(ifLocal = false)(watcher)
case watchedBy
case OptionVal.Some(watchedBy)
val watcheeSelf = watchee == this
val watcherSelf = watcher == this
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher))
if (!_watchedBy.compareAndSet(watchedBy, watchedBy + watcher))
addWatcher(watchee, watcher) // try again
if (!watchedBy.contains(watcher)) {
maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) {
_watchedBy = OptionVal.Some(watchedBy + watcher)
}
}
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered watch from $watcher to $watchee is illegal on FunctionRef"))
} else {
publish(Logging.Error(path.toString, classOf[FunctionRef], s"BUG: illegal Watch($watchee,$watcher) for $this"))
}
}
}
@tailrec private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
_watchedBy.get() match {
case null // do nothing...
case watchedBy
private def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = synchronized {
_watchedBy match {
case OptionVal.None // do nothing...
case OptionVal.Some(watchedBy)
val watcheeSelf = watchee == this
val watcherSelf = watcher == this
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher))
if (!_watchedBy.compareAndSet(watchedBy, watchedBy - watcher))
remWatcher(watchee, watcher) // try again
if (watchedBy.contains(watcher)) {
maintainAddressTerminatedSubscription(OptionVal.Some(watcher)) {
_watchedBy = OptionVal.Some(watchedBy - watcher)
}
}
} else if (!watcheeSelf && watcherSelf) {
publish(Logging.Warning(path.toString, classOf[FunctionRef], s"externally triggered unwatch from $watcher to $watchee is illegal on FunctionRef"))
} else {
@ -794,35 +829,79 @@ private[akka] final class FunctionRef(
}
}
private def publish(e: Logging.LogEvent): Unit = try eventStream.publish(e) catch { case NonFatal(_) }
private def publish(e: Logging.LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }
/**
* Have this FunctionRef watch the given Actor. This method must not be
* called concurrently from different threads, it should only be called by
* its parent Actor.
* Have this FunctionRef watch the given Actor.
*
* Upon receiving the Terminated message, unwatch() must be called from a
* safe context (i.e. normally from the parent Actor).
* Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak,
* which is different from an ordinary actor.
*/
def watch(actorRef: ActorRef): Unit = {
def watch(actorRef: ActorRef): Unit = synchronized {
maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) {
watching += actorRef
}
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this))
}
/**
* Have this FunctionRef unwatch the given Actor. This method must not be
* called concurrently from different threads, it should only be called by
* its parent Actor.
* Have this FunctionRef unwatch the given Actor.
*
* Upon receiving the Terminated message, `unwatch` must be called to avoid resource leak,
* which is different from an ordinary actor.
*/
def unwatch(actorRef: ActorRef): Unit = {
def unwatch(actorRef: ActorRef): Unit = synchronized {
maintainAddressTerminatedSubscription(OptionVal.Some(actorRef)) {
watching -= actorRef
}
actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this))
}
/**
* Query whether this FunctionRef is currently watching the given Actor. This
* method must not be called concurrently from different threads, it should
* only be called by its parent Actor.
* Query whether this FunctionRef is currently watching the given Actor.
*/
def isWatching(actorRef: ActorRef): Boolean = watching.contains(actorRef)
def isWatching(actorRef: ActorRef): Boolean = synchronized {
watching.contains(actorRef)
}
/**
* Starts subscription to AddressTerminated if not already subscribing and the
* block adds a non-local ref to watching or watchedBy.
* Ends subscription to AddressTerminated if subscribing and the
* block removes the last non-local ref from watching and watchedBy.
*
* This method must only be used from synchronized methods because AddressTerminatedTopic
* must be updated together with changes to watching or watchedBy.
*/
private def maintainAddressTerminatedSubscription[T](change: OptionVal[ActorRef])(block: T): T = {
def isNonLocal(ref: ActorRef) = ref match {
case a: InternalActorRef if !a.isLocal true
case _ false
}
def watchedByOrEmpty: Set[ActorRef] =
_watchedBy match {
case OptionVal.Some(watchedBy) watchedBy
case OptionVal.None ActorCell.emptyActorRefSet
}
change match {
case OptionVal.Some(ref) if !isNonLocal(ref)
// AddressTerminatedTopic update not needed
block
case _
def hasNonLocalAddress: Boolean = (watching exists isNonLocal) || (watchedByOrEmpty exists isNonLocal)
val had = hasNonLocalAddress
val result = block
val has = hasNonLocalAddress
if (had && !has) unsubscribeAddressTerminated()
else if (!had && has) subscribeAddressTerminated()
result
}
}
private def unsubscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).unsubscribe(this)
private def subscribeAddressTerminated(): Unit = AddressTerminatedTopic(system).subscribe(this)
}

View file

@ -64,7 +64,7 @@ private[akka] trait Children { this: ActorCell ⇒
val r = randomName(new java.lang.StringBuilder("$$"))
val n = if (name != "") s"$r-$name" else r
val childPath = new ChildActorPath(self.path, n, ActorCell.newUid())
val ref = new FunctionRef(childPath, provider, system.eventStream, f)
val ref = new FunctionRef(childPath, provider, system, f)
@tailrec def rec(): Unit = {
val old = functionRefs

View file

@ -0,0 +1,259 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import akka.Done
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.pattern.pipe
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.stream.ActorMaterializer
import akka.stream.RemoteStreamRefActorTerminatedException
import akka.stream.SinkRef
import akka.stream.SourceRef
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.StreamRefs
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit._
import com.typesafe.config.ConfigFactory
object StreamRefSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
auto-down-unreachable-after = 1s
}""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)
case class RequestLogs(streamId: Int)
case class LogsOffer(streamId: Int, sourceRef: SourceRef[String])
object DataSource {
def props(streamLifecycleProbe: ActorRef): Props =
Props(new DataSource(streamLifecycleProbe))
}
class DataSource(streamLifecycleProbe: ActorRef) extends Actor {
import context.dispatcher
implicit val mat = ActorMaterializer()(context)
def receive = {
case RequestLogs(streamId)
// materialize the SourceRef:
val (done: Future[Done], ref: Future[SourceRef[String]]) =
Source.fromIterator(() Iterator.from(1))
.map(n s"elem-$n")
.watchTermination()(Keep.right)
.toMat(StreamRefs.sourceRef())(Keep.both)
.mapMaterializedValue { m
streamLifecycleProbe ! s"started-$streamId"
m
}
.run()
done.onComplete {
case Success(_) streamLifecycleProbe ! s"completed-$streamId"
case Failure(_) streamLifecycleProbe ! s"failed-$streamId"
}
// wrap the SourceRef in some domain message, such that the sender knows what source it is
val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _))
// reply to sender
reply pipeTo sender()
}
}
case class PrepareUpload(id: String)
case class MeasurementsSinkReady(id: String, sinkRef: SinkRef[String])
object DataReceiver {
def props(streamLifecycleProbe: ActorRef): Props =
Props(new DataReceiver(streamLifecycleProbe))
}
class DataReceiver(streamLifecycleProbe: ActorRef) extends Actor {
import context.dispatcher
implicit val mat = ActorMaterializer()(context)
def receive = {
case PrepareUpload(nodeId)
// materialize the SinkRef (the remote is like a source of data for us):
val (ref: Future[SinkRef[String]], done: Future[Done]) =
StreamRefs.sinkRef[String]()
.throttle(1, 1.second)
.toMat(Sink.ignore)(Keep.both)
.mapMaterializedValue { m
streamLifecycleProbe ! s"started-$nodeId"
m
}
.run()
done.onComplete {
case Success(_) streamLifecycleProbe ! s"completed-$nodeId"
case Failure(_) streamLifecycleProbe ! s"failed-$nodeId"
}
// wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _))
// reply to sender
reply pipeTo sender()
}
}
}
class StreamRefMultiJvmNode1 extends StreamRefSpec
class StreamRefMultiJvmNode2 extends StreamRefSpec
class StreamRefMultiJvmNode3 extends StreamRefSpec
abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec)
with MultiNodeClusterSpec with ImplicitSender {
import StreamRefSpec._
private implicit val mat: ActorMaterializer = ActorMaterializer()
"A cluster with Stream Refs" must {
"join" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
enterBarrier("after-1")
}
"stop stream with SourceRef after downing and removal" taggedAs LongRunningTest in {
val dataSourceLifecycle = TestProbe()
runOn(second) {
system.actorOf(DataSource.props(dataSourceLifecycle.ref), "dataSource")
}
enterBarrier("actor-started")
// only used from first
var destinationForSource: TestSubscriber.Probe[String] = null
runOn(first) {
system.actorSelection(node(second) / "user" / "dataSource") ! Identify(None)
val ref = expectMsgType[ActorIdentity].ref.get
ref ! RequestLogs(1337)
val dataSourceRef = expectMsgType[LogsOffer].sourceRef
destinationForSource = dataSourceRef.runWith(TestSink.probe)
destinationForSource
.request(3)
.expectNext("elem-1")
.expectNext("elem-2")
.expectNext("elem-3")
}
runOn(second) {
dataSourceLifecycle.expectMsg("started-1337")
}
enterBarrier("streams-started")
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
testConductor.blackhole(third, second, Direction.Both).await
}
enterBarrier("after-split")
// auto-down
runOn(first, third) {
awaitMembersUp(2, Set(second) map address)
}
runOn(second) {
awaitMembersUp(1, Set(first, third) map address)
}
enterBarrier("members-removed")
runOn(first) {
destinationForSource.expectError().getClass should ===(classOf[RemoteStreamRefActorTerminatedException])
}
runOn(second) {
// it will be cancelled, i.e. competed
dataSourceLifecycle.expectMsg("completed-1337")
}
enterBarrier("after-2")
}
"stop stream with SinkRef after downing and removal" taggedAs LongRunningTest in {
import system.dispatcher
val streamLifecycle1 = TestProbe()
val streamLifecycle3 = TestProbe()
runOn(third) {
system.actorOf(DataReceiver.props(streamLifecycle3.ref), "dataReceiver")
}
enterBarrier("actor-started")
runOn(first) {
system.actorSelection(node(third) / "user" / "dataReceiver") ! Identify(None)
val ref = expectMsgType[ActorIdentity].ref.get
ref ! PrepareUpload("system-42-tmp")
val ready = expectMsgType[MeasurementsSinkReady]
Source.fromIterator(() Iterator.from(1))
.map(n s"elem-$n")
.watchTermination()(Keep.right)
.to(ready.sinkRef)
.run()
.onComplete {
case Success(_) streamLifecycle1.ref ! s"completed-system-42-tmp"
case Failure(_) streamLifecycle1.ref ! s"failed-system-42-tmp"
}
}
runOn(third) {
streamLifecycle3.expectMsg("started-system-42-tmp")
}
enterBarrier("streams-started")
runOn(first) {
testConductor.blackhole(first, third, Direction.Both).await
}
enterBarrier("after-split")
// auto-down
runOn(first) {
awaitMembersUp(1, Set(third) map address)
}
runOn(third) {
awaitMembersUp(1, Set(first) map address)
}
enterBarrier("members-removed")
runOn(first) {
streamLifecycle1.expectMsg("completed-system-42-tmp")
}
runOn(third) {
streamLifecycle3.expectMsg("failed-system-42-tmp")
}
enterBarrier("after-3")
}
}
}