/* * Copyright (C) 2018-2019 Lightbend Inc. */ package jdocs.stream.operators; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.NotUsed; import akka.japi.function.Function2; // #zip // #zip-with // #zip-with-index // #or-else // #prepend // #concat // #interleave // #merge // #merge-sorted import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; import java.util.Arrays; // #merge-sorted // #merge // #interleave // #concat // #prepend // #or-else // #zip-with-index // #zip-with // #zip // #log import akka.stream.Attributes; // #log import java.time.Duration; import java.util.Arrays; import java.util.Comparator; class SourceOrFlow { private static Materializer materializer = null; void logExample() { Flow.of(String.class) // #log .log("myStream") .addAttributes( Attributes.createLogLevels( Attributes.logLevelOff(), // onElement Attributes.logLevelInfo(), // onFinish Attributes.logLevelError())) // onFailure // #log ; } void zipWithIndexExample() { Materializer materializer = null; // #zip-with-index Source.from(Arrays.asList("apple", "orange", "banana")) .zipWithIndex() .runWith(Sink.foreach(System.out::print), materializer); // this will print ('apple', 0), ('orange', 1), ('banana', 2) // #zip-with-index } void zipExample() { // #zip Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); Source sourceFirstLetters = Source.from(Arrays.asList("A", "O", "B")); sourceFruits.zip(sourceFirstLetters).runWith(Sink.foreach(System.out::print), materializer); // this will print ('apple', 'A'), ('orange', 'O'), ('banana', 'B') // #zip } void zipWithExample() { // #zip-with Source sourceCount = Source.from(Arrays.asList("one", "two", "three")); Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); sourceCount .zipWith( sourceFruits, (Function2) (countStr, fruitName) -> countStr + " " + fruitName) .runWith(Sink.foreach(System.out::print), materializer); // this will print 'one apple', 'two orange', 'three banana' // #zip-with } void prependExample() { // #prepend Source ladies = Source.from(Arrays.asList("Emma", "Emily")); Source gentlemen = Source.from(Arrays.asList("Liam", "William")); gentlemen.prepend(ladies).runWith(Sink.foreach(System.out::print), materializer); // this will print "Emma", "Emily", "Liam", "William" // #prepend } void concatExample() { // #concat Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.concat(sourceB).runWith(Sink.foreach(System.out::print), materializer); // prints 1, 2, 3, 4, 10, 20, 30, 40 // #concat } void interleaveExample() { // #interleave Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.interleave(sourceB, 2).runWith(Sink.foreach(System.out::print), materializer); // prints 1, 2, 10, 20, 3, 4, 30, 40 // #interleave } void mergeExample() { // #merge Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.merge(sourceB).runWith(Sink.foreach(System.out::print), materializer); // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40 // #merge } void mergeSortedExample() { // #merge-sorted Source sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); Source sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); sourceA .mergeSorted(sourceB, Comparator.naturalOrder()) .runWith(Sink.foreach(System.out::print), materializer); // prints 1, 2, 3, 4, 5, 6, 7, 8 Source sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); sourceA .mergeSorted(sourceC, Comparator.naturalOrder()) .runWith(Sink.foreach(System.out::print), materializer); // prints 1, 3, 5, 7, 20, 1, 1, 1 // #merge-sorted } void orElseExample() { // #or-else Source source1 = Source.from(Arrays.asList("First source")); Source source2 = Source.from(Arrays.asList("Second source")); Source emptySource = Source.empty(); source1.orElse(source2).runWith(Sink.foreach(System.out::print), materializer); // this will print "First source" emptySource.orElse(source2).runWith(Sink.foreach(System.out::print), materializer); // this will print "Second source" // #or-else } void conflateExample() { // #conflate Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflate((Integer acc, Integer el) -> acc + el) .throttle(1, Duration.ofSeconds(1)); // slow downstream // #conflate } void scanExample() { // #scan Source source = Source.range(1, 5); source.scan(0, (acc, x) -> acc + x).runForeach(System.out::println, materializer); // 0 (= 0) // 1 (= 0 + 1) // 3 (= 0 + 1 + 2) // 6 (= 0 + 1 + 2 + 3) // 10 (= 0 + 1 + 2 + 3 + 4) // 15 (= 0 + 1 + 2 + 3 + 4 + 5) // #scan } static // #conflateWithSeed-type class Summed { private final Integer el; public Summed(Integer el) { this.el = el; } public Summed sum(Summed other) { return new Summed(this.el + other.el); } } // #conflateWithSeed-type void conflateWithSeedExample() { // #conflateWithSeed Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el))) .throttle(1, Duration.ofSeconds(1)); // slow downstream // #conflateWithSeed } }