diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 079de2f5f1..aaa899cac7 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -59,6 +59,10 @@ abstract class MultiNodeConfig { log-received-messages = on log-sent-messages = on } + akka.remote.artery { + log-received-messages = on + log-sent-messages = on + } akka.actor.debug { receive = on fsm = on diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 2e17a80c19..7a0407471c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -426,6 +426,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private lazy val shutdownHook = new Thread { override def run(): Unit = { if (hasBeenShutdown.compareAndSet(false, true)) { + log.debug("Shutting down [{}] via shutdownHook", localAddress) Await.result(internalShutdown(), 20.seconds) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 106d7ddc9d..8226b6c1f1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -191,7 +191,9 @@ private[remote] class Association( materializing.await(10, TimeUnit.SECONDS) _outboundControlIngress match { case OptionVal.Some(o) ⇒ o - case OptionVal.None ⇒ throw new IllegalStateException("outboundControlIngress not initialized yet") + case OptionVal.None ⇒ + if (transport.isShutdown) throw ShuttingDown + else throw new IllegalStateException("outboundControlIngress not initialized yet") } } } @@ -265,13 +267,14 @@ private[remote] class Association( // OutboundContext override def sendControl(message: ControlMessage): Unit = { try { - if (!transport.isShutdown) + if (!transport.isShutdown) { if (associationState.isQuarantined()) { log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message), remoteAddress) startIdleTimer() } - outboundControlIngress.sendControlMessage(message) + outboundControlIngress.sendControlMessage(message) + } } catch { case ShuttingDown ⇒ // silence it } @@ -624,11 +627,16 @@ private[remote] class Association( implicit val ec = materializer.executionContext updateStreamCompletion(streamName, (streamKillSwitch, streamCompleted.recover { case _ ⇒ Done })) streamCompleted.onFailure { - case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected - case _: AeronTerminated ⇒ // shutdown already in progress + case ArteryTransport.ShutdownSignal ⇒ + // shutdown as expected + // countDown the latch in case threads are waiting on the latch in outboundControlIngress method + materializing.countDown() + case _: AeronTerminated ⇒ // shutdown already in progress case cause if transport.isShutdown ⇒ // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) + // countDown the latch in case threads are waiting on the latch in outboundControlIngress method + materializing.countDown() case _: AbruptTerminationException ⇒ // ActorSystem shutdown case OutboundStreamStopSignal ⇒ // stop as expected due to quarantine