Extrapolate stage and expand redoc #23804

This commit is contained in:
mikolak-net 2018-03-27 10:39:25 +02:00 committed by Johan Andrén
parent 26f0f86088
commit 00068e2d1d
13 changed files with 670 additions and 82 deletions

View file

@ -4,6 +4,30 @@
package jdocs.stream;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.japi.tuple.Tuple3;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.TestSource;
import akka.testkit.TestLatch;
import akka.testkit.javadsl.TestKit;
import jdocs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.util.Random;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -13,27 +37,7 @@ import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.Stream;
import akka.NotUsed;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.japi.tuple.Tuple3;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.TestSource;
import akka.testkit.TestLatch;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import scala.util.Random;
import static org.junit.Assert.assertEquals;
public class RateTransformationDocTest extends AbstractJavaTest {
@ -105,11 +109,11 @@ public class RateTransformationDocTest extends AbstractJavaTest {
}
@Test
public void expandShouldRepeatLast() throws Exception {
//#expand-last
public void extrapolateShouldRepeatLast() throws Exception {
//#extrapolate-last
final Flow<Double, Double, NotUsed> lastFlow = Flow.of(Double.class)
.expand(in -> Stream.iterate(in, i -> i).iterator());
//#expand-last
.extrapolate(in -> Stream.iterate(in, i -> i).iterator());
//#extrapolate-last
final Pair<TestPublisher.Probe<Double>, CompletionStage<List<Double>>> probeFut = TestSource.<Double> probe(system)
.via(lastFlow)
@ -120,9 +124,64 @@ public class RateTransformationDocTest extends AbstractJavaTest {
final TestPublisher.Probe<Double> probe = probeFut.first();
final CompletionStage<List<Double>> fut = probeFut.second();
probe.sendNext(1.0);
final List<Double> expanded = fut.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(expanded.size(), 10);
assertEquals(expanded.stream().mapToDouble(d -> d).sum(), 10, 0.1);
final List<Double> extrapolated = fut.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(extrapolated.size(), 10);
assertEquals(extrapolated.stream().mapToDouble(d -> d).sum(), 10, 0.1);
}
@Test
public void extrapolateShouldSeedFirst() throws Exception {
//#extrapolate-seed
Double initial = 2.0;
final Flow<Double, Double, NotUsed> lastFlow = Flow.of(Double.class)
.extrapolate(in -> Stream.iterate(in, i -> i).iterator(), initial);
//#extrapolate-seed
final CompletionStage<List<Double>> fut = TestSource.<Double> probe(system)
.via(lastFlow)
.grouped(10)
.toMat(Sink.head(), Keep.right())
.run(mat);
final List<Double> extrapolated = fut.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(extrapolated.size(), 10);
assertEquals(extrapolated.stream().mapToDouble(d -> d).sum(), 10*initial, 0.1);
}
@Test
public void extrapolateShouldTrackDrift() throws Exception {
@SuppressWarnings("unused")
//#extrapolate-drift
final Flow<Double, Pair<Double, Integer>, NotUsed> driftFlow = Flow.of(Double.class)
.map(d -> new Pair<>(d, 0))
.extrapolate(d -> Stream.iterate(1, i -> i + 1).map(i -> new Pair<>(d.first(), i)).iterator());
//#extrapolate-drift
final TestLatch latch = new TestLatch(2, system);
final Flow<Double, Pair<Double, Integer>, NotUsed> realDriftFlow = Flow.of(Double.class)
.map(d -> {
latch.countDown();
return new Pair<>(d, 0);
})
.extrapolate(d -> { latch.countDown(); return Stream.iterate(1, i -> i + 1).map(i -> new Pair<>(d.first(), i)).iterator(); });
final Pair<TestPublisher.Probe<Double>, TestSubscriber.Probe<Pair<Double, Integer>>> pubSub = TestSource.<Double> probe(system)
.via(realDriftFlow)
.toMat(TestSink.<Pair<Double, Integer>> probe(system), Keep.both())
.run(mat);
final TestPublisher.Probe<Double> pub = pubSub.first();
final TestSubscriber.Probe<Pair<Double, Integer>> sub = pubSub.second();
sub.request(1);
pub.sendNext(1.0);
sub.expectNext(new Pair<>(1.0, 0));
sub.requestNext(new Pair<>(1.0, 1));
sub.requestNext(new Pair<>(1.0, 2));
pub.sendNext(2.0);
Await.ready(latch, Duration.create(1, TimeUnit.SECONDS));
sub.requestNext(new Pair<>(2.0, 0));
}
@Test