/* * Copyright (C) 2018-2019 Lightbend Inc. */ package jdocs.stream.operators; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.NotUsed; import akka.japi.function.Function2; //#zip //#zip-with //#zip-with-index //#or-else //#prepend //#concat //#interleave //#merge //#merge-sorted import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; import java.util.Arrays; //#merge-sorted //#merge //#interleave //#concat //#prepend //#or-else //#zip-with-index //#zip-with //#zip //#log import akka.stream.Attributes; import akka.stream.javadsl.Source; //#log import java.time.Duration; import java.util.Arrays; import java.util.Comparator; class SourceOrFlow { private static Materializer materializer = null; void logExample() { Flow.of(String.class) //#log .log("myStream") .addAttributes(Attributes.createLogLevels( Attributes.logLevelOff(), // onElement Attributes.logLevelError(), // onFailure Attributes.logLevelInfo())) // onFinish //#log ; } void zipWithIndexExample() { Materializer materializer = null; //#zip-with-index Source.from(Arrays.asList("apple", "orange", "banana")) .zipWithIndex() .runWith(Sink.foreach(System.out::print), materializer); // this will print ('apple', 0), ('orange', 1), ('banana', 2) //#zip-with-index } void zipExample() { //#zip Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); Source sourceFirstLetters = Source.from(Arrays.asList("A", "O", "B")); sourceFruits.zip(sourceFirstLetters).runWith(Sink.foreach(System.out::print), materializer); // this will print ('apple', 'A'), ('orange', 'O'), ('banana', 'B') //#zip } void zipWithExample() { //#zip-with Source sourceCount = Source.from(Arrays.asList("one", "two", "three")); Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); sourceCount.zipWith( sourceFruits, (Function2) (countStr, fruitName) -> countStr + " " + fruitName ).runWith(Sink.foreach(System.out::print), materializer); // this will print 'one apple', 'two orange', 'three banana' //#zip-with } void prependExample() { //#prepend Source ladies = Source.from(Arrays.asList("Emma", "Emily")); Source gentlemen = Source.from(Arrays.asList("Liam", "William")); gentlemen.prepend(ladies).runWith(Sink.foreach(System.out::print), materializer); // this will print "Emma", "Emily", "Liam", "William" //#prepend } void concatExample() { //#concat Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.concat(sourceB).runWith(Sink.foreach(System.out::print), materializer); //prints 1, 2, 3, 4, 10, 20, 30, 40 //#concat } void interleaveExample() { //#interleave Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.interleave(sourceB, 2).runWith(Sink.foreach(System.out::print), materializer); //prints 1, 2, 10, 20, 3, 4, 30, 40 //#interleave } void mergeExample() { //#merge Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.merge(sourceB).runWith(Sink.foreach(System.out::print), materializer); // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40 //#merge } void mergeSortedExample() { //#merge-sorted Source sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); Source sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); sourceA.mergeSorted(sourceB, Comparator.naturalOrder()).runWith(Sink.foreach(System.out::print), materializer); //prints 1, 2, 3, 4, 5, 6, 7, 8 Source sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); sourceA.mergeSorted(sourceC, Comparator.naturalOrder()).runWith(Sink.foreach(System.out::print), materializer); //prints 1, 3, 5, 7, 20, 1, 1, 1 //#merge-sorted } void orElseExample() { //#or-else Source source1 = Source.from(Arrays.asList("First source")); Source source2 = Source.from(Arrays.asList("Second source")); Source emptySource = Source.empty(); source1.orElse(source2).runWith(Sink.foreach(System.out::print), materializer); // this will print "First source" emptySource.orElse(source2).runWith(Sink.foreach(System.out::print), materializer); // this will print "Second source" //#or-else } void conflateExample() { //#conflate Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflate((Integer acc, Integer el) -> acc + el) .throttle(1, Duration.ofSeconds(1)); // slow downstream //#conflate } static //#conflateWithSeed-type class Summed { private final Integer el; public Summed(Integer el) { this.el = el; } public Summed sum(Summed other) { return new Summed(this.el + other.el); } } //#conflateWithSeed-type void conflateWithSeedExample() { //#conflateWithSeed Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el))) .throttle(1, Duration.ofSeconds(1)); // slow downstream //#conflateWithSeed } }