diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala index 0b9331b8e4..318b71a4ad 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala @@ -99,9 +99,6 @@ class RemoteRandomSpec(multiNodeConfig: RemoteRandomConfig) extends MultiNodeSpe // "Terminate" to a shut down node system.stop(actor) enterBarrier("done") - - // FIXME this test has problems shutting down actor system when running with Artery - // [akka.actor.ActorSystemImpl(RemoteRandomSpec)] Failed to stop [RemoteRandomSpec] within [5 seconds] } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 10417fdc2f..4e1c158f69 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -69,6 +69,8 @@ import io.aeron.driver.ThreadingMode import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.concurrent.BusySpinIdleStrategy import scala.util.control.NonFatal +import akka.actor.Props +import akka.actor.Actor /** * INTERNAL API @@ -313,6 +315,50 @@ private[akka] trait OutboundContext { def dummyRecipient: RemoteActorRef } +/** + * INTERNAL API + */ +private[remote] object FlushOnShutdown { + def props(done: Promise[Done], timeout: FiniteDuration, + inboundContext: InboundContext, associations: Set[Association]): Props = { + require(associations.nonEmpty) + Props(new FlushOnShutdown(done, timeout, inboundContext, associations)) + } + + case object Timeout +} + +/** + * INTERNAL API + */ +private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDuration, + inboundContext: InboundContext, associations: Set[Association]) extends Actor { + + var remaining = associations.flatMap(_.associationState.uniqueRemoteAddressValue) + + val timeoutTask = context.system.scheduler.scheduleOnce(timeout, self, FlushOnShutdown.Timeout)(context.dispatcher) + + override def preStart(): Unit = { + val msg = ActorSystemTerminating(inboundContext.localAddress) + associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), a.dummyRecipient) } + } + + override def postStop(): Unit = + timeoutTask.cancel() + + def receive = { + case ActorSystemTerminatingAck(from) ⇒ + remaining -= from + if (remaining.isEmpty) { + done.trySuccess(Done) + context.stop(self) + } + case FlushOnShutdown.Timeout ⇒ + done.trySuccess(Done) + context.stop(self) + } +} + /** * INTERNAL API */ @@ -352,6 +398,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R "handshake-timeout must be > 0") private val injectHandshakeInterval: FiniteDuration = 1.second private val giveUpSendAfter: FiniteDuration = 60.seconds + private val shutdownFlushTimeout = 1.second + + private val remoteDispatcher = system.dispatchers.lookup(remoteSettings.Dispatcher) private val largeMessageDestinations = system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry) ⇒ @@ -380,8 +429,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val (afrFileChannel, afrFlie, flightRecorder) = initializeFlightRecorder() + def createFlightRecorderEventSink(): EventSink = { + // FIXME there is some concurrency issue with the FlightRecorder, when shutting down. + // It crashes the JVM. + // flightRecorder.createEventSink() + IgnoreEventSink + } + // !!! WARNING !!! This is *NOT* thread safe, - private val topLevelFREvents = flightRecorder.createEventSink() + private val topLevelFREvents = createFlightRecorderEventSink() private val associationRegistry = new AssociationRegistry( remoteAddress ⇒ new Association(this, materializer, remoteAddress, controlSubject, largeMessageDestinations)) @@ -413,7 +469,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val materializerSettings = ActorMaterializerSettings( remoteSettings.config.getConfig("akka.remote.artery.advanced.materializer")) - materializer = ActorMaterializer(materializerSettings)(system) + materializer = ActorMaterializer.systemMaterializer(materializerSettings, "remote", system) messageDispatcher = new MessageDispatcher(system, provider) topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData) @@ -563,23 +619,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R controlSubject = ctrl - // ordinary messages stream controlSubject.attach(new ControlMessageObserver { override def notify(inboundEnvelope: InboundEnvelope): Unit = { - inboundEnvelope.message match { - case Quarantined(from, to) if to == localAddress ⇒ - val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address) - publishLifecycleEvent(lifecycleEvent) - // quarantine the other system from here - association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid)) - case _ ⇒ // not interesting - } - } - }) - - // compression messages - controlSubject.attach(new ControlMessageObserver { - override def notify(inboundEnvelope: InboundEnvelope): Unit = inboundEnvelope.message match { case m: CompressionMessage ⇒ m match { @@ -593,8 +634,22 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R association(from.address).compression.applyClassManifestCompressionTable(table) system.eventStream.publish(CompressionProtocol.Events.ReceivedCompressionTable(from, table)) } - case _ ⇒ // not interested in non CompressionMessages + + case Quarantined(from, to) if to == localAddress ⇒ + val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address) + publishLifecycleEvent(lifecycleEvent) + // quarantine the other system from here + association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid)) + + case _: ActorSystemTerminating ⇒ + inboundEnvelope.sender match { + case OptionVal.Some(snd) ⇒ snd.tell(ActorSystemTerminatingAck(localAddress), ActorRef.noSender) + case OptionVal.None ⇒ log.error("Expected sender for ActorSystemTerminating message") + } + + case _ ⇒ // not interesting } + } }) attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream(compression)) @@ -661,28 +716,42 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def shutdown(): Future[Done] = { _shutdown = true - killSwitch.shutdown() - topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) - if (taskRunner != null) { - taskRunner.stop() - topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + val allAssociations = associationRegistry.allAssociations + val flushing: Future[Done] = + if (allAssociations.isEmpty) Future.successful(Done) + else { + val flushingPromise = Promise[Done]() + system.systemActorOf(FlushOnShutdown.props(flushingPromise, shutdownFlushTimeout, + this, allAssociations).withDispatcher(remoteSettings.Dispatcher), "remoteFlushOnShutdown") + flushingPromise.future + } + implicit val ec = remoteDispatcher + flushing.recover { case _ ⇒ Done }.map { _ ⇒ + killSwitch.shutdown() + + topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) + if (taskRunner != null) { + taskRunner.stop() + topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + } + if (aeronErrorLogTask != null) { + aeronErrorLogTask.cancel() + topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) + } + if (aeron != null) aeron.close() + if (mediaDriver.isDefined) { + stopMediaDriver() + topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) + } + topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData) + + flightRecorder.close() + afrFileChannel.force(true) + afrFileChannel.close() + // TODO: Be smarter about this in tests and make it always-on-for prod + afrFlie.delete() + Done } - if (aeronErrorLogTask != null) { - aeronErrorLogTask.cancel() - topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) - } - if (aeron != null) aeron.close() - if (mediaDriver.isDefined) { - stopMediaDriver() - topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) - } - topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData) - flightRecorder.close() - afrFileChannel.force(true) - afrFileChannel.close() - // TODO: Be smarter about this in tests and make it always-on-for prod - afrFlie.delete() - Future.successful(Done) } private[remote] def isShutdown: Boolean = _shutdown @@ -742,7 +811,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, - envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) + envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) } def outboundLarge(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, Future[Done]] = { @@ -750,7 +819,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(new OutboundHandshake(system, outboundContext, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(createEncoder(largeEnvelopePool, compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, - envelopePool, giveUpSendAfter, flightRecorder.createEventSink()))(Keep.right) + envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) } def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[Send, (OutboundControlIngress, Future[Done])] = { @@ -761,7 +830,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, - envelopePool, Duration.Inf, flightRecorder.createEventSink()))(Keep.both) + envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } @@ -780,7 +849,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, NotUsed] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, - flightRecorder.createEventSink())) + createFlightRecorderEventSink())) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m.recipient.get, m.recipientAddress, m.message, m.sender) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 523da7ff8f..af5660fd21 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -171,7 +171,7 @@ private[remote] class Association( if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { - case _: SystemMessage | ClearSystemMessageDelivery ⇒ + case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage ⇒ val send = Send(message, sender, recipient, None) if (!controlQueue.offer(send)) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index c88d63a43a..f22a7a5d3c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -37,6 +37,16 @@ private[akka] trait ControlMessage */ private[akka] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage // FIXME serialization +/** + * INTERNAL API + */ +private[akka] case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage // FIXME serialization + +/** + * INTERNAL API + */ +private[akka] case class ActorSystemTerminatingAck(from: UniqueAddress) // FIXME serialization + /** * INTERNAL API */ diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 12bb70cec0..320c00603b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -169,6 +169,14 @@ private[akka] class SystemMessageDelivery( case s @ Send(ClearSystemMessageDelivery, _, _, _) ⇒ clear() pull(in) + case s @ Send(msg: ControlMessage, _, _, _) ⇒ + // e.g. ActorSystemTerminating, no need for acked delivery + if (resending.isEmpty && isAvailable(out)) + push(out, s) + else { + resending.offer(s) + tryResend() + } case s @ Send(msg: AnyRef, _, _, _) ⇒ if (unacknowledged.size < maxBufferSize) { seqNo += 1 diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 5df50600d1..2ebedb00c3 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -35,6 +35,7 @@ import akka.util.OptionVal object SystemMessageDeliverySpec { val config = ConfigFactory.parseString(s""" + akka.loglevel=INFO akka { actor.provider = remote remote.artery.enabled = on @@ -108,7 +109,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi "System messages" must { "be delivered with real actors" in { - val actorOnSystemB = systemB.actorOf(TestActors.echoActorProps, "echo") + systemB.actorOf(TestActors.echoActorProps, "echo") val remoteRef = { system.actorSelection(rootB / "user" / "echo") ! Identify(None) @@ -120,6 +121,30 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi expectTerminated(remoteRef) } + "be flushed on shutdown" in { + val systemC = ActorSystem("systemC", system.settings.config) + try { + systemC.actorOf(TestActors.echoActorProps, "echo") + + val addressC = RARP(systemC).provider.getDefaultAddress + val rootC = RootActorPath(addressC) + + val remoteRef = { + system.actorSelection(rootC / "user" / "echo") ! Identify(None) + expectMsgType[ActorIdentity].ref.get + } + + watch(remoteRef) + remoteRef ! "hello" + expectMsg("hello") + systemC.terminate() + // DeathWatchNotification is sent from systemC, failure detection takes longer than 3 seconds + expectTerminated(remoteRef, 5.seconds) + } finally { + shutdown(systemC) + } + } + "be resent when some in the middle are lost" in { val replyProbe = TestProbe() val controlSubject = new TestControlMessageSubject diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala index 433c2c5acc..673f70cbcc 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/HandshakeShouldDropCompressionTableSpec.scala @@ -22,7 +22,7 @@ object HandshakeShouldDropCompressionTableSpec { val commonConfig = ConfigFactory.parseString(s""" akka { loglevel = INFO - + actor.provider = "akka.remote.RemoteActorRefProvider" remote.artery.enabled = on remote.artery.hostname = localhost @@ -63,8 +63,7 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr // listen for compression table events val aProbe = TestProbe() val a1Probe = TestProbe() - val aNew2Probe = TestProbe() - val b1Probe = TestProbe() + val b1Probe = TestProbe()(systemB) system.eventStream.subscribe(aProbe.ref, classOf[CompressionProtocol.Events.Event]) systemB.eventStream.subscribe(b1Probe.ref, classOf[CompressionProtocol.Events.Event]) @@ -91,16 +90,20 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr Thread.sleep(5000) log.warning("SYSTEM READY {}...", systemB) + val aNewProbe = TestProbe() + system.eventStream.subscribe(aNewProbe.ref, classOf[CompressionProtocol.Events.Event]) + systemB.actorOf(TestActors.blackholeProps, "void") // start it again (1 to messagesToExchange).foreach { i ⇒ voidSel ! "hello" } // does not reply, but a hot receiver should be advertised // compression triggered again - val a2 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) + val a2 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("System [A] received: " + a2) assertCompression[ActorRef](a2.table, 1, _.toString should include(testActor.path.name)) + val aNew2Probe = TestProbe() (1 to messagesToExchange).foreach { i ⇒ voidSel.tell("hello", aNew2Probe.ref) } // does not reply, but a hot receiver should be advertised // compression triggered again - val a3 = aProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) + val a3 = aNewProbe.expectMsgType[Events.ReceivedCompressionTable[ActorRef]](10.seconds) info("Received second compression: " + a3) assertCompression[ActorRef](a3.table, 2, _.toString should include(aNew2Probe.ref.path.name)) } diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 1e7ede7b60..0c2d7cebaa 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -79,6 +79,22 @@ object ActorMaterializer { def apply(materializerSettings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = apply(Some(materializerSettings), None) + /** + * INTERNAL API: Creates the `StreamSupervisor` as a system actor. + */ + private[akka] def systemMaterializer(materializerSettings: ActorMaterializerSettings, namePrefix: String, + system: ExtendedActorSystem): ActorMaterializer = { + val haveShutDown = new AtomicBoolean(false) + new ActorMaterializerImpl( + system, + materializerSettings, + system.dispatchers, + system.systemActorOf(StreamSupervisor.props(materializerSettings, haveShutDown) + .withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), + haveShutDown, + FlowNames(system).name.copy(namePrefix)) + } + /** * Java API: Creates a ActorMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]