diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index bc8d27df53..fb9957a401 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -610,7 +610,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R completed } else { + val hubKillSwitch = KillSwitches.shared("hubKillSwitch") val source = aeronSource(ordinaryStreamId, envelopeBufferPool) + .via(hubKillSwitch.flow) .via(inboundFlow(compression)) .map(env ⇒ (env.recipient, env)) @@ -645,6 +647,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R import system.dispatcher val completed = Future.sequence(completedValues).map(_ ⇒ Done) + // tear down the upstream hub part if downstream lane fails + // lanes are not completed with success by themselves so we don't have to care about onSuccess + completed.onFailure { + case reason: Throwable ⇒ hubKillSwitch.abort(reason) + } + completed } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 0456ceb62b..376a0e5dce 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -45,6 +45,9 @@ import akka.remote.artery.compress.CompressionProtocol._ import akka.stream.scaladsl.MergeHub import akka.actor.DeadLetter import java.util.concurrent.atomic.AtomicBoolean +import akka.stream.KillSwitches +import scala.util.Failure +import scala.util.Success /** * INTERNAL API @@ -465,7 +468,10 @@ private[remote] class Association( wrapper }.toVector + val laneKillSwitch = KillSwitches.shared("outboundLaneKillSwitch") + val lane = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .via(laneKillSwitch.flow) .viaMat(transport.outboundTestFlow(this))(Keep.both) .viaMat(transport.outboundLane(this))(Keep.both) .watchTermination()(Keep.both) @@ -473,7 +479,9 @@ private[remote] class Association( case (((q, m), c), w) ⇒ ((q, m), (c, w)) } - val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer].toMat(transport.aeronSink(this))(Keep.both).run()(materializer) + val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer] + .via(laneKillSwitch.flow) + .toMat(transport.aeronSink(this))(Keep.both).run()(materializer) val values: Vector[((SendQueue.QueueValue[OutboundEnvelope], TestManagementApi), (Encoder.ChangeOutboundCompression, Future[Done]))] = (0 until outboundLanes).map { _ ⇒ @@ -490,6 +498,12 @@ private[remote] class Association( import transport.system.dispatcher val completed = Future.sequence(laneCompletedValues).flatMap(_ ⇒ aeronSinkCompleted) + // tear down all parts if one part fails or completes + completed.onFailure { + case reason: Throwable ⇒ laneKillSwitch.abort(reason) + } + (laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ ⇒ laneKillSwitch.shutdown() }) + queueValues.zip(wrappers).zipWithIndex.foreach { case ((q, w), i) ⇒ q.inject(w.queue)