Merge pull request #16599 from ktoso/docs-io-ktoso

+doc StreamTCP docs
This commit is contained in:
Konrad Malawski 2014-12-23 11:39:02 +01:00
commit ea389e16ed
15 changed files with 302 additions and 107 deletions

View file

@ -22,7 +22,7 @@ class HttpServerExampleSpec
implicit val materializer = FlowMaterializer() implicit val materializer = FlowMaterializer()
val serverBinding = Http(system).bind(interface = "localhost", port = 8080) val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
serverBinding.connections.foreach { connection // foreach materializes the source serverBinding.connections.foreach { connection => // foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress) println("Accepted new connection from " + connection.remoteAddress)
} }
//#bind-example //#bind-example
@ -41,15 +41,15 @@ class HttpServerExampleSpec
import akka.http.model.HttpMethods._ import akka.http.model.HttpMethods._
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
val requestHandler: HttpRequest HttpResponse = { val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse( HttpResponse(
entity = HttpEntity(MediaTypes.`text/html`, entity = HttpEntity(MediaTypes.`text/html`,
"<html><body>Hello world!</body></html>")) "<html><body>Hello world!</body></html>"))
case HttpRequest(GET, Uri.Path("/ping"), _, _, _) HttpResponse(entity = "PONG!") case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => HttpResponse(entity = "PONG!")
case HttpRequest(GET, Uri.Path("/crash"), _, _, _) sys.error("BOOM!") case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => sys.error("BOOM!")
case _: HttpRequest HttpResponse(404, entity = "Unknown resource!") case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
} }
serverBinding.connections foreach { connection => serverBinding.connections foreach { connection =>

View file

@ -47,11 +47,11 @@ object ActorSubscriberDocSpec {
} }
def receive = { def receive = {
case OnNext(Msg(id, replyTo)) case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo) queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}") assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self) router.route(Work(id), self)
case Reply(id) case Reply(id) =>
queue(id) ! Done(id) queue(id) ! Done(id)
queue -= id queue -= id
} }
@ -60,7 +60,7 @@ object ActorSubscriberDocSpec {
class Worker extends Actor { class Worker extends Actor {
import WorkerPool._ import WorkerPool._
def receive = { def receive = {
case Work(id) case Work(id) =>
// ... // ...
sender() ! Reply(id) sender() ! Reply(id)
} }

View file

@ -9,7 +9,6 @@ import akka.stream.testkit.AkkaSpec
import concurrent.Future import concurrent.Future
// TODO replace with => and disable this intellij setting
class FlowDocSpec extends AkkaSpec { class FlowDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
@ -23,10 +22,10 @@ class FlowDocSpec extends AkkaSpec {
"source is immutable" in { "source is immutable" in {
//#source-immutable //#source-immutable
val source = Source(1 to 10) val source = Source(1 to 10)
source.map(_ 0) // has no effect on source, since it's immutable source.map(_ => 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55 source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ 0) // returns new Source[Int], with `map()` appended val zeroes = source.map(_ => 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0 zeroes.runWith(Sink.fold(0)(_ + _)) // 0
//#source-immutable //#source-immutable
} }
@ -77,12 +76,12 @@ class FlowDocSpec extends AkkaSpec {
import scala.concurrent.duration._ import scala.concurrent.duration._
case object Tick case object Tick
val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () Tick) val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer) val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel() timerCancel.cancel()
val timerMap = timer.map(tick "tick") val timerMap = timer.map(tick => "tick")
val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable! val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable!
//#compound-source-is-not-keyed-runWith //#compound-source-is-not-keyed-runWith

View file

@ -18,7 +18,6 @@ import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
// TODO replace with => and disable this intellij setting
class FlowGraphDocSpec extends AkkaSpec { class FlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
@ -28,7 +27,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph" in { "build simple graph" in {
//format: OFF //format: OFF
//#simple-flow-graph //#simple-flow-graph
val g = FlowGraph { implicit b val g = FlowGraph { implicit b =>
import FlowGraphImplicits._ import FlowGraphImplicits._
val in = Source(1 to 10) val in = Source(1 to 10)
val out = Sink.ignore val out = Sink.ignore
@ -51,7 +50,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph without implicits" in { "build simple graph without implicits" in {
//#simple-flow-graph-no-implicits //#simple-flow-graph-no-implicits
val g = FlowGraph { b val g = FlowGraph { b =>
val in = Source(1 to 10) val in = Source(1 to 10)
val out = Sink.ignore val out = Sink.ignore
@ -75,7 +74,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"flow connection errors" in { "flow connection errors" in {
intercept[IllegalArgumentException] { intercept[IllegalArgumentException] {
//#simple-graph //#simple-graph
FlowGraph { implicit b FlowGraph { implicit b =>
import FlowGraphImplicits._ import FlowGraphImplicits._
val source1 = Source(1 to 10) val source1 = Source(1 to 10)
val source2 = Source(1 to 10) val source2 = Source(1 to 10)
@ -102,7 +101,7 @@ class FlowGraphDocSpec extends AkkaSpec {
// format: OFF // format: OFF
val g = val g =
//#flow-graph-reusing-a-flow //#flow-graph-reusing-a-flow
FlowGraph { implicit b FlowGraph { implicit b =>
import FlowGraphImplicits._ import FlowGraphImplicits._
val broadcast = Broadcast[Int] val broadcast = Broadcast[Int]
Source.single(1) ~> broadcast Source.single(1) ~> broadcast

View file

@ -21,7 +21,6 @@ import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
// TODO replace with => and disable this intellij setting
class StreamPartialFlowGraphDocSpec extends AkkaSpec { class StreamPartialFlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
@ -39,7 +38,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val in3 = UndefinedSource[Int] val in3 = UndefinedSource[Int]
val out = UndefinedSink[Int] val out = UndefinedSink[Int]
val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b =>
import FlowGraphImplicits._ import FlowGraphImplicits._
val zip1 = ZipWith[Int, Int, Int](math.max _) val zip1 = ZipWith[Int, Int, Int](math.max _)
@ -57,7 +56,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val resultSink = Sink.head[Int] val resultSink = Sink.head[Int]
val g = FlowGraph { b val g = FlowGraph { b =>
// import the partial flow graph explicitly // import the partial flow graph explicitly
b.importPartialFlowGraph(pickMaxOfThree) b.importPartialFlowGraph(pickMaxOfThree)
@ -74,7 +73,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val g2 = val g2 =
//#simple-partial-flow-graph-import-shorthand //#simple-partial-flow-graph-import-shorthand
FlowGraph(pickMaxOfThree) { b FlowGraph(pickMaxOfThree) { b =>
b.attachSource(in1, Source.single(1)) b.attachSource(in1, Source.single(1))
b.attachSource(in2, Source.single(2)) b.attachSource(in2, Source.single(2))
b.attachSource(in3, Source.single(3)) b.attachSource(in3, Source.single(3))
@ -88,13 +87,13 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build source from partial flow graph" in { "build source from partial flow graph" in {
//#source-from-partial-flow-graph //#source-from-partial-flow-graph
val pairs: Source[(Int, Int)] = Source() { implicit b val pairs: Source[(Int, Int)] = Source() { implicit b =>
import FlowGraphImplicits._ import FlowGraphImplicits._
// prepare graph elements // prepare graph elements
val undefinedSink = UndefinedSink[(Int, Int)] val undefinedSink = UndefinedSink[(Int, Int)]
val zip = Zip[Int, Int] val zip = Zip[Int, Int]
def ints = Source(() Iterator.from(1)) def ints = Source(() => Iterator.from(1))
// connect the graph // connect the graph
ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.left ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.left
@ -112,7 +111,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build flow from partial flow graph" in { "build flow from partial flow graph" in {
//#flow-from-partial-flow-graph //#flow-from-partial-flow-graph
val pairUpWithToString = Flow() { implicit b val pairUpWithToString = Flow() { implicit b =>
import FlowGraphImplicits._ import FlowGraphImplicits._
// prepare graph elements // prepare graph elements

View file

@ -0,0 +1,114 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.stream.scaladsl.Concat
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.FlowGraphImplicits
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.UndefinedSink
import akka.stream.scaladsl.UndefinedSource
import akka.stream.testkit.AkkaSpec
import akka.util.ByteString
import cookbook.RecipeParseLines
class StreamTcpDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
//#setup
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.StreamTcp
import akka.stream.scaladsl.StreamTcp._
implicit val sys = ActorSystem("stream-tcp-system")
implicit val mat = FlowMaterializer()
//#setup
val localhost = new InetSocketAddress("127.0.0.1", 8888)
"simple server connection" ignore {
//#echo-server-simple-bind
val localhost = new InetSocketAddress("127.0.0.1", 8888)
val binding = StreamTcp().bind(localhost)
//#echo-server-simple-bind
//#echo-server-simple-handle
val connections: Source[IncomingConnection] = binding.connections
connections foreach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(_ ++ "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
//#echo-server-simple-handle
}
"simple repl client" ignore {
val sys: ActorSystem = ???
//#repl-client
val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
val repl = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.map {
case "q" =>
sys.shutdown(); ByteString("BYE")
case text => ByteString(s"$text")
}
connection.handleWith(repl)
//#repl-client
}
"initial server banner echo server" ignore {
val binding = StreamTcp().bind(localhost)
//#welcome-banner-chat-server
binding.connections foreach { connection =>
val serverLogic = Flow() { implicit b =>
import FlowGraphImplicits._
// to be filled in by StreamTCP
val in = UndefinedSource[ByteString]
val out = UndefinedSink[ByteString]
val welcomeMsg =
s"""|Welcome to: ${connection.localAddress}!
|You are: ${connection.remoteAddress}!""".stripMargin
val welcome = Source.single(ByteString(welcomeMsg))
val echo = Flow[ByteString]
.transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
.map(_ ++ "!!!")
.map(ByteString(_))
val concat = Concat[ByteString]
// first we emit the welcome message,
welcome ~> concat.first
// then we continue using the echo-logic Flow
in ~> echo ~> concat.second
concat.out ~> out
(in, out)
}
connection.handleWith(serverLogic)
}
//#welcome-banner-chat-server
}
}

View file

@ -36,7 +36,7 @@ object TwitterStreamQuickstartDocSpec {
final case class Tweet(author: Author, timestamp: Long, body: String) { final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] = def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") Hashtag(t) }.toSet body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
} }
//#model //#model
@ -54,7 +54,6 @@ object TwitterStreamQuickstartDocSpec {
Nil) Nil)
} }
// TODO replace with => and disable this intellij setting
class TwitterStreamQuickstartDocSpec extends AkkaSpec { class TwitterStreamQuickstartDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._ import TwitterStreamQuickstartDocSpec._
@ -86,7 +85,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
trait Example3 { trait Example3 {
//#authors-collect //#authors-collect
val authors: Source[Author] = val authors: Source[Author] =
tweets.collect { case t if t.hashtags.contains(Akka) t.author } tweets.collect { case t if t.hashtags.contains(Akka) => t.author }
//#authors-collect //#authors-collect
} }
@ -118,7 +117,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
// format: OFF // format: OFF
//#flow-graph-broadcast //#flow-graph-broadcast
val g = FlowGraph { implicit builder val g = FlowGraph { implicit builder =>
import FlowGraphImplicits._ import FlowGraphImplicits._
val b = Broadcast[Tweet] val b = Broadcast[Tweet]
@ -151,8 +150,8 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#backpressure-by-readline //#backpressure-by-readline
val completion: Future[Unit] = val completion: Future[Unit] =
Source(1 to 10) Source(1 to 10)
.map(i { println(s"map => $i"); i }) .map(i => { println(s"map => $i"); i })
.foreach { i readLine(s"Element = $i; continue reading? [press enter]\n") } .foreach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
Await.ready(completion, 1.minute) Await.ready(completion, 1.minute)
//#backpressure-by-readline //#backpressure-by-readline
@ -163,17 +162,17 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#tweets-fold-count //#tweets-fold-count
val sumSink = Sink.fold[Int, Int](0)(_ + _) val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counter: RunnableFlow = tweets.map(t 1).to(sumSink) val counter: RunnableFlow = tweets.map(t => 1).to(sumSink)
val map: MaterializedMap = counter.run() val map: MaterializedMap = counter.run()
val sum: Future[Int] = map.get(sumSink) val sum: Future[Int] = map.get(sumSink)
sum.map { c println(s"Total tweets processed: $c") } sum.map { c => println(s"Total tweets processed: $c") }
//#tweets-fold-count //#tweets-fold-count
new AnyRef { new AnyRef {
//#tweets-fold-count-oneline //#tweets-fold-count-oneline
val sum: Future[Int] = tweets.map(t 1).runWith(sumSink) val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)
//#tweets-fold-count-oneline //#tweets-fold-count-oneline
} }
} }
@ -186,7 +185,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val counterRunnableFlow: RunnableFlow = val counterRunnableFlow: RunnableFlow =
tweetsInMinuteFromNow tweetsInMinuteFromNow
.filter(_.hashtags contains Akka) .filter(_.hashtags contains Akka)
.map(t 1) .map(t => 1)
.to(sumSink) .to(sumSink)
// materialize the stream once in the morning // materialize the stream once in the morning
@ -204,7 +203,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sum: Future[Int] = map.get(sumSink) val sum: Future[Int] = map.get(sumSink)
sum.map { c println(s"Total tweets processed: $c") } sum.map { c => println(s"Total tweets processed: $c") }
//#tweets-fold-count //#tweets-fold-count
} }

View file

@ -1,6 +1,7 @@
package docs.stream.cookbook package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source } import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString import akka.util.ByteString
import scala.annotation.tailrec import scala.annotation.tailrec
@ -18,63 +19,11 @@ class RecipeParseLines extends RecipeSpec {
ByteString("!\r"), ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"), ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n"))) ByteString("\r\n\r\n")))
import akka.stream.stage._
//#parse-lines import RecipeParseLines._
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
val linesStream = rawData.transform(() => parseLines("\r\n", 100)) val linesStream = rawData.transform(() => parseLines("\r\n", 100))
//#parse-lines
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List( Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!", "Hello World\r!",
"Hello Akka!", "Hello Akka!",
@ -85,3 +34,60 @@ class RecipeParseLines extends RecipeSpec {
} }
} }
object RecipeParseLines {
import akka.stream.stage._
//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
//#parse-lines
}

View file

@ -12,9 +12,9 @@ Signature
def logRequestResult(marker: String)(implicit log: LoggingContext): Directive0 def logRequestResult(marker: String)(implicit log: LoggingContext): Directive0
def logRequestResult(marker: String, level: LogLevel)(implicit log: LoggingContext): Directive0 def logRequestResult(marker: String, level: LogLevel)(implicit log: LoggingContext): Directive0
def logRequestResult(show: HttpRequest ⇒ HttpResponsePart ⇒ Option[LogEntry]) def logRequestResult(show: HttpRequest => HttpResponsePart => Option[LogEntry])
(implicit log: LoggingContext): Directive0 (implicit log: LoggingContext): Directive0
def logRequestResult(show: HttpRequest ⇒ Any ⇒ Option[LogEntry])(implicit log: LoggingContext): Directive0 def logRequestResult(show: HttpRequest => Any => Option[LogEntry])(implicit log: LoggingContext): Directive0
The signature shown is simplified, the real signature uses magnets. [1]_ The signature shown is simplified, the real signature uses magnets. [1]_

View file

@ -11,7 +11,7 @@ Routes
The "Route" is the central concept of the routing DSL since all structures you can build with it are instances of The "Route" is the central concept of the routing DSL since all structures you can build with it are instances of
a ``Route``. The type Route is defined like this:: a ``Route``. The type Route is defined like this::
type Route = RequestContext Future[RouteResult] type Route = RequestContext => Future[RouteResult]
It's a simple alias for a function taking a ``RequestContext`` as parameter and returning a ``Future[RouteResult]``. It's a simple alias for a function taking a ``RequestContext`` as parameter and returning a ``Future[RouteResult]``.

View file

@ -46,7 +46,7 @@ Flattening a stream of sequences
all the nested elements inside the sequences separately. all the nested elements inside the sequences separately.
The ``mapConcat`` operation can be used to implement a one-to-many transformation of elements using a mapper function The ``mapConcat`` operation can be used to implement a one-to-many transformation of elements using a mapper function
in the form of ``In immutable.Seq[Out]``. In this case we want to map a ``Seq`` of elements to the elements in the in the form of ``In => immutable.Seq[Out]``. In this case we want to map a ``Seq`` of elements to the elements in the
collection itself, so we can just call ``mapConcat(identity)``. collection itself, so we can just call ``mapConcat(identity)``.
.. includecode:: code/docs/stream/cookbook/RecipeFlattenSeq.scala#flattening-seqs .. includecode:: code/docs/stream/cookbook/RecipeFlattenSeq.scala#flattening-seqs
@ -85,6 +85,8 @@ passing the digest ByteString to be emitted.
.. includecode:: code/docs/stream/cookbook/RecipeDigest.scala#calculating-digest .. includecode:: code/docs/stream/cookbook/RecipeDigest.scala#calculating-digest
.. _cookbook-parse-lines-scala:
Parsing lines from a stream of ByteStrings Parsing lines from a stream of ByteStrings
------------------------------------------ ------------------------------------------

View file

@ -168,7 +168,7 @@ that the elements in the cycles can flow.
If we run the example we see that the same sequence of numbers are printed If we run the example we see that the same sequence of numbers are printed
over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still
backpressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple back-pressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple
of initial elements from ``source``. of initial elements from ``source``.
.. note:: .. note::

View file

@ -4,4 +4,88 @@
Working with streaming IO Working with streaming IO
######################### #########################
*TODO* Akka Streams provides a way of handling TCP connections with Streams.
While the general approach is very similar to the `Actor based TCP handling`_ using Akka IO,
by using Akka Streams you are freed of having to manually react to back-pressure signals,
as the library does it transparently for you.
.. _Actor based TCP handling: http://doc.akka.io/docs/akka/current/scala/io-tcp.html
Accepting connections: Echo Server
==================================
In order to implement a simple EchoServer we ``bind`` to a given address, which returns a ``Source[IncomingConnection]``,
which will emit an :class:`IncomingConnection` element for each new connection that the Server should handle:
.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#echo-server-simple-bind
Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage
to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily
correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``parseLines``
recipe from the :ref:`cookbook-parse-lines-scala` Akka Streams Cookbook recipe to chunk the inputs up into actual lines of text.
In this example we simply add exclamation marks to each incoming text message and push it through the flow:
.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#echo-server-simple-handle
Notice that while most building blocks in Akka Streams are reusable and freely shareable, this is *not* the case for the
incoming connection Flow, since it directly corresponds to an existing, already accepted connection its handling can
only ever be materialized *once*.
Closing connections is possible by cancelling the *incoming connection* :class:`Flow` from your server logic (e.g. by
connecting its downstream to an :class:`CancelledSink` and its upstream to a *completed* :class:`Source`).
It is also possible to shut down the servers socket by cancelling the ``connections:Source[IncomingConnection]``.
We can then test the TCP server by sending data to the TCP Socket using ``netcat``:
::
$ echo -n "Hello World" | netcat 127.0.0.1 8888
Hello World!!!
Connecting: REPL Client
=======================
In this example we implement a rather naive Read Evaluate Print Loop client over TCP.
Let's say we know a server has exposed a simple command line interface over TCP,
and would like to interact with it using Akka Streams over TCP. To open an outgoing connection socket we use
the ``outgoingConnection`` method:
.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#repl-client
The ``repl`` flow we use to handle the server interaction first prints the servers response, then awaits on input from
the command line (this blocking call is used here just for the sake of simplicity) and converts it to a
:class:`ByteString` which is then sent over the wire to the server. Then we simply connect the TCP pipeline to this
processing stageat this point it will be materialized and start processing data once the server responds with
an *initial message*.
A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into
a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time,
these improvements however are left as exercise for the reader.
Avoiding deadlocks and liveness issues in back-pressured cycles
===============================================================
When writing such end-to-end back-pressured systems you may sometimes end up in a situation of a loop,
in which *either side is waiting for the other one to start the conversation*. One does not need to look far
to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we
are connecting to would start the conversation, which effectively means both sides are back-pressured and can not get
the conversation started. There are multiple ways of dealing with this which are explained in depth in :ref:`graph-cycles-scala`,
however in client-server scenarios it is often the simplest to make either side simply send an initial message.
.. note::
In case of back-pressured cycles (which can occur even between different systems) sometimes you have to decide
which of the sides has start the conversation in order to kick it off. This can be often done by injecting an
initial message from one of the sidesa conversation starter.
To break this back-pressure cycle we need to inject some initial message, a "conversation starter".
First, we need to decide which side of the connection should remain passive and which active.
Thankfully in most situations finding the right spot to start the conversation is rather simple, as it often is inherent
to the protocol we are trying to implement using Streams. In chat-like applications, which our examples resemble,
it makes sense to make the Server initiate the conversation by emitting a "hello" message:
.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#welcome-banner-chat-server
The way we constructed a :class:`Flow` using a :class:`PartialFlowGraph` is explained in detail in
:ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`, however the basic concepts is rather simple
we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as it exposes the same interface, which means
exposing exactly one :class:`UndefinedSink` and exactly one :class:`UndefinedSource` which will be connected to the TCP
pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then
continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex
logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers.

View file

@ -1,7 +1,7 @@
.. _quickstart-scala: .. _quickstart-scala:
Quick Start: Reactive Tweets Quick Start Guide: Reactive Tweets
============================ ==================================
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them. other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
@ -73,7 +73,7 @@ combinator:
due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues). our implementation of flatMap (due to the liveness issues).
Please note that the mapConcat requires the supplied function to return a strict collection (``f:Outimmutable.Seq[T]``), Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``),
whereas ``flatMap`` would have to operate on streams all the way through. whereas ``flatMap`` would have to operate on streams all the way through.

View file

@ -3,21 +3,14 @@
*/ */
package akka.stream.io package akka.stream.io
import java.io.Closeable
import akka.actor.{ Actor, ActorRef, Props } import akka.actor.{ Actor, ActorRef, Props }
import akka.io.{ IO, Tcp } import akka.io.{ IO, Tcp }
import akka.stream.scaladsl.Flow
import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit
import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.testkit.{ TestKitBase, TestProbe } import akka.testkit.{ TestKitBase, TestProbe }
import akka.util.ByteString import akka.util.ByteString
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.channels.ServerSocketChannel
import org.reactivestreams.Processor
import scala.collection.immutable.Queue import scala.collection.immutable.Queue
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import akka.stream.scaladsl.Source
import akka.stream.testkit.TestUtils.temporaryServerAddress import akka.stream.testkit.TestUtils.temporaryServerAddress
object TcpHelper { object TcpHelper {