diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md index 905060e57d..f74574c7cf 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/concat.md @@ -27,3 +27,10 @@ After completion of the original upstream the elements of the given source will @@@ + +## Example +Scala +: @@snip [FlowConcatSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala) { #concat } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #concat } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleave.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleave.md index bd5499bf76..b9967b620f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleave.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/interleave.md @@ -28,3 +28,10 @@ source completes the rest of the other stream will be emitted. @@@ + +## Example +Scala +: @@snip [FlowInterleaveSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala) { #interleave } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #interleave } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/merge.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/merge.md index b109698246..6d27564705 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/merge.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/merge.md @@ -27,3 +27,10 @@ Merge multiple sources. Picks elements randomly if all sources has elements read @@@ + +## Example +Scala +: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #merge } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeSorted.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeSorted.md index e49a547497..f8de187ebe 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeSorted.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/mergeSorted.md @@ -28,3 +28,10 @@ smallest element. @@@ + +## Example +Scala +: @@snip [FlowMergeSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala) { #merge-sorted } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #merge-sorted } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/orElse.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/orElse.md index c0da22b0fd..7127a8c5a2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/orElse.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/orElse.md @@ -35,3 +35,10 @@ without emitting and the secondary stream already has completed or when the seco @@@ + +## Example +Scala +: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala) { #or-else } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #or-else } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md index 0d742a4977..89420f25b8 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/prepend.md @@ -29,3 +29,10 @@ If materialized values needs to be collected `prependMat` is available. @@@ + +## Example +Scala +: @@snip [FlowOrElseSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala) { #prepend } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #prepend } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md index a19cc5ee1d..d0a88f848b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zip.md @@ -27,3 +27,9 @@ Combines elements from each of multiple sources into @scala[tuples] @java[*Pair* @@@ +## Example +Scala +: @@snip [FlowZipSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala) { #zip } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip } \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md index 88b557204c..1323cd3d38 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/zipWith.md @@ -28,3 +28,10 @@ returned value downstream. @@@ + +## Example +Scala +: @@snip [FlowZipWithSpec.scala](/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala) { #zip-with } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #zip-with } \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index e7e4d10eec..c84db32475 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -7,12 +7,32 @@ package jdocs.stream.operators; import akka.stream.Materializer; import akka.stream.javadsl.Flow; +import akka.NotUsed; +import akka.japi.function.Function2; + +//#zip +//#zip-with //#zip-with-index -import akka.stream.javadsl.Sink; +//#or-else +//#prepend +//#concat +//#interleave +//#merge +//#merge-sorted import akka.stream.javadsl.Source; +import akka.stream.javadsl.Sink; import java.util.Arrays; +//#merge-sorted +//#merge +//#interleave +//#concat +//#prepend +//#or-else //#zip-with-index +//#zip-with +//#zip + //#log import akka.stream.Attributes; import akka.stream.javadsl.Source; @@ -20,9 +40,11 @@ import akka.stream.javadsl.Source; import java.time.Duration; import java.util.Arrays; +import java.util.Comparator; class SourceOrFlow { + private static Materializer materializer = null; void logExample() { Flow.of(String.class) @@ -46,6 +68,101 @@ class SourceOrFlow { //#zip-with-index } + void zipExample() { + //#zip + Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); + Source sourceFirstLetters = Source.from(Arrays.asList("A", "O", "B")); + sourceFruits.zip(sourceFirstLetters).runWith(Sink.foreach(System.out::print), materializer); + // this will print ('apple', 'A'), ('orange', 'O'), ('banana', 'B') + + //#zip + } + + void zipWithExample() { + //#zip-with + Source sourceCount = Source.from(Arrays.asList("one", "two", "three")); + Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); + sourceCount.zipWith( + sourceFruits, + (Function2) (countStr, fruitName) -> countStr + " " + fruitName + ).runWith(Sink.foreach(System.out::print), materializer); + // this will print 'one apple', 'two orange', 'three banana' + + //#zip-with + } + + void prependExample() { + //#prepend + Source ladies = Source.from(Arrays.asList("Emma", "Emily")); + Source gentlemen = Source.from(Arrays.asList("Liam", "William")); + gentlemen.prepend(ladies).runWith(Sink.foreach(System.out::print), materializer); + // this will print "Emma", "Emily", "Liam", "William" + + //#prepend + } + + + void concatExample() { + //#concat + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + sourceA.concat(sourceB).runWith(Sink.foreach(System.out::print), materializer); + //prints 1, 2, 3, 4, 10, 20, 30, 40 + + //#concat + } + + + void interleaveExample() { + //#interleave + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + sourceA.interleave(sourceB, 2).runWith(Sink.foreach(System.out::print), materializer); + //prints 1, 2, 10, 20, 3, 4, 30, 40 + + //#interleave + } + + + void mergeExample() { + //#merge + Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); + sourceA.merge(sourceB).runWith(Sink.foreach(System.out::print), materializer); + // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40 + + //#merge + } + + + void mergeSortedExample() { + //#merge-sorted + Source sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); + Source sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); + sourceA.mergeSorted(sourceB, Comparator.naturalOrder()).runWith(Sink.foreach(System.out::print), materializer); + //prints 1, 2, 3, 4, 5, 6, 7, 8 + + Source sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); + sourceA.mergeSorted(sourceC, Comparator.naturalOrder()).runWith(Sink.foreach(System.out::print), materializer); + //prints 1, 3, 5, 7, 20, 1, 1, 1 + //#merge-sorted + } + + void orElseExample() { + //#or-else + Source source1 = Source.from(Arrays.asList("First source")); + Source source2 = Source.from(Arrays.asList("Second source")); + Source emptySource = Source.empty(); + + source1.orElse(source2).runWith(Sink.foreach(System.out::print), materializer); + // this will print "First source" + + emptySource.orElse(source2).runWith(Sink.foreach(System.out::print), materializer); + // this will print "Second source" + + //#or-else + } + void conflateExample() { //#conflate Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala index 6b7d5274fd..9d17eb98fa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatSpec.scala @@ -192,5 +192,18 @@ class FlowConcatSpec extends BaseTwoStreamsSetup { probeSink.expectComplete() } + + "work in example" in { + //#concat + import akka.stream.scaladsl.Source + import akka.stream.scaladsl.Sink + + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + + sourceA.concat(sourceB).runWith(Sink.foreach(println)) + //prints 1, 2, 3, 4, 10, 20, 30, 40 + //#concat + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala index 00a895e1db..8823054014 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInterleaveSpec.scala @@ -232,6 +232,19 @@ class FlowInterleaveSpec extends BaseTwoStreamsSetup { up1.expectSubscription().expectCancellation() up2.expectSubscription().expectCancellation() } + + "work in example" in { + //#interleave + import akka.stream.scaladsl.Source + import akka.stream.scaladsl.Sink + + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + + sourceA.interleave(sourceB, segmentSize = 2).runWith(Sink.foreach(println)) + //prints 1, 2, 10, 20, 3, 4, 30, 40 + //#interleave + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala index a6759055b1..f4ff3ab6e9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMergeSpec.scala @@ -125,5 +125,36 @@ class FlowMergeSpec extends BaseTwoStreamsSetup { down.expectNext() } + + "works in number example for merge sorted" in { + //#merge-sorted + import akka.stream.scaladsl.Source + import akka.stream.scaladsl.Sink + + val sourceA = Source(List(1, 3, 5, 7)) + val sourceB = Source(List(2, 4, 6, 8)) + + sourceA.mergeSorted(sourceB).runWith(Sink.foreach(println)) + //prints 1, 2, 3, 4, 5, 6, 7, 8 + + val sourceC = Source(List(20, 1, 1, 1)) + + sourceA.mergeSorted(sourceC).runWith(Sink.foreach(println)) + //prints 1, 3, 5, 7, 20, 1, 1, 1 + //#merge-sorted + } + + "works in number example for merge" in { + //#merge + import akka.stream.scaladsl.Source + import akka.stream.scaladsl.Sink + + val sourceA = Source(List(1, 2, 3, 4)) + val sourceB = Source(List(10, 20, 30, 40)) + + sourceA.merge(sourceB).runWith(Sink.foreach(println)) + // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40 + //#merge + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala index b44767b9a1..54969248a8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowOrElseSpec.scala @@ -4,6 +4,11 @@ package akka.stream.scaladsl +//#or-else +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Sink + +//#or-else import scala.concurrent.duration._ import akka.stream.testkit.Utils.TE import akka.stream.testkit.{ TestPublisher, TestSubscriber } @@ -136,6 +141,20 @@ class FlowOrElseSpec extends AkkaSpec { outProbe.expectError() } + "work in the example" in { + //#or-else + val source1 = Source(List("First source")) + val source2 = Source(List("Second source")) + val emptySource = Source.empty[String] + + source1.orElse(source2).runWith(Sink.foreach(println)) + // this will print "First source" + + emptySource.orElse(source2).runWith(Sink.foreach(println)) + // this will print "Second source" + //#or-else + } + trait OrElseProbedFlow { val inProbe1 = TestPublisher.probe[Char]() val source1 = Source.fromPublisher(inProbe1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala new file mode 100644 index 0000000000..58ff9e883f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrependSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +//#prepend +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Sink + +//#prepend +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.testkit.AkkaSpec + +class FlowPrependSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + + implicit val materializer = ActorMaterializer(settings) + + "An Prepend flow" should { + + "work in entrance example" in { + //#prepend + val ladies = Source(List("Emma", "Emily")) + val gentlemen = Source(List("Liam", "William")) + + gentlemen.prepend(ladies).runWith(Sink.foreach(println)) + // this will print "Emma", "Emily", "Liam", "William" + //#prepend + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala index 86552ad9f6..f79daa2a3e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipSpec.scala @@ -4,6 +4,12 @@ package akka.stream.scaladsl +//#zip +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Sink + +//#zip + import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } @@ -70,5 +76,15 @@ class FlowZipSpec extends BaseTwoStreamsSetup { val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) subscriber2.expectSubscriptionAndError(TestException) } + + "work in fruits example" in { + //#zip + val sourceFruits = Source(List("apple", "orange", "banana")) + val sourceFirstLetters = Source(List("A", "O", "B")) + sourceFruits.zip(sourceFirstLetters).runWith(Sink.foreach(println)) + // this will print ('apple', 'A'), ('orange', 'O'), ('banana', 'B') + //#zip + + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala index b85c9b51c2..5f8ef9981b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowZipWithSpec.scala @@ -4,6 +4,12 @@ package akka.stream.scaladsl +//#zip-with +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Sink + +//#zip-with + import akka.stream.testkit.{ BaseTwoStreamsSetup, TestSubscriber } import org.reactivestreams.Publisher import scala.concurrent.duration._ @@ -91,6 +97,18 @@ class FlowZipWithSpec extends BaseTwoStreamsSetup { subscriber2.expectSubscriptionAndError(TestException) } + "work in fruits example" in { + //#zip-with + val sourceCount = Source(List("one", "two", "three")) + val sourceFruits = Source(List("apple", "orange", "banana")) + + sourceCount.zipWith(sourceFruits) { + (countStr, fruitName) ⇒ + s"$countStr $fruitName" + }.runWith(Sink.foreach(println)) + // this will print 'one apple', 'two orange', 'three banana' + //#zip-with + } } }