/* * Copyright (C) 2018-2019 Lightbend Inc. */ package jdocs.stream.operators; import akka.actor.ActorSystem; import akka.japi.pf.PFBuilder; import akka.stream.javadsl.Flow; import akka.NotUsed; import akka.japi.function.Function2; // #zip // #zip-with // #zip-with-index // #or-else // #prepend // #concat // #interleave // #merge // #merge-sorted import akka.stream.javadsl.Source; import akka.stream.javadsl.Sink; import java.util.Arrays; // #merge-sorted // #merge // #interleave // #concat // #prepend // #or-else // #zip-with-index // #zip-with // #zip // #log import akka.stream.Attributes; // #log import java.time.Duration; import java.util.Arrays; import java.util.Comparator; class SourceOrFlow { private static ActorSystem system = null; void logExample() { Flow.of(String.class) // #log .log("myStream") .addAttributes( Attributes.createLogLevels( Attributes.logLevelOff(), // onElement Attributes.logLevelInfo(), // onFinish Attributes.logLevelError())) // onFailure // #log ; } void zipWithIndexExample() { // #zip-with-index Source.from(Arrays.asList("apple", "orange", "banana")) .zipWithIndex() .runWith(Sink.foreach(System.out::print), system); // this will print ('apple', 0), ('orange', 1), ('banana', 2) // #zip-with-index } void zipExample() { // #zip Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); Source sourceFirstLetters = Source.from(Arrays.asList("A", "O", "B")); sourceFruits.zip(sourceFirstLetters).runWith(Sink.foreach(System.out::print), system); // this will print ('apple', 'A'), ('orange', 'O'), ('banana', 'B') // #zip } void zipWithExample() { // #zip-with Source sourceCount = Source.from(Arrays.asList("one", "two", "three")); Source sourceFruits = Source.from(Arrays.asList("apple", "orange", "banana")); sourceCount .zipWith( sourceFruits, (Function2) (countStr, fruitName) -> countStr + " " + fruitName) .runWith(Sink.foreach(System.out::print), system); // this will print 'one apple', 'two orange', 'three banana' // #zip-with } void prependExample() { // #prepend Source ladies = Source.from(Arrays.asList("Emma", "Emily")); Source gentlemen = Source.from(Arrays.asList("Liam", "William")); gentlemen.prepend(ladies).runWith(Sink.foreach(System.out::print), system); // this will print "Emma", "Emily", "Liam", "William" // #prepend } void concatExample() { // #concat Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.concat(sourceB).runWith(Sink.foreach(System.out::print), system); // prints 1, 2, 3, 4, 10, 20, 30, 40 // #concat } void interleaveExample() { // #interleave Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.interleave(sourceB, 2).runWith(Sink.foreach(System.out::print), system); // prints 1, 2, 10, 20, 3, 4, 30, 40 // #interleave } void mergeExample() { // #merge Source sourceA = Source.from(Arrays.asList(1, 2, 3, 4)); Source sourceB = Source.from(Arrays.asList(10, 20, 30, 40)); sourceA.merge(sourceB).runWith(Sink.foreach(System.out::print), system); // merging is not deterministic, can for example print 1, 2, 3, 4, 10, 20, 30, 40 // #merge } void mergeSortedExample() { // #merge-sorted Source sourceA = Source.from(Arrays.asList(1, 3, 5, 7)); Source sourceB = Source.from(Arrays.asList(2, 4, 6, 8)); sourceA .mergeSorted(sourceB, Comparator.naturalOrder()) .runWith(Sink.foreach(System.out::print), system); // prints 1, 2, 3, 4, 5, 6, 7, 8 Source sourceC = Source.from(Arrays.asList(20, 1, 1, 1)); sourceA .mergeSorted(sourceC, Comparator.naturalOrder()) .runWith(Sink.foreach(System.out::print), system); // prints 1, 3, 5, 7, 20, 1, 1, 1 // #merge-sorted } void orElseExample() { // #or-else Source source1 = Source.from(Arrays.asList("First source")); Source source2 = Source.from(Arrays.asList("Second source")); Source emptySource = Source.empty(); source1.orElse(source2).runWith(Sink.foreach(System.out::print), system); // this will print "First source" emptySource.orElse(source2).runWith(Sink.foreach(System.out::print), system); // this will print "Second source" // #or-else } void conflateExample() { // #conflate Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflate((Integer acc, Integer el) -> acc + el) .throttle(1, Duration.ofSeconds(1)); // slow downstream // #conflate } void scanExample() { // #scan Source source = Source.range(1, 5); source.scan(0, (acc, x) -> acc + x).runForeach(System.out::println, system); // 0 (= 0) // 1 (= 0 + 1) // 3 (= 0 + 1 + 2) // 6 (= 0 + 1 + 2 + 3) // 10 (= 0 + 1 + 2 + 3 + 4) // 15 (= 0 + 1 + 2 + 3 + 4 + 5) // #scan } static // #conflateWithSeed-type class Summed { private final Integer el; public Summed(Integer el) { this.el = el; } public Summed sum(Summed other) { return new Summed(this.el + other.el); } } // #conflateWithSeed-type void conflateWithSeedExample() { // #conflateWithSeed Source.cycle(() -> Arrays.asList(1, 10, 100).iterator()) .throttle(10, Duration.ofSeconds(1)) // fast upstream .conflateWithSeed(Summed::new, (Summed acc, Integer el) -> acc.sum(new Summed(el))) .throttle(1, Duration.ofSeconds(1)); // slow downstream // #conflateWithSeed } // #collect-elements static interface Message {} static class Ping implements Message { final int id; Ping(int id) { this.id = id; } } static class Pong { final int id; Pong(int id) { this.id = id; } } // #collect-elements void collectExample() { // #collect Flow flow = Flow.of(Message.class) .collect( new PFBuilder() .match(Ping.class, p -> p.id != 0, p -> new Pong(p.id)) .build()); // #collect } void collectTypeExample() { // #collectType Flow flow = Flow.of(Message.class) .collectType(Ping.class) .filter(p -> p.id != 0) .map(p -> new Pong(p.id)); // #collectType } void groupedExample() { // #grouped Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7)) .grouped(3) .runForeach(System.out::println, system); // [1, 2, 3] // [4, 5, 6] // [7] Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7)) .grouped(3) .map(g -> g.stream().reduce(0, Integer::sum)) .runForeach(System.out::println, system); // 6 (= 1 + 2 + 3) // 15 (= 4 + 5 + 6) // 7 (= 7) // #grouped } static // #fold class Histogram { final long low; final long high; private Histogram(long low, long high) { this.low = low; this.high = high; } // Immutable start value public static Histogram INSTANCE = new Histogram(0L, 0L); public Histogram add(int number) { if (number < 100) { return new Histogram(low + 1L, high); } else { return new Histogram(low, high + 1L); } } } // #fold void foldExample() { // #fold // Folding over the numbers from 1 to 150: Source.range(1, 150) .fold(Histogram.INSTANCE, (acc, n) -> acc.add(n)) .runForeach(h -> System.out.println("Histogram(" + h.low + ", " + h.high + ")"), system); // Prints: Histogram(99, 51) // #fold } void takeExample() { // #take Source.from(Arrays.asList(1, 2, 3, 4, 5)).take(3).runForeach(System.out::println, system); // this will print: // 1 // 2 // 3 // #take } void takeWhileExample() { // #take-while Source.from(Arrays.asList(1, 2, 3, 4, 5)) .takeWhile(i -> i < 3) .runForeach(System.out::println, system); // this will print: // 1 // 2 // #take-while } }