diff --git a/akka-docs/src/main/paradox/images/fromSinkAndSource.png b/akka-docs/src/main/paradox/images/fromSinkAndSource.png new file mode 100644 index 0000000000..59a4d6f468 Binary files /dev/null and b/akka-docs/src/main/paradox/images/fromSinkAndSource.png differ diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSource.md b/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSource.md index a09e006eb5..b7d08ed73b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSource.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSource.md @@ -4,19 +4,65 @@ Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sen @ref[Flow operators composed of Sinks and Sources](../index.md#flow-operators-composed-of-sinks-and-sources) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #fromSinkAndSource } - -@@@ +@apidoc[Flow.fromSinkAndSource](Flow$) { scala="#fromSinkAndSource[I,O](sink:akka.stream.Graph[akka.stream.SinkShape[I],_],source:akka.stream.Graph[akka.stream.SourceShape[O],_]):akka.stream.scaladsl.Flow[I,O,akka.NotUsed]" java="#fromSinkAndSource(akka.stream.Graph,akka.stream.Graph)" } ## Description -Creates a `Flow` from a `Sink` and a `Source` where the Flow's input will be sent to the `Sink` -and the `Flow` 's output will come from the Source. +Diagram -Note that termination events, like completion and cancelation is not automatically propagated through to the "other-side" -of the such-composed Flow. Use `Flow.fromSinkAndSourceCoupled` if you want to couple termination of both of the ends, -for example most useful in handling websocket connections. +`fromSinkAndSource` combines a separate `Sink` and `Source` into a `Flow`. + +Useful in many cases where an API requires a `Flow` but you want to provide a `Sink` and `Source` whose flows of elements are decoupled. + +Note that termination events, like completion and cancellation, are not automatically propagated through to the "other-side" of the such-composed `Flow`. The `Source` can complete and the sink will continue to accept elements. + +Use @ref:[fromSinkAndSourceCoupled](fromSinkAndSourceCoupled.md) if you want to couple termination of both of the ends. + +## Examples + +One use case is constructing a TCP server where requests and responses do not map 1:1 (like it does in the @ref[Echo TCP server sample](../../stream-io.md) where every incoming test is echoed back) but allows separate flows of elements from the client to the server and from the server to the client. + +This example `cancel`s the incoming stream, not allowing the client to write more messages, switching the TCP connection to "half-closed", but keeps streaming periodic output to the client: + +Scala +: @@snip [FromSinkAndSource.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/FromSinkAndSource.scala) { #halfClosedTcpServer } + +Java +: @@snip [FromSinkAndSource.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/FromSinkAndSource.java) { #halfClosedTcpServer } + +With this server running you could use `telnet 127.0.0.1 9999` to see a stream of timestamps being printed, one every second. + +The following sample is a little bit more advanced and uses the @apidoc[MergeHub] to dynamically merge incoming messages to a single stream which is then fed into a @apidoc[BroadcastHub] which emits elements over a dynamic set of downstreams allowing us to create a simplistic little TCP chat server in which a text entered from one client is emitted to all connected clients. + +Scala +: @@snip [FromSinkAndSource.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/FromSinkAndSource.scala) { #chat } + +Java +: @@snip [FromSinkAndSource.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/FromSinkAndSource.java) { #chat } + + +The same patterns can also be applied to @extref:[Akka HTTP WebSockets](akka.http:/server-side/websocket-support.html#server-api) which also have an API accepting a `Flow` of messages. + +If we would replace the `fromSinkAndSource` here with `fromSinkAndSourceCoupled` it would allow the client to close the connection by closing its outgoing stream. + +`fromSinkAndSource` can also be useful when testing a component that takes a `Flow` allowing for complete separate control and assertion of incoming and outgoing elements using stream testkit test probes for sink and source: + +Scala +: @@snip [FromSinkAndSource.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/FromSinkAndSource.scala) { #testing } + +Java +: @@snip [FromSinkAndSource.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/FromSinkAndSource.java) { #testing } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the `Source` emits + +**backpressures** when the `Sink` backpressures + +**completes** when the `Source` has completed and the `Sink` has cancelled. + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSourceCoupled.md b/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSourceCoupled.md index 4e4b7c5d47..03ce607c68 100644 --- a/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSourceCoupled.md +++ b/akka-docs/src/main/paradox/stream/operators/Flow/fromSinkAndSourceCoupled.md @@ -4,20 +4,15 @@ Allows coupling termination (cancellation, completion, erroring) of Sinks and So @ref[Flow operators composed of Sinks and Sources](../index.md#flow-operators-composed-of-sinks-and-sources) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #fromSinkAndSourceCoupled } - -@@@ +@apidoc[Flow.fromSinkAndSourceCoupled](Flow$) { scala="#fromSinkAndSourceCoupled[I,O](sink:akka.stream.Graph[akka.stream.SinkShape[I],_],source:akka.stream.Graph[akka.stream.SourceShape[O],_]):akka.stream.scaladsl.Flow[I,O,akka.NotUsed]" java="#fromSinkAndSourceCoupled(akka.stream.Graph,akka.stream.Graph)" } ## Description -Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow between them. -Similar to `Flow.fromSinkAndSource` however couples the termination of these two operators. +See @ref[Flow.fromSinkAndSource](fromSinkAndSource.md) for docs on the general workings and examples. -E.g. if the emitted `Flow` gets a cancellation, the `Source` is cancelled, +This operator only adds coupled termination to what `fromSinkAndSource` does: If the emitted `Flow` gets a cancellation, the `Source` is cancelled, however the Sink will also be completed. The table below illustrates the effects in detail: | Returned Flow | Sink (in) | Source (out) | diff --git a/akka-docs/src/test/java/jdocs/stream/operators/flow/FromSinkAndSource.java b/akka-docs/src/test/java/jdocs/stream/operators/flow/FromSinkAndSource.java new file mode 100644 index 0000000000..e5fa69cf7b --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/flow/FromSinkAndSource.java @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package jdocs.stream.operators.flow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.actor.Cancellable; +import akka.japi.Pair; +import akka.stream.javadsl.*; +import akka.stream.testkit.TestPublisher; +import akka.stream.testkit.TestSubscriber; +import akka.util.ByteString; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import java.time.Duration; +import java.util.concurrent.CompletionStage; + +public class FromSinkAndSource { + + void halfClosedTcpServer() { + ActorSystem system = null; + // #halfClosedTcpServer + // close in immediately + Sink sink = Sink.cancelled(); + // periodic tick out + Source source = + Source.tick(Duration.ofSeconds(1), Duration.ofSeconds(1), "tick") + .map(tick -> ByteString.fromString(System.currentTimeMillis() + "\n")); + + Flow serverFlow = Flow.fromSinkAndSource(sink, source); + + Source> connectionStream = + Tcp.get(system).bind("127.0.0.1", 9999); + + connectionStream.runForeach( + incomingConnection -> incomingConnection.handleWith(serverFlow, system), system); + // #halfClosedTcpServer + } + + void chat() { + ActorSystem system = null; + // #chat + Pair, Source> pair = + MergeHub.of(String.class).toMat(BroadcastHub.of(String.class), Keep.both()).run(system); + Sink sink = pair.first(); + Source source = pair.second(); + + Flow framing = + Framing.delimiter(ByteString.fromString("\n"), 1024); + + Sink sinkWithFraming = + framing.map(bytes -> bytes.utf8String()).to(pair.first()); + Source sourceWithFraming = + source.map(text -> ByteString.fromString(text + "\n")); + + Flow serverFlow = + Flow.fromSinkAndSource(sinkWithFraming, sourceWithFraming); + + Tcp.get(system) + .bind("127.0.0.1", 9999) + .runForeach( + incomingConnection -> incomingConnection.handleWith(serverFlow, system), system); + // #chat + } + + void myApiThatTakesAFlow(Flow flow) { + throw new UnsupportedOperationException(); + } + + void testing() { + ActorSystem system = null; + // #testing + TestSubscriber.Probe inProbe = TestSubscriber.probe(system); + TestPublisher.Probe outProbe = TestPublisher.probe(0, system); + Flow testFlow = + Flow.fromSinkAndSource(Sink.fromSubscriber(inProbe), Source.fromPublisher(outProbe)); + + myApiThatTakesAFlow(testFlow); + inProbe.expectNext("first"); + outProbe.expectRequest(); + outProbe.sendError(new RuntimeException("test error")); + // ... + // #testing + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/flow/FromSinkAndSource.scala b/akka-docs/src/test/scala/docs/stream/operators/flow/FromSinkAndSource.scala new file mode 100644 index 0000000000..3b1bfabb03 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/flow/FromSinkAndSource.scala @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package docs.stream.operators.flow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.BroadcastHub +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Framing +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.MergeHub +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Tcp +import akka.stream.testkit.TestPublisher +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.Utils.TE +import akka.stream.testkit.scaladsl.TestSource +import akka.stream.testkit.scaladsl.TestSink +import akka.util.ByteString + +import scala.concurrent.duration._ + +object FromSinkAndSource { + + implicit val system: ActorSystem = ??? + + def halfClosedTcpServer(): Unit = { + // #halfClosedTcpServer + // close in immediately + val sink = Sink.cancelled[ByteString] + // periodic tick out + val source = + Source.tick(1.second, 1.second, "tick").map(_ => ByteString(System.currentTimeMillis().toString + "\n")) + + val serverFlow = Flow.fromSinkAndSource(sink, source) + + Tcp().bind("127.0.0.1", 9999).runForeach { incomingConnection => + incomingConnection.handleWith(serverFlow) + } + // #halfClosedTcpServer + } + + def chat(): Unit = { + // #chat + val (sink, source) = MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run() + + val framing = Framing.delimiter(ByteString("\n"), 1024) + + val sinkWithFraming = framing.map(bytes => bytes.utf8String).to(sink) + val sourceWithFraming = source.map(text => ByteString(text + "\n")) + + val serverFlow = Flow.fromSinkAndSource(sinkWithFraming, sourceWithFraming) + + Tcp().bind("127.0.0.1", 9999).runForeach { incomingConnection => + incomingConnection.handleWith(serverFlow) + } + // #chat + } + + def testing(): Unit = { + def myApiThatTakesAFlow[In, Out](flow: Flow[In, Out, NotUsed]): Unit = ??? + // #testing + val inProbe = TestSubscriber.probe[String] + val outProbe = TestPublisher.probe[String]() + val testFlow = Flow.fromSinkAndSource(Sink.fromSubscriber(inProbe), Source.fromPublisher(outProbe)) + + myApiThatTakesAFlow(testFlow) + inProbe.expectNext("first") + outProbe.expectRequest() + outProbe.sendError(new RuntimeException("test error")) + // ... + // #testing + } +}