From 9fd359042a2264e6645a7d3b3ab014fb164282a2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 6 Sep 2016 16:57:30 +0200 Subject: [PATCH] add missing killSwitch for parallel outbound lanes, #21381 * it caused the shutdown to stall, since the part after MergeHub was never stopped * tear down parts upstream and downstream of the hub toghether --- .../akka/remote/artery/ArteryTransport.scala | 8 ++++++++ .../scala/akka/remote/artery/Association.scala | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index ba5fd1f4b1..a4d02673aa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -607,7 +607,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R completed } else { + val hubKillSwitch = KillSwitches.shared("hubKillSwitch") val source = aeronSource(ordinaryStreamId, envelopeBufferPool) + .via(hubKillSwitch.flow) .via(inboundFlow(compression)) .map(env ⇒ (env.recipient, env)) @@ -642,6 +644,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R import system.dispatcher val completed = Future.sequence(completedValues).map(_ ⇒ Done) + // tear down the upstream hub part if downstream lane fails + // lanes are not completed with success by themselves so we don't have to care about onSuccess + completed.onFailure { + case reason: Throwable ⇒ hubKillSwitch.abort(reason) + } + completed } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index cd361840d0..92608b00e4 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -45,6 +45,9 @@ import akka.remote.artery.compress.CompressionProtocol._ import akka.stream.scaladsl.MergeHub import akka.actor.DeadLetter import java.util.concurrent.atomic.AtomicBoolean +import akka.stream.KillSwitches +import scala.util.Failure +import scala.util.Success /** * INTERNAL API @@ -465,7 +468,10 @@ private[remote] class Association( wrapper }.toVector + val laneKillSwitch = KillSwitches.shared("outboundLaneKillSwitch") + val lane = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .via(laneKillSwitch.flow) .viaMat(transport.outboundTestFlow(this))(Keep.both) .viaMat(transport.outboundLane(this))(Keep.both) .watchTermination()(Keep.both) @@ -473,7 +479,9 @@ private[remote] class Association( case (((q, m), c), w) ⇒ ((q, m), (c, w)) } - val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer].toMat(transport.aeronSink(this))(Keep.both).run()(materializer) + val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer] + .via(laneKillSwitch.flow) + .toMat(transport.aeronSink(this))(Keep.both).run()(materializer) val values: Vector[((SendQueue.QueueValue[OutboundEnvelope], TestManagementApi), (Encoder.ChangeOutboundCompression, Future[Done]))] = (0 until outboundLanes).map { _ ⇒ @@ -490,6 +498,12 @@ private[remote] class Association( import transport.system.dispatcher val completed = Future.sequence(laneCompletedValues).flatMap(_ ⇒ aeronSinkCompleted) + // tear down all parts if one part fails or completes + completed.onFailure { + case reason: Throwable ⇒ laneKillSwitch.abort(reason) + } + (laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ laneKillSwitch.shutdown() }) + queueValues.zip(wrappers).zipWithIndex.foreach { case ((q, w), i) ⇒ q.inject(w.queue)