Merge branch 'master' into scalatest310

This commit is contained in:
Johannes Rudolph 2020-01-14 12:03:32 +01:00 committed by GitHub
commit ec208cad08
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 497 additions and 137 deletions

View file

@ -0,0 +1,108 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.japi.function.Creator;
import akka.japi.function.Function;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
public class Split {
public static void splitWhenExample(String[] args) {
ActorSystem system = ActorSystem.create();
// #splitWhen
Source.range(1, 100)
.throttle(1, Duration.ofMillis(100))
.map(elem -> new Pair<>(elem, Instant.now()))
.statefulMapConcat(
() -> {
return new Function<Pair<Integer, Instant>, Iterable<Pair<Integer, Boolean>>>() {
// stateful decision in statefulMapConcat
// keep track of time bucket (one per second)
LocalDateTime currentTimeBucket =
LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC);
@Override
public Iterable<Pair<Integer, Boolean>> apply(
Pair<Integer, Instant> elemTimestamp) {
LocalDateTime time =
LocalDateTime.ofInstant(elemTimestamp.second(), ZoneOffset.UTC);
LocalDateTime bucket = time.withNano(0);
boolean newBucket = !bucket.equals(currentTimeBucket);
if (newBucket) currentTimeBucket = bucket;
return Collections.singleton(new Pair<>(elemTimestamp.first(), newBucket));
}
};
})
.splitWhen(elemDecision -> elemDecision.second()) // split when time bucket changes
.map(elemDecision -> elemDecision.first())
.fold(0, (acc, notUsed) -> acc + 1) // sum
.to(Sink.foreach(System.out::println))
.run(system);
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 7
// #splitWhen
}
public static void splitAfterExample(String[] args) {
ActorSystem system = ActorSystem.create();
// #splitAfter
Source.range(1, 100)
.throttle(1, Duration.ofMillis(100))
.map(elem -> new Pair<>(elem, Instant.now()))
.sliding(2, 1)
.splitAfter(
slidingElements -> {
if (slidingElements.size() == 2) {
Pair<Integer, Instant> current = slidingElements.get(0);
Pair<Integer, Instant> next = slidingElements.get(1);
LocalDateTime currentBucket =
LocalDateTime.ofInstant(current.second(), ZoneOffset.UTC).withNano(0);
LocalDateTime nextBucket =
LocalDateTime.ofInstant(next.second(), ZoneOffset.UTC).withNano(0);
return !currentBucket.equals(nextBucket);
} else {
return false;
}
})
.map(slidingElements -> slidingElements.get(0).first())
.fold(0, (acc, notUsed) -> acc + 1) // sum
.to(Sink.foreach(System.out::println))
.run(system);
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 6
// note that the very last element is never included due to sliding,
// but that would not be problem for an infinite stream
// #splitAfter
}
}

View file

@ -0,0 +1,101 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset
import scala.concurrent.duration._
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Split {
def splitWhenExample(args: Array[String]): Unit = {
import akka.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem()
//#splitWhen
Source(1 to 100)
.throttle(1, 100.millis)
.map(elem => (elem, Instant.now()))
.statefulMapConcat(() => {
// stateful decision in statefulMapConcat
// keep track of time bucket (one per second)
var currentTimeBucket = LocalDateTime.ofInstant(Instant.ofEpochMilli(0), ZoneOffset.UTC)
{
case (elem, timestamp) =>
val time = LocalDateTime.ofInstant(timestamp, ZoneOffset.UTC)
val bucket = time.withNano(0)
val newBucket = bucket != currentTimeBucket
if (newBucket)
currentTimeBucket = bucket
List((elem, newBucket))
}
})
.splitWhen(_._2) // split when time bucket changes
.map(_._1)
.fold(0)((acc, _) => acc + 1) // sum
.to(Sink.foreach(println))
.run()
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 7
//#splitWhen
}
def splitAfterExample(args: Array[String]): Unit = {
import akka.actor.ActorSystem
implicit val system: ActorSystem = ActorSystem()
//#splitAfter
Source(1 to 100)
.throttle(1, 100.millis)
.map(elem => (elem, Instant.now()))
.sliding(2)
.splitAfter { slidingElements =>
if (slidingElements.size == 2) {
val current = slidingElements.head
val next = slidingElements.tail.head
val currentBucket = LocalDateTime.ofInstant(current._2, ZoneOffset.UTC).withNano(0)
val nextBucket = LocalDateTime.ofInstant(next._2, ZoneOffset.UTC).withNano(0)
currentBucket != nextBucket
} else {
false
}
}
.map(_.head._1)
.fold(0)((acc, _) => acc + 1) // sum
.to(Sink.foreach(println))
.run()
// 3
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 10
// 6
// note that the very last element is never included due to sliding,
// but that would not be problem for an infinite stream
//#splitAfter
}
}