Flush messages before DeathWatchNotification, #28695 (#28940)

* Since DeathWatchNotification is sent over the control channel it may overtake
  other messages that have been sent from the same actor before it stopped.
* It can be confusing that Terminated can't be used as an end-of-conversation marker.
* In classic Remoting we didn't have this problem because all messages were sent over
  the same connection.

* don't send DeathWatchNotification when system is terminating
* when using Cluster we can rely on that the other side will publish AddressTerminated
  when the member has been removed
* it's actually already a race condition that often will result in that the DeathWatchNotification
  from the terminating side
  * in DeathWatch.scala it will remove the watchedBy when receiving AddressTerminated, and that
    may (sometimes) happen before tellWatchersWeDied

* same for Unwatch
* to avoid sending many Unwatch messages when watcher's ActorSystem is terminated
* same race exists for Unwatch as for DeathWatchNotification, if RemoteWatcher publishAddressTerminated
  before the watcher is terminated

* config for the flush timeout, and possibility to disable
This commit is contained in:
Patrik Nordwall 2020-07-03 09:54:35 +02:00 committed by GitHub
parent 5f70286b18
commit f6ceb4d49a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 599 additions and 89 deletions

View file

@ -0,0 +1,2 @@
# #28695 added isTerminated to ExtendedActorSystem
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ExtendedActorSystem.isTerminating")

View file

@ -780,6 +780,11 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/
@InternalApi private[akka] def finalTerminate(): Unit
/**
* INTERNAL API
*/
@InternalApi private[akka] def isTerminating(): Boolean
}
/**
@ -1063,6 +1068,10 @@ private[akka] class ActorSystemImpl(
guardian.stop()
}
override private[akka] def isTerminating(): Boolean = {
terminating || aborting || CoordinatedShutdown(this).shutdownReason().isDefined
}
@volatile var aborting = false
/**

View file

@ -0,0 +1,168 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import scala.concurrent.duration._
import akka.actor._
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.remote.artery.ArteryMultiNodeSpec
import akka.remote.artery.ArterySpecSupport
object ClusterDeathWatchNotificationSpec {
val config = ConfigFactory.parseString(s"""
akka {
loglevel = INFO
actor {
provider = cluster
}
}
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""").withFallback(ArterySpecSupport.defaultConfig)
object Sender {
def props(receiver: ActorRef, sendOnStop: Vector[String]): Props =
Props(new Sender(receiver, sendOnStop))
}
class Sender(receiver: ActorRef, sendOnStop: Vector[String]) extends Actor {
override def receive: Receive = {
case msg => sender() ! msg
}
override def postStop(): Unit = {
sendOnStop.foreach(receiver ! _)
}
}
}
class ClusterDeathWatchNotificationSpec
extends ArteryMultiNodeSpec(ClusterDeathWatchNotificationSpec.config)
with ImplicitSender {
import ClusterDeathWatchNotificationSpec.Sender
private def system1: ActorSystem = system
private val system2 = newRemoteSystem(name = Some(system.name))
private val system3 = newRemoteSystem(name = Some(system.name))
private val systems = Vector(system1, system2, system3)
private val messages = (1 to 100).map(_.toString).toVector
private def setupSender(sys: ActorSystem, receiverProbe: TestProbe, name: String): Unit = {
val receiverPath = receiverProbe.ref.path.toStringWithAddress(address(system1))
val otherProbe = TestProbe()(sys)
sys.actorSelection(receiverPath).tell(Identify(None), otherProbe.ref)
val receiver = otherProbe.expectMsgType[ActorIdentity](5.seconds).ref.get
receiver.path.address.hasGlobalScope should ===(true) // should be remote
sys.actorOf(Sender.props(receiver, messages), name)
}
private def identifySender(sys: ActorSystem, name: String): ActorRef = {
system1.actorSelection(rootActorPath(sys) / "user" / name) ! Identify(None)
val sender = expectMsgType[ActorIdentity](5.seconds).ref.get
sender
}
"join cluster" in within(10.seconds) {
systems.foreach { sys =>
Cluster(sys).join(Cluster(system1).selfAddress)
}
awaitAssert {
systems.foreach { sys =>
Cluster(sys).state.members.size should ===(systems.size)
Cluster(sys).state.members.iterator.map(_.status).toSet should ===(Set(MemberStatus.Up))
}
}
}
"receive Terminated after ordinary messages" in {
val receiverProbe = TestProbe()
setupSender(system2, receiverProbe, "sender")
val sender = identifySender(system2, "sender")
receiverProbe.watch(sender)
// make it likely that the watch has been established
sender.tell("echo", receiverProbe.ref)
receiverProbe.expectMsg("echo")
sender ! PoisonPill
receiverProbe.receiveN(messages.size).toVector shouldBe messages
receiverProbe.expectTerminated(sender)
}
"receive Terminated after ordinary messages when system is shutdown" in {
val receiverProbe1 = TestProbe()
setupSender(system2, receiverProbe1, "sender1")
val sender1 = identifySender(system2, "sender1")
val receiverProbe2 = TestProbe()
setupSender(system2, receiverProbe2, "sender2")
val sender2 = identifySender(system2, "sender2")
val receiverProbe3 = TestProbe()
setupSender(system2, receiverProbe3, "sender3")
val sender3 = identifySender(system2, "sender3")
receiverProbe1.watch(sender1)
receiverProbe2.watch(sender2)
receiverProbe3.watch(sender3)
// make it likely that the watch has been established
sender1.tell("echo1", receiverProbe1.ref)
receiverProbe1.expectMsg("echo1")
sender2.tell("echo2", receiverProbe2.ref)
receiverProbe2.expectMsg("echo2")
sender3.tell("echo3", receiverProbe3.ref)
receiverProbe3.expectMsg("echo3")
system2.log.debug("terminating")
system2.terminate()
receiverProbe1.receiveN(messages.size, 5.seconds).toVector shouldBe messages
receiverProbe1.expectTerminated(sender1)
receiverProbe2.receiveN(messages.size).toVector shouldBe messages
receiverProbe2.expectTerminated(sender2)
receiverProbe3.receiveN(messages.size).toVector shouldBe messages
receiverProbe3.expectTerminated(sender3)
}
"receive Terminated after ordinary messages when system is leaving" in {
val receiverProbe1 = TestProbe()
setupSender(system3, receiverProbe1, "sender1")
val sender1 = identifySender(system3, "sender1")
val receiverProbe2 = TestProbe()
setupSender(system3, receiverProbe2, "sender2")
val sender2 = identifySender(system3, "sender2")
val receiverProbe3 = TestProbe()
setupSender(system3, receiverProbe3, "sender3")
val sender3 = identifySender(system3, "sender3")
receiverProbe1.watch(sender1)
receiverProbe2.watch(sender2)
receiverProbe3.watch(sender3)
// make it likely that the watch has been established
sender1.tell("echo1", receiverProbe1.ref)
receiverProbe1.expectMsg("echo1")
sender2.tell("echo2", receiverProbe2.ref)
receiverProbe2.expectMsg("echo2")
sender3.tell("echo3", receiverProbe3.ref)
receiverProbe3.expectMsg("echo3")
system3.log.debug("leaving")
Cluster(system1).leave(Cluster(system3).selfAddress)
receiverProbe1.receiveN(messages.size, 5.seconds).toVector shouldBe messages
receiverProbe1.expectTerminated(sender1)
receiverProbe2.receiveN(messages.size).toVector shouldBe messages
receiverProbe2.expectTerminated(sender2)
receiverProbe3.receiveN(messages.size).toVector shouldBe messages
receiverProbe3.expectTerminated(sender3)
}
}

View file

@ -0,0 +1,6 @@
# #28695 flush before sending DeathWatchNotification
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlushOnShutdown.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlushOnShutdown.timeoutTask")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlushOnShutdown.this")
ProblemFilters.exclude[FinalClassProblem]("akka.remote.artery.ActorSystemTerminating")
ProblemFilters.exclude[FinalClassProblem]("akka.remote.artery.ActorSystemTerminatingAck")

View file

@ -980,6 +980,12 @@ akka {
# remote messages has been completed
shutdown-flush-timeout = 1 second
# Before sending notificaiton of terminated actor (DeathWatchNotification) other messages
# will be flushed to make sure that the Terminated message arrives after other messages.
# It will wait this long for the flush acknowledgement before continuing.
# The flushing can be disabled by setting this to `off`.
death-watch-notification-flush-timeout = 3 seconds
# See 'inbound-max-restarts'
inbound-restart-timeout = 5 seconds

View file

@ -193,8 +193,10 @@ private[akka] class RemoteWatcher(
}
}
def publishAddressTerminated(address: Address): Unit =
def publishAddressTerminated(address: Address): Unit = {
log.debug("Publish AddressTerminated [{}]", address)
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
}
def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = {
remoteProvider.transport match {

View file

@ -152,7 +152,18 @@ private[akka] final class ArterySettings private (config: Config) {
val ShutdownFlushTimeout: FiniteDuration =
config
.getMillisDuration("shutdown-flush-timeout")
.requiring(interval => interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
.requiring(timeout => timeout > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val DeathWatchNotificationFlushTimeout: FiniteDuration = {
toRootLowerCase(config.getString("death-watch-notification-flush-timeout")) match {
case "off" => Duration.Zero
case _ =>
config
.getMillisDuration("death-watch-notification-flush-timeout")
.requiring(
interval => interval > Duration.Zero,
"death-watch-notification-flush-timeout must be more than zero, or off")
}
}
val InboundRestartTimeout: FiniteDuration =
config
.getMillisDuration("inbound-restart-timeout")

View file

@ -16,15 +16,12 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import com.github.ghik.silencer.silent
import akka.Done
import akka.NotUsed
import akka.actor._
import akka.actor.Actor
import akka.actor.Props
import akka.annotation.InternalStableApi
import akka.dispatch.Dispatchers
import akka.event.Logging
@ -50,7 +47,6 @@ import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.util.OptionVal
import akka.util.WildcardIndex
import akka.util.unused
/**
* INTERNAL API
@ -249,79 +245,6 @@ private[remote] trait OutboundContext {
}
/**
* INTERNAL API
*/
private[remote] object FlushOnShutdown {
def props(
done: Promise[Done],
timeout: FiniteDuration,
inboundContext: InboundContext,
associations: Set[Association]): Props = {
require(associations.nonEmpty)
Props(new FlushOnShutdown(done, timeout, inboundContext, associations))
}
case object Timeout
}
/**
* INTERNAL API
*/
private[remote] class FlushOnShutdown(
done: Promise[Done],
timeout: FiniteDuration,
@unused inboundContext: InboundContext,
associations: Set[Association])
extends Actor {
var remaining = Map.empty[UniqueAddress, Int]
val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
override def preStart(): Unit = {
try {
associations.foreach { a =>
val acksExpected = a.sendTerminationHint(self)
a.associationState.uniqueRemoteAddress() match {
case Some(address) => remaining += address -> acksExpected
case None => // Ignore, handshake was not completed on this association
}
}
if (remaining.valuesIterator.sum == 0) {
done.trySuccess(Done)
context.stop(self)
}
} catch {
case NonFatal(e) =>
// sendTerminationHint may throw
done.tryFailure(e)
throw e
}
}
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive = {
case ActorSystemTerminatingAck(from) =>
// Just treat unexpected acks as systems from which zero acks are expected
val acksRemaining = remaining.getOrElse(from, 0)
if (acksRemaining <= 1) {
remaining -= from
} else {
remaining = remaining.updated(from, acksRemaining - 1)
}
if (remaining.isEmpty)
context.stop(self)
case FlushOnShutdown.Timeout =>
context.stop(self)
}
}
/**
* INTERNAL API
*/
@ -679,7 +602,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val flushingPromise = Promise[Done]()
system.systemActorOf(
FlushOnShutdown
.props(flushingPromise, settings.Advanced.ShutdownFlushTimeout, this, allAssociations)
.props(flushingPromise, settings.Advanced.ShutdownFlushTimeout, allAssociations)
.withDispatcher(Dispatchers.InternalDispatcherId),
"remoteFlushOnShutdown")
flushingPromise.future
@ -925,10 +848,30 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
}
}
// Checks for termination hint messages and sends an ACK for those (not processing them further)
// Purpose of this stage is flushing, the sender can wait for the ACKs up to try flushing
// pending messages.
val flushReplier: Flow[InboundEnvelope, InboundEnvelope, NotUsed] = {
Flow[InboundEnvelope].filter { envelope =>
envelope.message match {
case Flush =>
envelope.sender match {
case OptionVal.Some(snd) =>
snd.tell(FlushAck, ActorRef.noSender)
case OptionVal.None =>
log.error("Expected sender for Flush message from [{}]", envelope.association)
}
false
case _ => true
}
}
}
def inboundSink(bufferPool: EnvelopeBufferPool): Sink[InboundEnvelope, Future[Done]] =
Flow[InboundEnvelope]
.via(createDeserializer(bufferPool))
.via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope])
.via(flushReplier)
.via(terminationHintReplier(inControlStream = false))
.via(new InboundHandshake(this, inControlStream = false))
.via(new InboundQuarantineCheck(this))
@ -948,6 +891,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Flow[InboundEnvelope]
.via(createDeserializer(envelopeBufferPool))
.via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope])
.via(flushReplier)
.via(terminationHintReplier(inControlStream = true))
.via(new InboundHandshake(this, inControlStream = true))
.via(new InboundQuarantineCheck(this))

View file

@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
@ -27,7 +28,10 @@ import akka.actor.ActorSelectionMessage
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.Dropped
import akka.dispatch.Dispatchers
import akka.dispatch.sysmsg.DeathWatchNotification
import akka.dispatch.sysmsg.SystemMessage
import akka.dispatch.sysmsg.Unwatch
import akka.event.Logging
import akka.remote.DaemonMsgCreate
import akka.remote.PriorityMessage
@ -147,6 +151,7 @@ private[remote] class Association(
override def settings = transport.settings
private def advancedSettings = transport.settings.Advanced
private val deathWatchNotificationFlushEnabled = advancedSettings.DeathWatchNotificationFlushTimeout > Duration.Zero
private val restartCounter =
new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
@ -352,6 +357,34 @@ private[remote] class Association(
deadletters ! env
}
def shouldSendUnwatch(): Boolean =
!transport.provider.settings.HasCluster || !transport.system.isTerminating()
def shouldSendDeathWatchNotification(d: DeathWatchNotification): Boolean =
d.addressTerminated || !transport.provider.settings.HasCluster || !transport.system.isTerminating()
def sendSystemMessage(outboundEnvelope: OutboundEnvelope): Unit = {
outboundEnvelope.message match {
case u: Unwatch if shouldSendUnwatch() =>
log.debug(
"Not sending Unwatch of {} to {} because it will be notified when this member " +
"has been removed from Cluster.",
u.watcher,
u.watchee)
case d: DeathWatchNotification if !shouldSendDeathWatchNotification(d) =>
log.debug(
"Not sending DeathWatchNotification of {} to {} because it will be notified when this member " +
"has been removed from Cluster.",
d.actor,
outboundEnvelope.recipient.getOrElse("unknown"))
case _ =>
if (!controlQueue.offer(outboundEnvelope)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
}
}
val state = associationState
val quarantined = state.isQuarantined()
val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]
@ -368,11 +401,18 @@ private[remote] class Association(
try {
val outboundEnvelope = createOutboundEnvelope()
message match {
case d: DeathWatchNotification if deathWatchNotificationFlushEnabled && shouldSendDeathWatchNotification(d) =>
val flushingPromise = Promise[Done]()
transport.system.systemActorOf(
FlushBeforeDeathWatchNotification
.props(flushingPromise, settings.Advanced.DeathWatchNotificationFlushTimeout, this)
.withDispatcher(Dispatchers.InternalDispatcherId),
FlushBeforeDeathWatchNotification.nextName())
flushingPromise.future.onComplete { _ =>
sendSystemMessage(outboundEnvelope)
}(materializer.executionContext)
case _: SystemMessage =>
if (!controlQueue.offer(outboundEnvelope)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
sendSystemMessage(outboundEnvelope)
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery =>
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
if (!controlQueue.offer(outboundEnvelope)) {
@ -447,11 +487,17 @@ private[remote] class Association(
}
}
def sendTerminationHint(replyTo: ActorRef): Int = {
def sendTerminationHint(replyTo: ActorRef): Int =
sendToAllQueues(ActorSystemTerminating(localAddress), replyTo, excludeControlQueue = false)
def sendFlush(replyTo: ActorRef, excludeControlQueue: Boolean): Int =
sendToAllQueues(Flush, replyTo, excludeControlQueue)
def sendToAllQueues(msg: ControlMessage, replyTo: ActorRef, excludeControlQueue: Boolean): Int = {
if (!associationState.isQuarantined()) {
val msg = ActorSystemTerminating(localAddress)
var sent = 0
queues.iterator.filter(q => q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue =>
val queuesIter = if (excludeControlQueue) queues.iterator.drop(1) else queues.iterator
queuesIter.filter(q => q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue =>
try {
val envelope = outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.Some(replyTo))

View file

@ -11,6 +11,7 @@ import scala.concurrent.Promise
import scala.util.Try
import akka.Done
import akka.annotation.InternalApi
import akka.event.Logging
import akka.remote.UniqueAddress
import akka.stream.Attributes
@ -21,11 +22,13 @@ import akka.stream.stage._
import akka.util.OptionVal
/** INTERNAL API: marker trait for protobuf-serializable artery messages */
@InternalApi
private[remote] trait ArteryMessage extends Serializable
/**
* INTERNAL API: Marker trait for reply messages
*/
@InternalApi
private[remote] trait Reply extends ControlMessage
/**
@ -33,26 +36,43 @@ private[remote] trait Reply extends ControlMessage
* Marker trait for control messages that can be sent via the system message sub-channel
* but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`.
*/
@InternalApi
private[remote] trait ControlMessage extends ArteryMessage
/**
* INTERNAL API
*/
@InternalApi
private[remote] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage
/**
* INTERNAL API
*/
private[remote] case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage
@InternalApi
private[remote] final case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage
/**
* INTERNAL API
*/
private[remote] case class ActorSystemTerminatingAck(from: UniqueAddress) extends ArteryMessage
@InternalApi
private[remote] final case class ActorSystemTerminatingAck(from: UniqueAddress) extends ArteryMessage
/**
* INTERNAL API
*/
@InternalApi
private[remote] case object Flush extends ControlMessage
/**
* INTERNAL API
*/
@InternalApi
private[remote] case object FlushAck extends ArteryMessage
/**
* INTERNAL API
*/
@InternalApi
private[remote] object InboundControlJunction {
/**
@ -87,6 +107,7 @@ private[remote] object InboundControlJunction {
/**
* INTERNAL API
*/
@InternalApi
private[remote] class InboundControlJunction
extends GraphStageWithMaterializedValue[
FlowShape[InboundEnvelope, InboundEnvelope],
@ -150,6 +171,7 @@ private[remote] class InboundControlJunction
/**
* INTERNAL API
*/
@InternalApi
private[remote] object OutboundControlJunction {
private[remote] trait OutboundControlIngress {
def sendControlMessage(message: ControlMessage): Unit
@ -159,6 +181,7 @@ private[remote] object OutboundControlJunction {
/**
* INTERNAL API
*/
@InternalApi
private[remote] class OutboundControlJunction(
outboundContext: OutboundContext,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])

View file

@ -0,0 +1,83 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.Done
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi
private[remote] object FlushBeforeDeathWatchNotification {
private val nameCounter = new AtomicLong(0L)
def props(done: Promise[Done], timeout: FiniteDuration, association: Association): Props = {
Props(new FlushBeforeDeathWatchNotification(done, timeout, association))
}
def nextName(): String = s"flush-${nameCounter.incrementAndGet()}"
private case object Timeout
}
/**
* INTERNAL API
*/
@InternalApi
private[remote] class FlushBeforeDeathWatchNotification(
done: Promise[Done],
timeout: FiniteDuration,
association: Association)
extends Actor
with ActorLogging {
import FlushBeforeDeathWatchNotification.Timeout
var remaining = 0
private val timeoutTask =
context.system.scheduler.scheduleOnce(timeout, self, Timeout)(context.dispatcher)
override def preStart(): Unit = {
try {
remaining = association.sendFlush(self, excludeControlQueue = true)
if (remaining == 0) {
done.trySuccess(Done)
context.stop(self)
}
} catch {
case NonFatal(e) =>
// sendFlush may throw
done.tryFailure(e)
// will log and stop
throw e
}
}
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive: Receive = {
case FlushAck =>
remaining -= 1
log.debug("Flush acknowledged, [{}] remaining", remaining)
if (remaining == 0)
context.stop(self)
case Timeout =>
log.debug("Flush timeout, [{}] remaining", remaining)
context.stop(self)
}
}

View file

@ -0,0 +1,83 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.Done
import akka.actor.Actor
import akka.actor.Props
import akka.annotation.InternalApi
import akka.remote.UniqueAddress
/**
* INTERNAL API
*/
@InternalApi
private[remote] object FlushOnShutdown {
def props(done: Promise[Done], timeout: FiniteDuration, associations: Set[Association]): Props = {
require(associations.nonEmpty)
Props(new FlushOnShutdown(done, timeout, associations))
}
private case object Timeout
}
/**
* INTERNAL API
*/
@InternalApi
private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, associations: Set[Association])
extends Actor {
var remaining = Map.empty[UniqueAddress, Int]
private val timeoutTask =
context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
override def preStart(): Unit = {
try {
associations.foreach { a =>
val acksExpected = a.sendTerminationHint(self)
a.associationState.uniqueRemoteAddress() match {
case Some(address) => remaining += address -> acksExpected
case None => // Ignore, handshake was not completed on this association
}
}
if (remaining.valuesIterator.sum == 0) {
done.trySuccess(Done)
context.stop(self)
}
} catch {
case NonFatal(e) =>
// sendTerminationHint may throw
done.tryFailure(e)
throw e
}
}
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive: Receive = {
case ActorSystemTerminatingAck(from) =>
// Just treat unexpected acks as systems from which zero acks are expected
val acksRemaining = remaining.getOrElse(from, 0)
if (acksRemaining <= 1) {
remaining -= from
} else {
remaining = remaining.updated(from, acksRemaining - 1)
}
if (remaining.isEmpty)
context.stop(self)
case FlushOnShutdown.Timeout =>
context.stop(self)
}
}

View file

@ -15,6 +15,8 @@ import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp }
import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable }
import akka.remote.artery.compress.CompressionProtocol._
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import akka.remote.artery.Flush
import akka.remote.artery.FlushAck
/** INTERNAL API */
private[akka] object ArteryMessageSerializer {
@ -34,6 +36,9 @@ private[akka] object ArteryMessageSerializer {
private val ArteryHeartbeatManifest = "m"
private val ArteryHeartbeatRspManifest = "n"
private val FlushManifest = "o"
private val FlushAckManifest = "p"
private final val DeadLettersRepresentation = ""
}
@ -54,6 +59,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case _: RemoteWatcher.ArteryHeartbeatRsp => ArteryHeartbeatRspManifest
case _: SystemMessageDelivery.Nack => SystemMessageDeliveryNackManifest
case _: Quarantined => QuarantinedManifest
case Flush => FlushManifest
case FlushAck => FlushAckManifest
case _: ActorSystemTerminating => ActorSystemTerminatingManifest
case _: ActorSystemTerminatingAck => ActorSystemTerminatingAckManifest
case _: CompressionProtocol.ActorRefCompressionAdvertisement => ActorRefCompressionAdvertisementManifest
@ -74,6 +81,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case RemoteWatcher.ArteryHeartbeatRsp(from) => serializeArteryHeartbeatRsp(from).toByteArray
case SystemMessageDelivery.Nack(seqNo, from) => serializeSystemMessageDeliveryAck(seqNo, from).toByteArray
case q: Quarantined => serializeQuarantined(q).toByteArray
case Flush => Array.emptyByteArray
case FlushAck => Array.emptyByteArray
case ActorSystemTerminating(from) => serializeWithAddress(from).toByteArray
case ActorSystemTerminatingAck(from) => serializeWithAddress(from).toByteArray
case adv: ActorRefCompressionAdvertisement => serializeActorRefCompressionAdvertisement(adv).toByteArray
@ -92,6 +101,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case HandshakeRspManifest => deserializeWithFromAddress(bytes, HandshakeRsp)
case SystemMessageDeliveryNackManifest => deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack)
case QuarantinedManifest => deserializeQuarantined(ArteryControlFormats.Quarantined.parseFrom(bytes))
case FlushManifest => Flush
case FlushAckManifest => FlushAck
case ActorSystemTerminatingManifest => deserializeWithFromAddress(bytes, ActorSystemTerminating)
case ActorSystemTerminatingAckManifest => deserializeWithFromAddress(bytes, ActorSystemTerminatingAck)
case ActorRefCompressionAdvertisementManifest => deserializeActorRefCompressionAdvertisement(bytes)

View file

@ -0,0 +1,112 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor._
import akka.testkit._
import com.typesafe.config.ConfigFactory
object DeathWatchNotificationSpec {
val config = ConfigFactory.parseString(s"""
akka {
loglevel = INFO
actor {
provider = remote
}
remote.use-unsafe-remote-features-outside-cluster = on
remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
}
""").withFallback(ArterySpecSupport.defaultConfig)
object Sender {
def props(receiver: ActorRef, sendOnStop: Vector[String]): Props =
Props(new Sender(receiver, sendOnStop))
}
class Sender(receiver: ActorRef, sendOnStop: Vector[String]) extends Actor {
override def receive: Receive = {
case msg => sender() ! msg
}
override def postStop(): Unit = {
sendOnStop.foreach(receiver ! _)
}
}
}
class DeathWatchNotificationSpec extends ArteryMultiNodeSpec(DeathWatchNotificationSpec.config) with ImplicitSender {
import DeathWatchNotificationSpec.Sender
private val otherSystem = newRemoteSystem(name = Some("other"))
private val messages = (1 to 100).map(_.toString).toVector
private def setupSender(receiverProbe: TestProbe, name: String): Unit = {
val receiverPath = receiverProbe.ref.path.toStringWithAddress(address(system))
val otherProbe = TestProbe()(otherSystem)
otherSystem.actorSelection(receiverPath).tell(Identify(None), otherProbe.ref)
val receiver = otherProbe.expectMsgType[ActorIdentity](5.seconds).ref.get
receiver.path.address.hasGlobalScope should ===(true) // should be remote
otherSystem.actorOf(Sender.props(receiver, messages), name)
}
private def identifySender(name: String): ActorRef = {
system.actorSelection(rootActorPath(otherSystem) / "user" / name) ! Identify(None)
val sender = expectMsgType[ActorIdentity](5.seconds).ref.get
sender
}
"receive Terminated after ordinary messages" in {
val receiverProbe = TestProbe()
setupSender(receiverProbe, "sender")
val sender = identifySender("sender")
receiverProbe.watch(sender)
// make it likely that the watch has been established
sender.tell("echo", receiverProbe.ref)
receiverProbe.expectMsg("echo")
sender ! PoisonPill
receiverProbe.receiveN(messages.size).toVector shouldBe messages
receiverProbe.expectTerminated(sender)
}
"receive Terminated after ordinary messages when system is shutdown" in {
val receiverProbe1 = TestProbe()
setupSender(receiverProbe1, "sender1")
val sender1 = identifySender("sender1")
val receiverProbe2 = TestProbe()
setupSender(receiverProbe2, "sender2")
val sender2 = identifySender("sender2")
val receiverProbe3 = TestProbe()
setupSender(receiverProbe3, "sender3")
val sender3 = identifySender("sender3")
receiverProbe1.watch(sender1)
receiverProbe2.watch(sender2)
receiverProbe3.watch(sender3)
// make it likely that the watch has been established
sender1.tell("echo1", receiverProbe1.ref)
receiverProbe1.expectMsg("echo1")
sender2.tell("echo2", receiverProbe2.ref)
receiverProbe2.expectMsg("echo2")
sender3.tell("echo3", receiverProbe3.ref)
receiverProbe3.expectMsg("echo3")
otherSystem.terminate()
receiverProbe1.receiveN(messages.size, 5.seconds).toVector shouldBe messages
receiverProbe1.expectTerminated(sender1, 5.seconds)
receiverProbe2.receiveN(messages.size).toVector shouldBe messages
receiverProbe2.expectTerminated(sender2, 5.seconds)
receiverProbe3.receiveN(messages.size).toVector shouldBe messages
receiverProbe3.expectTerminated(sender3, 5.seconds)
}
}

View file

@ -7,6 +7,8 @@ package akka.remote.serialization
import java.io.NotSerializableException
import akka.actor._
import akka.remote.artery.Flush
import akka.remote.artery.FlushAck
import akka.remote.{ RemoteWatcher, UniqueAddress }
import akka.remote.artery.{ ActorSystemTerminating, ActorSystemTerminatingAck, Quarantined, SystemMessageDelivery }
import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp }
@ -29,6 +31,8 @@ class ArteryMessageSerializerSpec extends AkkaSpec {
"Quarantined" -> Quarantined(uniqueAddress(), uniqueAddress()),
"ActorSystemTerminating" -> ActorSystemTerminating(uniqueAddress()),
"ActorSystemTerminatingAck" -> ActorSystemTerminatingAck(uniqueAddress()),
"Flush" -> Flush,
"FlushAck" -> FlushAck,
"HandshakeReq" -> HandshakeReq(uniqueAddress(), uniqueAddress().address),
"HandshakeRsp" -> HandshakeRsp(uniqueAddress()),
"ActorRefCompressionAdvertisement" -> ActorRefCompressionAdvertisement(