diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index b2fd70d7d8..c843f71557 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -716,6 +716,8 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug **completes** when upstream completes +**cancels** when downstream or `Sink` cancels + --------------------------------------------------------------- ### map diff --git a/akka-docs/src/main/paradox/stream/stream-error.md b/akka-docs/src/main/paradox/stream/stream-error.md index 128d8f41f6..e7e26298d3 100644 --- a/akka-docs/src/main/paradox/stream/stream-error.md +++ b/akka-docs/src/main/paradox/stream/stream-error.md @@ -126,10 +126,26 @@ Java : @@snip [RestartDocTest.java]($code$/java/jdocs/stream/RestartDocTest.java) { #with-kill-switch } Sinks and flows can also be supervised, using @scala[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`] -@java[`akka.stream.scaladsl.RestartSink` and `akka.stream.scaladsl.RestartFlow`]. The `RestartSink` is restarted when +@java[`akka.stream.javadsl.RestartSink` and `akka.stream.javadsl.RestartFlow`]. The `RestartSink` is restarted when it cancels, while the `RestartFlow` is restarted when either the in port cancels, the out port completes, or the out port sends an error. +@@@ note + +Care should be taken when using `GraphStage`s that conditionally propagate termination signals inside a +`RestartSource`, `RestartSink` or `RestartFlow`. + +An example is a `Broadcast` stage with the default `eagerCancel = false` where +some of the outlets are for side-effecting branches (that do not re-join e.g. via a `Merge`). +A failure on a side branch will not terminate the supervised stream which will +not be restarted. Conversely, a failure on the main branch can trigger a restart but leave behind old +running instances of side branches. + +In this example `eagerCancel` should probably be set to `true`, or, when only a single side branch is used, `alsoTo` +or `divertTo` should be considered as alternatives. + +@@@ + ## Supervision Strategies @@@ note diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index 512064a391..3a525c22f2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -72,7 +72,6 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender { val a = system.actorOf(Props(new ActorWithMaterializer(p)).withDispatcher("akka.test.stream-dispatcher")) p.expectMsg("hello") - p.expectMsg("one") a ! PoisonPill val Failure(ex) = p.expectMsgType[Try[Done]] } @@ -101,7 +100,9 @@ object ActorMaterializerSpec { implicit val mat = ActorMaterializer(settings)(context) Source.repeat("hello") - .alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one"))) + .take(1) + .concat(Source.maybe) + .map(p.ref ! _) .runWith(Sink.onComplete(signal ⇒ { p.ref ! signal })) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 86e916a9f1..eba26e5a2d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -12,6 +12,8 @@ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.testkit.TestProbe +import scala.concurrent.Promise + object ActorRefBackpressureSinkSpec { val initMessage = "start" val completeMessage = "done" @@ -123,11 +125,12 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { val fw = createActor(classOf[Fw2]) val sink = Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage) .withAttributes(inputBuffer(bufferSize, bufferSize)) - val probe = Source(1 to streamElementCount) - .alsoToMat(Flow[Int].take(bufferSize).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) + val bufferFullProbe = Promise[akka.Done.type] + Source(1 to streamElementCount) + .alsoTo(Flow[Int].drop(bufferSize - 1).to(Sink.foreach(_ ⇒ bufferFullProbe.trySuccess(akka.Done)))) .to(sink) .run() - probe.futureValue should ===(akka.Done) + bufferFullProbe.future.futureValue should ===(akka.Done) expectMsg(initMessage) fw ! TriggerAckMessage for (i ← 1 to streamElementCount) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala index e525f18107..ad68ffd101 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBroadcastSpec.scala @@ -1,7 +1,8 @@ package akka.stream.scaladsl -import akka.stream.testkit.scaladsl.TestSink -import scala.concurrent.{ Future, Await } +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } + +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import akka.stream._ import akka.stream.testkit._ @@ -240,13 +241,22 @@ class GraphBroadcastSpec extends StreamSpec { ps2.expectComplete() } - "alsoTo must continue if sink cancels" in assertAllStagesStopped { - val p, p2 = TestSink.probe[Int](system) - val (ps1, ps2) = Source(1 to 6).alsoToMat(p)(Keep.right).toMat(p2)(Keep.both).run() - ps2.request(6) - ps1.cancel() - ps2.expectNext(1, 2, 3, 4, 5, 6) - ps2.expectComplete() + "cancel if alsoTo side branch cancels" in assertAllStagesStopped { + val in = TestSource.probe[Int](system) + val outSide = TestSink.probe[Int](system) + val (pIn, pSide) = in.alsoToMat(outSide)(Keep.both).toMat(Sink.ignore)(Keep.left).run() + + pSide.cancel() + pIn.expectCancellation() + } + + "cancel if alsoTo main branch cancels" in assertAllStagesStopped { + val in = TestSource.probe[Int](system) + val outMain = TestSink.probe[Int](system) + val (pIn, pMain) = in.alsoToMat(Sink.ignore)(Keep.left).toMat(outMain)(Keep.both).run() + + pMain.cancel() + pIn.expectCancellation() } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index cbd0accd85..e755da7fa1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -10,7 +10,7 @@ import akka.stream.{ ActorMaterializer, StreamDetachedException } import akka.stream.testkit.Utils._ import akka.stream.testkit._ -import scala.concurrent.Await +import scala.concurrent.{ Await, Promise } import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -126,11 +126,12 @@ class QueueSinkSpec extends StreamSpec { val streamElementCount = bufferSize + 4 val sink = Sink.queue[Int]() .withAttributes(inputBuffer(bufferSize, bufferSize)) - val (probe, queue) = Source(1 to streamElementCount) - .alsoToMat(Flow[Int].take(bufferSize).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) - .toMat(sink)(Keep.both) + val bufferFullProbe = Promise[akka.Done.type] + val queue = Source(1 to streamElementCount) + .alsoTo(Flow[Int].drop(bufferSize - 1).to(Sink.foreach(_ ⇒ bufferFullProbe.trySuccess(akka.Done)))) + .toMat(sink)(Keep.right) .run() - probe.futureValue should ===(akka.Done) + bufferFullProbe.future.futureValue should ===(akka.Done) for (i ← 1 to streamElementCount) { queue.pull() pipeTo testActor expectMsg(Some(i)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 171638a7c5..03bc1226ff 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1676,7 +1676,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream or Sink cancels */ def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.alsoTo(that)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b5be3606ca..5926cd48d3 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -732,7 +732,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream or Sink cancels */ def alsoTo(that: Graph[SinkShape[Out], _]): javadsl.Source[Out, Mat] = new Source(delegate.alsoTo(that)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 6c40bfb05b..1629f80ba4 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1145,7 +1145,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream or Sink cancels */ def alsoTo(that: Graph[SinkShape[Out], _]): SubFlow[In, Out, Mat] = new SubFlow(delegate.alsoTo(that)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 7e471738a9..0b9f4c8bb5 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1137,7 +1137,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream cancels + * '''Cancels when''' downstream or Sink cancels */ def alsoTo(that: Graph[SinkShape[Out], _]): SubSource[Out, Mat] = new SubSource(delegate.alsoTo(that)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8d75e058ce..c26be1cf59 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -2286,14 +2286,14 @@ trait FlowOps[+Out, +Mat] { * * '''Completes when''' upstream completes * - * '''Cancels when''' downstream and Sink cancel + * '''Cancels when''' downstream or Sink cancels */ def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that)) protected def alsoToGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = GraphDSL.create(that) { implicit b ⇒ r ⇒ import GraphDSL.Implicits._ - val bcast = b.add(Broadcast[Out](2)) + val bcast = b.add(Broadcast[Out](2, eagerCancel = true)) bcast.out(1) ~> r FlowShape(bcast.in, bcast.out(0)) }