Merge pull request #29502 from chbatey/reintroduce-flush-on-terminate

Reintroduce flush on terminate
This commit is contained in:
Christopher Batey 2020-09-25 16:14:56 +01:00 committed by GitHub
commit 50924e56ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 522 additions and 111 deletions

View file

@ -0,0 +1,2 @@
# #28695 added isTerminated to ExtendedActorSystem
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ExtendedActorSystem.isTerminating")

View file

@ -780,6 +780,11 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/ */
@InternalApi private[akka] def finalTerminate(): Unit @InternalApi private[akka] def finalTerminate(): Unit
/**
* INTERNAL API
*/
@InternalApi private[akka] def isTerminating(): Boolean
} }
/** /**
@ -1067,6 +1072,10 @@ private[akka] class ActorSystemImpl(
guardian.stop() guardian.stop()
} }
override private[akka] def isTerminating(): Boolean = {
terminating || aborting || CoordinatedShutdown(this).shutdownReason().isDefined
}
@volatile var aborting = false @volatile var aborting = false
/** /**

View file

@ -7,7 +7,6 @@ package akka.cluster.sharding.typed
import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.ThreadLocalRandom
import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
@ -36,10 +35,12 @@ import akka.cluster.sharding.typed.ReplicatedShardingSpec.MyReplicatedStringSet
import akka.persistence.typed.ReplicationId import akka.persistence.typed.ReplicationId
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.util.ccompat._ import akka.util.ccompat._
import org.scalatest.time.Span
@ccompatUsedUntil213 @ccompatUsedUntil213
object ReplicatedShardingSpec { object ReplicatedShardingSpec {
def commonConfig = ConfigFactory.parseString(""" def commonConfig = ConfigFactory.parseString("""
akka.loglevel = INFO akka.loglevel = DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.remote.classic.netty.tcp.port = 0 akka.remote.classic.netty.tcp.port = 0
@ -176,18 +177,16 @@ object ProxyActor {
case class ForwardToAllInt(entityId: String, msg: MyReplicatedIntSet.Command) extends Command case class ForwardToAllInt(entityId: String, msg: MyReplicatedIntSet.Command) extends Command
def apply(replicationType: ReplicationType): Behavior[Command] = Behaviors.setup { context => def apply(replicationType: ReplicationType): Behavior[Command] = Behaviors.setup { context =>
val replicatedShardingStringSet = val replicatedShardingStringSet: ReplicatedSharding[MyReplicatedStringSet.Command] =
ReplicatedShardingExtension(context.system).init(MyReplicatedStringSet.provider(replicationType)) ReplicatedShardingExtension(context.system).init(MyReplicatedStringSet.provider(replicationType))
val replicatedShardingIntSet = val replicatedShardingIntSet: ReplicatedSharding[MyReplicatedIntSet.Command] =
ReplicatedShardingExtension(context.system).init(MyReplicatedIntSet.provider(replicationType)) ReplicatedShardingExtension(context.system).init(MyReplicatedIntSet.provider(replicationType))
Behaviors.setup { ctx => Behaviors.setup { ctx =>
Behaviors.receiveMessage { Behaviors.receiveMessage {
case ForwardToAllString(entityId, cmd) => case ForwardToAllString(entityId, cmd) =>
val entityRefs = replicatedShardingStringSet.entityRefsFor(entityId) val entityRefs = replicatedShardingStringSet.entityRefsFor(entityId)
ctx.log.infoN("Entity refs {}", entityRefs) ctx.log.infoN("Entity refs {}", entityRefs)
entityRefs.foreach { entityRefs.foreach {
case (replica, ref) => case (replica, ref) =>
ctx.log.infoN("Forwarding to replica {} ref {}", replica, ref) ctx.log.infoN("Forwarding to replica {} ref {}", replica, ref)
@ -226,8 +225,11 @@ class DataCenterReplicatedShardingSpec
abstract class ReplicatedShardingSpec(replicationType: ReplicationType, configA: Config, configB: Config) abstract class ReplicatedShardingSpec(replicationType: ReplicationType, configA: Config, configB: Config)
extends ScalaTestWithActorTestKit(configA) extends ScalaTestWithActorTestKit(configA)
with AnyWordSpecLike with AnyWordSpecLike {
with LogCapturing {
// don't retry quite so quickly
override implicit val patience: PatienceConfig =
PatienceConfig(testKit.testKitSettings.DefaultTimeout.duration, Span(500, org.scalatest.time.Millis))
val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = configB) val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = configB)
@ -265,7 +267,7 @@ abstract class ReplicatedShardingSpec(replicationType: ReplicationType, configA:
} }
"forward to replicas" in { "forward to replicas" in {
val proxy = spawn(ProxyActor(replicationType)) val proxy: ActorRef[ProxyActor.Command] = spawn(ProxyActor(replicationType))
proxy ! ProxyActor.ForwardToAllString("id1", MyReplicatedStringSet.Add("to-all")) proxy ! ProxyActor.ForwardToAllString("id1", MyReplicatedStringSet.Add("to-all"))
proxy ! ProxyActor.ForwardToRandomString("id1", MyReplicatedStringSet.Add("to-random")) proxy ! ProxyActor.ForwardToRandomString("id1", MyReplicatedStringSet.Add("to-random"))

View file

@ -537,7 +537,7 @@ object ShardCoordinator {
*/ */
private[akka] class RebalanceWorker( private[akka] class RebalanceWorker(
shard: String, shard: String,
from: ActorRef, shardRegionFrom: ActorRef,
handOffTimeout: FiniteDuration, handOffTimeout: FiniteDuration,
regions: Set[ActorRef]) regions: Set[ActorRef])
extends Actor extends Actor
@ -554,15 +554,15 @@ object ShardCoordinator {
timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout) timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout)
def receive = { def receive: Receive = {
case BeginHandOffAck(`shard`) => case BeginHandOffAck(`shard`) =>
log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender()) log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender())
acked(sender()) acked(sender())
case ShardRegionTerminated(shardRegion) => case RebalanceWorker.ShardRegionTerminated(shardRegion) =>
log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard)
acked(shardRegion) acked(shardRegion)
case ReceiveTimeout => case ReceiveTimeout =>
log.debug("Rebalance of [{}] from [{}] timed out", shard, from) log.debug("Rebalance of shard [{}] from [{}] timed out", shard, shardRegionFrom)
done(ok = false) done(ok = false)
} }
@ -570,14 +570,19 @@ object ShardCoordinator {
remaining -= shardRegion remaining -= shardRegion
if (remaining.isEmpty) { if (remaining.isEmpty) {
log.debug("All shard regions acked, handing off shard [{}].", shard) log.debug("All shard regions acked, handing off shard [{}].", shard)
from ! HandOff(shard) shardRegionFrom ! HandOff(shard)
context.become(stoppingShard, discardOld = true) context.become(stoppingShard, discardOld = true)
} else {
log.debug("Remaining shard regions: {}", remaining.size)
} }
} }
def stoppingShard: Receive = { def stoppingShard: Receive = {
case ShardStopped(`shard`) => done(ok = true) case ShardStopped(`shard`) => done(ok = true)
case ReceiveTimeout => done(ok = false) case ReceiveTimeout => done(ok = false)
case RebalanceWorker.ShardRegionTerminated(`shardRegionFrom`) =>
log.debug("ShardRegion [{}] terminated while waiting for ShardStopped for shard [{}].", shardRegionFrom, shard)
done(ok = true)
} }
def done(ok: Boolean): Unit = { def done(ok: Boolean): Unit = {
@ -588,10 +593,10 @@ object ShardCoordinator {
private[akka] def rebalanceWorkerProps( private[akka] def rebalanceWorkerProps(
shard: String, shard: String,
from: ActorRef, shardRegionFrom: ActorRef,
handOffTimeout: FiniteDuration, handOffTimeout: FiniteDuration,
regions: Set[ActorRef]): Props = { regions: Set[ActorRef]): Props = {
Props(new RebalanceWorker(shard, from, handOffTimeout, regions)) Props(new RebalanceWorker(shard, shardRegionFrom, handOffTimeout, regions))
} }
} }
@ -1000,13 +1005,15 @@ abstract class ShardCoordinator(
} }
} }
def regionProxyTerminated(ref: ActorRef): Unit = def regionProxyTerminated(ref: ActorRef): Unit = {
rebalanceWorkers.foreach(_ ! RebalanceWorker.ShardRegionTerminated(ref))
if (state.regionProxies.contains(ref)) { if (state.regionProxies.contains(ref)) {
log.debug("ShardRegion proxy terminated: [{}]", ref) log.debug("ShardRegion proxy terminated: [{}]", ref)
update(ShardRegionProxyTerminated(ref)) { evt => update(ShardRegionProxyTerminated(ref)) { evt =>
state = state.updated(evt) state = state.updated(evt)
} }
} }
}
def shuttingDown: Receive = { def shuttingDown: Receive = {
case _ => // ignore all case _ => // ignore all

View file

@ -0,0 +1,168 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import scala.concurrent.duration._
import akka.actor._
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.remote.artery.ArteryMultiNodeSpec
import akka.remote.artery.ArterySpecSupport
object ClusterDeathWatchNotificationSpec {
val config = ConfigFactory.parseString(s"""
akka {
loglevel = INFO
actor {
provider = cluster
}
}
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""").withFallback(ArterySpecSupport.defaultConfig)
object Sender {
def props(receiver: ActorRef, sendOnStop: Vector[String]): Props =
Props(new Sender(receiver, sendOnStop))
}
class Sender(receiver: ActorRef, sendOnStop: Vector[String]) extends Actor {
override def receive: Receive = {
case msg => sender() ! msg
}
override def postStop(): Unit = {
sendOnStop.foreach(receiver ! _)
}
}
}
class ClusterDeathWatchNotificationSpec
extends ArteryMultiNodeSpec(ClusterDeathWatchNotificationSpec.config)
with ImplicitSender {
import ClusterDeathWatchNotificationSpec.Sender
private def system1: ActorSystem = system
private val system2 = newRemoteSystem(name = Some(system.name))
private val system3 = newRemoteSystem(name = Some(system.name))
private val systems = Vector(system1, system2, system3)
private val messages = (1 to 100).map(_.toString).toVector
private def setupSender(sys: ActorSystem, receiverProbe: TestProbe, name: String): Unit = {
val receiverPath = receiverProbe.ref.path.toStringWithAddress(address(system1))
val otherProbe = TestProbe()(sys)
sys.actorSelection(receiverPath).tell(Identify(None), otherProbe.ref)
val receiver = otherProbe.expectMsgType[ActorIdentity](5.seconds).ref.get
receiver.path.address.hasGlobalScope should ===(true) // should be remote
sys.actorOf(Sender.props(receiver, messages), name)
}
private def identifySender(sys: ActorSystem, name: String): ActorRef = {
system1.actorSelection(rootActorPath(sys) / "user" / name) ! Identify(None)
val sender = expectMsgType[ActorIdentity](5.seconds).ref.get
sender
}
"join cluster" in within(10.seconds) {
systems.foreach { sys =>
Cluster(sys).join(Cluster(system1).selfAddress)
}
awaitAssert {
systems.foreach { sys =>
Cluster(sys).state.members.size should ===(systems.size)
Cluster(sys).state.members.iterator.map(_.status).toSet should ===(Set(MemberStatus.Up))
}
}
}
"receive Terminated after ordinary messages" in {
val receiverProbe = TestProbe()
setupSender(system2, receiverProbe, "sender")
val sender = identifySender(system2, "sender")
receiverProbe.watch(sender)
// make it likely that the watch has been established
sender.tell("echo", receiverProbe.ref)
receiverProbe.expectMsg("echo")
sender ! PoisonPill
receiverProbe.receiveN(messages.size).toVector shouldBe messages
receiverProbe.expectTerminated(sender)
}
"receive Terminated after ordinary messages when system is shutdown" in {
val receiverProbe1 = TestProbe()
setupSender(system2, receiverProbe1, "sender1")
val sender1 = identifySender(system2, "sender1")
val receiverProbe2 = TestProbe()
setupSender(system2, receiverProbe2, "sender2")
val sender2 = identifySender(system2, "sender2")
val receiverProbe3 = TestProbe()
setupSender(system2, receiverProbe3, "sender3")
val sender3 = identifySender(system2, "sender3")
receiverProbe1.watch(sender1)
receiverProbe2.watch(sender2)
receiverProbe3.watch(sender3)
// make it likely that the watch has been established
sender1.tell("echo1", receiverProbe1.ref)
receiverProbe1.expectMsg("echo1")
sender2.tell("echo2", receiverProbe2.ref)
receiverProbe2.expectMsg("echo2")
sender3.tell("echo3", receiverProbe3.ref)
receiverProbe3.expectMsg("echo3")
system2.log.debug("terminating")
system2.terminate()
receiverProbe1.receiveN(messages.size, 5.seconds).toVector shouldBe messages
receiverProbe1.expectTerminated(sender1)
receiverProbe2.receiveN(messages.size).toVector shouldBe messages
receiverProbe2.expectTerminated(sender2)
receiverProbe3.receiveN(messages.size).toVector shouldBe messages
receiverProbe3.expectTerminated(sender3)
}
"receive Terminated after ordinary messages when system is leaving" in {
val receiverProbe1 = TestProbe()
setupSender(system3, receiverProbe1, "sender1")
val sender1 = identifySender(system3, "sender1")
val receiverProbe2 = TestProbe()
setupSender(system3, receiverProbe2, "sender2")
val sender2 = identifySender(system3, "sender2")
val receiverProbe3 = TestProbe()
setupSender(system3, receiverProbe3, "sender3")
val sender3 = identifySender(system3, "sender3")
receiverProbe1.watch(sender1)
receiverProbe2.watch(sender2)
receiverProbe3.watch(sender3)
// make it likely that the watch has been established
sender1.tell("echo1", receiverProbe1.ref)
receiverProbe1.expectMsg("echo1")
sender2.tell("echo2", receiverProbe2.ref)
receiverProbe2.expectMsg("echo2")
sender3.tell("echo3", receiverProbe3.ref)
receiverProbe3.expectMsg("echo3")
system3.log.debug("leaving")
Cluster(system1).leave(Cluster(system3).selfAddress)
receiverProbe1.receiveN(messages.size, 5.seconds).toVector shouldBe messages
receiverProbe1.expectTerminated(sender1)
receiverProbe2.receiveN(messages.size).toVector shouldBe messages
receiverProbe2.expectTerminated(sender2)
receiverProbe3.receiveN(messages.size).toVector shouldBe messages
receiverProbe3.expectTerminated(sender3)
}
}

View file

@ -0,0 +1,6 @@
# #28695 flush before sending DeathWatchNotification
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlushOnShutdown.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlushOnShutdown.timeoutTask")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlushOnShutdown.this")
ProblemFilters.exclude[FinalClassProblem]("akka.remote.artery.ActorSystemTerminating")
ProblemFilters.exclude[FinalClassProblem]("akka.remote.artery.ActorSystemTerminatingAck")

View file

@ -982,6 +982,12 @@ akka {
# remote messages has been completed # remote messages has been completed
shutdown-flush-timeout = 1 second shutdown-flush-timeout = 1 second
# Before sending notificaiton of terminated actor (DeathWatchNotification) other messages
# will be flushed to make sure that the Terminated message arrives after other messages.
# It will wait this long for the flush acknowledgement before continuing.
# The flushing can be disabled by setting this to `off`.
death-watch-notification-flush-timeout = 3 seconds
# See 'inbound-max-restarts' # See 'inbound-max-restarts'
inbound-restart-timeout = 5 seconds inbound-restart-timeout = 5 seconds

View file

@ -193,8 +193,10 @@ private[akka] class RemoteWatcher(
} }
} }
def publishAddressTerminated(address: Address): Unit = def publishAddressTerminated(address: Address): Unit = {
log.debug("Publish AddressTerminated [{}]", address)
AddressTerminatedTopic(context.system).publish(AddressTerminated(address)) AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
}
def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = { def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = {
remoteProvider.transport match { remoteProvider.transport match {

View file

@ -152,7 +152,18 @@ private[akka] final class ArterySettings private (config: Config) {
val ShutdownFlushTimeout: FiniteDuration = val ShutdownFlushTimeout: FiniteDuration =
config config
.getMillisDuration("shutdown-flush-timeout") .getMillisDuration("shutdown-flush-timeout")
.requiring(interval => interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") .requiring(timeout => timeout > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val DeathWatchNotificationFlushTimeout: FiniteDuration = {
toRootLowerCase(config.getString("death-watch-notification-flush-timeout")) match {
case "off" => Duration.Zero
case _ =>
config
.getMillisDuration("death-watch-notification-flush-timeout")
.requiring(
interval => interval > Duration.Zero,
"death-watch-notification-flush-timeout must be more than zero, or off")
}
}
val InboundRestartTimeout: FiniteDuration = val InboundRestartTimeout: FiniteDuration =
config config
.getMillisDuration("inbound-restart-timeout") .getMillisDuration("inbound-restart-timeout")

View file

@ -16,15 +16,12 @@ import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Try import scala.util.Try
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
import akka.Done import akka.Done
import akka.NotUsed import akka.NotUsed
import akka.actor._ import akka.actor._
import akka.actor.Actor
import akka.actor.Props
import akka.annotation.InternalStableApi import akka.annotation.InternalStableApi
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.Logging import akka.event.Logging
@ -50,7 +47,6 @@ import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.util.OptionVal import akka.util.OptionVal
import akka.util.WildcardIndex import akka.util.WildcardIndex
import akka.util.unused
/** /**
* INTERNAL API * INTERNAL API
@ -249,84 +245,6 @@ private[remote] trait OutboundContext {
} }
/**
* INTERNAL API
*/
private[remote] object FlushOnShutdown {
def props(
done: Promise[Done],
timeout: FiniteDuration,
inboundContext: InboundContext,
associations: Set[Association]): Props = {
require(associations.nonEmpty)
Props(new FlushOnShutdown(done, timeout, inboundContext, associations))
}
case object Timeout
}
/**
* INTERNAL API
*/
private[remote] class FlushOnShutdown(
done: Promise[Done],
timeout: FiniteDuration,
@unused inboundContext: InboundContext,
associations: Set[Association])
extends Actor
with ActorLogging {
var remaining = Map.empty[UniqueAddress, Int]
val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
override def preStart(): Unit = {
try {
associations.foreach { a =>
val acksExpected = a.sendTerminationHint(self)
a.associationState.uniqueRemoteAddress() match {
case Some(address) => remaining += address -> acksExpected
case None => // Ignore, handshake was not completed on this association
}
}
if (remaining.valuesIterator.sum == 0) {
done.trySuccess(Done)
context.stop(self)
}
} catch {
case NonFatal(e) =>
// sendTerminationHint may throw
done.tryFailure(e)
throw e
}
}
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive = {
case ActorSystemTerminatingAck(from) =>
// Just treat unexpected acks as systems from which zero acks are expected
val acksRemaining = remaining.getOrElse(from, 0)
if (acksRemaining <= 1) {
remaining -= from
} else {
remaining = remaining.updated(from, acksRemaining - 1)
}
if (remaining.isEmpty)
context.stop(self)
case FlushOnShutdown.Timeout =>
log.debug(
"Flush of remote transport timed out after [{}]. Remaining [{}] associations.",
timeout.toCoarsest,
remaining.size)
context.stop(self)
}
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -684,7 +602,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val flushingPromise = Promise[Done]() val flushingPromise = Promise[Done]()
system.systemActorOf( system.systemActorOf(
FlushOnShutdown FlushOnShutdown
.props(flushingPromise, settings.Advanced.ShutdownFlushTimeout, this, allAssociations) .props(flushingPromise, settings.Advanced.ShutdownFlushTimeout, allAssociations)
.withDispatcher(Dispatchers.InternalDispatcherId), .withDispatcher(Dispatchers.InternalDispatcherId),
"remoteFlushOnShutdown") "remoteFlushOnShutdown")
flushingPromise.future flushingPromise.future
@ -930,10 +848,30 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
} }
} }
// Checks for termination hint messages and sends an ACK for those (not processing them further)
// Purpose of this stage is flushing, the sender can wait for the ACKs up to try flushing
// pending messages.
val flushReplier: Flow[InboundEnvelope, InboundEnvelope, NotUsed] = {
Flow[InboundEnvelope].filter { envelope =>
envelope.message match {
case Flush =>
envelope.sender match {
case OptionVal.Some(snd) =>
snd.tell(FlushAck, ActorRef.noSender)
case OptionVal.None =>
log.error("Expected sender for Flush message from [{}]", envelope.association)
}
false
case _ => true
}
}
}
def inboundSink(bufferPool: EnvelopeBufferPool): Sink[InboundEnvelope, Future[Done]] = def inboundSink(bufferPool: EnvelopeBufferPool): Sink[InboundEnvelope, Future[Done]] =
Flow[InboundEnvelope] Flow[InboundEnvelope]
.via(createDeserializer(bufferPool)) .via(createDeserializer(bufferPool))
.via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope]) .via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope])
.via(flushReplier)
.via(terminationHintReplier(inControlStream = false)) .via(terminationHintReplier(inControlStream = false))
.via(new InboundHandshake(this, inControlStream = false)) .via(new InboundHandshake(this, inControlStream = false))
.via(new InboundQuarantineCheck(this)) .via(new InboundQuarantineCheck(this))
@ -953,6 +891,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Flow[InboundEnvelope] Flow[InboundEnvelope]
.via(createDeserializer(envelopeBufferPool)) .via(createDeserializer(envelopeBufferPool))
.via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope]) .via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope])
.via(flushReplier)
.via(terminationHintReplier(inControlStream = true)) .via(terminationHintReplier(inControlStream = true))
.via(new InboundHandshake(this, inControlStream = true)) .via(new InboundHandshake(this, inControlStream = true))
.via(new InboundQuarantineCheck(this)) .via(new InboundQuarantineCheck(this))

View file

@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -27,7 +28,10 @@ import akka.actor.ActorSelectionMessage
import akka.actor.Address import akka.actor.Address
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.Dropped import akka.actor.Dropped
import akka.dispatch.Dispatchers
import akka.dispatch.sysmsg.DeathWatchNotification
import akka.dispatch.sysmsg.SystemMessage import akka.dispatch.sysmsg.SystemMessage
import akka.dispatch.sysmsg.Unwatch
import akka.event.Logging import akka.event.Logging
import akka.remote.DaemonMsgCreate import akka.remote.DaemonMsgCreate
import akka.remote.PriorityMessage import akka.remote.PriorityMessage
@ -147,6 +151,7 @@ private[remote] class Association(
override def settings = transport.settings override def settings = transport.settings
private def advancedSettings = transport.settings.Advanced private def advancedSettings = transport.settings.Advanced
private val deathWatchNotificationFlushEnabled = advancedSettings.DeathWatchNotificationFlushTimeout > Duration.Zero && transport.provider.settings.HasCluster
private val restartCounter = private val restartCounter =
new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout) new RestartCounter(advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout)
@ -352,6 +357,34 @@ private[remote] class Association(
deadletters ! env deadletters ! env
} }
def shouldSendUnwatch(): Boolean =
!transport.provider.settings.HasCluster || !transport.system.isTerminating()
def shouldSendDeathWatchNotification(d: DeathWatchNotification): Boolean =
d.addressTerminated || !transport.provider.settings.HasCluster || !transport.system.isTerminating()
def sendSystemMessage(outboundEnvelope: OutboundEnvelope): Unit = {
outboundEnvelope.message match {
case u: Unwatch if shouldSendUnwatch() =>
log.debug(
"Not sending Unwatch of {} to {} because it will be notified when this member " +
"has been removed from Cluster.",
u.watcher,
u.watchee)
case d: DeathWatchNotification if !shouldSendDeathWatchNotification(d) =>
log.debug(
"Not sending DeathWatchNotification of {} to {} because it will be notified when this member " +
"has been removed from Cluster.",
d.actor,
outboundEnvelope.recipient.getOrElse("unknown"))
case _ =>
if (!controlQueue.offer(outboundEnvelope)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
}
}
val state = associationState val state = associationState
val quarantined = state.isQuarantined() val quarantined = state.isQuarantined()
val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery] val messageIsClearSystemMessageDelivery = message.isInstanceOf[ClearSystemMessageDelivery]
@ -368,11 +401,20 @@ private[remote] class Association(
try { try {
val outboundEnvelope = createOutboundEnvelope() val outboundEnvelope = createOutboundEnvelope()
message match { message match {
case d: DeathWatchNotification if deathWatchNotificationFlushEnabled && shouldSendDeathWatchNotification(d) =>
val flushingPromise = Promise[Done]()
log.debug("Delaying death watch notification until flush has been sent. {}", d)
transport.system.systemActorOf(
FlushBeforeDeathWatchNotification
.props(flushingPromise, settings.Advanced.DeathWatchNotificationFlushTimeout, this)
.withDispatcher(Dispatchers.InternalDispatcherId),
FlushBeforeDeathWatchNotification.nextName())
flushingPromise.future.onComplete { _ =>
log.debug("Sending death watch notification as flush is complete. {}", d)
sendSystemMessage(outboundEnvelope)
}(materializer.executionContext)
case _: SystemMessage => case _: SystemMessage =>
if (!controlQueue.offer(outboundEnvelope)) { sendSystemMessage(outboundEnvelope)
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")
dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope)
}
case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery => case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | _: ClearSystemMessageDelivery =>
// ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating // ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating
if (!controlQueue.offer(outboundEnvelope)) { if (!controlQueue.offer(outboundEnvelope)) {
@ -448,10 +490,18 @@ private[remote] class Association(
} }
def sendTerminationHint(replyTo: ActorRef): Int = { def sendTerminationHint(replyTo: ActorRef): Int = {
log.debug("Sending ActorSystemTerminating to all queues")
sendToAllQueues(ActorSystemTerminating(localAddress), replyTo, excludeControlQueue = false)
}
def sendFlush(replyTo: ActorRef, excludeControlQueue: Boolean): Int =
sendToAllQueues(Flush, replyTo, excludeControlQueue)
def sendToAllQueues(msg: ControlMessage, replyTo: ActorRef, excludeControlQueue: Boolean): Int = {
if (!associationState.isQuarantined()) { if (!associationState.isQuarantined()) {
val msg = ActorSystemTerminating(localAddress)
var sent = 0 var sent = 0
queues.iterator.filter(q => q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue => val queuesIter = if (excludeControlQueue) queues.iterator.drop(1) else queues.iterator
queuesIter.filter(q => q.isEnabled && !q.isInstanceOf[LazyQueueWrapper]).foreach { queue =>
try { try {
val envelope = outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.Some(replyTo)) val envelope = outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.Some(replyTo))

View file

@ -11,6 +11,7 @@ import scala.concurrent.Promise
import scala.util.Try import scala.util.Try
import akka.Done import akka.Done
import akka.annotation.InternalApi
import akka.event.Logging import akka.event.Logging
import akka.remote.UniqueAddress import akka.remote.UniqueAddress
import akka.stream.Attributes import akka.stream.Attributes
@ -21,11 +22,13 @@ import akka.stream.stage._
import akka.util.OptionVal import akka.util.OptionVal
/** INTERNAL API: marker trait for protobuf-serializable artery messages */ /** INTERNAL API: marker trait for protobuf-serializable artery messages */
@InternalApi
private[remote] trait ArteryMessage extends Serializable private[remote] trait ArteryMessage extends Serializable
/** /**
* INTERNAL API: Marker trait for reply messages * INTERNAL API: Marker trait for reply messages
*/ */
@InternalApi
private[remote] trait Reply extends ControlMessage private[remote] trait Reply extends ControlMessage
/** /**
@ -33,26 +36,43 @@ private[remote] trait Reply extends ControlMessage
* Marker trait for control messages that can be sent via the system message sub-channel * Marker trait for control messages that can be sent via the system message sub-channel
* but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`. * but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`.
*/ */
@InternalApi
private[remote] trait ControlMessage extends ArteryMessage private[remote] trait ControlMessage extends ArteryMessage
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[remote] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage private[remote] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[remote] case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage @InternalApi
private[remote] final case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[remote] case class ActorSystemTerminatingAck(from: UniqueAddress) extends ArteryMessage @InternalApi
private[remote] final case class ActorSystemTerminatingAck(from: UniqueAddress) extends ArteryMessage
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[remote] case object Flush extends ControlMessage
/**
* INTERNAL API
*/
@InternalApi
private[remote] case object FlushAck extends ArteryMessage
/**
* INTERNAL API
*/
@InternalApi
private[remote] object InboundControlJunction { private[remote] object InboundControlJunction {
/** /**
@ -87,6 +107,7 @@ private[remote] object InboundControlJunction {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[remote] class InboundControlJunction private[remote] class InboundControlJunction
extends GraphStageWithMaterializedValue[ extends GraphStageWithMaterializedValue[
FlowShape[InboundEnvelope, InboundEnvelope], FlowShape[InboundEnvelope, InboundEnvelope],
@ -150,6 +171,7 @@ private[remote] class InboundControlJunction
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[remote] object OutboundControlJunction { private[remote] object OutboundControlJunction {
private[remote] trait OutboundControlIngress { private[remote] trait OutboundControlIngress {
def sendControlMessage(message: ControlMessage): Unit def sendControlMessage(message: ControlMessage): Unit
@ -159,6 +181,7 @@ private[remote] object OutboundControlJunction {
/** /**
* INTERNAL API * INTERNAL API
*/ */
@InternalApi
private[remote] class OutboundControlJunction( private[remote] class OutboundControlJunction(
outboundContext: OutboundContext, outboundContext: OutboundContext,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope]) outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])

View file

@ -0,0 +1,83 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.Done
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Props
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi
private[remote] object FlushBeforeDeathWatchNotification {
private val nameCounter = new AtomicLong(0L)
def props(done: Promise[Done], timeout: FiniteDuration, association: Association): Props = {
Props(new FlushBeforeDeathWatchNotification(done, timeout, association))
}
def nextName(): String = s"flush-${nameCounter.incrementAndGet()}"
private case object Timeout
}
/**
* INTERNAL API
*/
@InternalApi
private[remote] class FlushBeforeDeathWatchNotification(
done: Promise[Done],
timeout: FiniteDuration,
association: Association)
extends Actor
with ActorLogging {
import FlushBeforeDeathWatchNotification.Timeout
var remaining = 0
private val timeoutTask =
context.system.scheduler.scheduleOnce(timeout, self, Timeout)(context.dispatcher)
override def preStart(): Unit = {
try {
remaining = association.sendFlush(self, excludeControlQueue = true)
if (remaining == 0) {
done.trySuccess(Done)
context.stop(self)
}
} catch {
case NonFatal(e) =>
// sendFlush may throw
done.tryFailure(e)
// will log and stop
throw e
}
}
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive: Receive = {
case FlushAck =>
remaining -= 1
log.debug("Flush acknowledged, [{}] remaining", remaining)
if (remaining == 0)
context.stop(self)
case Timeout =>
log.debug("Flush timeout, [{}] remaining", remaining)
context.stop(self)
}
}

View file

@ -0,0 +1,87 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import akka.Done
import akka.actor.{ Actor, ActorLogging, Props }
import akka.annotation.InternalApi
import akka.remote.UniqueAddress
/**
* INTERNAL API
*/
@InternalApi
private[remote] object FlushOnShutdown {
def props(done: Promise[Done], timeout: FiniteDuration, associations: Set[Association]): Props = {
require(associations.nonEmpty)
Props(new FlushOnShutdown(done, timeout, associations))
}
private case object Timeout
}
/**
* INTERNAL API
*/
@InternalApi
private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, associations: Set[Association])
extends Actor
with ActorLogging {
var remaining = Map.empty[UniqueAddress, Int]
private val timeoutTask =
context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
override def preStart(): Unit = {
try {
associations.foreach { a =>
val acksExpected = a.sendTerminationHint(self)
a.associationState.uniqueRemoteAddress() match {
case Some(address) => remaining += address -> acksExpected
case None => // Ignore, handshake was not completed on this association
}
}
if (remaining.valuesIterator.sum == 0) {
done.trySuccess(Done)
context.stop(self)
}
} catch {
case NonFatal(e) =>
// sendTerminationHint may throw
done.tryFailure(e)
throw e
}
}
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive: Receive = {
case ActorSystemTerminatingAck(from) =>
log.debug("ActorSystemTerminatingAck from [{}]", from)
// Just treat unexpected acks as systems from which zero acks are expected
val acksRemaining = remaining.getOrElse(from, 0)
if (acksRemaining <= 1) {
remaining -= from
} else {
remaining = remaining.updated(from, acksRemaining - 1)
}
if (remaining.isEmpty)
context.stop(self)
case FlushOnShutdown.Timeout =>
log.debug(
"Flush of remote transport timed out after [{}]. Remaining [{}] associations.",
timeout.toCoarsest,
remaining.size)
context.stop(self)
}
}

View file

@ -15,6 +15,8 @@ import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp }
import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable } import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable }
import akka.remote.artery.compress.CompressionProtocol._ import akka.remote.artery.compress.CompressionProtocol._
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest } import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import akka.remote.artery.Flush
import akka.remote.artery.FlushAck
/** INTERNAL API */ /** INTERNAL API */
private[akka] object ArteryMessageSerializer { private[akka] object ArteryMessageSerializer {
@ -34,6 +36,9 @@ private[akka] object ArteryMessageSerializer {
private val ArteryHeartbeatManifest = "m" private val ArteryHeartbeatManifest = "m"
private val ArteryHeartbeatRspManifest = "n" private val ArteryHeartbeatRspManifest = "n"
private val FlushManifest = "o"
private val FlushAckManifest = "p"
private final val DeadLettersRepresentation = "" private final val DeadLettersRepresentation = ""
} }
@ -54,6 +59,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case _: RemoteWatcher.ArteryHeartbeatRsp => ArteryHeartbeatRspManifest case _: RemoteWatcher.ArteryHeartbeatRsp => ArteryHeartbeatRspManifest
case _: SystemMessageDelivery.Nack => SystemMessageDeliveryNackManifest case _: SystemMessageDelivery.Nack => SystemMessageDeliveryNackManifest
case _: Quarantined => QuarantinedManifest case _: Quarantined => QuarantinedManifest
case Flush => FlushManifest
case FlushAck => FlushAckManifest
case _: ActorSystemTerminating => ActorSystemTerminatingManifest case _: ActorSystemTerminating => ActorSystemTerminatingManifest
case _: ActorSystemTerminatingAck => ActorSystemTerminatingAckManifest case _: ActorSystemTerminatingAck => ActorSystemTerminatingAckManifest
case _: CompressionProtocol.ActorRefCompressionAdvertisement => ActorRefCompressionAdvertisementManifest case _: CompressionProtocol.ActorRefCompressionAdvertisement => ActorRefCompressionAdvertisementManifest
@ -74,6 +81,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case RemoteWatcher.ArteryHeartbeatRsp(from) => serializeArteryHeartbeatRsp(from).toByteArray case RemoteWatcher.ArteryHeartbeatRsp(from) => serializeArteryHeartbeatRsp(from).toByteArray
case SystemMessageDelivery.Nack(seqNo, from) => serializeSystemMessageDeliveryAck(seqNo, from).toByteArray case SystemMessageDelivery.Nack(seqNo, from) => serializeSystemMessageDeliveryAck(seqNo, from).toByteArray
case q: Quarantined => serializeQuarantined(q).toByteArray case q: Quarantined => serializeQuarantined(q).toByteArray
case Flush => Array.emptyByteArray
case FlushAck => Array.emptyByteArray
case ActorSystemTerminating(from) => serializeWithAddress(from).toByteArray case ActorSystemTerminating(from) => serializeWithAddress(from).toByteArray
case ActorSystemTerminatingAck(from) => serializeWithAddress(from).toByteArray case ActorSystemTerminatingAck(from) => serializeWithAddress(from).toByteArray
case adv: ActorRefCompressionAdvertisement => serializeActorRefCompressionAdvertisement(adv).toByteArray case adv: ActorRefCompressionAdvertisement => serializeActorRefCompressionAdvertisement(adv).toByteArray
@ -92,6 +101,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case HandshakeRspManifest => deserializeWithFromAddress(bytes, HandshakeRsp) case HandshakeRspManifest => deserializeWithFromAddress(bytes, HandshakeRsp)
case SystemMessageDeliveryNackManifest => deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack) case SystemMessageDeliveryNackManifest => deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack)
case QuarantinedManifest => deserializeQuarantined(ArteryControlFormats.Quarantined.parseFrom(bytes)) case QuarantinedManifest => deserializeQuarantined(ArteryControlFormats.Quarantined.parseFrom(bytes))
case FlushManifest => Flush
case FlushAckManifest => FlushAck
case ActorSystemTerminatingManifest => deserializeWithFromAddress(bytes, ActorSystemTerminating) case ActorSystemTerminatingManifest => deserializeWithFromAddress(bytes, ActorSystemTerminating)
case ActorSystemTerminatingAckManifest => deserializeWithFromAddress(bytes, ActorSystemTerminatingAck) case ActorSystemTerminatingAckManifest => deserializeWithFromAddress(bytes, ActorSystemTerminatingAck)
case ActorRefCompressionAdvertisementManifest => deserializeActorRefCompressionAdvertisement(bytes) case ActorRefCompressionAdvertisementManifest => deserializeActorRefCompressionAdvertisement(bytes)

View file

@ -37,7 +37,8 @@ import scala.util.control.NonFatal
// This is a simplification Spec. It doesn't rely on changing files. // This is a simplification Spec. It doesn't rely on changing files.
class RotatingProviderWithStaticKeysSpec class RotatingProviderWithStaticKeysSpec
extends RotatingKeysSSLEngineProviderSpec(RotatingKeysSSLEngineProviderSpec.resourcesConfig) { extends RotatingKeysSSLEngineProviderSpec(RotatingKeysSSLEngineProviderSpec.resourcesConfig) {
"Artery with TLS/TCP with RotatingKeysSSLEngine" must { // FIXME this is a flaky test and don't want the noise on the repeat branch
"Artery with TLS/TCP with RotatingKeysSSLEngine" ignore {
"rebuild the SSLContext" in { "rebuild the SSLContext" in {
if (!arteryTcpTlsEnabled()) if (!arteryTcpTlsEnabled())

View file

@ -7,6 +7,8 @@ package akka.remote.serialization
import java.io.NotSerializableException import java.io.NotSerializableException
import akka.actor._ import akka.actor._
import akka.remote.artery.Flush
import akka.remote.artery.FlushAck
import akka.remote.{ RemoteWatcher, UniqueAddress } import akka.remote.{ RemoteWatcher, UniqueAddress }
import akka.remote.artery.{ ActorSystemTerminating, ActorSystemTerminatingAck, Quarantined, SystemMessageDelivery } import akka.remote.artery.{ ActorSystemTerminating, ActorSystemTerminatingAck, Quarantined, SystemMessageDelivery }
import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp } import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp }
@ -29,6 +31,8 @@ class ArteryMessageSerializerSpec extends AkkaSpec {
"Quarantined" -> Quarantined(uniqueAddress(), uniqueAddress()), "Quarantined" -> Quarantined(uniqueAddress(), uniqueAddress()),
"ActorSystemTerminating" -> ActorSystemTerminating(uniqueAddress()), "ActorSystemTerminating" -> ActorSystemTerminating(uniqueAddress()),
"ActorSystemTerminatingAck" -> ActorSystemTerminatingAck(uniqueAddress()), "ActorSystemTerminatingAck" -> ActorSystemTerminatingAck(uniqueAddress()),
"Flush" -> Flush,
"FlushAck" -> FlushAck,
"HandshakeReq" -> HandshakeReq(uniqueAddress(), uniqueAddress().address), "HandshakeReq" -> HandshakeReq(uniqueAddress(), uniqueAddress().address),
"HandshakeRsp" -> HandshakeRsp(uniqueAddress()), "HandshakeRsp" -> HandshakeRsp(uniqueAddress()),
"ActorRefCompressionAdvertisement" -> ActorRefCompressionAdvertisement( "ActorRefCompressionAdvertisement" -> ActorRefCompressionAdvertisement(