From c840bd6f37c9f014b93aebe26697dd9ed24b4171 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Tue, 9 May 2017 13:55:08 +0300 Subject: [PATCH] #22808 Move CoupledTerminationFlow to Flow.fromSinkAndSourceCoupled Also mention coupled variant in the uncoupled scaladoc --- .../paradox/java/stream/stages-overview.md | 6 +- .../paradox/scala/stream/stages-overview.md | 6 +- .../scaladsl/CoupledTerminationFlowSpec.scala | 44 ++++++---- .../javadsl/CoupledTerminationFlow.scala | 3 +- .../main/scala/akka/stream/javadsl/Flow.scala | 84 ++++++++++++++++++- .../scaladsl/CoupledTerminationFlow.scala | 10 +-- .../scala/akka/stream/scaladsl/Flow.scala | 80 ++++++++++++++++++ 7 files changed, 199 insertions(+), 34 deletions(-) diff --git a/akka-docs/src/main/paradox/java/stream/stages-overview.md b/akka-docs/src/main/paradox/java/stream/stages-overview.md index a91bf45a9d..fc91415e51 100644 --- a/akka-docs/src/main/paradox/java/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/java/stream/stages-overview.md @@ -1059,16 +1059,14 @@ Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sen and the `Flow` 's output will come from the Source. Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side" -of the such-composed Flow. Use `CoupledTerminationFlow` if you want to couple termination of both of the ends, +of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends, for example most useful in handling websocket connections. --------------------------------------------------------------- -### CoupledTerminationFlow.fromSinkAndSource +### Flow.fromSinkAndSourceCoupled Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. -Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages. - Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages. E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled, diff --git a/akka-docs/src/main/paradox/scala/stream/stages-overview.md b/akka-docs/src/main/paradox/scala/stream/stages-overview.md index 74ead74af2..48105789a4 100644 --- a/akka-docs/src/main/paradox/scala/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/scala/stream/stages-overview.md @@ -1047,16 +1047,14 @@ Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sen and the `Flow` 's output will come from the Source. Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side" -of the such-composed Flow. Use `CoupledTerminationFlow` if you want to couple termination of both of the ends, +of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends, for example most useful in handling websocket connections. --------------------------------------------------------------- -### CoupledTerminationFlow.fromSinkAndSource +### Flow.fromSinkAndSourceCoupled Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. -Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages. - Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages. E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala index 6ddbf7d472..6b1891c8ea 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala @@ -6,10 +6,12 @@ package akka.stream.scaladsl import akka.{ Done, NotUsed } import akka.stream._ import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.testkit.TestProbe import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.scalatest.Assertion +import scala.concurrent.Future import scala.util.{ Failure, Success, Try } import scala.xml.Node @@ -72,7 +74,7 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { val (innerSink, innerSinkAssertion) = interpretInnerSink(innerSinkRule) val (innerSource, innerSourceAssertion) = interpretInnerSource(innerSourceRule) - val flow = CoupledTerminationFlow.fromSinkAndSource(innerSink, innerSource) + val flow = Flow.fromSinkAndSourceCoupledMat(innerSink, innerSource)(Keep.none) outerSource.via(flow).to(outerSink).run() outerAssertions() @@ -83,9 +85,9 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { "completed out:Source => complete in:Sink" in { val probe = TestProbe() - val f = CoupledTerminationFlow.fromSinkAndSource( + val f = Flow.fromSinkAndSourceCoupledMat( Sink.onComplete(d ⇒ probe.ref ! "done"), - Source.empty) // completes right away, should complete the sink as well + Source.empty)(Keep.none) // completes right away, should complete the sink as well f.runWith(Source.maybe, Sink.ignore) // these do nothing. @@ -94,7 +96,7 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { "cancel in:Sink => cancel out:Source" in { val probe = TestProbe() - val f = CoupledTerminationFlow.fromSinkAndSource( + val f = Flow.fromSinkAndSourceCoupledMat( Sink.cancelled, Source.fromPublisher(new Publisher[String] { override def subscribe(subscriber: Subscriber[_ >: String]): Unit = { @@ -104,7 +106,7 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { override def request(l: Long): Unit = () // do nothing }) } - })) // completes right away, should complete the sink as well + }))(Keep.none) // completes right away, should complete the sink as well f.runWith(Source.maybe, Sink.ignore) // these do nothing. @@ -113,15 +115,27 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { "error wrapped Sink when wrapped Source errors " in { val probe = TestProbe() - val f = CoupledTerminationFlow.fromSinkAndSource( + val f = Flow.fromSinkAndSourceCoupledMat( Sink.onComplete(e ⇒ probe.ref ! e.failed.get.getMessage), - Source.failed(new Exception("BOOM!"))) // completes right away, should complete the sink as well + Source.failed(new Exception("BOOM!")))(Keep.none) // completes right away, should complete the sink as well f.runWith(Source.maybe, Sink.ignore) // these do nothing. probe.expectMsg("BOOM!") } + "support usage with Graphs" in { + val source: Graph[SourceShape[Int], TestPublisher.Probe[Int]] = TestSource.probe[Int] + val sink: Graph[SinkShape[Any], Future[Done]] = Sink.ignore + + val flow = Flow.fromSinkAndSourceCoupledMat(sink, source)(Keep.right) + + val (source1, source2) = TestSource.probe[Int].viaMat(flow)(Keep.both).toMat(Sink.ignore)(Keep.left).run + + source1.sendComplete() + source2.expectCancellation() + } + } def interpretOuter(rule: String): (Source[String, NotUsed], Sink[String, NotUsed], () ⇒ Any) = { @@ -148,12 +162,12 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { } val assertCompleteAndCancel = () ⇒ { probe.expectMsgPF() { - case Success(v) ⇒ // good - case "cancel-received" ⇒ // good + case Success(v) ⇒ // good + case "cancel-received" ⇒ // good } probe.expectMsgPF() { - case Success(v) ⇒ // good - case "cancel-received" ⇒ // good + case Success(v) ⇒ // good + case "cancel-received" ⇒ // good } } val assertError = () ⇒ { @@ -163,12 +177,12 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest { } val assertErrorAndCancel = () ⇒ { probe.expectMsgPF() { - case Failure(ex) ⇒ // good - case "cancel-received" ⇒ // good + case Failure(ex) ⇒ // good + case "cancel-received" ⇒ // good } probe.expectMsgPF() { - case Failure(ex) ⇒ // good - case "cancel-received" ⇒ // good + case Failure(ex) ⇒ // good + case "cancel-received" ⇒ // good } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala index 64fce5fd20..4b69a19836 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala @@ -56,6 +56,7 @@ object CoupledTerminationFlow { * * The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering. */ + @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ..., Keep.both())` instead", "2.5.2") def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = - akka.stream.scaladsl.CoupledTerminationFlow.fromSinkAndSource(in.asScala, out.asScala).asJava + akka.stream.scaladsl.Flow.fromSinkAndSourceCoupledMat(in, out)(akka.stream.scaladsl.Keep.both).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 913d2c590a..4153bd3351 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -55,18 +55,98 @@ object Flow { } /** - * Helper to create `Flow` from a `Sink`and a `Source`. + * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input + * will be sent to the Sink and the Flow's output will come from the Source. + * + * The completion of the Sink and Source sides of a Flow constructed using + * this method are independent. So if the Sink receives a completion signal, + * the Source side will remain unaware of that. If you are looking to couple + * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupled` instead. */ def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = new Flow(scaladsl.Flow.fromSinkAndSourceMat(sink, source)(scaladsl.Keep.none)) /** - * Helper to create `Flow` from a `Sink`and a `Source`. + * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input + * will be sent to the Sink and the Flow's output will come from the Source. + * + * The completion of the Sink and Source sides of a Flow constructed using + * this method are independent. So if the Sink receives a completion signal, + * the Source side will remain unaware of that. If you are looking to couple + * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupledMat` instead. + * + * The `combine` function is used to compose the materialized values of the `sink` and `source` + * into the materialized value of the resulting [[Flow]]. */ def fromSinkAndSourceMat[I, O, M1, M2, M]( sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2], combine: function.Function2[M1, M2, M]): Flow[I, O, M] = new Flow(scaladsl.Flow.fromSinkAndSourceMat(sink, source)(combinerToScala(combine))) + + /** + * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. + * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages. + * + * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled, + * however the Sink will also be completed. The table below illustrates the effects in detail: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Returned FlowSink (in)Source (out)
cause: upstream (sink-side) receives completioneffect: receives completioneffect: receives cancel
cause: upstream (sink-side) receives erroreffect: receives erroreffect: receives cancel
cause: downstream (source-side) receives canceleffect: completeseffect: receives cancel
effect: cancels upstream, completes downstreameffect: completescause: signals complete
effect: cancels upstream, errors downstreameffect: receives errorcause: signals error or throws
effect: cancels upstream, completes downstreamcause: cancelseffect: receives cancel
+ * + */ + def fromSinkAndSourceCoupled[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = + new Flow(scaladsl.Flow.fromSinkAndSourceCoupled(sink, source)) + + /** + * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. + * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages. + * + * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled, + * however the Sink will also be completed. The table on [[Flow.fromSinkAndSourceCoupled]] + * illustrates the effects in detail. + * + * The `combine` function is used to compose the materialized values of the `sink` and `source` + * into the materialized value of the resulting [[Flow]]. + */ + def fromSinkAndSourceCoupledMat[I, O, M1, M2, M]( + sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2], + combine: function.Function2[M1, M2, M]): Flow[I, O, M] = + new Flow(scaladsl.Flow.fromSinkAndSourceCoupledMat(sink, source)(combinerToScala(combine))) } /** Create a `Flow` which can process elements of type `T`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala index 60236ad3ac..d4fe7cf6c9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala @@ -59,15 +59,9 @@ object CoupledTerminationFlow { * * The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering. */ + @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2") def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = - // format: OFF - Flow.fromGraph(GraphDSL.create(in, out)(Keep.both) { implicit b => (i, o) => - import GraphDSL.Implicits._ - val bidi = b.add(new CoupledTerminationBidi[I, O]) - /* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */ - FlowShape(bidi.in1, bidi.out2) - }) - // format: ON + Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 56225be88a..7e17fef307 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -315,6 +315,11 @@ object Flow { /** * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input * will be sent to the Sink and the Flow's output will come from the Source. + * + * The completion of the Sink and Source sides of a Flow constructed using + * this method are independent. So if the Sink receives a completion signal, + * the Source side will remain unaware of that. If you are looking to couple + * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupled` instead. */ def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = fromSinkAndSourceMat(sink, source)(Keep.none) @@ -323,11 +328,86 @@ object Flow { * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input * will be sent to the Sink and the Flow's output will come from the Source. * + * The completion of the Sink and Source sides of a Flow constructed using + * this method are independent. So if the Sink receives a completion signal, + * the Source side will remain unaware of that. If you are looking to couple + * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupledMat` instead. + * * The `combine` function is used to compose the materialized values of the `sink` and `source` * into the materialized value of the resulting [[Flow]]. */ def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] = fromGraph(GraphDSL.create(sink, source)(combine) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) }) + + /** + * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. + * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages. + * + * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled, + * however the Sink will also be completed. The table below illustrates the effects in detail: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Returned FlowSink (in)Source (out)
cause: upstream (sink-side) receives completioneffect: receives completioneffect: receives cancel
cause: upstream (sink-side) receives erroreffect: receives erroreffect: receives cancel
cause: downstream (source-side) receives canceleffect: completeseffect: receives cancel
effect: cancels upstream, completes downstreameffect: completescause: signals complete
effect: cancels upstream, errors downstreameffect: receives errorcause: signals error or throws
effect: cancels upstream, completes downstreamcause: cancelseffect: receives cancel
+ * + */ + def fromSinkAndSourceCoupled[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = + fromSinkAndSourceCoupledMat(sink, source)(Keep.none) + + /** + * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them. + * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages. + * + * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled, + * however the Sink will also be completed. The table on [[Flow.fromSinkAndSourceCoupled]] + * illustrates the effects in detail. + * + * The `combine` function is used to compose the materialized values of the `sink` and `source` + * into the materialized value of the resulting [[Flow]]. + */ + def fromSinkAndSourceCoupledMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] = + // format: OFF + Flow.fromGraph(GraphDSL.create(sink, source)(combine) { implicit b => (i, o) => + import GraphDSL.Implicits._ + val bidi = b.add(new CoupledTerminationBidi[I, O]) + /* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */ + FlowShape(bidi.in1, bidi.out2) + }) + // format: ON } object RunnableGraph {