Merge pull request #19895 from drewhk/wip-19892-killswitch-drewhk

#19892: KillSwitch (externally controllable stream completion)
This commit is contained in:
drewhk 2016-03-31 12:10:58 +02:00
commit cc5adc632c
6 changed files with 713 additions and 110 deletions

View file

@ -3,7 +3,7 @@
*/ */
package akka.http.impl.engine.ws package akka.http.impl.engine.ws
import scala.concurrent.Await import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import org.scalactic.ConversionCheckedTripleEquals import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
@ -18,7 +18,10 @@ import akka.stream.testkit._
import akka.stream.scaladsl.GraphDSL.Implicits._ import akka.stream.scaladsl.GraphDSL.Implicits._
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.Done
import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
import akka.util.ByteString import akka.util.ByteString
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.{ AkkaSpec, EventFilter } import akka.testkit.{ AkkaSpec, EventFilter }
@ -69,12 +72,35 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
val binding = Await.result(bindingFuture, 3.seconds) val binding = Await.result(bindingFuture, 3.seconds)
val myPort = binding.localAddress.getPort val myPort = binding.localAddress.getPort
val completeOnlySwitch: Flow[ByteString, ByteString, Promise[Done]] = Flow.fromGraph(
new GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Promise[Done]] {
override val shape: FlowShape[ByteString, ByteString] =
FlowShape(Inlet("completeOnlySwitch.in"), Outlet("completeOnlySwitch.out"))
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Promise[Done]) = {
val promise = Promise[Done]
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(shape.out, grab(shape.in))
override def onPull(): Unit = pull(shape.in)
override def preStart(): Unit = {
promise.future.foreach(_ getAsyncCallback[Done](_ complete(shape.out)).invoke(Done))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}
setHandlers(shape.in, shape.out, this)
}
(logic, promise)
}
})
val ((response, breaker), sink) = val ((response, breaker), sink) =
Source.empty Source.empty
.viaMat { .viaMat {
Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort))
.atop(TLSPlacebo()) .atop(TLSPlacebo())
.joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via( .joinMat(completeOnlySwitch.via(
Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both) Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both)
}(Keep.right) }(Keep.right)
.toMat(TestSink.probe[Message])(Keep.both) .toMat(TestSink.probe[Message])(Keep.both)
@ -85,7 +111,7 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
.request(10) .request(10)
.expectNoMsg(1500.millis) .expectNoMsg(1500.millis)
breaker.value.get.get.complete() breaker.trySuccess(Done)
source source
.sendNext(TextMessage("hello")) .sendNext(TextMessage("hello"))
@ -149,20 +175,20 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
val myPort = binding.localAddress.getPort val myPort = binding.localAddress.getPort
@volatile var messages = 0 @volatile var messages = 0
val (breaker, completion) = val (switch, completion) =
Source.maybe Source.maybe
.viaMat { .viaMat {
Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort))
.atop(TLSPlacebo()) .atop(TLSPlacebo())
// the resource leak of #19398 existed only for severed websocket connections // the resource leak of #19398 existed only for severed websocket connections
.atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right) .atopMat(KillSwitches.singleBidi[ByteString, ByteString])(Keep.right)
.join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)) .join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true))
}(Keep.right) }(Keep.right)
.toMat(Sink.foreach(_ messages += 1))(Keep.both) .toMat(Sink.foreach(_ messages += 1))(Keep.both)
.run() .run()
eventually(messages should ===(N)) eventually(messages should ===(N))
// breaker should have been fulfilled long ago // breaker should have been fulfilled long ago
breaker.value.get.get.completeAndCancel() switch.shutdown()
completion.futureValue completion.futureValue
binding.unbind() binding.unbind()

View file

@ -0,0 +1,102 @@
package akka.stream.javadsl;
import akka.Done;
import akka.stream.*;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.Utils;
import akka.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class KillSwitchTest extends StreamTest {
public KillSwitchTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest",
AkkaSpec.testConf());
@Test
public void beAbleToUseKillSwitch() throws Exception {
final TestPublisher.Probe<Integer> upstream = TestPublisher.probe(0, system);
final TestSubscriber.Probe<Integer> downstream = TestSubscriber.probe(system);
final SharedKillSwitch killSwitch = KillSwitches.shared("testSwitch");
final SharedKillSwitch k =
Source.fromPublisher(upstream)
.viaMat(killSwitch.flow(), Keep.right())
.to(Sink.fromSubscriber(downstream)).run(materializer);
final CompletionStage<Done> completionStage =
Source.single(1)
.via(killSwitch.flow())
.runWith(Sink.ignore(), materializer);
downstream.request(1);
upstream.sendNext(1);
downstream.expectNext(1);
assertEquals(killSwitch, k);
killSwitch.shutdown();
upstream.expectCancellation();
downstream.expectComplete();
assertEquals(completionStage.toCompletableFuture().get(3, TimeUnit.SECONDS), Done.getInstance());
}
@Test
public void beAbleToUseKillSwitchAbort() throws Exception {
final TestPublisher.Probe<Integer> upstream = TestPublisher.probe(0, system);
final TestSubscriber.Probe<Integer> downstream = TestSubscriber.probe(system);
final SharedKillSwitch killSwitch = KillSwitches.shared("testSwitch");
Source.fromPublisher(upstream)
.viaMat(killSwitch.flow(), Keep.right())
.runWith(Sink.fromSubscriber(downstream), materializer);
downstream.request(1);
upstream.sendNext(1);
downstream.expectNext(1);
final Exception te = new Utils.TE("Testy");
killSwitch.abort(te);
upstream.expectCancellation();
final Throwable te2 = downstream.expectError();
assertEquals(te, te2);
}
@Test
public void beAbleToUseSingleKillSwitch() throws Exception {
final TestPublisher.Probe<Integer> upstream = TestPublisher.probe(0, system);
final TestSubscriber.Probe<Integer> downstream = TestSubscriber.probe(system);
final Graph<FlowShape<Integer, Integer>, UniqueKillSwitch> killSwitchFlow = KillSwitches.single();
final UniqueKillSwitch killSwitch =
Source.fromPublisher(upstream)
.viaMat(killSwitchFlow, Keep.right())
.to(Sink.fromSubscriber(downstream)).run(materializer);
downstream.request(1);
upstream.sendNext(1);
downstream.expectNext(1);
killSwitch.shutdown();
upstream.expectCancellation();
downstream.expectComplete();
}
}

View file

@ -0,0 +1,307 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.Done
import akka.stream.{ ActorMaterializer, ClosedShape, KillSwitch, KillSwitches }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
import akka.testkit.{ AkkaSpec, EventFilter }
import scala.concurrent.duration._
class FlowKillSwitchSpec extends AkkaSpec {
implicit val mat = ActorMaterializer()
"A UniqueKillSwitch" must {
"stop a stream if requested" in {
val ((upstream, switch), downstream) =
TestSource.probe[Int].viaMat(KillSwitches.single)(Keep.both).toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch.shutdown()
upstream.expectCancellation()
downstream.expectComplete()
}
"fail a stream if requested" in {
val ((upstream, switch), downstream) =
TestSource.probe[Int].viaMat(KillSwitches.single)(Keep.both).toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch.abort(TE("Abort"))
upstream.expectCancellation()
downstream.expectError(TE("Abort"))
}
"work if used multiple times in a flow" in {
val (((upstream, switch1), switch2), downstream) =
TestSource.probe[Int]
.viaMat(KillSwitches.single)(Keep.both)
.recover { case TE(_) -1 }
.viaMat(KillSwitches.single)(Keep.both)
.toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch1.abort(TE("Abort"))
upstream.expectCancellation()
downstream.requestNext(-1)
switch2.shutdown()
downstream.expectComplete()
}
"ignore completion after already completed" in {
val ((upstream, switch), downstream) =
TestSource.probe[Int].viaMat(KillSwitches.single)(Keep.both).toMat(TestSink.probe)(Keep.both).run()
upstream.ensureSubscription()
downstream.ensureSubscription()
switch.shutdown()
upstream.expectCancellation()
downstream.expectComplete()
switch.abort(TE("Won't happen"))
upstream.expectNoMsg(100.millis)
downstream.expectNoMsg(100.millis)
}
}
"A SharedKillSwitches" must {
"stop a stream if requested" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch.shutdown()
upstream.expectCancellation()
downstream.expectComplete()
}
"fail a stream if requested" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch.abort(TE("Abort"))
upstream.expectCancellation()
downstream.expectError(TE("Abort"))
}
"pass through all elements unmodified" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
Source(1 to 100).via(switch.flow).runWith(Sink.seq).futureValue should ===(1 to 100)
}
"provide a flow that if materialized multiple times (with multiple types) stops all streams if requested" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
val (upstream1, downstream1) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
val (upstream2, downstream2) = TestSource.probe[String].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
downstream1.request(1)
upstream1.sendNext(1)
downstream1.expectNext(1)
downstream2.request(2)
upstream2.sendNext("A").sendNext("B")
downstream2.expectNext("A", "B")
switch.shutdown()
upstream1.expectCancellation()
upstream2.expectCancellation()
downstream1.expectComplete()
downstream2.expectComplete()
}
"provide a flow that if materialized multiple times (with multiple types) fails all streams if requested" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
val (upstream1, downstream1) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
val (upstream2, downstream2) = TestSource.probe[String].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
downstream1.request(1)
upstream1.sendNext(1)
downstream1.expectNext(1)
downstream2.request(2)
upstream2.sendNext("A").sendNext("B")
downstream2.expectNext("A", "B")
switch.abort(TE("Abort"))
upstream1.expectCancellation()
upstream2.expectCancellation()
downstream1.expectError(TE("Abort"))
downstream2.expectError(TE("Abort"))
}
"ignore subsequent aborts and shutdowns after shutdown" in {
val switch = KillSwitches.shared("switch")
val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch.shutdown()
upstream.expectCancellation()
downstream.expectComplete()
switch.shutdown()
upstream.expectNoMsg(100.millis)
downstream.expectNoMsg(100.millis)
switch.abort(TE("Abort"))
upstream.expectNoMsg(100.millis)
downstream.expectNoMsg(100.millis)
}
"ignore subsequent aborts and shutdowns after abort" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
switch.abort(TE("Abort"))
upstream.expectCancellation()
downstream.expectError(TE("Abort"))
switch.shutdown()
upstream.expectNoMsg(100.millis)
downstream.expectNoMsg(100.millis)
switch.abort(TE("Abort_Late"))
upstream.expectNoMsg(100.millis)
downstream.expectNoMsg(100.millis)
}
"complete immediately flows materialized after switch shutdown" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
switch.shutdown()
val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
upstream.expectCancellation()
downstream.expectSubscriptionAndComplete()
}
"fail immediately flows materialized after switch failure" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
switch.abort(TE("Abort"))
val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run()
upstream.expectCancellation()
downstream.expectSubscriptionAndError(TE("Abort"))
}
"should not cause problems if switch is shutdown after Flow completed normally" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
Source(1 to 10).via(switch.flow).runWith(Sink.seq).futureValue should ===(1 to 10)
switch.shutdown()
}
"provide flows that materialize to its owner KillSwitch" in assertAllStagesStopped {
val switch = KillSwitches.shared("switch")
val (switch2, completion) = Source.maybe[Int].viaMat(switch.flow)(Keep.right).toMat(Sink.ignore)(Keep.both).run()
(switch2 eq switch) should be(true)
switch2.shutdown()
completion.futureValue should ===(Done)
}
"not affect streams corresponding to another KillSwitch" in assertAllStagesStopped {
val switch1 = KillSwitches.shared("switch")
val switch2 = KillSwitches.shared("switch")
switch1 should !==(switch2)
val (upstream1, downstream1) = TestSource.probe[Int].via(switch1.flow).toMat(TestSink.probe)(Keep.both).run()
val (upstream2, downstream2) = TestSource.probe[Int].via(switch2.flow).toMat(TestSink.probe)(Keep.both).run()
downstream1.request(1)
upstream1.sendNext(1)
downstream1.expectNext(1)
downstream2.request(1)
upstream2.sendNext(2)
downstream2.expectNext(2)
switch1.shutdown()
upstream1.expectCancellation()
downstream1.expectComplete()
upstream2.expectNoMsg(100.millis)
downstream2.expectNoMsg(100.millis)
switch2.abort(TE("Abort"))
upstream1.expectNoMsg(100.millis)
downstream1.expectNoMsg(100.millis)
upstream2.expectCancellation()
downstream2.expectError(TE("Abort"))
}
"allow using multiple KillSwitch in one graph" in assertAllStagesStopped {
val switch1 = KillSwitches.shared("switch")
val switch2 = KillSwitches.shared("switch")
val downstream = RunnableGraph.fromGraph(GraphDSL.create(TestSink.probe[Int]) { implicit b
snk
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
Source.maybe[Int].via(switch1.flow) ~> merge ~> snk
Source.maybe[Int].via(switch2.flow) ~> merge
ClosedShape
}).run()
downstream.ensureSubscription()
downstream.expectNoMsg(100.millis)
switch1.shutdown()
downstream.expectNoMsg(100.millis)
switch2.shutdown()
downstream.expectComplete()
}
"use its name on the flows it hands out" in assertAllStagesStopped {
pending // toString does not work for me after rebase
val switch = KillSwitches.shared("mySwitchName")
switch.toString should ===("KillSwitch(mySwitchName)")
switch.flow.toString should ===("Flow(KillSwitchFlow(switch: mySwitchName))")
}
}
}

View file

@ -0,0 +1,259 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream
import akka.Done
import akka.stream.stage._
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
/**
* Creates shared or single kill switches which can be used to control completion of graphs from the outside.
* - The factory ``shared()`` returns a [[SharedKillSwitch]] which provides a [[Graph]] of [[FlowShape]] that can be
* used in arbitrary number of graphs and materializations. The switch simultaneously
* controls completion in all of those graphs.
* - The factory ``single()`` returns a [[Graph]] of [[FlowShape]] that materializes to a [[UniqueKillSwitch]]
* which is always unique
* to that materialized Flow itself.
*
* Creates a [[SharedKillSwitch]] that can be used to externally control the completion of various streams.
*
*/
object KillSwitches {
/**
* Creates a new [[SharedKillSwitch]] with the given name that can be used to control the completion of multiple
* streams from the outside simultaneously.
*
* @see SharedKillSwitch
*/
def shared(name: String): SharedKillSwitch = new SharedKillSwitch(name)
/**
* Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion
* of that unique materialization. Different materializations result in different, independent switches.
*
* For a Bidi version see [[KillSwitch#singleBidi]]
*/
def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch] =
UniqueKillSwitchStage.asInstanceOf[Graph[FlowShape[T, T], UniqueKillSwitch]]
/**
* Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion
* of that unique materialization. Different materializations result in different, independent switches.
*
* For a Flow version see [[KillSwitch#single]]
*/
def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch] =
UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]]
abstract class KillableGraphStageLogic(val terminationSignal: Future[Done], _shape: Shape) extends GraphStageLogic(_shape) {
override def preStart(): Unit = {
terminationSignal.value match {
case Some(status) onSwitch(status)
case _
// callback.invoke is a simple actor send, so it is fine to run on the invoking thread
terminationSignal.onComplete(getAsyncCallback[Try[Done]](onSwitch).invoke)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}
}
private def onSwitch(mode: Try[Done]): Unit = mode match {
case Success(_) completeStage()
case Failure(ex) failStage(ex)
}
}
private[stream] object UniqueKillSwitchStage extends GraphStageWithMaterializedValue[FlowShape[Any, Any], UniqueKillSwitch] {
override val initialAttributes = Attributes.name("breaker")
override val shape = FlowShape(Inlet[Any]("KillSwitch.in"), Outlet[Any]("KillSwitch.out"))
override def toString: String = "UniqueKillSwitchFlow"
override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Done]
val switch = new UniqueKillSwitch(promise)
val logic = new KillableGraphStageLogic(promise.future, shape) with InHandler with OutHandler {
override def onPush(): Unit = push(shape.out, grab(shape.in))
override def onPull(): Unit = pull(shape.in)
setHandler(shape.in, this)
setHandler(shape.out, this)
}
(logic, switch)
}
}
private[stream] object UniqueBidiKillSwitchStage extends GraphStageWithMaterializedValue[BidiShape[Any, Any, Any, Any], UniqueKillSwitch] {
override val initialAttributes = Attributes.name("breaker")
override val shape = BidiShape(
Inlet[Any]("KillSwitchBidi.in1"), Outlet[Any]("KillSwitchBidi .out1"),
Inlet[Any]("KillSwitchBidi.in2"), Outlet[Any]("KillSwitchBidi.out2"))
override def toString: String = "UniqueKillSwitchBidi"
override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Done]
val switch = new UniqueKillSwitch(promise)
val logic = new KillableGraphStageLogic(promise.future, shape) {
setHandler(shape.in1, new InHandler {
override def onPush(): Unit = push(shape.out1, grab(shape.in1))
override def onUpstreamFinish(): Unit = complete(shape.out1)
override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out1, ex)
})
setHandler(shape.in2, new InHandler {
override def onPush(): Unit = push(shape.out2, grab(shape.in2))
override def onUpstreamFinish(): Unit = complete(shape.out2)
override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out2, ex)
})
setHandler(shape.out1, new OutHandler {
override def onPull(): Unit = pull(shape.in1)
override def onDownstreamFinish(): Unit = cancel(shape.in1)
})
setHandler(shape.out2, new OutHandler {
override def onPull(): Unit = pull(shape.in2)
override def onDownstreamFinish(): Unit = cancel(shape.in2)
})
}
(logic, switch)
}
}
}
/**
* A [[KillSwitch]] allows completion of [[Graph]]s from the outside by completing [[Graph]]s of [[FlowShape]] linked
* to the switch. Depending on whether the [[KillSwitch]] is a [[UniqueKillSwitch]] or a [[SharedKillSwitch]] one or
* multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of
* this interface.
*/
trait KillSwitch {
/**
* After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally.
*/
def shutdown(): Unit
/**
* After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed.
*/
def abort(ex: Throwable): Unit
}
/**
* A [[UniqueKillSwitch]] is always a result of a materialization (unlike [[SharedKillSwitch]] which is constructed
* before any materialization) and it always controls that graph and stage which yielded the materialized value.
*
* After calling [[UniqueKillSwitch#shutdown()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will complete its downstream and cancel its upstream (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of completion commands will be ignored.
*
* After calling [[UniqueKillSwitch#abort()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will fail its downstream with the provided exception and cancel its upstream
* (unless if finished or failed already in which case the command is ignored). Subsequent invocations of
* completion commands will be ignored.
*
* It is also possible to individually cancel, complete or fail upstream and downstream parts by calling the corresponding
* methods.
*/
final class UniqueKillSwitch private[stream] (private val promise: Promise[Done]) {
/**
* After calling [[UniqueKillSwitch#shutdown()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will complete its downstream and cancel its upstream (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of completion commands will be ignored.
*/
def shutdown(): Unit = promise.trySuccess(Done)
/**
* After calling [[UniqueKillSwitch#abort()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the
* [[UniqueKillSwitch]] will fail its downstream with the provided exception and cancel its upstream
* (unless if finished or failed already in which case the command is ignored). Subsequent invocations of
* completion commands will be ignored.
*/
def abort(ex: Throwable): Unit = promise.tryFailure(ex)
override def toString: String = s"SingleKillSwitch($hashCode)"
}
/**
* A [[SharedKillSwitch]] is a provider for [[Graph]]s of [[FlowShape]] that can be completed or failed from the outside.
* A [[Graph]] returned by the switch can be materialized arbitrary amount of times: every newly materialized [[Graph]]
* belongs to the switch from which it was aquired. Multiple [[SharedKillSwitch]] instances are isolated from each other,
* shutting down or aborting on instance does not affect the [[Graph]]s provided by another instance.
*
* After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will complete their downstreams and cancel their upstreams (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be
* ignored.
*
* After calling [[SharedKillSwitch#abort()]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will fail their downstreams with the provided exception and cancel their upstreams
* (unless it finished or failed already in which case the command is ignored). Subsequent invocations of
* [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be ignored.
*
* The [[Graph]]s provided by the [[SharedKillSwitch]] do not modify the passed through elements in any way or affect
* backpressure in the stream. All provided [[Graph]]s provide the parent [[SharedKillSwitch]] as materialized value.
*
* This class is thread-safe, the instance can be passed safely among threads and its methods may be invoked concurrently.
*/
final class SharedKillSwitch private[stream] (val name: String) {
private[this] val shutdownPromise = Promise[Done]
private[this] val _flow: Graph[FlowShape[Any, Any], SharedKillSwitch] = new SharedKillSwitchFlow
/**
* After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will complete their downstreams and cancel their upstreams (unless if finished or failed already in which
* case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be
* ignored.
*/
def shutdown(): Unit = shutdownPromise.trySuccess(Done)
/**
* After calling [[SharedKillSwitch#abort()]] all materialized, running instances of all [[Graph]]s provided by the
* [[SharedKillSwitch]] will fail their downstreams with the provided exception and cancel their upstreams
* (unless it finished or failed already in which case the command is ignored). Subsequent invocations of
* [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be ignored.
*
* These provided [[Graph]]s materialize to their owning switch. This might make certain integrations simpler than
* passing around the switch instance itself.
*
* @param reason The exception to be used for failing the linked [[Graph]]s
*/
def abort(reason: Throwable): Unit = shutdownPromise.tryFailure(reason)
/**
* Retrurns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking
* [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this
* switch will be stopped normally or failed.
*
* @tparam T Type of the elements the Flow will forward
* @return A reusable [[Graph]] that is linked with the switch. The materialized value provided is this switch itself.
*/
def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch] = _flow.asInstanceOf[Graph[FlowShape[T, T], SharedKillSwitch]]
override def toString: String = s"KillSwitch($name)"
private class SharedKillSwitchFlow extends GraphStageWithMaterializedValue[FlowShape[Any, Any], SharedKillSwitch] {
override val shape: FlowShape[Any, Any] = FlowShape(Inlet[Any]("KillSwitch.in"), Outlet[Any]("KillSwitch.out"))
override def toString: String = s"SharedKillSwitchFlow(switch: $name)"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SharedKillSwitch) = {
val logic = new KillSwitches.KillableGraphStageLogic(shutdownPromise.future, shape) with InHandler with OutHandler {
setHandler(shape.in, this)
setHandler(shape.out, this)
override def onPush(): Unit = push(shape.out, grab(shape.in))
override def onPull(): Unit = pull(shape.in)
}
(logic, SharedKillSwitch.this)
}
}
}

View file

@ -114,109 +114,6 @@ object GraphStages {
private val _detacher = new Detacher[Any] private val _detacher = new Detacher[Any]
def detacher[T]: GraphStage[FlowShape[T, T]] = _detacher.asInstanceOf[GraphStage[FlowShape[T, T]]] def detacher[T]: GraphStage[FlowShape[T, T]] = _detacher.asInstanceOf[GraphStage[FlowShape[T, T]]]
final class Breaker(callback: Breaker.Operation Unit) {
import Breaker._
def complete(): Unit = callback(Complete)
def cancel(): Unit = callback(Cancel)
def fail(ex: Throwable): Unit = callback(Fail(ex))
def completeAndCancel(): Unit = callback(CompleteAndCancel)
def failAndCancel(ex: Throwable): Unit = callback(FailAndCancel(ex))
}
object Breaker extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Breaker]] {
sealed trait Operation
case object Complete extends Operation
case object Cancel extends Operation
case class Fail(ex: Throwable) extends Operation
case object CompleteAndCancel extends Operation
case class FailAndCancel(ex: Throwable) extends Operation
override val initialAttributes = Attributes.name("breaker")
override val shape = FlowShape(Inlet[Any]("breaker.in"), Outlet[Any]("breaker.out"))
override def toString: String = "Breaker"
override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Breaker]
val logic = new GraphStageLogic(shape) {
passAlong(shape.in, shape.out)
setHandler(shape.out, eagerTerminateOutput)
override def preStart(): Unit = {
pull(shape.in)
promise.success(new Breaker(getAsyncCallback[Operation] {
case Complete complete(shape.out)
case Cancel cancel(shape.in)
case Fail(ex) fail(shape.out, ex)
case CompleteAndCancel completeStage()
case FailAndCancel(ex) failStage(ex)
}.invoke))
}
}
(logic, promise.future)
}
}
def breaker[T]: Graph[FlowShape[T, T], Future[Breaker]] = Breaker.asInstanceOf[Graph[FlowShape[T, T], Future[Breaker]]]
object BidiBreaker extends GraphStageWithMaterializedValue[BidiShape[Any, Any, Any, Any], Future[Breaker]] {
import Breaker._
override val initialAttributes = Attributes.name("breaker")
override val shape = BidiShape(
Inlet[Any]("breaker.in1"), Outlet[Any]("breaker.out1"),
Inlet[Any]("breaker.in2"), Outlet[Any]("breaker.out2"))
override def toString: String = "BidiBreaker"
override def createLogicAndMaterializedValue(attr: Attributes) = {
val promise = Promise[Breaker]
val logic = new GraphStageLogic(shape) {
setHandler(shape.in1, new InHandler {
override def onPush(): Unit = push(shape.out1, grab(shape.in1))
override def onUpstreamFinish(): Unit = complete(shape.out1)
override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out1, ex)
})
setHandler(shape.in2, new InHandler {
override def onPush(): Unit = push(shape.out2, grab(shape.in2))
override def onUpstreamFinish(): Unit = complete(shape.out2)
override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out2, ex)
})
setHandler(shape.out1, new OutHandler {
override def onPull(): Unit = pull(shape.in1)
override def onDownstreamFinish(): Unit = cancel(shape.in1)
})
setHandler(shape.out2, new OutHandler {
override def onPull(): Unit = pull(shape.in2)
override def onDownstreamFinish(): Unit = cancel(shape.in2)
})
override def preStart(): Unit = {
promise.success(new Breaker(getAsyncCallback[Operation] {
case Complete
complete(shape.out1)
complete(shape.out2)
case Cancel
cancel(shape.in1)
cancel(shape.in2)
case Fail(ex)
fail(shape.out1, ex)
fail(shape.out2, ex)
case CompleteAndCancel completeStage()
case FailAndCancel(ex) failStage(ex)
}.invoke))
}
}
(logic, promise.future)
}
}
def bidiBreaker[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]] = BidiBreaker.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]]]
private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] { private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] {
val in = Inlet[Any]("terminationWatcher.in") val in = Inlet[Any]("terminationWatcher.in")
val out = Outlet[Any]("terminationWatcher.out") val out = Outlet[Any]("terminationWatcher.out")

View file

@ -763,7 +763,19 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile$"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile$"),
FilterAnyProblemStartingWith("akka.stream.impl.fusing.Collect"), FilterAnyProblemStartingWith("akka.stream.impl.fusing.Collect"),
FilterAnyProblemStartingWith("akka.stream.impl.fusing.DropWhile"), FilterAnyProblemStartingWith("akka.stream.impl.fusing.DropWhile"),
FilterAnyProblemStartingWith("akka.stream.impl.fusing.LimitWeighted") FilterAnyProblemStartingWith("akka.stream.impl.fusing.LimitWeighted"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$"),
// #19892 Removed internal Breaker classes from akka.stream.impl.fusing
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.breaker"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.bidiBreaker"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel$"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"),
ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail")
) )
) )
} }