diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala index abaf19eb69..feff147e66 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala @@ -4,6 +4,7 @@ package docs.stream import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference import akka.actor.ActorSystem import akka.stream.scaladsl.Concat @@ -12,7 +13,9 @@ import akka.stream.scaladsl.FlowGraphImplicits import akka.stream.scaladsl.Source import akka.stream.scaladsl.UndefinedSink import akka.stream.scaladsl.UndefinedSource +import akka.stream.stage.{ PushStage, Directive, Context, PushPullStage } import akka.stream.testkit.AkkaSpec +import akka.testkit.TestProbe import akka.util.ByteString import cookbook.RecipeParseLines @@ -29,6 +32,9 @@ class StreamTcpDocSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() //#setup + // silence sysout + def println(s: String) = () + val localhost = new InetSocketAddress("127.0.0.1", 8888) "simple server connection" ignore { @@ -53,29 +59,10 @@ class StreamTcpDocSpec extends AkkaSpec { //#echo-server-simple-handle } - "simple repl client" ignore { - val sys: ActorSystem = ??? + "actually working client-server CLI app" in { + val serverProbe = TestProbe() - //#repl-client - val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost) - - val repl = Flow[ByteString] - .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) - .map(text => println("Server: " + text)) - .map(_ => readLine("> ")) - .map { - case "q" => - sys.shutdown(); ByteString("BYE") - case text => ByteString(s"$text") - } - - connection.handleWith(repl) - //#repl-client - } - - "initial server banner echo server" ignore { val binding = StreamTcp().bind(localhost) - //#welcome-banner-chat-server binding.connections runForeach { connection => @@ -86,14 +73,27 @@ class StreamTcpDocSpec extends AkkaSpec { val in = UndefinedSource[ByteString] val out = UndefinedSink[ByteString] - val welcomeMsg = - s"""|Welcome to: ${connection.localAddress}! - |You are: ${connection.remoteAddress}!""".stripMargin + // server logic, parses incoming commands + val commandParser = new PushStage[String, String] { + override def onPush(elem: String, ctx: Context[String]): Directive = { + elem match { + case "BYE" ⇒ ctx.finish() + case _ ⇒ ctx.push(elem + "!") + } + } + } + + import connection._ + val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!\n" val welcome = Source.single(ByteString(welcomeMsg)) val echo = Flow[ByteString] .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) - .map(_ ++ "!!!") + //#welcome-banner-chat-server + .map { command ⇒ serverProbe.ref ! command; command } + //#welcome-banner-chat-server + .transform(() ⇒ commandParser) + .map(_ ++ "\n") .map(ByteString(_)) val concat = Concat[ByteString] @@ -108,7 +108,41 @@ class StreamTcpDocSpec extends AkkaSpec { connection.handleWith(serverLogic) } + //#welcome-banner-chat-server + val input = new AtomicReference("Hello world" :: "What a lovely day" :: Nil) + def readLine(prompt: String): String = { + input.get() match { + case all @ cmd :: tail if input.compareAndSet(all, tail) ⇒ cmd + case _ ⇒ "q" + } + } + + //#repl-client + val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost) + + val replParser = new PushStage[String, ByteString] { + override def onPush(elem: String, ctx: Context[ByteString]): Directive = { + elem match { + case "q" ⇒ ctx.pushAndFinish(ByteString("BYE\n")) + case _ ⇒ ctx.push(ByteString(s"$elem\n")) + } + } + } + + val repl = Flow[ByteString] + .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) + .map(text => println("Server: " + text)) + .map(_ => readLine("> ")) + //#repl-client + .transform(() ⇒ replParser) + + connection.handleWith(repl) + //#repl-client + + serverProbe.expectMsg("Hello world") + serverProbe.expectMsg("What a lovely day") + serverProbe.expectMsg("BYE") } } diff --git a/akka-docs-dev/rst/scala/stream-customize.rst b/akka-docs-dev/rst/scala/stream-customize.rst index bdcba9203f..82ff1571f7 100644 --- a/akka-docs-dev/rst/scala/stream-customize.rst +++ b/akka-docs-dev/rst/scala/stream-customize.rst @@ -16,6 +16,8 @@ To extend the available transformations on a :class:`Flow` or :class:`Source` on which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this page. +.. _stream-using-push-pull-stage-scala: + Using PushPullStage ------------------- diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst index fd8018858a..6af31277df 100644 --- a/akka-docs-dev/rst/scala/stream-io.rst +++ b/akka-docs-dev/rst/scala/stream-io.rst @@ -88,4 +88,8 @@ we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as i exposing exactly one :class:`UndefinedSink` and exactly one :class:`UndefinedSource` which will be connected to the TCP pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex -logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers. \ No newline at end of file +logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers. + +In this example both client and server may need to close the stream based on a parsed command command - ``BYE`` in the case +of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`PushStage` +(see :ref:`stream-using-push-pull-stage-scala`) which completes the stream once it encounters such command.