=doc #16740 add missing \n to examples, test if really works

This commit is contained in:
Konrad Malawski 2015-01-28 18:02:07 +01:00
parent 580ba8c484
commit 286c08e848
3 changed files with 66 additions and 26 deletions

View file

@ -4,6 +4,7 @@
package docs.stream
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorSystem
import akka.stream.scaladsl.Concat
@ -12,7 +13,9 @@ import akka.stream.scaladsl.FlowGraphImplicits
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.UndefinedSink
import akka.stream.scaladsl.UndefinedSource
import akka.stream.stage.{ PushStage, Directive, Context, PushPullStage }
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.ByteString
import cookbook.RecipeParseLines
@ -29,6 +32,9 @@ class StreamTcpDocSpec extends AkkaSpec {
implicit val mat = ActorFlowMaterializer()
//#setup
// silence sysout
def println(s: String) = ()
val localhost = new InetSocketAddress("127.0.0.1", 8888)
"simple server connection" ignore {
@ -53,29 +59,10 @@ class StreamTcpDocSpec extends AkkaSpec {
//#echo-server-simple-handle
}
"simple repl client" ignore {
val sys: ActorSystem = ???
"actually working client-server CLI app" in {
val serverProbe = TestProbe()
//#repl-client
val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
val repl = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.map {
case "q" =>
sys.shutdown(); ByteString("BYE")
case text => ByteString(s"$text")
}
connection.handleWith(repl)
//#repl-client
}
"initial server banner echo server" ignore {
val binding = StreamTcp().bind(localhost)
//#welcome-banner-chat-server
binding.connections runForeach { connection =>
@ -86,14 +73,27 @@ class StreamTcpDocSpec extends AkkaSpec {
val in = UndefinedSource[ByteString]
val out = UndefinedSink[ByteString]
val welcomeMsg =
s"""|Welcome to: ${connection.localAddress}!
|You are: ${connection.remoteAddress}!""".stripMargin
// server logic, parses incoming commands
val commandParser = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]): Directive = {
elem match {
case "BYE" ctx.finish()
case _ ctx.push(elem + "!")
}
}
}
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!\n"
val welcome = Source.single(ByteString(welcomeMsg))
val echo = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(_ ++ "!!!")
//#welcome-banner-chat-server
.map { command serverProbe.ref ! command; command }
//#welcome-banner-chat-server
.transform(() commandParser)
.map(_ ++ "\n")
.map(ByteString(_))
val concat = Concat[ByteString]
@ -108,7 +108,41 @@ class StreamTcpDocSpec extends AkkaSpec {
connection.handleWith(serverLogic)
}
//#welcome-banner-chat-server
val input = new AtomicReference("Hello world" :: "What a lovely day" :: Nil)
def readLine(prompt: String): String = {
input.get() match {
case all @ cmd :: tail if input.compareAndSet(all, tail) cmd
case _ "q"
}
}
//#repl-client
val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): Directive = {
elem match {
case "q" ctx.pushAndFinish(ByteString("BYE\n"))
case _ ctx.push(ByteString(s"$elem\n"))
}
}
}
val repl = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
//#repl-client
.transform(() replParser)
connection.handleWith(repl)
//#repl-client
serverProbe.expectMsg("Hello world")
serverProbe.expectMsg("What a lovely day")
serverProbe.expectMsg("BYE")
}
}

View file

@ -16,6 +16,8 @@ To extend the available transformations on a :class:`Flow` or :class:`Source` on
which takes a factory function returning a :class:`Stage`. Stages come in different flavors swhich we will introduce in this
page.
.. _stream-using-push-pull-stage-scala:
Using PushPullStage
-------------------

View file

@ -88,4 +88,8 @@ we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as i
exposing exactly one :class:`UndefinedSink` and exactly one :class:`UndefinedSource` which will be connected to the TCP
pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then
continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex
logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers.
logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers.
In this example both client and server may need to close the stream based on a parsed command command - ``BYE`` in the case
of the server, and ``q`` in the case of the client. This is implemented by using a custom :class:`PushStage`
(see :ref:`stream-using-push-pull-stage-scala`) which completes the stream once it encounters such command.