Merge pull request #21383 from akka/wip-21381-killSwitch-patriknw

add missing killSwitch for parallel outbound lanes, #21381
This commit is contained in:
Patrik Nordwall 2016-09-07 11:15:01 +02:00 committed by GitHub
commit f1e4e7a657
2 changed files with 23 additions and 1 deletions

View file

@ -610,7 +610,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
completed
} else {
val hubKillSwitch = KillSwitches.shared("hubKillSwitch")
val source = aeronSource(ordinaryStreamId, envelopeBufferPool)
.via(hubKillSwitch.flow)
.via(inboundFlow(compression))
.map(env (env.recipient, env))
@ -645,6 +647,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
import system.dispatcher
val completed = Future.sequence(completedValues).map(_ Done)
// tear down the upstream hub part if downstream lane fails
// lanes are not completed with success by themselves so we don't have to care about onSuccess
completed.onFailure {
case reason: Throwable hubKillSwitch.abort(reason)
}
completed
}

View file

@ -45,6 +45,9 @@ import akka.remote.artery.compress.CompressionProtocol._
import akka.stream.scaladsl.MergeHub
import akka.actor.DeadLetter
import java.util.concurrent.atomic.AtomicBoolean
import akka.stream.KillSwitches
import scala.util.Failure
import scala.util.Success
/**
* INTERNAL API
@ -465,7 +468,10 @@ private[remote] class Association(
wrapper
}.toVector
val laneKillSwitch = KillSwitches.shared("outboundLaneKillSwitch")
val lane = Source.fromGraph(new SendQueue[OutboundEnvelope])
.via(laneKillSwitch.flow)
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.viaMat(transport.outboundLane(this))(Keep.both)
.watchTermination()(Keep.both)
@ -473,7 +479,9 @@ private[remote] class Association(
case (((q, m), c), w) ((q, m), (c, w))
}
val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer].toMat(transport.aeronSink(this))(Keep.both).run()(materializer)
val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer]
.via(laneKillSwitch.flow)
.toMat(transport.aeronSink(this))(Keep.both).run()(materializer)
val values: Vector[((SendQueue.QueueValue[OutboundEnvelope], TestManagementApi), (Encoder.ChangeOutboundCompression, Future[Done]))] =
(0 until outboundLanes).map { _
@ -490,6 +498,12 @@ private[remote] class Association(
import transport.system.dispatcher
val completed = Future.sequence(laneCompletedValues).flatMap(_ aeronSinkCompleted)
// tear down all parts if one part fails or completes
completed.onFailure {
case reason: Throwable laneKillSwitch.abort(reason)
}
(laneCompletedValues :+ aeronSinkCompleted).foreach(_.onSuccess { case _ laneKillSwitch.shutdown() })
queueValues.zip(wrappers).zipWithIndex.foreach {
case ((q, w), i)
q.inject(w.queue)