diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala index 3b62c59e87..4561a43509 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowStagesSpec.scala @@ -67,7 +67,6 @@ class FlowStagesSpec extends AkkaSpec { else ctx.finish() } - override def onUpstreamFinish(ctx: Context[A]): TerminationDirective = ctx.absorbTermination() diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index a8382ccc41..f5a602f57e 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -127,7 +127,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#tweet-authors val authors: Source[Author] = tweets - .filter(_.hashtags.contains(Akka)) + .filter(_.hashtags.contains(akka)) .map(_.author) //#tweet-authors @@ -166,7 +166,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#external-service-mapAsyncUnordered val authors: Source[Author] = - tweets.filter(_.hashtags.contains(Akka)).map(_.author) + tweets.filter(_.hashtags.contains(akka)).map(_.author) val emailAddresses: Source[String] = authors @@ -199,7 +199,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val addressSystem = new AddressSystem val smsServer = new SmsServer(probe.ref) - val authors = tweets.filter(_.hashtags.contains(Akka)).map(_.author) + val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) val phoneNumbers = authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle)) @@ -236,7 +236,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val addressSystem = new AddressSystem val smsServer = new SmsServer(probe.ref) - val authors = tweets.filter(_.hashtags.contains(Akka)).map(_.author) + val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) val phoneNumbers = authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle)) @@ -270,9 +270,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db") //#save-tweets - import akka.pattern.ask - - val akkaTweets: Source[Tweet] = tweets.filter(_.hashtags.contains(Akka)) + val akkaTweets: Source[Tweet] = tweets.filter(_.hashtags.contains(akka)) implicit val timeout = Timeout(3.seconds) val saveTweets: RunnableFlow = diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index 2331ca7971..81c290502f 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -23,7 +23,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec { trait Fixture { //#authors val authors = Flow[Tweet] - .filter(_.hashtags.contains(Akka)) + .filter(_.hashtags.contains(akka)) .map(_.author) //#authors diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index e095908c16..0b7159e9fc 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -29,8 +29,7 @@ import akka.stream.testkit.AkkaSpec object TwitterStreamQuickstartDocSpec { //#model final case class Author(handle: String) - val AkkaTeam = Author("akkateam") - val Akka = Hashtag("#akka") + val akka = Hashtag("#akka") final case class Hashtag(name: String) @@ -78,14 +77,14 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#authors-filter-map val authors: Source[Author] = tweets - .filter(_.hashtags.contains(Akka)) + .filter(_.hashtags.contains(akka)) .map(_.author) //#authors-filter-map trait Example3 { //#authors-collect val authors: Source[Author] = - tweets.collect { case t if t.hashtags.contains(Akka) => t.author } + tweets.collect { case t if t.hashtags.contains(akka) => t.author } //#authors-collect } @@ -167,7 +166,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val sum: Future[Int] = map.get(sumSink) - sum.map { c => println(s"Total tweets processed: $c") } + sum.foreach(c => println(s"Total tweets processed: $c")) //#tweets-fold-count new AnyRef { @@ -184,7 +183,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val sumSink = Sink.fold[Int, Int](0)(_ + _) val counterRunnableFlow: RunnableFlow = tweetsInMinuteFromNow - .filter(_.hashtags contains Akka) + .filter(_.hashtags contains akka) .map(t => 1) .to(sumSink) diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index d17bc4d3e9..2a3b18e5cb 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -104,8 +104,8 @@ the undefined elements are rewired to real sources and sinks. The graph can then .. _constructing-sources-sinks-flows-from-partial-graphs-scala: -Constructing Sources, Sinks and Flows from a Partial Graphs ------------------------------------------------------------ +Constructing Sources, Sinks and Flows from Partial Graphs +--------------------------------------------------------- Instead of treating a :class:`PartialFlowGraph` as simply a collection of flows and junctions which may not yet all be connected it is sometimes useful to expose such complex graph as a simpler structure, such as a :class:`Source`, :class:`Sink` or :class:`Flow`.