diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala index 0b60a94492..5fcecd8f2f 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala @@ -3,7 +3,7 @@ */ package akka.http.impl.engine.ws -import scala.concurrent.Await +import scala.concurrent.{ Await, Promise } import scala.concurrent.duration.DurationInt import org.scalactic.ConversionCheckedTripleEquals import org.scalatest.concurrent.ScalaFutures @@ -18,7 +18,10 @@ import akka.stream.testkit._ import akka.stream.scaladsl.GraphDSL.Implicits._ import org.scalatest.concurrent.Eventually import java.net.InetSocketAddress + +import akka.Done import akka.stream.impl.fusing.GraphStages +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler } import akka.util.ByteString import akka.stream.testkit.scaladsl.TestSink import akka.testkit.{ AkkaSpec, EventFilter } @@ -69,12 +72,35 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug. val binding = Await.result(bindingFuture, 3.seconds) val myPort = binding.localAddress.getPort + val completeOnlySwitch: Flow[ByteString, ByteString, Promise[Done]] = Flow.fromGraph( + new GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Promise[Done]] { + override val shape: FlowShape[ByteString, ByteString] = + FlowShape(Inlet("completeOnlySwitch.in"), Outlet("completeOnlySwitch.out")) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Promise[Done]) = { + val promise = Promise[Done] + + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(shape.out, grab(shape.in)) + override def onPull(): Unit = pull(shape.in) + + override def preStart(): Unit = { + promise.future.foreach(_ ⇒ getAsyncCallback[Done](_ ⇒ complete(shape.out)).invoke(Done))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + } + + setHandlers(shape.in, shape.out, this) + } + + (logic, promise) + } + }) + val ((response, breaker), sink) = Source.empty .viaMat { Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) .atop(TLSPlacebo()) - .joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via( + .joinMat(completeOnlySwitch.via( Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both) }(Keep.right) .toMat(TestSink.probe[Message])(Keep.both) @@ -85,7 +111,7 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug. .request(10) .expectNoMsg(1500.millis) - breaker.value.get.get.complete() + breaker.trySuccess(Done) source .sendNext(TextMessage("hello")) @@ -149,20 +175,20 @@ class WebSocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug. val myPort = binding.localAddress.getPort @volatile var messages = 0 - val (breaker, completion) = + val (switch, completion) = Source.maybe .viaMat { Http().webSocketClientLayer(WebSocketRequest("ws://localhost:" + myPort)) .atop(TLSPlacebo()) // the resource leak of #19398 existed only for severed websocket connections - .atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right) + .atopMat(KillSwitches.singleBidi[ByteString, ByteString])(Keep.right) .join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)) }(Keep.right) .toMat(Sink.foreach(_ ⇒ messages += 1))(Keep.both) .run() eventually(messages should ===(N)) // breaker should have been fulfilled long ago - breaker.value.get.get.completeAndCancel() + switch.shutdown() completion.futureValue binding.unbind() diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java new file mode 100644 index 0000000000..a31705335c --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java @@ -0,0 +1,102 @@ +package akka.stream.javadsl; + +import akka.Done; +import akka.stream.*; +import akka.stream.testkit.TestPublisher; +import akka.stream.testkit.TestSubscriber; +import akka.stream.testkit.Utils; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class KillSwitchTest extends StreamTest { + public KillSwitchTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", + AkkaSpec.testConf()); + + @Test + public void beAbleToUseKillSwitch() throws Exception { + final TestPublisher.Probe upstream = TestPublisher.probe(0, system); + final TestSubscriber.Probe downstream = TestSubscriber.probe(system); + final SharedKillSwitch killSwitch = KillSwitches.shared("testSwitch"); + + final SharedKillSwitch k = + Source.fromPublisher(upstream) + .viaMat(killSwitch.flow(), Keep.right()) + .to(Sink.fromSubscriber(downstream)).run(materializer); + + final CompletionStage completionStage = + Source.single(1) + .via(killSwitch.flow()) + .runWith(Sink.ignore(), materializer); + + downstream.request(1); + upstream.sendNext(1); + downstream.expectNext(1); + + assertEquals(killSwitch, k); + + killSwitch.shutdown(); + + upstream.expectCancellation(); + downstream.expectComplete(); + + assertEquals(completionStage.toCompletableFuture().get(3, TimeUnit.SECONDS), Done.getInstance()); + } + + @Test + public void beAbleToUseKillSwitchAbort() throws Exception { + final TestPublisher.Probe upstream = TestPublisher.probe(0, system); + final TestSubscriber.Probe downstream = TestSubscriber.probe(system); + final SharedKillSwitch killSwitch = KillSwitches.shared("testSwitch"); + + Source.fromPublisher(upstream) + .viaMat(killSwitch.flow(), Keep.right()) + .runWith(Sink.fromSubscriber(downstream), materializer); + + downstream.request(1); + upstream.sendNext(1); + downstream.expectNext(1); + + final Exception te = new Utils.TE("Testy"); + killSwitch.abort(te); + + upstream.expectCancellation(); + final Throwable te2 = downstream.expectError(); + + assertEquals(te, te2); + } + + + @Test + public void beAbleToUseSingleKillSwitch() throws Exception { + final TestPublisher.Probe upstream = TestPublisher.probe(0, system); + final TestSubscriber.Probe downstream = TestSubscriber.probe(system); + final Graph, UniqueKillSwitch> killSwitchFlow = KillSwitches.single(); + + final UniqueKillSwitch killSwitch = + Source.fromPublisher(upstream) + .viaMat(killSwitchFlow, Keep.right()) + .to(Sink.fromSubscriber(downstream)).run(materializer); + + + downstream.request(1); + upstream.sendNext(1); + downstream.expectNext(1); + + killSwitch.shutdown(); + + upstream.expectCancellation(); + downstream.expectComplete(); + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala new file mode 100644 index 0000000000..f32ac774e1 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowKillSwitchSpec.scala @@ -0,0 +1,307 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.Done +import akka.stream.{ ActorMaterializer, ClosedShape, KillSwitch, KillSwitches } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } +import akka.testkit.{ AkkaSpec, EventFilter } + +import scala.concurrent.duration._ + +class FlowKillSwitchSpec extends AkkaSpec { + + implicit val mat = ActorMaterializer() + + "A UniqueKillSwitch" must { + + "stop a stream if requested" in { + val ((upstream, switch), downstream) = + TestSource.probe[Int].viaMat(KillSwitches.single)(Keep.both).toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch.shutdown() + upstream.expectCancellation() + downstream.expectComplete() + } + + "fail a stream if requested" in { + val ((upstream, switch), downstream) = + TestSource.probe[Int].viaMat(KillSwitches.single)(Keep.both).toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch.abort(TE("Abort")) + upstream.expectCancellation() + downstream.expectError(TE("Abort")) + } + + "work if used multiple times in a flow" in { + val (((upstream, switch1), switch2), downstream) = + TestSource.probe[Int] + .viaMat(KillSwitches.single)(Keep.both) + .recover { case TE(_) ⇒ -1 } + .viaMat(KillSwitches.single)(Keep.both) + .toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch1.abort(TE("Abort")) + upstream.expectCancellation() + downstream.requestNext(-1) + + switch2.shutdown() + downstream.expectComplete() + + } + + "ignore completion after already completed" in { + val ((upstream, switch), downstream) = + TestSource.probe[Int].viaMat(KillSwitches.single)(Keep.both).toMat(TestSink.probe)(Keep.both).run() + + upstream.ensureSubscription() + downstream.ensureSubscription() + + switch.shutdown() + upstream.expectCancellation() + downstream.expectComplete() + + switch.abort(TE("Won't happen")) + upstream.expectNoMsg(100.millis) + downstream.expectNoMsg(100.millis) + } + + } + + "A SharedKillSwitches" must { + + "stop a stream if requested" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + + val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch.shutdown() + upstream.expectCancellation() + downstream.expectComplete() + } + + "fail a stream if requested" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + + val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch.abort(TE("Abort")) + upstream.expectCancellation() + downstream.expectError(TE("Abort")) + } + + "pass through all elements unmodified" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + Source(1 to 100).via(switch.flow).runWith(Sink.seq).futureValue should ===(1 to 100) + } + + "provide a flow that if materialized multiple times (with multiple types) stops all streams if requested" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + + val (upstream1, downstream1) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + val (upstream2, downstream2) = TestSource.probe[String].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream1.request(1) + upstream1.sendNext(1) + downstream1.expectNext(1) + + downstream2.request(2) + upstream2.sendNext("A").sendNext("B") + downstream2.expectNext("A", "B") + + switch.shutdown() + upstream1.expectCancellation() + upstream2.expectCancellation() + downstream1.expectComplete() + downstream2.expectComplete() + } + + "provide a flow that if materialized multiple times (with multiple types) fails all streams if requested" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + + val (upstream1, downstream1) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + val (upstream2, downstream2) = TestSource.probe[String].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream1.request(1) + upstream1.sendNext(1) + downstream1.expectNext(1) + + downstream2.request(2) + upstream2.sendNext("A").sendNext("B") + downstream2.expectNext("A", "B") + + switch.abort(TE("Abort")) + upstream1.expectCancellation() + upstream2.expectCancellation() + downstream1.expectError(TE("Abort")) + downstream2.expectError(TE("Abort")) + + } + + "ignore subsequent aborts and shutdowns after shutdown" in { + val switch = KillSwitches.shared("switch") + + val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch.shutdown() + upstream.expectCancellation() + downstream.expectComplete() + + switch.shutdown() + upstream.expectNoMsg(100.millis) + downstream.expectNoMsg(100.millis) + + switch.abort(TE("Abort")) + upstream.expectNoMsg(100.millis) + downstream.expectNoMsg(100.millis) + } + + "ignore subsequent aborts and shutdowns after abort" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + + val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream.request(1) + upstream.sendNext(1) + downstream.expectNext(1) + + switch.abort(TE("Abort")) + upstream.expectCancellation() + downstream.expectError(TE("Abort")) + + switch.shutdown() + upstream.expectNoMsg(100.millis) + downstream.expectNoMsg(100.millis) + + switch.abort(TE("Abort_Late")) + upstream.expectNoMsg(100.millis) + downstream.expectNoMsg(100.millis) + } + + "complete immediately flows materialized after switch shutdown" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + switch.shutdown() + + val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + upstream.expectCancellation() + downstream.expectSubscriptionAndComplete() + } + + "fail immediately flows materialized after switch failure" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + switch.abort(TE("Abort")) + + val (upstream, downstream) = TestSource.probe[Int].via(switch.flow).toMat(TestSink.probe)(Keep.both).run() + upstream.expectCancellation() + downstream.expectSubscriptionAndError(TE("Abort")) + } + + "should not cause problems if switch is shutdown after Flow completed normally" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + Source(1 to 10).via(switch.flow).runWith(Sink.seq).futureValue should ===(1 to 10) + + switch.shutdown() + } + + "provide flows that materialize to its owner KillSwitch" in assertAllStagesStopped { + val switch = KillSwitches.shared("switch") + val (switch2, completion) = Source.maybe[Int].viaMat(switch.flow)(Keep.right).toMat(Sink.ignore)(Keep.both).run() + + (switch2 eq switch) should be(true) + switch2.shutdown() + completion.futureValue should ===(Done) + } + + "not affect streams corresponding to another KillSwitch" in assertAllStagesStopped { + val switch1 = KillSwitches.shared("switch") + val switch2 = KillSwitches.shared("switch") + + switch1 should !==(switch2) + + val (upstream1, downstream1) = TestSource.probe[Int].via(switch1.flow).toMat(TestSink.probe)(Keep.both).run() + val (upstream2, downstream2) = TestSource.probe[Int].via(switch2.flow).toMat(TestSink.probe)(Keep.both).run() + + downstream1.request(1) + upstream1.sendNext(1) + downstream1.expectNext(1) + + downstream2.request(1) + upstream2.sendNext(2) + downstream2.expectNext(2) + + switch1.shutdown() + upstream1.expectCancellation() + downstream1.expectComplete() + upstream2.expectNoMsg(100.millis) + downstream2.expectNoMsg(100.millis) + + switch2.abort(TE("Abort")) + upstream1.expectNoMsg(100.millis) + downstream1.expectNoMsg(100.millis) + upstream2.expectCancellation() + downstream2.expectError(TE("Abort")) + } + + "allow using multiple KillSwitch in one graph" in assertAllStagesStopped { + val switch1 = KillSwitches.shared("switch") + val switch2 = KillSwitches.shared("switch") + + val downstream = RunnableGraph.fromGraph(GraphDSL.create(TestSink.probe[Int]) { implicit b ⇒ + snk ⇒ + import GraphDSL.Implicits._ + val merge = b.add(Merge[Int](2)) + + Source.maybe[Int].via(switch1.flow) ~> merge ~> snk + Source.maybe[Int].via(switch2.flow) ~> merge + + ClosedShape + }).run() + + downstream.ensureSubscription() + downstream.expectNoMsg(100.millis) + + switch1.shutdown() + downstream.expectNoMsg(100.millis) + + switch2.shutdown() + downstream.expectComplete() + } + + "use its name on the flows it hands out" in assertAllStagesStopped { + pending // toString does not work for me after rebase + val switch = KillSwitches.shared("mySwitchName") + + switch.toString should ===("KillSwitch(mySwitchName)") + switch.flow.toString should ===("Flow(KillSwitchFlow(switch: mySwitchName))") + + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala new file mode 100644 index 0000000000..fa5e8e1d1b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala @@ -0,0 +1,259 @@ +/** + * Copyright (C) 2015-2016 Lightbend Inc. + */ +package akka.stream + +import akka.Done +import akka.stream.stage._ + +import scala.concurrent.{ Future, Promise } +import scala.util.{ Failure, Success, Try } + +/** + * Creates shared or single kill switches which can be used to control completion of graphs from the outside. + * - The factory ``shared()`` returns a [[SharedKillSwitch]] which provides a [[Graph]] of [[FlowShape]] that can be + * used in arbitrary number of graphs and materializations. The switch simultaneously + * controls completion in all of those graphs. + * - The factory ``single()`` returns a [[Graph]] of [[FlowShape]] that materializes to a [[UniqueKillSwitch]] + * which is always unique + * to that materialized Flow itself. + * + * Creates a [[SharedKillSwitch]] that can be used to externally control the completion of various streams. + * + */ +object KillSwitches { + + /** + * Creates a new [[SharedKillSwitch]] with the given name that can be used to control the completion of multiple + * streams from the outside simultaneously. + * + * @see SharedKillSwitch + */ + def shared(name: String): SharedKillSwitch = new SharedKillSwitch(name) + + /** + * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion + * of that unique materialization. Different materializations result in different, independent switches. + * + * For a Bidi version see [[KillSwitch#singleBidi]] + */ + def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch] = + UniqueKillSwitchStage.asInstanceOf[Graph[FlowShape[T, T], UniqueKillSwitch]] + + /** + * Creates a new [[Graph]] of [[FlowShape]] that materializes to an external switch that allows external completion + * of that unique materialization. Different materializations result in different, independent switches. + * + * For a Flow version see [[KillSwitch#single]] + */ + def singleBidi[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch] = + UniqueBidiKillSwitchStage.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], UniqueKillSwitch]] + + abstract class KillableGraphStageLogic(val terminationSignal: Future[Done], _shape: Shape) extends GraphStageLogic(_shape) { + override def preStart(): Unit = { + terminationSignal.value match { + case Some(status) ⇒ onSwitch(status) + case _ ⇒ + // callback.invoke is a simple actor send, so it is fine to run on the invoking thread + terminationSignal.onComplete(getAsyncCallback[Try[Done]](onSwitch).invoke)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + } + } + + private def onSwitch(mode: Try[Done]): Unit = mode match { + case Success(_) ⇒ completeStage() + case Failure(ex) ⇒ failStage(ex) + } + } + + private[stream] object UniqueKillSwitchStage extends GraphStageWithMaterializedValue[FlowShape[Any, Any], UniqueKillSwitch] { + override val initialAttributes = Attributes.name("breaker") + override val shape = FlowShape(Inlet[Any]("KillSwitch.in"), Outlet[Any]("KillSwitch.out")) + override def toString: String = "UniqueKillSwitchFlow" + + override def createLogicAndMaterializedValue(attr: Attributes) = { + val promise = Promise[Done] + val switch = new UniqueKillSwitch(promise) + + val logic = new KillableGraphStageLogic(promise.future, shape) with InHandler with OutHandler { + override def onPush(): Unit = push(shape.out, grab(shape.in)) + override def onPull(): Unit = pull(shape.in) + setHandler(shape.in, this) + setHandler(shape.out, this) + } + + (logic, switch) + } + } + + private[stream] object UniqueBidiKillSwitchStage extends GraphStageWithMaterializedValue[BidiShape[Any, Any, Any, Any], UniqueKillSwitch] { + + override val initialAttributes = Attributes.name("breaker") + override val shape = BidiShape( + Inlet[Any]("KillSwitchBidi.in1"), Outlet[Any]("KillSwitchBidi .out1"), + Inlet[Any]("KillSwitchBidi.in2"), Outlet[Any]("KillSwitchBidi.out2")) + override def toString: String = "UniqueKillSwitchBidi" + + override def createLogicAndMaterializedValue(attr: Attributes) = { + val promise = Promise[Done] + val switch = new UniqueKillSwitch(promise) + + val logic = new KillableGraphStageLogic(promise.future, shape) { + + setHandler(shape.in1, new InHandler { + override def onPush(): Unit = push(shape.out1, grab(shape.in1)) + override def onUpstreamFinish(): Unit = complete(shape.out1) + override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out1, ex) + }) + setHandler(shape.in2, new InHandler { + override def onPush(): Unit = push(shape.out2, grab(shape.in2)) + override def onUpstreamFinish(): Unit = complete(shape.out2) + override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out2, ex) + }) + setHandler(shape.out1, new OutHandler { + override def onPull(): Unit = pull(shape.in1) + override def onDownstreamFinish(): Unit = cancel(shape.in1) + }) + setHandler(shape.out2, new OutHandler { + override def onPull(): Unit = pull(shape.in2) + override def onDownstreamFinish(): Unit = cancel(shape.in2) + }) + + } + + (logic, switch) + } + } + +} + +/** + * A [[KillSwitch]] allows completion of [[Graph]]s from the outside by completing [[Graph]]s of [[FlowShape]] linked + * to the switch. Depending on whether the [[KillSwitch]] is a [[UniqueKillSwitch]] or a [[SharedKillSwitch]] one or + * multiple streams might be linked with the switch. For details see the documentation of the concrete subclasses of + * this interface. + */ +trait KillSwitch { + /** + * After calling [[KillSwitch#shutdown()]] the linked [[Graph]]s of [[FlowShape]] are completed normally. + */ + def shutdown(): Unit + /** + * After calling [[KillSwitch#abort()]] the linked [[Graph]]s of [[FlowShape]] are failed. + */ + def abort(ex: Throwable): Unit +} + +/** + * A [[UniqueKillSwitch]] is always a result of a materialization (unlike [[SharedKillSwitch]] which is constructed + * before any materialization) and it always controls that graph and stage which yielded the materialized value. + * + * After calling [[UniqueKillSwitch#shutdown()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the + * [[UniqueKillSwitch]] will complete its downstream and cancel its upstream (unless if finished or failed already in which + * case the command is ignored). Subsequent invocations of completion commands will be ignored. + * + * After calling [[UniqueKillSwitch#abort()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the + * [[UniqueKillSwitch]] will fail its downstream with the provided exception and cancel its upstream + * (unless if finished or failed already in which case the command is ignored). Subsequent invocations of + * completion commands will be ignored. + * + * It is also possible to individually cancel, complete or fail upstream and downstream parts by calling the corresponding + * methods. + */ +final class UniqueKillSwitch private[stream] (private val promise: Promise[Done]) { + + /** + * After calling [[UniqueKillSwitch#shutdown()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the + * [[UniqueKillSwitch]] will complete its downstream and cancel its upstream (unless if finished or failed already in which + * case the command is ignored). Subsequent invocations of completion commands will be ignored. + */ + def shutdown(): Unit = promise.trySuccess(Done) + + /** + * After calling [[UniqueKillSwitch#abort()]] the running instance of the [[Graph]] of [[FlowShape]] that materialized to the + * [[UniqueKillSwitch]] will fail its downstream with the provided exception and cancel its upstream + * (unless if finished or failed already in which case the command is ignored). Subsequent invocations of + * completion commands will be ignored. + */ + def abort(ex: Throwable): Unit = promise.tryFailure(ex) + + override def toString: String = s"SingleKillSwitch($hashCode)" +} + +/** + * A [[SharedKillSwitch]] is a provider for [[Graph]]s of [[FlowShape]] that can be completed or failed from the outside. + * A [[Graph]] returned by the switch can be materialized arbitrary amount of times: every newly materialized [[Graph]] + * belongs to the switch from which it was aquired. Multiple [[SharedKillSwitch]] instances are isolated from each other, + * shutting down or aborting on instance does not affect the [[Graph]]s provided by another instance. + * + * After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the + * [[SharedKillSwitch]] will complete their downstreams and cancel their upstreams (unless if finished or failed already in which + * case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be + * ignored. + * + * After calling [[SharedKillSwitch#abort()]] all materialized, running instances of all [[Graph]]s provided by the + * [[SharedKillSwitch]] will fail their downstreams with the provided exception and cancel their upstreams + * (unless it finished or failed already in which case the command is ignored). Subsequent invocations of + * [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be ignored. + * + * The [[Graph]]s provided by the [[SharedKillSwitch]] do not modify the passed through elements in any way or affect + * backpressure in the stream. All provided [[Graph]]s provide the parent [[SharedKillSwitch]] as materialized value. + * + * This class is thread-safe, the instance can be passed safely among threads and its methods may be invoked concurrently. + */ +final class SharedKillSwitch private[stream] (val name: String) { + private[this] val shutdownPromise = Promise[Done] + private[this] val _flow: Graph[FlowShape[Any, Any], SharedKillSwitch] = new SharedKillSwitchFlow + + /** + * After calling [[SharedKillSwitch#shutdown()]] all materialized, running instances of all [[Graph]]s provided by the + * [[SharedKillSwitch]] will complete their downstreams and cancel their upstreams (unless if finished or failed already in which + * case the command is ignored). Subsequent invocations of [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be + * ignored. + */ + def shutdown(): Unit = shutdownPromise.trySuccess(Done) + + /** + * After calling [[SharedKillSwitch#abort()]] all materialized, running instances of all [[Graph]]s provided by the + * [[SharedKillSwitch]] will fail their downstreams with the provided exception and cancel their upstreams + * (unless it finished or failed already in which case the command is ignored). Subsequent invocations of + * [[SharedKillSwitch#shutdown()]] and [[SharedKillSwitch#abort()]] will be ignored. + * + * These provided [[Graph]]s materialize to their owning switch. This might make certain integrations simpler than + * passing around the switch instance itself. + * + * @param reason The exception to be used for failing the linked [[Graph]]s + */ + def abort(reason: Throwable): Unit = shutdownPromise.tryFailure(reason) + + /** + * Retrurns a typed Flow of a requested type that will be linked to this [[SharedKillSwitch]] instance. By invoking + * [[SharedKillSwitch#shutdown()]] or [[SharedKillSwitch#abort()]] all running instances of all provided [[Graph]]s by this + * switch will be stopped normally or failed. + * + * @tparam T Type of the elements the Flow will forward + * @return A reusable [[Graph]] that is linked with the switch. The materialized value provided is this switch itself. + */ + def flow[T]: Graph[FlowShape[T, T], SharedKillSwitch] = _flow.asInstanceOf[Graph[FlowShape[T, T], SharedKillSwitch]] + + override def toString: String = s"KillSwitch($name)" + + private class SharedKillSwitchFlow extends GraphStageWithMaterializedValue[FlowShape[Any, Any], SharedKillSwitch] { + override val shape: FlowShape[Any, Any] = FlowShape(Inlet[Any]("KillSwitch.in"), Outlet[Any]("KillSwitch.out")) + + override def toString: String = s"SharedKillSwitchFlow(switch: $name)" + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, SharedKillSwitch) = { + val logic = new KillSwitches.KillableGraphStageLogic(shutdownPromise.future, shape) with InHandler with OutHandler { + setHandler(shape.in, this) + setHandler(shape.out, this) + + override def onPush(): Unit = push(shape.out, grab(shape.in)) + override def onPull(): Unit = pull(shape.in) + + } + + (logic, SharedKillSwitch.this) + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 92cb4ed69d..c973bb1fd4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -114,109 +114,6 @@ object GraphStages { private val _detacher = new Detacher[Any] def detacher[T]: GraphStage[FlowShape[T, T]] = _detacher.asInstanceOf[GraphStage[FlowShape[T, T]]] - final class Breaker(callback: Breaker.Operation ⇒ Unit) { - import Breaker._ - def complete(): Unit = callback(Complete) - def cancel(): Unit = callback(Cancel) - def fail(ex: Throwable): Unit = callback(Fail(ex)) - def completeAndCancel(): Unit = callback(CompleteAndCancel) - def failAndCancel(ex: Throwable): Unit = callback(FailAndCancel(ex)) - } - - object Breaker extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Breaker]] { - sealed trait Operation - case object Complete extends Operation - case object Cancel extends Operation - case class Fail(ex: Throwable) extends Operation - case object CompleteAndCancel extends Operation - case class FailAndCancel(ex: Throwable) extends Operation - - override val initialAttributes = Attributes.name("breaker") - override val shape = FlowShape(Inlet[Any]("breaker.in"), Outlet[Any]("breaker.out")) - override def toString: String = "Breaker" - - override def createLogicAndMaterializedValue(attr: Attributes) = { - val promise = Promise[Breaker] - - val logic = new GraphStageLogic(shape) { - - passAlong(shape.in, shape.out) - setHandler(shape.out, eagerTerminateOutput) - - override def preStart(): Unit = { - pull(shape.in) - promise.success(new Breaker(getAsyncCallback[Operation] { - case Complete ⇒ complete(shape.out) - case Cancel ⇒ cancel(shape.in) - case Fail(ex) ⇒ fail(shape.out, ex) - case CompleteAndCancel ⇒ completeStage() - case FailAndCancel(ex) ⇒ failStage(ex) - }.invoke)) - } - } - - (logic, promise.future) - } - } - - def breaker[T]: Graph[FlowShape[T, T], Future[Breaker]] = Breaker.asInstanceOf[Graph[FlowShape[T, T], Future[Breaker]]] - - object BidiBreaker extends GraphStageWithMaterializedValue[BidiShape[Any, Any, Any, Any], Future[Breaker]] { - import Breaker._ - - override val initialAttributes = Attributes.name("breaker") - override val shape = BidiShape( - Inlet[Any]("breaker.in1"), Outlet[Any]("breaker.out1"), - Inlet[Any]("breaker.in2"), Outlet[Any]("breaker.out2")) - override def toString: String = "BidiBreaker" - - override def createLogicAndMaterializedValue(attr: Attributes) = { - val promise = Promise[Breaker] - - val logic = new GraphStageLogic(shape) { - - setHandler(shape.in1, new InHandler { - override def onPush(): Unit = push(shape.out1, grab(shape.in1)) - override def onUpstreamFinish(): Unit = complete(shape.out1) - override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out1, ex) - }) - setHandler(shape.in2, new InHandler { - override def onPush(): Unit = push(shape.out2, grab(shape.in2)) - override def onUpstreamFinish(): Unit = complete(shape.out2) - override def onUpstreamFailure(ex: Throwable): Unit = fail(shape.out2, ex) - }) - setHandler(shape.out1, new OutHandler { - override def onPull(): Unit = pull(shape.in1) - override def onDownstreamFinish(): Unit = cancel(shape.in1) - }) - setHandler(shape.out2, new OutHandler { - override def onPull(): Unit = pull(shape.in2) - override def onDownstreamFinish(): Unit = cancel(shape.in2) - }) - - override def preStart(): Unit = { - promise.success(new Breaker(getAsyncCallback[Operation] { - case Complete ⇒ - complete(shape.out1) - complete(shape.out2) - case Cancel ⇒ - cancel(shape.in1) - cancel(shape.in2) - case Fail(ex) ⇒ - fail(shape.out1, ex) - fail(shape.out2, ex) - case CompleteAndCancel ⇒ completeStage() - case FailAndCancel(ex) ⇒ failStage(ex) - }.invoke)) - } - } - - (logic, promise.future) - } - } - - def bidiBreaker[T1, T2]: Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]] = BidiBreaker.asInstanceOf[Graph[BidiShape[T1, T1, T2, T2], Future[Breaker]]] - private object TerminationWatcher extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Future[Done]] { val in = Inlet[Any]("terminationWatcher.in") val out = Outlet[Any]("terminationWatcher.out") diff --git a/project/MiMa.scala b/project/MiMa.scala index fa618654ea..f4ad109b29 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -763,7 +763,19 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.Stages$DropWhile$"), FilterAnyProblemStartingWith("akka.stream.impl.fusing.Collect"), FilterAnyProblemStartingWith("akka.stream.impl.fusing.DropWhile"), - FilterAnyProblemStartingWith("akka.stream.impl.fusing.LimitWeighted") + FilterAnyProblemStartingWith("akka.stream.impl.fusing.LimitWeighted"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$"), + + // #19892 Removed internal Breaker classes from akka.stream.impl.fusing + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.breaker"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphStages.bidiBreaker"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail$"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel$"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$FailAndCancel"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Operation"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$Breaker$Fail") ) ) }