diff --git a/akka-docs/src/main/paradox/java/stream/stages-overview.md b/akka-docs/src/main/paradox/java/stream/stages-overview.md
index a91bf45a9d..fc91415e51 100644
--- a/akka-docs/src/main/paradox/java/stream/stages-overview.md
+++ b/akka-docs/src/main/paradox/java/stream/stages-overview.md
@@ -1059,16 +1059,14 @@ Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sen
and the `Flow` 's output will come from the Source.
Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side"
-of the such-composed Flow. Use `CoupledTerminationFlow` if you want to couple termination of both of the ends,
+of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends,
for example most useful in handling websocket connections.
---------------------------------------------------------------
-### CoupledTerminationFlow.fromSinkAndSource
+### Flow.fromSinkAndSourceCoupled
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.
-Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.
-
Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages.
E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled,
diff --git a/akka-docs/src/main/paradox/scala/stream/stages-overview.md b/akka-docs/src/main/paradox/scala/stream/stages-overview.md
index 74ead74af2..48105789a4 100644
--- a/akka-docs/src/main/paradox/scala/stream/stages-overview.md
+++ b/akka-docs/src/main/paradox/scala/stream/stages-overview.md
@@ -1047,16 +1047,14 @@ Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sen
and the `Flow` 's output will come from the Source.
Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side"
-of the such-composed Flow. Use `CoupledTerminationFlow` if you want to couple termination of both of the ends,
+of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends,
for example most useful in handling websocket connections.
---------------------------------------------------------------
-### CoupledTerminationFlow.fromSinkAndSource
+### Flow.fromSinkAndSourceCoupled
Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them.
-Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages.
-
Similar to `Flow.fromSinkAndSource` however couples the termination of these two stages.
E.g. if the emitted `Flow` gets a cancellation, the `Source` of course is cancelled,
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala
index 6ddbf7d472..6b1891c8ea 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CoupledTerminationFlowSpec.scala
@@ -6,10 +6,12 @@ package akka.stream.scaladsl
import akka.{ Done, NotUsed }
import akka.stream._
import akka.stream.testkit._
+import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.TestProbe
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import org.scalatest.Assertion
+import scala.concurrent.Future
import scala.util.{ Failure, Success, Try }
import scala.xml.Node
@@ -72,7 +74,7 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
val (innerSink, innerSinkAssertion) = interpretInnerSink(innerSinkRule)
val (innerSource, innerSourceAssertion) = interpretInnerSource(innerSourceRule)
- val flow = CoupledTerminationFlow.fromSinkAndSource(innerSink, innerSource)
+ val flow = Flow.fromSinkAndSourceCoupledMat(innerSink, innerSource)(Keep.none)
outerSource.via(flow).to(outerSink).run()
outerAssertions()
@@ -83,9 +85,9 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
"completed out:Source => complete in:Sink" in {
val probe = TestProbe()
- val f = CoupledTerminationFlow.fromSinkAndSource(
+ val f = Flow.fromSinkAndSourceCoupledMat(
Sink.onComplete(d ⇒ probe.ref ! "done"),
- Source.empty) // completes right away, should complete the sink as well
+ Source.empty)(Keep.none) // completes right away, should complete the sink as well
f.runWith(Source.maybe, Sink.ignore) // these do nothing.
@@ -94,7 +96,7 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
"cancel in:Sink => cancel out:Source" in {
val probe = TestProbe()
- val f = CoupledTerminationFlow.fromSinkAndSource(
+ val f = Flow.fromSinkAndSourceCoupledMat(
Sink.cancelled,
Source.fromPublisher(new Publisher[String] {
override def subscribe(subscriber: Subscriber[_ >: String]): Unit = {
@@ -104,7 +106,7 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
override def request(l: Long): Unit = () // do nothing
})
}
- })) // completes right away, should complete the sink as well
+ }))(Keep.none) // completes right away, should complete the sink as well
f.runWith(Source.maybe, Sink.ignore) // these do nothing.
@@ -113,15 +115,27 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
"error wrapped Sink when wrapped Source errors " in {
val probe = TestProbe()
- val f = CoupledTerminationFlow.fromSinkAndSource(
+ val f = Flow.fromSinkAndSourceCoupledMat(
Sink.onComplete(e ⇒ probe.ref ! e.failed.get.getMessage),
- Source.failed(new Exception("BOOM!"))) // completes right away, should complete the sink as well
+ Source.failed(new Exception("BOOM!")))(Keep.none) // completes right away, should complete the sink as well
f.runWith(Source.maybe, Sink.ignore) // these do nothing.
probe.expectMsg("BOOM!")
}
+ "support usage with Graphs" in {
+ val source: Graph[SourceShape[Int], TestPublisher.Probe[Int]] = TestSource.probe[Int]
+ val sink: Graph[SinkShape[Any], Future[Done]] = Sink.ignore
+
+ val flow = Flow.fromSinkAndSourceCoupledMat(sink, source)(Keep.right)
+
+ val (source1, source2) = TestSource.probe[Int].viaMat(flow)(Keep.both).toMat(Sink.ignore)(Keep.left).run
+
+ source1.sendComplete()
+ source2.expectCancellation()
+ }
+
}
def interpretOuter(rule: String): (Source[String, NotUsed], Sink[String, NotUsed], () ⇒ Any) = {
@@ -148,12 +162,12 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
}
val assertCompleteAndCancel = () ⇒ {
probe.expectMsgPF() {
- case Success(v) ⇒ // good
- case "cancel-received" ⇒ // good
+ case Success(v) ⇒ // good
+ case "cancel-received" ⇒ // good
}
probe.expectMsgPF() {
- case Success(v) ⇒ // good
- case "cancel-received" ⇒ // good
+ case Success(v) ⇒ // good
+ case "cancel-received" ⇒ // good
}
}
val assertError = () ⇒ {
@@ -163,12 +177,12 @@ class CoupledTerminationFlowSpec extends StreamSpec with ScriptedTest {
}
val assertErrorAndCancel = () ⇒ {
probe.expectMsgPF() {
- case Failure(ex) ⇒ // good
- case "cancel-received" ⇒ // good
+ case Failure(ex) ⇒ // good
+ case "cancel-received" ⇒ // good
}
probe.expectMsgPF() {
- case Failure(ex) ⇒ // good
- case "cancel-received" ⇒ // good
+ case Failure(ex) ⇒ // good
+ case "cancel-received" ⇒ // good
}
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala
index 64fce5fd20..4b69a19836 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/CoupledTerminationFlow.scala
@@ -56,6 +56,7 @@ object CoupledTerminationFlow {
*
* The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering.
*/
+ @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ..., Keep.both())` instead", "2.5.2")
def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
- akka.stream.scaladsl.CoupledTerminationFlow.fromSinkAndSource(in.asScala, out.asScala).asJava
+ akka.stream.scaladsl.Flow.fromSinkAndSourceCoupledMat(in, out)(akka.stream.scaladsl.Keep.both).asJava
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 913d2c590a..4153bd3351 100644
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -55,18 +55,98 @@ object Flow {
}
/**
- * Helper to create `Flow` from a `Sink`and a `Source`.
+ * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
+ * will be sent to the Sink and the Flow's output will come from the Source.
+ *
+ * The completion of the Sink and Source sides of a Flow constructed using
+ * this method are independent. So if the Sink receives a completion signal,
+ * the Source side will remain unaware of that. If you are looking to couple
+ * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupled` instead.
*/
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
new Flow(scaladsl.Flow.fromSinkAndSourceMat(sink, source)(scaladsl.Keep.none))
/**
- * Helper to create `Flow` from a `Sink`and a `Source`.
+ * Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
+ * will be sent to the Sink and the Flow's output will come from the Source.
+ *
+ * The completion of the Sink and Source sides of a Flow constructed using
+ * this method are independent. So if the Sink receives a completion signal,
+ * the Source side will remain unaware of that. If you are looking to couple
+ * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupledMat` instead.
+ *
+ * The `combine` function is used to compose the materialized values of the `sink` and `source`
+ * into the materialized value of the resulting [[Flow]].
*/
def fromSinkAndSourceMat[I, O, M1, M2, M](
sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2],
combine: function.Function2[M1, M2, M]): Flow[I, O, M] =
new Flow(scaladsl.Flow.fromSinkAndSourceMat(sink, source)(combinerToScala(combine)))
+
+ /**
+ * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
+ * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages.
+ *
+ * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
+ * however the Sink will also be completed. The table below illustrates the effects in detail:
+ *
+ *
+ *
+ * | Returned Flow |
+ * Sink (in) |
+ * Source (out) |
+ *
+ *
+ * | cause: upstream (sink-side) receives completion |
+ * effect: receives completion |
+ * effect: receives cancel |
+ *
+ *
+ * | cause: upstream (sink-side) receives error |
+ * effect: receives error |
+ * effect: receives cancel |
+ *
+ *
+ * | cause: downstream (source-side) receives cancel |
+ * effect: completes |
+ * effect: receives cancel |
+ *
+ *
+ * | effect: cancels upstream, completes downstream |
+ * effect: completes |
+ * cause: signals complete |
+ *
+ *
+ * | effect: cancels upstream, errors downstream |
+ * effect: receives error |
+ * cause: signals error or throws |
+ *
+ *
+ * | effect: cancels upstream, completes downstream |
+ * cause: cancels |
+ * effect: receives cancel |
+ *
+ *
+ *
+ */
+ def fromSinkAndSourceCoupled[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
+ new Flow(scaladsl.Flow.fromSinkAndSourceCoupled(sink, source))
+
+ /**
+ * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
+ * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages.
+ *
+ * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
+ * however the Sink will also be completed. The table on [[Flow.fromSinkAndSourceCoupled]]
+ * illustrates the effects in detail.
+ *
+ * The `combine` function is used to compose the materialized values of the `sink` and `source`
+ * into the materialized value of the resulting [[Flow]].
+ */
+ def fromSinkAndSourceCoupledMat[I, O, M1, M2, M](
+ sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2],
+ combine: function.Function2[M1, M2, M]): Flow[I, O, M] =
+ new Flow(scaladsl.Flow.fromSinkAndSourceCoupledMat(sink, source)(combinerToScala(combine)))
}
/** Create a `Flow` which can process elements of type `T`. */
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala
index 60236ad3ac..d4fe7cf6c9 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/CoupledTerminationFlow.scala
@@ -59,15 +59,9 @@ object CoupledTerminationFlow {
*
* The order in which the `in` and `out` sides receive their respective completion signals is not defined, do not rely on its ordering.
*/
+ @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2")
def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] =
- // format: OFF
- Flow.fromGraph(GraphDSL.create(in, out)(Keep.both) { implicit b => (i, o) =>
- import GraphDSL.Implicits._
- val bidi = b.add(new CoupledTerminationBidi[I, O])
- /* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */
- FlowShape(bidi.in1, bidi.out2)
- })
- // format: ON
+ Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
}
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index 56225be88a..7e17fef307 100644
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -315,6 +315,11 @@ object Flow {
/**
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
* will be sent to the Sink and the Flow's output will come from the Source.
+ *
+ * The completion of the Sink and Source sides of a Flow constructed using
+ * this method are independent. So if the Sink receives a completion signal,
+ * the Source side will remain unaware of that. If you are looking to couple
+ * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupled` instead.
*/
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
fromSinkAndSourceMat(sink, source)(Keep.none)
@@ -323,11 +328,86 @@ object Flow {
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
* will be sent to the Sink and the Flow's output will come from the Source.
*
+ * The completion of the Sink and Source sides of a Flow constructed using
+ * this method are independent. So if the Sink receives a completion signal,
+ * the Source side will remain unaware of that. If you are looking to couple
+ * the termination signals of the two sides use `Flow.fromSinkAndSourceCoupledMat` instead.
+ *
* The `combine` function is used to compose the materialized values of the `sink` and `source`
* into the materialized value of the resulting [[Flow]].
*/
def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] =
fromGraph(GraphDSL.create(sink, source)(combine) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) })
+
+ /**
+ * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
+ * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages.
+ *
+ * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
+ * however the Sink will also be completed. The table below illustrates the effects in detail:
+ *
+ *
+ *
+ * | Returned Flow |
+ * Sink (in) |
+ * Source (out) |
+ *
+ *
+ * | cause: upstream (sink-side) receives completion |
+ * effect: receives completion |
+ * effect: receives cancel |
+ *
+ *
+ * | cause: upstream (sink-side) receives error |
+ * effect: receives error |
+ * effect: receives cancel |
+ *
+ *
+ * | cause: downstream (source-side) receives cancel |
+ * effect: completes |
+ * effect: receives cancel |
+ *
+ *
+ * | effect: cancels upstream, completes downstream |
+ * effect: completes |
+ * cause: signals complete |
+ *
+ *
+ * | effect: cancels upstream, errors downstream |
+ * effect: receives error |
+ * cause: signals error or throws |
+ *
+ *
+ * | effect: cancels upstream, completes downstream |
+ * cause: cancels |
+ * effect: receives cancel |
+ *
+ *
+ *
+ */
+ def fromSinkAndSourceCoupled[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
+ fromSinkAndSourceCoupledMat(sink, source)(Keep.none)
+
+ /**
+ * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
+ * Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages.
+ *
+ * E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
+ * however the Sink will also be completed. The table on [[Flow.fromSinkAndSourceCoupled]]
+ * illustrates the effects in detail.
+ *
+ * The `combine` function is used to compose the materialized values of the `sink` and `source`
+ * into the materialized value of the resulting [[Flow]].
+ */
+ def fromSinkAndSourceCoupledMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] =
+ // format: OFF
+ Flow.fromGraph(GraphDSL.create(sink, source)(combine) { implicit b => (i, o) =>
+ import GraphDSL.Implicits._
+ val bidi = b.add(new CoupledTerminationBidi[I, O])
+ /* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */
+ FlowShape(bidi.in1, bidi.out2)
+ })
+ // format: ON
}
object RunnableGraph {