diff --git a/akka-docs-dev/rst/java/stream-io.rst b/akka-docs-dev/rst/java/stream-io.rst new file mode 100644 index 0000000000..9b65b9b21d --- /dev/null +++ b/akka-docs-dev/rst/java/stream-io.rst @@ -0,0 +1,95 @@ +.. _stream-io-java: + +######################### +Working with streaming IO +######################### + +Akka Streams provides a way of handling TCP connections with Streams. +While the general approach is very similar to the `Actor based TCP handling`_ using Akka IO, +by using Akka Streams you are freed of having to manually react to back-pressure signals, +as the library does it transparently for you. + +.. _Actor based TCP handling: http://doc.akka.io/docs/akka/current/java/io-tcp.html + +Accepting connections: Echo Server +================================== +In order to implement a simple EchoServer we ``bind`` to a given address, which returns a ``Source[IncomingConnection]``, +which will emit an :class:`IncomingConnection` element for each new connection that the Server should handle: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#echo-server-simple-bind + +Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage +to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily +correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``parseLines`` +recipe from the :ref:`cookbook-parse-lines-java` Akka Streams Cookbook recipe to chunk the inputs up into actual lines of text. +In this example we simply add exclamation marks to each incoming text message and push it through the flow: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#echo-server-simple-handle + +Notice that while most building blocks in Akka Streams are reusable and freely shareable, this is *not* the case for the +incoming connection Flow, since it directly corresponds to an existing, already accepted connection its handling can +only ever be materialized *once*. + +Closing connections is possible by cancelling the *incoming connection* :class:`Flow` from your server logic (e.g. by +connecting its downstream to an :class:`CancelledSink` and its upstream to a *completed* :class:`Source`). +It is also possible to shut down the servers socket by cancelling the ``connections:Source[IncomingConnection]``. + +We can then test the TCP server by sending data to the TCP Socket using ``netcat``: + +:: + + $ echo -n "Hello World" | netcat 127.0.0.1 8889 + Hello World!!! + +Connecting: REPL Client +======================= +In this example we implement a rather naive Read Evaluate Print Loop client over TCP. +Let's say we know a server has exposed a simple command line interface over TCP, +and would like to interact with it using Akka Streams over TCP. To open an outgoing connection socket we use +the ``outgoingConnection`` method: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#repl-client + +The ``repl`` flow we use to handle the server interaction first prints the servers response, then awaits on input from +the command line (this blocking call is used here just for the sake of simplicity) and converts it to a +:class:`ByteString` which is then sent over the wire to the server. Then we simply connect the TCP pipeline to this +processing stage–at this point it will be materialized and start processing data once the server responds with +an *initial message*. + +A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into +a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time, +these improvements however are left as exercise for the reader. + +Avoiding deadlocks and liveness issues in back-pressured cycles +=============================================================== +When writing such end-to-end back-pressured systems you may sometimes end up in a situation of a loop, +in which *either side is waiting for the other one to start the conversation*. One does not need to look far +to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we +are connecting to would start the conversation, which effectively means both sides are back-pressured and can not get +the conversation started. There are multiple ways of dealing with this which are explained in depth in :ref:`graph-cycles-java`, +however in client-server scenarios it is often the simplest to make either side simply send an initial message. + +.. note:: + In case of back-pressured cycles (which can occur even between different systems) sometimes you have to decide + which of the sides has start the conversation in order to kick it off. This can be often done by injecting an + initial message from one of the sides–a conversation starter. + +To break this back-pressure cycle we need to inject some initial message, a "conversation starter". +First, we need to decide which side of the connection should remain passive and which active. +Thankfully in most situations finding the right spot to start the conversation is rather simple, as it often is inherent +to the protocol we are trying to implement using Streams. In chat-like applications, which our examples resemble, +it makes sense to make the Server initiate the conversation by emitting a "hello" message: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamTcpDocTest.java#welcome-banner-chat-server + +The way we constructed a :class:`Flow` using a :class:`PartialFlowGraph` is explained in detail in +:ref:`constructing-sources-sinks-flows-from-partial-graphs-java`, however the basic concepts is rather simple– +we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as it exposes the same interface, which means +exposing exactly one :class:`UndefinedSink` and exactly one :class:`UndefinedSource` which will be connected to the TCP +pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then +continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex +logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers. + +In this example both client and server may need to close the stream based on a parsed command command - ``BYE`` in the case +of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`PushStage` +(see :ref:`stream-using-push-pull-stage-java`) which completes the stream once it encounters such command. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala index feff147e66..6a0ac1206f 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala @@ -5,15 +5,17 @@ package docs.stream import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference - import akka.actor.ActorSystem +import akka.stream.ActorFlowMaterializer import akka.stream.scaladsl.Concat import akka.stream.scaladsl.Flow import akka.stream.scaladsl.FlowGraphImplicits import akka.stream.scaladsl.Source +import akka.stream.scaladsl.StreamTcp +import akka.stream.scaladsl.StreamTcp._ import akka.stream.scaladsl.UndefinedSink import akka.stream.scaladsl.UndefinedSource -import akka.stream.stage.{ PushStage, Directive, Context, PushPullStage } +import akka.stream.stage.{ PushStage, Directive, Context } import akka.stream.testkit.AkkaSpec import akka.testkit.TestProbe import akka.util.ByteString @@ -22,15 +24,7 @@ import cookbook.RecipeParseLines class StreamTcpDocSpec extends AkkaSpec { implicit val ec = system.dispatcher - - //#setup - import akka.stream.ActorFlowMaterializer - import akka.stream.scaladsl.StreamTcp - import akka.stream.scaladsl.StreamTcp._ - - implicit val sys = ActorSystem("stream-tcp-system") implicit val mat = ActorFlowMaterializer() - //#setup // silence sysout def println(s: String) = () @@ -51,7 +45,7 @@ class StreamTcpDocSpec extends AkkaSpec { val echo = Flow[ByteString] .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) - .map(_ ++ "!!!\n") + .map(_ + "!!!\n") .map(ByteString(_)) connection.handleWith(echo) @@ -93,7 +87,7 @@ class StreamTcpDocSpec extends AkkaSpec { .map { command ⇒ serverProbe.ref ! command; command } //#welcome-banner-chat-server .transform(() ⇒ commandParser) - .map(_ ++ "\n") + .map(_ + "\n") .map(ByteString(_)) val concat = Concat[ByteString] @@ -135,7 +129,6 @@ class StreamTcpDocSpec extends AkkaSpec { .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) .map(text => println("Server: " + text)) .map(_ => readLine("> ")) - //#repl-client .transform(() ⇒ replParser) connection.handleWith(repl) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala index bc16fd9793..e95ac181bd 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala @@ -63,25 +63,23 @@ object RecipeParseLines { // No matching character, we need to accumulate more bytes into the buffer nextPossibleMatch = buffer.size parsedLinesSoFar + } else if (possibleMatchPos + separatorBytes.size > buffer.size) { + // We have found a possible match (we found the first character of the terminator + // sequence) but we don't have yet enough bytes. We remember the position to + // retry from next time. + nextPossibleMatch = possibleMatchPos + parsedLinesSoFar } else { - if (possibleMatchPos + separatorBytes.size > buffer.size) { - // We have found a possible match (we found the first character of the terminator - // sequence) but we don't have yet enough bytes. We remember the position to - // retry from next time. - nextPossibleMatch = possibleMatchPos - parsedLinesSoFar + if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) + == separatorBytes) { + // Found a match + val parsedLine = buffer.slice(0, possibleMatchPos).utf8String + buffer = buffer.drop(possibleMatchPos + separatorBytes.size) + nextPossibleMatch -= possibleMatchPos + separatorBytes.size + doParse(parsedLinesSoFar :+ parsedLine) } else { - if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) - == separatorBytes) { - // Found a match - val parsedLine = buffer.slice(0, possibleMatchPos).utf8String - buffer = buffer.drop(possibleMatchPos + separatorBytes.size) - nextPossibleMatch -= possibleMatchPos + separatorBytes.size - doParse(parsedLinesSoFar :+ parsedLine) - } else { - nextPossibleMatch += 1 - doParse(parsedLinesSoFar) - } + nextPossibleMatch += 1 + doParse(parsedLinesSoFar) } } @@ -90,4 +88,4 @@ object RecipeParseLines { } //#parse-lines -} \ No newline at end of file +}