diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md index 4fecb22ee2..2ff5155b53 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflate.md @@ -21,7 +21,7 @@ average of incoming numbers, if aggregation should lead to a different type `con ## Example Scala -: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #conflate } +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Conflate.scala) { #conflate } Java : @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #conflate } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md index a013ad08ac..b5bd85f25d 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/conflateWithSeed.md @@ -21,7 +21,7 @@ transform it to the summary type. ## Example Scala -: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #conflateWithSeed } +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Conflate.scala) { #conflateWithSeed } Java : @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #conflateWithSeed-type #conflateWithSeed } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md index cd8f80560d..cb3d9263a7 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/log.md @@ -21,7 +21,7 @@ This can be changed by calling @scala[`Attributes.logLevels(...)`] @java[`Attrib ## Example Scala -: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #log } +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Log.scala) { #log } Java : @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #log } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md index b017864175..f401250287 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/scan.md @@ -34,7 +34,7 @@ Note that the `zero` value must be immutable. ## Examples Scala -: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala) { #scan } +: @@snip [SourceOrFlow.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala) { #scan } Java : @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #scan } diff --git a/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala b/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala deleted file mode 100644 index 9ca2caaad8..0000000000 --- a/akka-docs/src/test/scala/docs/stream/operators/SourceOrFlow.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package docs.stream.operators - -import akka.stream.scaladsl._ - -// - -object SourceOrFlow { - - def logExample(): Unit = { - //#log - import akka.stream.Attributes - - //#log - - Flow[String] - //#log - .log(name = "myStream") - .addAttributes( - Attributes.logLevels( - onElement = Attributes.LogLevels.Off, - onFailure = Attributes.LogLevels.Error, - onFinish = Attributes.LogLevels.Info)) - //#log - } - - def conflateExample(): Unit = { - //#conflate - import scala.concurrent.duration._ - - Source - .cycle(() => List(1, 10, 100, 1000).iterator) - .throttle(10, per = 1.second) // faster upstream - .conflate((acc, el) => acc + el) // acc: Int, el: Int - .throttle(1, per = 1.second) // slow downstream - //#conflate - } - - def conflateWithSeedExample(): Unit = { - //#conflateWithSeed - import scala.concurrent.duration._ - - case class Summed(i: Int) { - def sum(other: Summed) = Summed(this.i + other.i) - } - - Source - .cycle(() => List(1, 10, 100, 1000).iterator) - .throttle(10, per = 1.second) // faster upstream - .conflateWithSeed(el => Summed(el))((acc, el) => acc.sum(Summed(el))) // (Summed, Int) => Summed - .throttle(1, per = 1.second) // slow downstream - //#conflateWithSeed - } - - def scanExample(): Unit = { - import akka.actor.ActorSystem - import akka.stream.ActorMaterializer - - implicit val system: ActorSystem = ActorSystem() - implicit val materializer: ActorMaterializer = ActorMaterializer() - - //#scan - val source = Source(1 to 5) - source.scan(0)((acc, x) => acc + x).runForeach(println) - // 0 (= 0) - // 1 (= 0 + 1) - // 3 (= 0 + 1 + 2) - // 6 (= 0 + 1 + 2 + 3) - // 10 (= 0 + 1 + 2 + 3 + 4) - // 15 (= 0 + 1 + 2 + 3 + 4 + 5) - //#scan - } - -} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Conflate.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Conflate.scala new file mode 100644 index 0000000000..060fa003b9 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Conflate.scala @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +//#conflate +//#conflateWithSeed +import akka.stream.scaladsl.Source + +//#conflateWithSeed +//#conflate + +object Conflate { + def conflateExample(): Unit = { + //#conflate + import scala.concurrent.duration._ + + Source + .cycle(() => List(1, 10, 100, 1000).iterator) + .throttle(10, per = 1.second) // faster upstream + .conflate((acc, el) => acc + el) // acc: Int, el: Int + .throttle(1, per = 1.second) // slow downstream + //#conflate + } + + def conflateWithSeedExample(): Unit = { + //#conflateWithSeed + import scala.concurrent.duration._ + + case class Summed(i: Int) { + def sum(other: Summed) = Summed(this.i + other.i) + } + + Source + .cycle(() => List(1, 10, 100, 1000).iterator) + .throttle(10, per = 1.second) // faster upstream + .conflateWithSeed(el => Summed(el))((acc, el) => acc.sum(Summed(el))) // (Summed, Int) => Summed + .throttle(1, per = 1.second) // slow downstream + //#conflateWithSeed + } + +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Log.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Log.scala new file mode 100644 index 0000000000..33e12b40b1 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Log.scala @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.stream.scaladsl.Flow +//#log +import akka.stream.Attributes + +//#log + +object Log { + def logExample(): Unit = { + Flow[String] + //#log + .log(name = "myStream") + .addAttributes( + Attributes.logLevels( + onElement = Attributes.LogLevels.Off, + onFailure = Attributes.LogLevels.Error, + onFinish = Attributes.LogLevels.Info)) + //#log + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala new file mode 100644 index 0000000000..34dc336259 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Scan.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow +import akka.stream.scaladsl.Source + +object Scan { + def scanExample(): Unit = { + import akka.actor.ActorSystem + import akka.stream.ActorMaterializer + + implicit val system: ActorSystem = ActorSystem() + implicit val materializer: ActorMaterializer = ActorMaterializer() + + //#scan + val source = Source(1 to 5) + source.scan(0)((acc, x) => acc + x).runForeach(println) + // 0 (= 0) + // 1 (= 0 + 1) + // 3 (= 0 + 1 + 2) + // 6 (= 0 + 1 + 2 + 3) + // 10 (= 0 + 1 + 2 + 3 + 4) + // 15 (= 0 + 1 + 2 + 3 + 4 + 5) + //#scan + } + +}