diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala
index 8f9fd2b5b3..5a6a37e1a0 100644
--- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala
+++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala
@@ -22,7 +22,7 @@ class HttpServerExampleSpec
implicit val materializer = FlowMaterializer()
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
- serverBinding.connections.foreach { connection ⇒ // foreach materializes the source
+ serverBinding.connections.foreach { connection => // foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
}
//#bind-example
@@ -41,15 +41,15 @@ class HttpServerExampleSpec
import akka.http.model.HttpMethods._
import akka.stream.scaladsl.Flow
- val requestHandler: HttpRequest ⇒ HttpResponse = {
- case HttpRequest(GET, Uri.Path("/"), _, _, _) ⇒
+ val requestHandler: HttpRequest => HttpResponse = {
+ case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
HttpResponse(
entity = HttpEntity(MediaTypes.`text/html`,
"
Hello world!"))
- case HttpRequest(GET, Uri.Path("/ping"), _, _, _) ⇒ HttpResponse(entity = "PONG!")
- case HttpRequest(GET, Uri.Path("/crash"), _, _, _) ⇒ sys.error("BOOM!")
- case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!")
+ case HttpRequest(GET, Uri.Path("/ping"), _, _, _) => HttpResponse(entity = "PONG!")
+ case HttpRequest(GET, Uri.Path("/crash"), _, _, _) => sys.error("BOOM!")
+ case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
}
serverBinding.connections foreach { connection =>
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala
index 7bc1dc656b..fc4a8ef46d 100644
--- a/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala
+++ b/akka-docs-dev/rst/scala/code/docs/stream/ActorSubscriberDocSpec.scala
@@ -47,11 +47,11 @@ object ActorSubscriberDocSpec {
}
def receive = {
- case OnNext(Msg(id, replyTo)) ⇒
+ case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
- case Reply(id) ⇒
+ case Reply(id) =>
queue(id) ! Done(id)
queue -= id
}
@@ -60,7 +60,7 @@ object ActorSubscriberDocSpec {
class Worker extends Actor {
import WorkerPool._
def receive = {
- case Work(id) ⇒
+ case Work(id) =>
// ...
sender() ! Reply(id)
}
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala
index a5108e38ef..07982dd809 100644
--- a/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala
+++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowDocSpec.scala
@@ -9,7 +9,6 @@ import akka.stream.testkit.AkkaSpec
import concurrent.Future
-// TODO replace ⇒ with => and disable this intellij setting
class FlowDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
@@ -23,10 +22,10 @@ class FlowDocSpec extends AkkaSpec {
"source is immutable" in {
//#source-immutable
val source = Source(1 to 10)
- source.map(_ ⇒ 0) // has no effect on source, since it's immutable
+ source.map(_ => 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55
- val zeroes = source.map(_ ⇒ 0) // returns new Source[Int], with `map()` appended
+ val zeroes = source.map(_ => 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0
//#source-immutable
}
@@ -77,12 +76,12 @@ class FlowDocSpec extends AkkaSpec {
import scala.concurrent.duration._
case object Tick
- val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () ⇒ Tick)
+ val timer = Source(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel()
- val timerMap = timer.map(tick ⇒ "tick")
+ val timerMap = timer.map(tick => "tick")
val _ = Sink.ignore.runWith(timerMap) // WRONG: returned type is not the timers Cancellable!
//#compound-source-is-not-keyed-runWith
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala
index e75e1e9d95..2dddeee33e 100644
--- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala
+++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala
@@ -18,7 +18,6 @@ import akka.stream.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.duration._
-// TODO replace ⇒ with => and disable this intellij setting
class FlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
@@ -28,7 +27,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph" in {
//format: OFF
//#simple-flow-graph
- val g = FlowGraph { implicit b ⇒
+ val g = FlowGraph { implicit b =>
import FlowGraphImplicits._
val in = Source(1 to 10)
val out = Sink.ignore
@@ -51,7 +50,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"build simple graph without implicits" in {
//#simple-flow-graph-no-implicits
- val g = FlowGraph { b ⇒
+ val g = FlowGraph { b =>
val in = Source(1 to 10)
val out = Sink.ignore
@@ -75,7 +74,7 @@ class FlowGraphDocSpec extends AkkaSpec {
"flow connection errors" in {
intercept[IllegalArgumentException] {
//#simple-graph
- FlowGraph { implicit b ⇒
+ FlowGraph { implicit b =>
import FlowGraphImplicits._
val source1 = Source(1 to 10)
val source2 = Source(1 to 10)
@@ -102,7 +101,7 @@ class FlowGraphDocSpec extends AkkaSpec {
// format: OFF
val g =
//#flow-graph-reusing-a-flow
- FlowGraph { implicit b ⇒
+ FlowGraph { implicit b =>
import FlowGraphImplicits._
val broadcast = Broadcast[Int]
Source.single(1) ~> broadcast
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala
index 231f494fa4..6b5a218222 100644
--- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala
+++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala
@@ -21,7 +21,6 @@ import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
-// TODO replace ⇒ with => and disable this intellij setting
class StreamPartialFlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
@@ -39,7 +38,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val in3 = UndefinedSource[Int]
val out = UndefinedSink[Int]
- val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b ⇒
+ val pickMaxOfThree: PartialFlowGraph = PartialFlowGraph { implicit b =>
import FlowGraphImplicits._
val zip1 = ZipWith[Int, Int, Int](math.max _)
@@ -57,7 +56,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val resultSink = Sink.head[Int]
- val g = FlowGraph { b ⇒
+ val g = FlowGraph { b =>
// import the partial flow graph explicitly
b.importPartialFlowGraph(pickMaxOfThree)
@@ -74,7 +73,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
val g2 =
//#simple-partial-flow-graph-import-shorthand
- FlowGraph(pickMaxOfThree) { b ⇒
+ FlowGraph(pickMaxOfThree) { b =>
b.attachSource(in1, Source.single(1))
b.attachSource(in2, Source.single(2))
b.attachSource(in3, Source.single(3))
@@ -88,13 +87,13 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build source from partial flow graph" in {
//#source-from-partial-flow-graph
- val pairs: Source[(Int, Int)] = Source() { implicit b ⇒
+ val pairs: Source[(Int, Int)] = Source() { implicit b =>
import FlowGraphImplicits._
// prepare graph elements
val undefinedSink = UndefinedSink[(Int, Int)]
val zip = Zip[Int, Int]
- def ints = Source(() ⇒ Iterator.from(1))
+ def ints = Source(() => Iterator.from(1))
// connect the graph
ints ~> Flow[Int].filter(_ % 2 != 0) ~> zip.left
@@ -112,7 +111,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
"build flow from partial flow graph" in {
//#flow-from-partial-flow-graph
- val pairUpWithToString = Flow() { implicit b ⇒
+ val pairUpWithToString = Flow() { implicit b =>
import FlowGraphImplicits._
// prepare graph elements
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala
new file mode 100644
index 0000000000..dea4949424
--- /dev/null
+++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2014 Typesafe Inc.
+ */
+package docs.stream
+
+import java.net.InetSocketAddress
+
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Concat
+import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.FlowGraphImplicits
+import akka.stream.scaladsl.Source
+import akka.stream.scaladsl.UndefinedSink
+import akka.stream.scaladsl.UndefinedSource
+import akka.stream.testkit.AkkaSpec
+import akka.util.ByteString
+import cookbook.RecipeParseLines
+
+class StreamTcpDocSpec extends AkkaSpec {
+
+ implicit val ec = system.dispatcher
+
+ //#setup
+ import akka.stream.FlowMaterializer
+ import akka.stream.scaladsl.StreamTcp
+ import akka.stream.scaladsl.StreamTcp._
+
+ implicit val sys = ActorSystem("stream-tcp-system")
+ implicit val mat = FlowMaterializer()
+ //#setup
+
+ val localhost = new InetSocketAddress("127.0.0.1", 8888)
+
+ "simple server connection" ignore {
+ //#echo-server-simple-bind
+ val localhost = new InetSocketAddress("127.0.0.1", 8888)
+ val binding = StreamTcp().bind(localhost)
+ //#echo-server-simple-bind
+
+ //#echo-server-simple-handle
+ val connections: Source[IncomingConnection] = binding.connections
+
+ connections foreach { connection =>
+ println(s"New connection from: ${connection.remoteAddress}")
+
+ val echo = Flow[ByteString]
+ .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
+ .map(_ ++ "!!!\n")
+ .map(ByteString(_))
+
+ connection.handleWith(echo)
+ }
+ //#echo-server-simple-handle
+ }
+
+ "simple repl client" ignore {
+ val sys: ActorSystem = ???
+
+ //#repl-client
+ val connection: OutgoingConnection = StreamTcp().outgoingConnection(localhost)
+
+ val repl = Flow[ByteString]
+ .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
+ .map(text => println("Server: " + text))
+ .map(_ => readLine("> "))
+ .map {
+ case "q" =>
+ sys.shutdown(); ByteString("BYE")
+ case text => ByteString(s"$text")
+ }
+
+ connection.handleWith(repl)
+ //#repl-client
+ }
+
+ "initial server banner echo server" ignore {
+ val binding = StreamTcp().bind(localhost)
+
+ //#welcome-banner-chat-server
+ binding.connections foreach { connection =>
+
+ val serverLogic = Flow() { implicit b =>
+ import FlowGraphImplicits._
+
+ // to be filled in by StreamTCP
+ val in = UndefinedSource[ByteString]
+ val out = UndefinedSink[ByteString]
+
+ val welcomeMsg =
+ s"""|Welcome to: ${connection.localAddress}!
+ |You are: ${connection.remoteAddress}!""".stripMargin
+
+ val welcome = Source.single(ByteString(welcomeMsg))
+ val echo = Flow[ByteString]
+ .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256))
+ .map(_ ++ "!!!")
+ .map(ByteString(_))
+
+ val concat = Concat[ByteString]
+ // first we emit the welcome message,
+ welcome ~> concat.first
+ // then we continue using the echo-logic Flow
+ in ~> echo ~> concat.second
+
+ concat.out ~> out
+ (in, out)
+ }
+
+ connection.handleWith(serverLogic)
+ }
+ //#welcome-banner-chat-server
+
+ }
+}
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala
index fa00df3457..e095908c16 100644
--- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala
+++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala
@@ -36,7 +36,7 @@ object TwitterStreamQuickstartDocSpec {
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
- body.split(" ").collect { case t if t.startsWith("#") ⇒ Hashtag(t) }.toSet
+ body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
//#model
@@ -54,7 +54,6 @@ object TwitterStreamQuickstartDocSpec {
Nil)
}
-// TODO replace ⇒ with => and disable this intellij setting
class TwitterStreamQuickstartDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
@@ -86,7 +85,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
trait Example3 {
//#authors-collect
val authors: Source[Author] =
- tweets.collect { case t if t.hashtags.contains(Akka) ⇒ t.author }
+ tweets.collect { case t if t.hashtags.contains(Akka) => t.author }
//#authors-collect
}
@@ -118,7 +117,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
// format: OFF
//#flow-graph-broadcast
- val g = FlowGraph { implicit builder ⇒
+ val g = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val b = Broadcast[Tweet]
@@ -151,8 +150,8 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#backpressure-by-readline
val completion: Future[Unit] =
Source(1 to 10)
- .map(i ⇒ { println(s"map => $i"); i })
- .foreach { i ⇒ readLine(s"Element = $i; continue reading? [press enter]\n") }
+ .map(i => { println(s"map => $i"); i })
+ .foreach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
Await.ready(completion, 1.minute)
//#backpressure-by-readline
@@ -163,17 +162,17 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#tweets-fold-count
val sumSink = Sink.fold[Int, Int](0)(_ + _)
- val counter: RunnableFlow = tweets.map(t ⇒ 1).to(sumSink)
+ val counter: RunnableFlow = tweets.map(t => 1).to(sumSink)
val map: MaterializedMap = counter.run()
val sum: Future[Int] = map.get(sumSink)
- sum.map { c ⇒ println(s"Total tweets processed: $c") }
+ sum.map { c => println(s"Total tweets processed: $c") }
//#tweets-fold-count
new AnyRef {
//#tweets-fold-count-oneline
- val sum: Future[Int] = tweets.map(t ⇒ 1).runWith(sumSink)
+ val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)
//#tweets-fold-count-oneline
}
}
@@ -186,7 +185,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val counterRunnableFlow: RunnableFlow =
tweetsInMinuteFromNow
.filter(_.hashtags contains Akka)
- .map(t ⇒ 1)
+ .map(t => 1)
.to(sumSink)
// materialize the stream once in the morning
@@ -204,7 +203,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sum: Future[Int] = map.get(sumSink)
- sum.map { c ⇒ println(s"Total tweets processed: $c") }
+ sum.map { c => println(s"Total tweets processed: $c") }
//#tweets-fold-count
}
diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala
index 77f8bea667..bc16fd9793 100644
--- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala
+++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala
@@ -1,6 +1,7 @@
package docs.stream.cookbook
-import akka.stream.scaladsl.{ Sink, Source }
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.annotation.tailrec
@@ -18,63 +19,11 @@ class RecipeParseLines extends RecipeSpec {
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
- import akka.stream.stage._
- //#parse-lines
- def parseLines(separator: String, maximumLineBytes: Int) =
- new StatefulStage[ByteString, String] {
- private val separatorBytes = ByteString(separator)
- private val firstSeparatorByte = separatorBytes.head
- private var buffer = ByteString.empty
- private var nextPossibleMatch = 0
-
- def initial = new State {
- override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
- buffer ++= chunk
- if (buffer.size > maximumLineBytes)
- ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
- s"which is more than $maximumLineBytes without seeing a line terminator"))
- else emit(doParse(Vector.empty).iterator, ctx)
- }
-
- @tailrec
- private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
- val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
- if (possibleMatchPos == -1) {
- // No matching character, we need to accumulate more bytes into the buffer
- nextPossibleMatch = buffer.size
- parsedLinesSoFar
- } else {
- if (possibleMatchPos + separatorBytes.size > buffer.size) {
- // We have found a possible match (we found the first character of the terminator
- // sequence) but we don't have yet enough bytes. We remember the position to
- // retry from next time.
- nextPossibleMatch = possibleMatchPos
- parsedLinesSoFar
- } else {
- if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
- == separatorBytes) {
- // Found a match
- val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
- buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
- nextPossibleMatch -= possibleMatchPos + separatorBytes.size
- doParse(parsedLinesSoFar :+ parsedLine)
- } else {
- nextPossibleMatch += 1
- doParse(parsedLinesSoFar)
- }
- }
- }
-
- }
- }
-
- }
+ import RecipeParseLines._
val linesStream = rawData.transform(() => parseLines("\r\n", 100))
- //#parse-lines
-
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
@@ -85,3 +34,60 @@ class RecipeParseLines extends RecipeSpec {
}
}
+
+object RecipeParseLines {
+
+ import akka.stream.stage._
+
+ //#parse-lines
+ def parseLines(separator: String, maximumLineBytes: Int) =
+ new StatefulStage[ByteString, String] {
+ private val separatorBytes = ByteString(separator)
+ private val firstSeparatorByte = separatorBytes.head
+ private var buffer = ByteString.empty
+ private var nextPossibleMatch = 0
+
+ def initial = new State {
+ override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
+ buffer ++= chunk
+ if (buffer.size > maximumLineBytes)
+ ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
+ s"which is more than $maximumLineBytes without seeing a line terminator"))
+ else emit(doParse(Vector.empty).iterator, ctx)
+ }
+
+ @tailrec
+ private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
+ val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
+ if (possibleMatchPos == -1) {
+ // No matching character, we need to accumulate more bytes into the buffer
+ nextPossibleMatch = buffer.size
+ parsedLinesSoFar
+ } else {
+ if (possibleMatchPos + separatorBytes.size > buffer.size) {
+ // We have found a possible match (we found the first character of the terminator
+ // sequence) but we don't have yet enough bytes. We remember the position to
+ // retry from next time.
+ nextPossibleMatch = possibleMatchPos
+ parsedLinesSoFar
+ } else {
+ if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
+ == separatorBytes) {
+ // Found a match
+ val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
+ buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
+ nextPossibleMatch -= possibleMatchPos + separatorBytes.size
+ doParse(parsedLinesSoFar :+ parsedLine)
+ } else {
+ nextPossibleMatch += 1
+ doParse(parsedLinesSoFar)
+ }
+ }
+ }
+
+ }
+ }
+ }
+ //#parse-lines
+
+}
\ No newline at end of file
diff --git a/akka-docs-dev/rst/scala/http/directives/debugging-directives/logRequestResult.rst b/akka-docs-dev/rst/scala/http/directives/debugging-directives/logRequestResult.rst
index 4ff072501b..afbdf9c85d 100644
--- a/akka-docs-dev/rst/scala/http/directives/debugging-directives/logRequestResult.rst
+++ b/akka-docs-dev/rst/scala/http/directives/debugging-directives/logRequestResult.rst
@@ -12,9 +12,9 @@ Signature
def logRequestResult(marker: String)(implicit log: LoggingContext): Directive0
def logRequestResult(marker: String, level: LogLevel)(implicit log: LoggingContext): Directive0
- def logRequestResult(show: HttpRequest ⇒ HttpResponsePart ⇒ Option[LogEntry])
+ def logRequestResult(show: HttpRequest => HttpResponsePart => Option[LogEntry])
(implicit log: LoggingContext): Directive0
- def logRequestResult(show: HttpRequest ⇒ Any ⇒ Option[LogEntry])(implicit log: LoggingContext): Directive0
+ def logRequestResult(show: HttpRequest => Any => Option[LogEntry])(implicit log: LoggingContext): Directive0
The signature shown is simplified, the real signature uses magnets. [1]_
diff --git a/akka-docs-dev/rst/scala/http/routing.rst b/akka-docs-dev/rst/scala/http/routing.rst
index f2e32f05e1..53046cef67 100644
--- a/akka-docs-dev/rst/scala/http/routing.rst
+++ b/akka-docs-dev/rst/scala/http/routing.rst
@@ -11,7 +11,7 @@ Routes
The "Route" is the central concept of the routing DSL since all structures you can build with it are instances of
a ``Route``. The type Route is defined like this::
- type Route = RequestContext ⇒ Future[RouteResult]
+ type Route = RequestContext => Future[RouteResult]
It's a simple alias for a function taking a ``RequestContext`` as parameter and returning a ``Future[RouteResult]``.
diff --git a/akka-docs-dev/rst/scala/stream-cookbook.rst b/akka-docs-dev/rst/scala/stream-cookbook.rst
index a5d2f5a97a..9d3ac7993e 100644
--- a/akka-docs-dev/rst/scala/stream-cookbook.rst
+++ b/akka-docs-dev/rst/scala/stream-cookbook.rst
@@ -46,7 +46,7 @@ Flattening a stream of sequences
all the nested elements inside the sequences separately.
The ``mapConcat`` operation can be used to implement a one-to-many transformation of elements using a mapper function
-in the form of ``In ⇒ immutable.Seq[Out]``. In this case we want to map a ``Seq`` of elements to the elements in the
+in the form of ``In => immutable.Seq[Out]``. In this case we want to map a ``Seq`` of elements to the elements in the
collection itself, so we can just call ``mapConcat(identity)``.
.. includecode:: code/docs/stream/cookbook/RecipeFlattenSeq.scala#flattening-seqs
@@ -85,6 +85,8 @@ passing the digest ByteString to be emitted.
.. includecode:: code/docs/stream/cookbook/RecipeDigest.scala#calculating-digest
+.. _cookbook-parse-lines-scala:
+
Parsing lines from a stream of ByteStrings
------------------------------------------
diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst
index 6587bfc9a7..d17bc4d3e9 100644
--- a/akka-docs-dev/rst/scala/stream-graphs.rst
+++ b/akka-docs-dev/rst/scala/stream-graphs.rst
@@ -168,7 +168,7 @@ that the elements in the cycles can flow.
If we run the example we see that the same sequence of numbers are printed
over and over again, but the processing does not stop. Hence, we avoided the deadlock, but ``source`` is still
-backpressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple
+back-pressured forever, because buffer space is never recovered: the only action we see is the circulation of a couple
of initial elements from ``source``.
.. note::
diff --git a/akka-docs-dev/rst/scala/stream-io.rst b/akka-docs-dev/rst/scala/stream-io.rst
index d3c616cdc3..fd8018858a 100644
--- a/akka-docs-dev/rst/scala/stream-io.rst
+++ b/akka-docs-dev/rst/scala/stream-io.rst
@@ -4,4 +4,88 @@
Working with streaming IO
#########################
-*TODO*
\ No newline at end of file
+Akka Streams provides a way of handling TCP connections with Streams.
+While the general approach is very similar to the `Actor based TCP handling`_ using Akka IO,
+by using Akka Streams you are freed of having to manually react to back-pressure signals,
+as the library does it transparently for you.
+
+.. _Actor based TCP handling: http://doc.akka.io/docs/akka/current/scala/io-tcp.html
+
+Accepting connections: Echo Server
+==================================
+In order to implement a simple EchoServer we ``bind`` to a given address, which returns a ``Source[IncomingConnection]``,
+which will emit an :class:`IncomingConnection` element for each new connection that the Server should handle:
+
+.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#echo-server-simple-bind
+
+Next, we simply handle *each* incoming connection using a :class:`Flow` which will be used as the processing stage
+to handle and emit ByteStrings from and to the TCP Socket. Since one :class:`ByteString` does not have to necessarily
+correspond to exactly one line of text (the client might be sending the line in chunks) we use the ``parseLines``
+recipe from the :ref:`cookbook-parse-lines-scala` Akka Streams Cookbook recipe to chunk the inputs up into actual lines of text.
+In this example we simply add exclamation marks to each incoming text message and push it through the flow:
+
+.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#echo-server-simple-handle
+
+Notice that while most building blocks in Akka Streams are reusable and freely shareable, this is *not* the case for the
+incoming connection Flow, since it directly corresponds to an existing, already accepted connection its handling can
+only ever be materialized *once*.
+
+Closing connections is possible by cancelling the *incoming connection* :class:`Flow` from your server logic (e.g. by
+connecting its downstream to an :class:`CancelledSink` and its upstream to a *completed* :class:`Source`).
+It is also possible to shut down the servers socket by cancelling the ``connections:Source[IncomingConnection]``.
+
+We can then test the TCP server by sending data to the TCP Socket using ``netcat``:
+
+::
+
+ $ echo -n "Hello World" | netcat 127.0.0.1 8888
+ Hello World!!!
+
+Connecting: REPL Client
+=======================
+In this example we implement a rather naive Read Evaluate Print Loop client over TCP.
+Let's say we know a server has exposed a simple command line interface over TCP,
+and would like to interact with it using Akka Streams over TCP. To open an outgoing connection socket we use
+the ``outgoingConnection`` method:
+
+.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#repl-client
+
+The ``repl`` flow we use to handle the server interaction first prints the servers response, then awaits on input from
+the command line (this blocking call is used here just for the sake of simplicity) and converts it to a
+:class:`ByteString` which is then sent over the wire to the server. Then we simply connect the TCP pipeline to this
+processing stage–at this point it will be materialized and start processing data once the server responds with
+an *initial message*.
+
+A resilient REPL client would be more sophisticated than this, for example it should split out the input reading into
+a separate mapAsync step and have a way to let the server write more data than one ByteString chunk at any given time,
+these improvements however are left as exercise for the reader.
+
+Avoiding deadlocks and liveness issues in back-pressured cycles
+===============================================================
+When writing such end-to-end back-pressured systems you may sometimes end up in a situation of a loop,
+in which *either side is waiting for the other one to start the conversation*. One does not need to look far
+to find examples of such back-pressure loops. In the two examples shown previously, we always assumed that the side we
+are connecting to would start the conversation, which effectively means both sides are back-pressured and can not get
+the conversation started. There are multiple ways of dealing with this which are explained in depth in :ref:`graph-cycles-scala`,
+however in client-server scenarios it is often the simplest to make either side simply send an initial message.
+
+.. note::
+ In case of back-pressured cycles (which can occur even between different systems) sometimes you have to decide
+ which of the sides has start the conversation in order to kick it off. This can be often done by injecting an
+ initial message from one of the sides–a conversation starter.
+
+To break this back-pressure cycle we need to inject some initial message, a "conversation starter".
+First, we need to decide which side of the connection should remain passive and which active.
+Thankfully in most situations finding the right spot to start the conversation is rather simple, as it often is inherent
+to the protocol we are trying to implement using Streams. In chat-like applications, which our examples resemble,
+it makes sense to make the Server initiate the conversation by emitting a "hello" message:
+
+.. includecode:: code/docs/stream/StreamTcpDocSpec.scala#welcome-banner-chat-server
+
+The way we constructed a :class:`Flow` using a :class:`PartialFlowGraph` is explained in detail in
+:ref:`constructing-sources-sinks-flows-from-partial-graphs-scala`, however the basic concepts is rather simple–
+we can encapsulate arbitrarily complex logic within a :class:`Flow` as long as it exposes the same interface, which means
+exposing exactly one :class:`UndefinedSink` and exactly one :class:`UndefinedSource` which will be connected to the TCP
+pipeline. In this example we use a :class:`Concat` graph processing stage to inject the initial message, and then
+continue with handling all incoming data using the echo handler. You should use this pattern of encapsulating complex
+logic in Flows and attaching those to :class:`StreamIO` in order to implement your custom and possibly sophisticated TCP servers.
\ No newline at end of file
diff --git a/akka-docs-dev/rst/scala/stream-quickstart.rst b/akka-docs-dev/rst/scala/stream-quickstart.rst
index df9ced101c..ffecf6d5a8 100644
--- a/akka-docs-dev/rst/scala/stream-quickstart.rst
+++ b/akka-docs-dev/rst/scala/stream-quickstart.rst
@@ -1,7 +1,7 @@
.. _quickstart-scala:
-Quick Start: Reactive Tweets
-============================
+Quick Start Guide: Reactive Tweets
+==================================
A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some
other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.
@@ -73,7 +73,7 @@ combinator:
due to the risk of deadlock (with merge being the preferred strategy), and secondly, the monad laws would not hold for
our implementation of flatMap (due to the liveness issues).
- Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out⇒immutable.Seq[T]``),
+ Please note that the mapConcat requires the supplied function to return a strict collection (``f:Out=>immutable.Seq[T]``),
whereas ``flatMap`` would have to operate on streams all the way through.
diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala
index b58b9a00ef..69073aee9b 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala
@@ -3,21 +3,14 @@
*/
package akka.stream.io
-import java.io.Closeable
import akka.actor.{ Actor, ActorRef, Props }
import akka.io.{ IO, Tcp }
-import akka.stream.scaladsl.Flow
import akka.stream.testkit.StreamTestKit
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.testkit.{ TestKitBase, TestProbe }
import akka.util.ByteString
import java.net.InetSocketAddress
-import java.nio.channels.ServerSocketChannel
-import org.reactivestreams.Processor
import scala.collection.immutable.Queue
-import scala.concurrent.{ Await, Future }
-import scala.concurrent.duration.Duration
-import akka.stream.scaladsl.Source
import akka.stream.testkit.TestUtils.temporaryServerAddress
object TcpHelper {