From ab2300c4cbc4f33ad317cf39254e62153b6cc54f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 14 Jan 2020 11:17:20 +0100 Subject: [PATCH] doc: example for splitWhen and splitAfter, #25468 (#28480) * doc: example for splitWhen and splitAfter, #25468 --- .../operators/Source-or-Flow/splitAfter.md | 13 +++ .../operators/Source-or-Flow/splitWhen.md | 14 +++ .../stream/operators/sourceorflow/Split.java | 108 ++++++++++++++++++ .../stream/operators/sourceorflow/Split.scala | 101 ++++++++++++++++ 4 files changed, 236 insertions(+) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md index 229c3bbd15..a71b8b3eb2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitAfter.md @@ -16,6 +16,19 @@ End the current substream whenever a predicate returns `true`, starting a new su End the current substream whenever a predicate returns `true`, starting a new substream for the next element. +## Example + +Given some time series data source we would like to split the stream into sub-streams for each second. +By using `sliding` we can compare the timestamp of the current and next element to decide when to split. + +Scala +: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala) { #splitAfter } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java) { #splitAfter } + +An alternative way of implementing this is shown in @ref:[splitWhen example](splitWhen.md#example). + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md index df0f131146..609afe8ca2 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/splitWhen.md @@ -16,6 +16,20 @@ Split off elements into a new substream whenever a predicate function return `tr Split off elements into a new substream whenever a predicate function return `true`. +## Example + +Given some time series data source we would like to split the stream into sub-streams for each second. +We need to compare the timestamp of the previous and current element to decide when to split. This +decision can be implemented in a `statefulMapConcat` operator preceding the `splitWhen`. + +Scala +: @@snip [Scan.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala) { #splitWhen } + +Java +: @@snip [SourceOrFlow.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java) { #splitWhen } + +An alternative way of implementing this is shown in @ref:[splitAfter example](splitAfter.md#example). + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java new file mode 100644 index 0000000000..f9530d3bb8 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Split.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.japi.function.Creator; +import akka.japi.function.Function; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Collections; + +public class Split { + public static void splitWhenExample(String[] args) { + ActorSystem system = ActorSystem.create(); + + // #splitWhen + Source.range(1, 100) + .throttle(1, Duration.ofMillis(100)) + .map(elem -> new Pair<>(elem, Instant.now())) + .statefulMapConcat( + () -> { + return new Function, Iterable>>() { + // stateful decision in statefulMapConcat + // keep track of time bucket (one per second) + LocalDateTime currentTimeBucket = + LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC); + + @Override + public Iterable> apply( + Pair elemTimestamp) { + LocalDateTime time = + LocalDateTime.ofInstant(elemTimestamp.second(), ZoneOffset.UTC); + LocalDateTime bucket = time.withNano(0); + boolean newBucket = !bucket.equals(currentTimeBucket); + if (newBucket) currentTimeBucket = bucket; + return Collections.singleton(new Pair<>(elemTimestamp.first(), newBucket)); + } + }; + }) + .splitWhen(elemDecision -> elemDecision.second()) // split when time bucket changes + .map(elemDecision -> elemDecision.first()) + .fold(0, (acc, notUsed) -> acc + 1) // sum + .to(Sink.foreach(System.out::println)) + .run(system); + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 7 + // #splitWhen + } + + public static void splitAfterExample(String[] args) { + ActorSystem system = ActorSystem.create(); + + // #splitAfter + Source.range(1, 100) + .throttle(1, Duration.ofMillis(100)) + .map(elem -> new Pair<>(elem, Instant.now())) + .sliding(2, 1) + .splitAfter( + slidingElements -> { + if (slidingElements.size() == 2) { + Pair current = slidingElements.get(0); + Pair next = slidingElements.get(1); + LocalDateTime currentBucket = + LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0); + LocalDateTime nextBucket = + LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0); + return !currentBucket.equals(nextBucket); + } else { + return false; + } + }) + .map(slidingElements -> slidingElements.get(0).first()) + .fold(0, (acc, notUsed) -> acc + 1) // sum + .to(Sink.foreach(System.out::println)) + .run(system); + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 6 + // note that the very last element is never included due to sliding, + // but that would not be problem for an infinite stream + // #splitAfter + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala new file mode 100644 index 0000000000..fed0a0fe6b --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Split.scala @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset + +import scala.concurrent.duration._ + +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +object Split { + def splitWhenExample(args: Array[String]): Unit = { + import akka.actor.ActorSystem + + implicit val system: ActorSystem = ActorSystem() + + //#splitWhen + Source(1 to 100) + .throttle(1, 100.millis) + .map(elem => (elem, Instant.now())) + .statefulMapConcat(() => { + // stateful decision in statefulMapConcat + // keep track of time bucket (one per second) + var currentTimeBucket = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC) + + { + case (elem, timestamp) => + val time = LocalDateTime.ofInstant(timestamp, ZoneOffset.UTC) + val bucket = time.withNano(0) + val newBucket = bucket != currentTimeBucket + if (newBucket) + currentTimeBucket = bucket + List((elem, newBucket)) + } + }) + .splitWhen(_._2) // split when time bucket changes + .map(_._1) + .fold(0)((acc, _) => acc + 1) // sum + .to(Sink.foreach(println)) + .run() + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 7 + //#splitWhen + } + + def splitAfterExample(args: Array[String]): Unit = { + import akka.actor.ActorSystem + + implicit val system: ActorSystem = ActorSystem() + + //#splitAfter + Source(1 to 100) + .throttle(1, 100.millis) + .map(elem => (elem, Instant.now())) + .sliding(2) + .splitAfter { slidingElements => + if (slidingElements.size == 2) { + val current = slidingElements.head + val next = slidingElements.tail.head + val currentBucket = LocalDateTime.ofInstant(current._2, ZoneOffset.UTC).withNano(0) + val nextBucket = LocalDateTime.ofInstant(next._2, ZoneOffset.UTC).withNano(0) + currentBucket != nextBucket + } else { + false + } + } + .map(_.head._1) + .fold(0)((acc, _) => acc + 1) // sum + .to(Sink.foreach(println)) + .run() + // 3 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 10 + // 6 + // note that the very last element is never included due to sliding, + // but that would not be problem for an infinite stream + //#splitAfter + } + +}