+doc #16549 StreamTCP docs

This commit is contained in:
Konrad 'ktoso' Malawski 2014-12-22 16:18:26 +01:00
parent f9ab2e42c5
commit e98af843ae
16 changed files with 305 additions and 108 deletions

View file

@ -22,7 +22,7 @@ class HttpServerExampleSpec
implicit val materializer = FlowMaterializer()
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
serverBinding.connections.foreach { connection // foreach materializes the source
serverBinding.connections.foreach { connection => // foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
}
//#bind-example
@ -41,15 +41,15 @@ class HttpServerExampleSpec
import akka.http.model.HttpMethods._
import akka.stream.scaladsl.Flow
val requestHandler: HttpRequest HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _)
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(
entity = HttpEntity(MediaTypes.`text/html`,
"<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!")
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!")
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => sys.error("BOOM!")
case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
}
serverBinding.connections foreach { connection =>

View file

@ -47,11 +47,11 @@ object ActorSubscriberDocSpec {
}
def receive = {
case OnNext(Msg(id, replyTo))
case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
case Reply(id)
case Reply(id) =>
queue(id) ! Done(id)
queue -= id
}
@ -60,7 +60,7 @@ object ActorSubscriberDocSpec {
class Worker extends Actor {
import WorkerPool._
def receive = {
case Work(id)
case Work(id) =>
// ...
sender() ! Reply(id)
}

View file

@ -12,7 +12,6 @@ import akka.stream.testkit.AkkaSpec
import concurrent.Future
// TODO replace with => and disable this intellij setting
class FlowDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
@ -26,10 +25,10 @@ class FlowDocSpec extends AkkaSpec {
"source is immutable" in {
//#source-immutable
val source = Source(1 to 10)
source.map(_ 0) // has no effect on source, since it's immutable
source.map(_ => 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ 0) // returns new Source[Int], with `map()` appended
val zeroes = source.map(_ => 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0
//#source-immutable
}
@ -66,12 +65,12 @@ class FlowDocSpec extends AkkaSpec {
import scala.concurrent.duration._
case object Tick
val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () Tick)
val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel()
val timerMap = timer.map(tick "tick")
val timerMap = timer.map(tick => "tick")
val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable!
//#compound-source-is-not-keyed-runWith

View file

@ -18,7 +18,6 @@ import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.duration._
// TODO replace with => and disable this intellij setting
class FlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
@ -28,7 +27,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph" in {
//format: OFF
//#simple-flow-graph
val g = FlowGraph { implicit b
val g = FlowGraph { implicit b =>
import FlowGraphImplicits._
val in = Source(1 to 10)
val out = Sink.ignore
@ -51,7 +50,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph without implicits" in {
//#simple-flow-graph-no-implicits
val g = FlowGraph { b
val g = FlowGraph { b =>
val in = Source(1 to 10)
val out = Sink.ignore
@ -75,7 +74,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"flow connection errors" in {
intercept[IllegalArgumentException] {
//#simple-graph
FlowGraph { implicit b
FlowGraph { implicit b =>
import FlowGraphImplicits._
val source1 = Source(1 to 10)
val source2 = Source(1 to 10)
@ -102,7 +101,7 @@ class FlowGraphDocSpec extends AkkaSpec {
// format: OFF
val g =
//#flow-graph-reusing-a-flow
FlowGraph { implicit b
FlowGraph { implicit b =>
import FlowGraphImplicits._
val broadcast = Broadcast[Int]
Source.single(1) ~> broadcast

View file

@ -21,7 +21,6 @@ import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
// TODO replace with => and disable this intellij setting
class StreamPartialFlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
@ -39,7 +38,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val in3 = UndefinedSource[Int]
val out = UndefinedSink[Int]
val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b
val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b =>
import FlowGraphImplicits._
val zip1 = ZipWith[Int, Int, Int](math.max _)
@ -57,7 +56,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val resultSink = Sink.head[Int]
val g = FlowGraph { b
val g = FlowGraph { b =>
// import the partial flow graph explicitly
b.importPartialFlowGraph(pickMaxOfThree)
@ -74,7 +73,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val g2 =
//#simple-partial-flow-graph-import-shorthand
FlowGraph(pickMaxOfThree) { b
FlowGraph(pickMaxOfThree) { b =>
b.attachSource(in1, Source.single(1))
b.attachSource(in2, Source.single(2))
b.attachSource(in3, Source.single(3))
@ -88,13 +87,13 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build source from partial flow graph" in {
//#source-from-partial-flow-graph
val pairs: Source[(Int, Int)] = Source() { implicit b
val pairs: Source[(Int, Int)] = Source() { implicit b =>
import FlowGraphImplicits._
// prepare graph elements
val undefinedSink = UndefinedSink[(Int, Int)]
val zip = Zip[Int, Int]
def ints = Source(() Iterator.from(1))
def ints = Source(() => Iterator.from(1))
// connect the graph
ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.left
@ -112,7 +111,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build flow from partial flow graph" in {
//#flow-from-partial-flow-graph
val pairUpWithToString = Flow() { implicit b
val pairUpWithToString = Flow() { implicit b =>
import FlowGraphImplicits._
// prepare graph elements

View file

@ -0,0 +1,114 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.stream.scaladsl.Concat
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.FlowGraphImplicits
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.UndefinedSink
import akka.stream.scaladsl.UndefinedSource
import akka.stream.testkit.AkkaSpec
import akka.util.ByteString
import cookbook.RecipeParseLines
class StreamTcpDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
//#setup
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.StreamTcp
import akka.stream.scaladsl.StreamTcp._
implicit val sys = ActorSystem("stream-tcp-system")
implicit val mat = FlowMaterializer()
//#setup
val localhost = new InetSocketAddress("127.0.0.1", 8888)
"simple server connection" ignore {
//#echo-server-simple-bind
val localhost = new InetSocketAddress("127.0.0.1", 8888)
val binding = StreamTcp().bind(localhost)
//#echo-server-simple-bind
//#echo-server-simple-handle
val connections: Source[IncomingConnection] = binding.connections
connections foreach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(_ ++ "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
//#echo-server-simple-handle
}
"simple repl client" ignore {
val sys: ActorSystem = ???
//#repl-client
val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
val repl = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.map {
case "q" =>
sys.shutdown(); ByteString("BYE")
case text => ByteString(s"$text")
}
connection.handleWith(repl)
//#repl-client
}
"initial server banner echo server" ignore {
val binding = StreamTcp().bind(localhost)
//#welcome-banner-chat-server
binding.connections foreach { connection =>
val serverLogic = Flow() { implicit b =>
import FlowGraphImplicits._
// to be filled in by StreamTCP
val in = UndefinedSource[ByteString]
val out = UndefinedSink[ByteString]
val welcomeMsg =
s"""|Welcome to: ${connection.localAddress}!
|You are: ${connection.remoteAddress}!""".stripMargin
val welcome = Source.single(ByteString(welcomeMsg))
val echo = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(_ ++ "!!!")
.map(ByteString(_))
val concat = Concat[ByteString]
// first we emit the welcome message,
welcome ~> concat.first
// then we continue using the echo-logic Flow
in ~> echo ~> concat.second
concat.out ~> out
(in, out)
}
connection.handleWith(serverLogic)
}
//#welcome-banner-chat-server
}
}

View file

@ -36,7 +36,7 @@ object TwitterStreamQuickstartDocSpec {
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") Hashtag(t) }.toSet
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
//#model
@ -54,7 +54,6 @@ object TwitterStreamQuickstartDocSpec {
Nil)
}
// TODO replace with => and disable this intellij setting
class TwitterStreamQuickstartDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
@ -86,7 +85,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
trait Example3 {
//#authors-collect
val authors: Source[Author] =
tweets.collect { case t if t.hashtags.contains(Akka) t.author }
tweets.collect { case t if t.hashtags.contains(Akka) => t.author }
//#authors-collect
}
@ -118,7 +117,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
// format: OFF
//#flow-graph-broadcast
val g = FlowGraph { implicit builder
val g = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val b = Broadcast[Tweet]
@ -151,8 +150,8 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#backpressure-by-readline
val completion: Future[Unit] =
Source(1 to 10)
.map(i { println(s"map => $i"); i })
.foreach { i readLine(s"Element = $i; continue reading? [press enter]\n") }
.map(i => { println(s"map => $i"); i })
.foreach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
Await.ready(completion, 1.minute)
//#backpressure-by-readline
@ -163,17 +162,17 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#tweets-fold-count
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counter: RunnableFlow = tweets.map(t 1).to(sumSink)
val counter: RunnableFlow = tweets.map(t => 1).to(sumSink)
val map: MaterializedMap = counter.run()
val sum: Future[Int] = map.get(sumSink)
sum.map { c println(s"Total tweets processed: $c") }
sum.map { c => println(s"Total tweets processed: $c") }
//#tweets-fold-count
new AnyRef {
//#tweets-fold-count-oneline
val sum: Future[Int] = tweets.map(t 1).runWith(sumSink)
val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)
//#tweets-fold-count-oneline
}
}
@ -186,7 +185,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val counterRunnableFlow: RunnableFlow =
tweetsInMinuteFromNow
.filter(_.hashtags contains Akka)
.map(t 1)
.map(t => 1)
.to(sumSink)
// materialize the stream once in the morning
@ -204,7 +203,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sum: Future[Int] = map.get(sumSink)
sum.map { c println(s"Total tweets processed: $c") }
sum.map { c => println(s"Total tweets processed: $c") }
//#tweets-fold-count
}

View file

@ -1,6 +1,7 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.annotation.tailrec
@ -18,63 +19,11 @@ class RecipeParseLines extends RecipeSpec {
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
import akka.stream.stage._
//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
import RecipeParseLines._
val linesStream = rawData.transform(() => parseLines("\r\n", 100))
//#parse-lines
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
@ -85,3 +34,60 @@ class RecipeParseLines extends RecipeSpec {
}
}
object RecipeParseLines {
import akka.stream.stage._
//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
//#parse-lines
}