flush messages on shutdown, #20811

* StreamSupervisor as system actor so that it is
  stopped after ordinary actors
* when transport is shutdown send flush message to all
  outbound associations (over control stream) and wait for ack
  or timeout
This commit is contained in:
Patrik Nordwall 2016-06-23 18:11:56 +02:00
parent d99274a51f
commit a021eb5ff4
8 changed files with 181 additions and 53 deletions

View file

@ -99,9 +99,6 @@ class RemoteRandomSpec(multiNodeConfig: RemoteRandomConfig) extends MultiNodeSpe
// "Terminate" to a shut down node // "Terminate" to a shut down node
system.stop(actor) system.stop(actor)
enterBarrier("done") enterBarrier("done")
// FIXME this test has problems shutting down actor system when running with Artery
// [akka.actor.ActorSystemImpl(RemoteRandomSpec)] Failed to stop [RemoteRandomSpec] within [5 seconds]
} }
} }
} }

View file

@ -69,6 +69,8 @@ import io.aeron.driver.ThreadingMode
import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.BusySpinIdleStrategy import org.agrona.concurrent.BusySpinIdleStrategy
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.Props
import akka.actor.Actor
/** /**
* INTERNAL API * INTERNAL API
@ -313,6 +315,50 @@ private[akka] trait OutboundContext {
def dummyRecipient: RemoteActorRef def dummyRecipient: RemoteActorRef
} }
/**
* INTERNAL API
*/
private[remote] object FlushOnShutdown {
def props(done: Promise[Done], timeout: FiniteDuration,
inboundContext: InboundContext, associations: Set[Association]): Props = {
require(associations.nonEmpty)
Props(new FlushOnShutdown(done, timeout, inboundContext, associations))
}
case object Timeout
}
/**
* INTERNAL API
*/
private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration,
inboundContext: InboundContext, associations: Set[Association]) extends Actor {
var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue)
val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher)
override def preStart(): Unit = {
val msg = ActorSystemTerminating(inboundContext.localAddress)
associations.foreach { a a.send(msg, OptionVal.Some(self), a.dummyRecipient) }
}
override def postStop(): Unit =
timeoutTask.cancel()
def receive = {
case ActorSystemTerminatingAck(from)
remaining -= from
if (remaining.isEmpty) {
done.trySuccess(Done)
context.stop(self)
}
case FlushOnShutdown.Timeout
done.trySuccess(Done)
context.stop(self)
}
}
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -352,6 +398,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
"handshake-timeout must be > 0") "handshake-timeout must be > 0")
private val injectHandshakeInterval: FiniteDuration = 1.second private val injectHandshakeInterval: FiniteDuration = 1.second
private val giveUpSendAfter: FiniteDuration = 60.seconds private val giveUpSendAfter: FiniteDuration = 60.seconds
private val shutdownFlushTimeout = 1.second
private val remoteDispatcher = system.dispatchers.lookup(remoteSettings.Dispatcher)
private val largeMessageDestinations = private val largeMessageDestinations =
system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry) system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry)
@ -380,8 +429,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder()
def createFlightRecorderEventSink(): EventSink = {
// FIXME there is some concurrency issue with the FlightRecorder, when shutting down.
// It crashes the JVM.
// flightRecorder.createEventSink()
IgnoreEventSink
}
// !!! WARNING !!! This is *NOT* thread safe, // !!! WARNING !!! This is *NOT* thread safe,
private val topLevelFREvents = flightRecorder.createEventSink() private val topLevelFREvents = createFlightRecorderEventSink()
private val associationRegistry = new AssociationRegistry( private val associationRegistry = new AssociationRegistry(
remoteAddress new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations)) remoteAddress new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations))
@ -413,7 +469,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
val materializerSettings = ActorMaterializerSettings( val materializerSettings = ActorMaterializerSettings(
remoteSettings.config.getConfig("akka.remote.artery.advanced.materializer")) remoteSettings.config.getConfig("akka.remote.artery.advanced.materializer"))
materializer = ActorMaterializer(materializerSettings)(system) materializer = ActorMaterializer.systemMaterializer(materializerSettings, "remote", system)
messageDispatcher = new MessageDispatcher(system, provider) messageDispatcher = new MessageDispatcher(system, provider)
topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData) topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData)
@ -563,23 +619,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
controlSubject = ctrl controlSubject = ctrl
// ordinary messages stream
controlSubject.attach(new ControlMessageObserver { controlSubject.attach(new ControlMessageObserver {
override def notify(inboundEnvelope: InboundEnvelope): Unit = { override def notify(inboundEnvelope: InboundEnvelope): Unit = {
inboundEnvelope.message match {
case Quarantined(from, to) if to == localAddress
val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address)
publishLifecycleEvent(lifecycleEvent)
// quarantine the other system from here
association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid))
case _ // not interesting
}
}
})
// compression messages
controlSubject.attach(new ControlMessageObserver {
override def notify(inboundEnvelope: InboundEnvelope): Unit =
inboundEnvelope.message match { inboundEnvelope.message match {
case m: CompressionMessage case m: CompressionMessage
m match { m match {
@ -593,8 +634,22 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
association(from.address).compression.applyClassManifestCompressionTable(table) association(from.address).compression.applyClassManifestCompressionTable(table)
system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table))
} }
case _ // not interested in non CompressionMessages
case Quarantined(from, to) if to == localAddress
val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address)
publishLifecycleEvent(lifecycleEvent)
// quarantine the other system from here
association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid))
case _: ActorSystemTerminating
inboundEnvelope.sender match {
case OptionVal.Some(snd) snd.tell(ActorSystemTerminatingAck(localAddress), ActorRef.noSender)
case OptionVal.None log.error("Expected sender for ActorSystemTerminating message")
}
case _ // not interesting
} }
}
}) })
attachStreamRestart("Inbound control stream", completed, () runInboundControlStream(compression)) attachStreamRestart("Inbound control stream", completed, () runInboundControlStream(compression))
@ -661,28 +716,42 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def shutdown(): Future[Done] = { override def shutdown(): Future[Done] = {
_shutdown = true _shutdown = true
killSwitch.shutdown() val allAssociations = associationRegistry.allAssociations
topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) val flushing: Future[Done] =
if (taskRunner != null) { if (allAssociations.isEmpty) Future.successful(Done)
taskRunner.stop() else {
topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) val flushingPromise = Promise[Done]()
system.systemActorOf(FlushOnShutdown.props(flushingPromise, shutdownFlushTimeout,
this, allAssociations).withDispatcher(remoteSettings.Dispatcher), "remoteFlushOnShutdown")
flushingPromise.future
}
implicit val ec = remoteDispatcher
flushing.recover { case _ Done }.map { _
killSwitch.shutdown()
topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData)
if (taskRunner != null) {
taskRunner.stop()
topLevelFREvents.loFreq(Transport_Stopped, NoMetaData)
}
if (aeronErrorLogTask != null) {
aeronErrorLogTask.cancel()
topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
}
if (aeron != null) aeron.close()
if (mediaDriver.isDefined) {
stopMediaDriver()
topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData)
}
topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData)
flightRecorder.close()
afrFileChannel.force(true)
afrFileChannel.close()
// TODO: Be smarter about this in tests and make it always-on-for prod
afrFlie.delete()
Done
} }
if (aeronErrorLogTask != null) {
aeronErrorLogTask.cancel()
topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
}
if (aeron != null) aeron.close()
if (mediaDriver.isDefined) {
stopMediaDriver()
topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData)
}
topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData)
flightRecorder.close()
afrFileChannel.force(true)
afrFileChannel.close()
// TODO: Be smarter about this in tests and make it always-on-for prod
afrFlie.delete()
Future.successful(Done)
} }
private[remote] def isShutdown: Boolean = _shutdown private[remote] def isShutdown: Boolean = _shutdown
@ -742,7 +811,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval))
.via(encoder(compression)) .via(encoder(compression))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right)
} }
def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = { def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = {
@ -750,7 +819,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval))
.via(createEncoder(largeEnvelopePool, compression)) .via(createEncoder(largeEnvelopePool, compression))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner,
envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right)
} }
def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, (OutboundControlIngress, Future[Done])] = { def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, (OutboundControlIngress, Future[Done])] = {
@ -761,7 +830,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right)
.via(encoder(compression)) .via(encoder(compression))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,
envelopePool, Duration.Inf, flightRecorder.createEventSink()))(Keep.both) envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both)
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
} }
@ -780,7 +849,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] =
Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool,
flightRecorder.createEventSink())) createFlightRecorderEventSink()))
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender) messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender)

View file

@ -171,7 +171,7 @@ private[remote] class Association(
if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) {
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
message match { message match {
case _: SystemMessage | ClearSystemMessageDelivery case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage
val send = Send(message, sender, recipient, None) val send = Send(message, sender, recipient, None)
if (!controlQueue.offer(send)) { if (!controlQueue.offer(send)) {
quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]")

View file

@ -37,6 +37,16 @@ private[akka] trait ControlMessage
*/ */
private[akka] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage // FIXME serialization private[akka] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage // FIXME serialization
/**
* INTERNAL API
*/
private[akka] case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage // FIXME serialization
/**
* INTERNAL API
*/
private[akka] case class ActorSystemTerminatingAck(from: UniqueAddress) // FIXME serialization
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -169,6 +169,14 @@ private[akka] class SystemMessageDelivery(
case s @ Send(ClearSystemMessageDelivery, _, _, _) case s @ Send(ClearSystemMessageDelivery, _, _, _)
clear() clear()
pull(in) pull(in)
case s @ Send(msg: ControlMessage, _, _, _)
// e.g. ActorSystemTerminating, no need for acked delivery
if (resending.isEmpty && isAvailable(out))
push(out, s)
else {
resending.offer(s)
tryResend()
}
case s @ Send(msg: AnyRef, _, _, _) case s @ Send(msg: AnyRef, _, _, _)
if (unacknowledged.size < maxBufferSize) { if (unacknowledged.size < maxBufferSize) {
seqNo += 1 seqNo += 1

View file

@ -35,6 +35,7 @@ import akka.util.OptionVal
object SystemMessageDeliverySpec { object SystemMessageDeliverySpec {
val config = ConfigFactory.parseString(s""" val config = ConfigFactory.parseString(s"""
akka.loglevel=INFO
akka { akka {
actor.provider = remote actor.provider = remote
remote.artery.enabled = on remote.artery.enabled = on
@ -108,7 +109,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
"System messages" must { "System messages" must {
"be delivered with real actors" in { "be delivered with real actors" in {
val actorOnSystemB = systemB.actorOf(TestActors.echoActorProps, "echo") systemB.actorOf(TestActors.echoActorProps, "echo")
val remoteRef = { val remoteRef = {
system.actorSelection(rootB / "user" / "echo") ! Identify(None) system.actorSelection(rootB / "user" / "echo") ! Identify(None)
@ -120,6 +121,30 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
expectTerminated(remoteRef) expectTerminated(remoteRef)
} }
"be flushed on shutdown" in {
val systemC = ActorSystem("systemC", system.settings.config)
try {
systemC.actorOf(TestActors.echoActorProps, "echo")
val addressC = RARP(systemC).provider.getDefaultAddress
val rootC = RootActorPath(addressC)
val remoteRef = {
system.actorSelection(rootC / "user" / "echo") ! Identify(None)
expectMsgType[ActorIdentity].ref.get
}
watch(remoteRef)
remoteRef ! "hello"
expectMsg("hello")
systemC.terminate()
// DeathWatchNotification is sent from systemC, failure detection takes longer than 3 seconds
expectTerminated(remoteRef, 5.seconds)
} finally {
shutdown(systemC)
}
}
"be resent when some in the middle are lost" in { "be resent when some in the middle are lost" in {
val replyProbe = TestProbe() val replyProbe = TestProbe()
val controlSubject = new TestControlMessageSubject val controlSubject = new TestControlMessageSubject

View file

@ -22,7 +22,7 @@ object HandshakeShouldDropCompressionTableSpec {
val commonConfig = ConfigFactory.parseString(s""" val commonConfig = ConfigFactory.parseString(s"""
akka { akka {
loglevel = INFO loglevel = INFO
actor.provider = "akka.remote.RemoteActorRefProvider" actor.provider = "akka.remote.RemoteActorRefProvider"
remote.artery.enabled = on remote.artery.enabled = on
remote.artery.hostname = localhost remote.artery.hostname = localhost
@ -63,8 +63,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
// listen for compression table events // listen for compression table events
val aProbe = TestProbe() val aProbe = TestProbe()
val a1Probe = TestProbe() val a1Probe = TestProbe()
val aNew2Probe = TestProbe() val b1Probe = TestProbe()(systemB)
val b1Probe = TestProbe()
system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event]) system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event])
systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event]) systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event])
@ -91,16 +90,20 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
Thread.sleep(5000) Thread.sleep(5000)
log.warning("SYSTEM READY {}...", systemB) log.warning("SYSTEM READY {}...", systemB)
val aNewProbe = TestProbe()
system.eventStream.subscribe(aNewProbe.ref, classOf[CompressionProtocol.Events.Event])
systemB.actorOf(TestActors.blackholeProps, "void") // start it again systemB.actorOf(TestActors.blackholeProps, "void") // start it again
(1 to messagesToExchange).foreach { i voidSel ! "hello" } // does not reply, but a hot receiver should be advertised (1 to messagesToExchange).foreach { i voidSel ! "hello" } // does not reply, but a hot receiver should be advertised
// compression triggered again // compression triggered again
val a2 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) val a2 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds)
info("System [A] received: " + a2) info("System [A] received: " + a2)
assertCompression[ActorRef](a2.table, 1, _.toString should include(testActor.path.name)) assertCompression[ActorRef](a2.table, 1, _.toString should include(testActor.path.name))
val aNew2Probe = TestProbe()
(1 to messagesToExchange).foreach { i voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised (1 to messagesToExchange).foreach { i voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised
// compression triggered again // compression triggered again
val a3 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) val a3 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds)
info("Received second compression: " + a3) info("Received second compression: " + a3)
assertCompression[ActorRef](a3.table, 2, _.toString should include(aNew2Probe.ref.path.name)) assertCompression[ActorRef](a3.table, 2, _.toString should include(aNew2Probe.ref.path.name))
} }

View file

@ -79,6 +79,22 @@ object ActorMaterializer {
def apply(materializerSettings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = def apply(materializerSettings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer =
apply(Some(materializerSettings), None) apply(Some(materializerSettings), None)
/**
* INTERNAL API: Creates the `StreamSupervisor` as a system actor.
*/
private[akka] def systemMaterializer(materializerSettings: ActorMaterializerSettings, namePrefix: String,
system: ExtendedActorSystem): ActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
new ActorMaterializerImpl(
system,
materializerSettings,
system.dispatchers,
system.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown)
.withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()),
haveShutDown,
FlowNames(system).name.copy(namePrefix))
}
/** /**
* Java API: Creates a ActorMaterializer which will execute every step of a transformation * Java API: Creates a ActorMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]