diff --git a/akka-docs/src/main/paradox/stream/stages-overview.md b/akka-docs/src/main/paradox/stream/stages-overview.md index ae7acaf3df..f0a6ee7f8e 100644 --- a/akka-docs/src/main/paradox/stream/stages-overview.md +++ b/akka-docs/src/main/paradox/stream/stages-overview.md @@ -1442,10 +1442,34 @@ aggregated to the batched value. --------------------------------------------------------------- +### extrapolate + +Allow for a faster downstream by expanding the last emitted element to an `Iterator`. For example, an +`Iterator.continually(element)` will cause `extrapolate` to keep repeating the last emitted element. + +All original elements are always emitted unchanged - the `Iterator` is only used whenever there is downstream + demand before upstream emits a new element. + +Includes an optional `initial` argument to prevent blocking the entire stream when there are multiple producers. + +See @ref:[Understanding extrapolate and expand](stream-rate.md#understanding-extrapolate-and-expand) for more information +and examples. + +**emits** when downstream stops backpressuring + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +--------------------------------------------------------------- + ### expand -Allow for a faster downstream by expanding the last incoming element to an `Iterator`. For example -`Iterator.continually(element)` to keep repeating the last incoming element. +Like `extrapolate`, but does not have the `initial` argument, and the `Iterator` is also used in lieu of the original +element, allowing for it to be rewritten and/or filtered. + +See @ref:[Understanding extrapolate and expand](stream-rate.md#understanding-extrapolate-and-expand) for more information +and examples. **emits** when downstream stops backpressuring diff --git a/akka-docs/src/main/paradox/stream/stream-rate.md b/akka-docs/src/main/paradox/stream/stream-rate.md index 1898cda50f..ce5561b76d 100644 --- a/akka-docs/src/main/paradox/stream/stream-rate.md +++ b/akka-docs/src/main/paradox/stream/stream-rate.md @@ -203,22 +203,40 @@ Scala Java : @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #conflate-sample } -### Understanding expand +### Understanding extrapolate and expand -Expand helps to deal with slow producers which are unable to keep up with the demand coming from consumers. -Expand allows to extrapolate a value to be sent as an element to a consumer. +Now we will discuss two stages, `extrapolate` and `expand`, helping to deal with slow producers that are unable to keep +up with the demand coming from consumers. +They allow for additional values to be sent as elements to a consumer. -As a simple use of `expand` here is a flow that sends the same element to consumer when producer does not send -any new elements. +As a simple use case of `extrapolate`, here is a flow that repeats the last emitted element to a consumer, whenever +the consumer signals demand and the producer cannot supply new elements yet. Scala -: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #expand-last } +: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #extrapolate-last } Java -: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #expand-last } +: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-last } -Expand also allows to keep some state between demand requests from the downstream. Leveraging this, here is a flow -that tracks and reports a drift between fast consumer and slow producer. +For situations where there may be downstream demand before any element is emitted from upstream, +you can use the `initial` parameter of `extrapolate` to "seed" the stream. + +Scala +: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #extrapolate-seed } + +Java +: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-seed } + +`extrapolate` and `expand` also allow to produce metainformation based on demand signalled from the downstream. +Leveraging this, here is a flow that tracks and reports a drift between a fast consumer and a slow producer. + +Scala +: @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #extrapolate-drift } + +Java +: @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #extrapolate-drift } + +And here's a more concise representation with `expand`. Scala : @@snip [RateTransformationDocSpec.scala]($code$/scala/docs/stream/RateTransformationDocSpec.scala) { #expand-drift } @@ -226,5 +244,12 @@ Scala Java : @@snip [RateTransformationDocTest.java]($code$/java/jdocs/stream/RateTransformationDocTest.java) { #expand-drift } -Note that all of the elements coming from upstream will go through `expand` at least once. This means that the -output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise. \ No newline at end of file +The difference is due to the different handling of the `Iterator`-generating argument. + +While `extrapolate` uses an `Iterator` only when there is unmet downstream demand, `expand` _always_ creates +an `Iterator` and emits elements downstream from it. + +This makes `expand` able to transform or even filter out (by providing an empty `Iterator`) the "original" elements. + +Regardless, since we provide a non-empty `Iterator` in both examples, this means that the +output of this flow is going to report a drift of zero if the producer is fast enough - or a larger drift otherwise. \ No newline at end of file diff --git a/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java b/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java index 554fe34e04..ff03abd2f8 100644 --- a/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/RateTransformationDocTest.java @@ -4,6 +4,30 @@ package jdocs.stream; +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.japi.Pair; +import akka.japi.tuple.Tuple3; +import akka.stream.ActorMaterializer; +import akka.stream.Materializer; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Keep; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.testkit.TestPublisher; +import akka.stream.testkit.TestSubscriber; +import akka.stream.testkit.javadsl.TestSink; +import akka.stream.testkit.javadsl.TestSource; +import akka.testkit.TestLatch; +import akka.testkit.javadsl.TestKit; +import jdocs.AbstractJavaTest; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; +import scala.util.Random; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -13,27 +37,7 @@ import java.util.stream.Collectors; import java.util.stream.DoubleStream; import java.util.stream.Stream; -import akka.NotUsed; -import jdocs.AbstractJavaTest; -import akka.testkit.javadsl.TestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.*; - -import akka.actor.ActorSystem; -import akka.japi.Pair; -import akka.japi.tuple.Tuple3; -import akka.stream.*; -import akka.stream.javadsl.*; -import akka.stream.testkit.TestPublisher; -import akka.stream.testkit.TestSubscriber; -import akka.stream.testkit.javadsl.TestSink; -import akka.stream.testkit.javadsl.TestSource; -import akka.testkit.TestLatch; -import scala.concurrent.Await; -import scala.concurrent.duration.Duration; -import scala.util.Random; +import static org.junit.Assert.assertEquals; public class RateTransformationDocTest extends AbstractJavaTest { @@ -105,11 +109,11 @@ public class RateTransformationDocTest extends AbstractJavaTest { } @Test - public void expandShouldRepeatLast() throws Exception { - //#expand-last + public void extrapolateShouldRepeatLast() throws Exception { + //#extrapolate-last final Flow lastFlow = Flow.of(Double.class) - .expand(in -> Stream.iterate(in, i -> i).iterator()); - //#expand-last + .extrapolate(in -> Stream.iterate(in, i -> i).iterator()); + //#extrapolate-last final Pair, CompletionStage>> probeFut = TestSource. probe(system) .via(lastFlow) @@ -120,9 +124,64 @@ public class RateTransformationDocTest extends AbstractJavaTest { final TestPublisher.Probe probe = probeFut.first(); final CompletionStage> fut = probeFut.second(); probe.sendNext(1.0); - final List expanded = fut.toCompletableFuture().get(1, TimeUnit.SECONDS); - assertEquals(expanded.size(), 10); - assertEquals(expanded.stream().mapToDouble(d -> d).sum(), 10, 0.1); + final List extrapolated = fut.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertEquals(extrapolated.size(), 10); + assertEquals(extrapolated.stream().mapToDouble(d -> d).sum(), 10, 0.1); + } + + @Test + public void extrapolateShouldSeedFirst() throws Exception { + //#extrapolate-seed + Double initial = 2.0; + final Flow lastFlow = Flow.of(Double.class) + .extrapolate(in -> Stream.iterate(in, i -> i).iterator(), initial); + //#extrapolate-seed + + final CompletionStage> fut = TestSource. probe(system) + .via(lastFlow) + .grouped(10) + .toMat(Sink.head(), Keep.right()) + .run(mat); + + final List extrapolated = fut.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertEquals(extrapolated.size(), 10); + assertEquals(extrapolated.stream().mapToDouble(d -> d).sum(), 10*initial, 0.1); + } + + @Test + public void extrapolateShouldTrackDrift() throws Exception { + @SuppressWarnings("unused") + //#extrapolate-drift + final Flow, NotUsed> driftFlow = Flow.of(Double.class) + .map(d -> new Pair<>(d, 0)) + .extrapolate(d -> Stream.iterate(1, i -> i + 1).map(i -> new Pair<>(d.first(), i)).iterator()); + //#extrapolate-drift + final TestLatch latch = new TestLatch(2, system); + final Flow, NotUsed> realDriftFlow = Flow.of(Double.class) + .map(d -> { + latch.countDown(); + return new Pair<>(d, 0); + }) + .extrapolate(d -> { latch.countDown(); return Stream.iterate(1, i -> i + 1).map(i -> new Pair<>(d.first(), i)).iterator(); }); + + final Pair, TestSubscriber.Probe>> pubSub = TestSource. probe(system) + .via(realDriftFlow) + .toMat(TestSink.> probe(system), Keep.both()) + .run(mat); + + final TestPublisher.Probe pub = pubSub.first(); + final TestSubscriber.Probe> sub = pubSub.second(); + + sub.request(1); + pub.sendNext(1.0); + sub.expectNext(new Pair<>(1.0, 0)); + + sub.requestNext(new Pair<>(1.0, 1)); + sub.requestNext(new Pair<>(1.0, 2)); + + pub.sendNext(2.0); + Await.ready(latch, Duration.create(1, TimeUnit.SECONDS)); + sub.requestNext(new Pair<>(2.0, 0)); } @Test diff --git a/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala index 7af7a19635..c0ff88e233 100644 --- a/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala @@ -58,11 +58,11 @@ class RateTransformationDocSpec extends AkkaSpec { fut.futureValue } - "expand should repeat last" in { - //#expand-last + "extrapolate should repeat last" in { + //#extrapolate-last val lastFlow = Flow[Double] - .expand(Iterator.continually(_)) - //#expand-last + .extrapolate(Iterator.continually(_)) + //#extrapolate-last val (probe, fut) = TestSource.probe[Double] .via(lastFlow) @@ -71,9 +71,52 @@ class RateTransformationDocSpec extends AkkaSpec { .run() probe.sendNext(1.0) - val expanded = fut.futureValue - expanded.size shouldBe 10 - expanded.sum shouldBe 10 + val extrapolated = fut.futureValue + extrapolated.size shouldBe 10 + extrapolated.sum shouldBe 10 + } + + "extrapolate should send seed first" in { + //#extrapolate-seed + val initial = 2.0 + val seedFlow = Flow[Double] + .extrapolate(Iterator.continually(_), Some(initial)) + //#extrapolate-seed + + val fut = TestSource.probe[Double] + .via(seedFlow) + .grouped(10) + .runWith(Sink.head) + + val extrapolated = Await.result(fut, 100.millis) + extrapolated.size shouldBe 10 + extrapolated.sum shouldBe 10 * initial + } + + "extrapolate should track drift" in { + //#extrapolate-drift + val driftFlow = Flow[Double].map(_ -> 0) + .extrapolate[(Double, Int)] { case (i, _) ⇒ Iterator.from(1).map(i -> _) } + //#extrapolate-drift + val latch = TestLatch(2) + val realDriftFlow = Flow[Double].map(d ⇒ { latch.countDown(); d -> 0; }) + .extrapolate[(Double, Int)] { case (d, _) ⇒ latch.countDown(); Iterator.from(1).map(d -> _) } + + val (pub, sub) = TestSource.probe[Double] + .via(realDriftFlow) + .toMat(TestSink.probe[(Double, Int)])(Keep.both) + .run() + + sub.request(1) + pub.sendNext(1.0) + sub.expectNext((1.0, 0)) + + sub.requestNext((1.0, 1)) + sub.requestNext((1.0, 2)) + + pub.sendNext(2.0) + Await.ready(latch, 1.second) + sub.requestNext((2.0, 0)) } "expand should track drift" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 6f8c8ef17a..4bdeb4d119 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -163,7 +163,7 @@ class InterpreterSupervisionSpec extends StreamSpec with GraphInterpreterSpecKit lastEvents() should be(Set(OnError(TE), Cancel)) } - "fail when Expand `extrapolate` throws" in new OneBoundedSetup[Int]( + "fail when Expand `expander` throws" in new OneBoundedSetup[Int]( new Expand((in: Int) ⇒ if (in == 2) Iterator.continually(throw TE) else Iterator(in) ++ Iterator.continually(-math.abs(in)))) { lastEvents() should be(Set(RequestOne)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index 8f5fc2f1af..0605cf5712 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -29,7 +29,7 @@ class FlowExpandSpec extends StreamSpec { val subscriber = TestSubscriber.probe[Int]() // Simply repeat the last element as an extrapolation step - Source.fromPublisher(publisher).expand(Iterator.continually(_)).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).expand(Iterator.single).to(Sink.fromSubscriber(subscriber)).run() for (i ← 1 to 100) { // Order is important here: If the request comes first it will be extrapolated! @@ -49,7 +49,7 @@ class FlowExpandSpec extends StreamSpec { publisher.sendNext(42) - for (i ← 1 to 100) { + for (_ ← 1 to 100) { subscriber.requestNext(42) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExtrapolateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExtrapolateSpec.scala new file mode 100644 index 0000000000..8ce1427f38 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExtrapolateSpec.scala @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import java.util.concurrent.ThreadLocalRandom + +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.{ TestSink, TestSource } +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowExtrapolateSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = ActorMaterializer(settings) + + "Extrapolate" must { + + "pass-through elements unchanged when there is no rate difference" in { + // Shadow the fuzzed materializer (see the ordering guarantee needed by the for loop below). + implicit val materializer = ActorMaterializer(settings.withFuzzing(false)) + + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + // Provide an empty stream + Source.fromPublisher(publisher).extrapolate(_ ⇒ Iterator.empty).to(Sink.fromSubscriber(subscriber)).run() + + for (i ← 1 to 100) { + // Order is important here: If the request comes first it will be extrapolated! + publisher.sendNext(i) + subscriber.requestNext(i) + } + + subscriber.cancel() + } + + "extrapolate from elements while upstream is silent" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + // Simply repeat the last element as an extrapolation step + Source.fromPublisher(publisher).extrapolate(e ⇒ Iterator.continually(e + 1)).to(Sink.fromSubscriber(subscriber)).run() + + publisher.sendNext(42) + subscriber.requestNext(42) + + for (_ ← 1 to 100) { + subscriber.requestNext(42 + 1) + } + + publisher.sendNext(-42) + + // The request below is otherwise in race with the above sendNext + subscriber.expectNoMsg(500.millis) + subscriber.requestNext(-42) + + subscriber.cancel() + } + + "always emit the initial element first" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + val testInit = 44 + + // Simply repeat the last element as an extrapolation step + Source.fromPublisher(publisher).extrapolate(Iterator.continually(_), initial = Some(testInit)).to(Sink.fromSubscriber(subscriber)).run() + + publisher.sendNext(42) + subscriber.requestNext(testInit) + subscriber.requestNext(42) + + subscriber.cancel() + } + + "do not drop the last element" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + // Simply repeat the last element as an extrapolation step + Source.fromPublisher(publisher).extrapolate(_ ⇒ Iterator.empty).to(Sink.fromSubscriber(subscriber)).run() + + publisher.sendNext(1) + subscriber.requestNext(1) + + publisher.sendNext(2) + publisher.sendComplete() + + // The request below is otherwise in race with the above sendNext(2) (and completion) + subscriber.expectNoMsg(500.millis) + + subscriber.requestNext(2) + subscriber.expectComplete() + } + + "work on a variable rate chain" in { + val future = Source(1 to 100) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .extrapolate(Iterator.continually(_)) + .runFold(Set.empty[Int])(_ + _) + + Await.result(future, 10.seconds) should contain theSameElementsAs (1 to 100).toSet + } + + "backpressure publisher when subscriber is slower" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source.fromPublisher(publisher).extrapolate(Iterator.continually(_)).to(Sink.fromSubscriber(subscriber)).run() + + publisher.sendNext(1) + subscriber.requestNext(1) + subscriber.requestNext(1) + + var pending = publisher.pending + // Deplete pending requests coming from input buffer + while (pending > 0) { + publisher.unsafeSendNext(2) + pending -= 1 + } + + // The above sends are absorbed in the input buffer, and will result in two one-sized batch requests + pending += publisher.expectRequest() + pending += publisher.expectRequest() + while (pending > 0) { + publisher.unsafeSendNext(2) + pending -= 1 + } + + publisher.expectNoMsg(1.second) + + subscriber.request(2) + subscriber.expectNext(2) + subscriber.expectNext(2) + + // Now production is resumed + publisher.expectRequest() + + } + + "work properly with finite extrapolations" in { + val (source, sink) = + TestSource.probe[Int] + .expand(i ⇒ Iterator.from(0).map(i → _).take(3)) + .toMat(TestSink.probe)(Keep.both) + .run() + source + .sendNext(1) + sink + .request(4) + .expectNext(1 → 0, 1 → 1, 1 → 2) + .expectNoMsg(100.millis) + source + .sendNext(2) + .sendComplete() + sink + .expectNext(2 → 0) + .expectComplete() + } + } + +} diff --git a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes index 1313933f9e..e64a8b455e 100644 --- a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes @@ -8,3 +8,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.LazySink.th # #24581 RS violation ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.VirtualProcessor$Both") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProcessor#Both.create") + +# #23804 Added extrapolate stage +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.extrapolate$default$2") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b777b3dc16..8d99105dff 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -20,6 +20,7 @@ import akka.actor.ActorRef import akka.dispatch.ExecutionContexts import akka.stream.impl.fusing.LazyFlow +import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ import scala.reflect.ClassTag @@ -1556,7 +1557,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr new Flow(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** - * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * @@ -1565,7 +1566,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * subscriber. * * Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. - * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * Exceptions from the `expander` function will complete the stream with failure. + * + * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * * '''Emits when''' downstream stops backpressuring * @@ -1575,12 +1578,67 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * @param seed Provides the first state for extrapolation using the first unconsumed element - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. + * @param expander Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + * @see [[#extrapolate]] */ - def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): javadsl.Flow[In, U, Mat] = - new Flow(delegate.expand(in ⇒ extrapolate(in).asScala)) + def expand[U](expander: function.Function[Out, java.util.Iterator[U]]): javadsl.Flow[In, U, Mat] = + new Flow(delegate.expand(in ⇒ expander(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator Takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.extrapolate(in ⇒ extrapolator(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator Takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @param initial The initial element to be emitted, in case upstream is able to stall the entire stream. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]], initial: Out @uncheckedVariance): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.extrapolate(in ⇒ extrapolator(in).asScala, Some(initial))) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index f49a71e4f0..9d747ccf43 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -2113,7 +2113,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * subscriber. * * Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. - * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * Exceptions from the `expander` function will complete the stream with failure. + * + * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * * '''Emits when''' downstream stops backpressuring * @@ -2123,11 +2125,67 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. + * @param expander Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + * @see [[#extrapolate]] */ - def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): javadsl.Source[U, Mat] = - new Source(delegate.expand(in ⇒ extrapolate(in).asScala)) + def expand[U](expander: function.Function[Out, java.util.Iterator[U]]): javadsl.Source[U, Mat] = + new Source(delegate.expand(in ⇒ expander(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator Takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]]): Source[Out, Mat] = + new Source(delegate.extrapolate(in ⇒ extrapolator(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @param initial the initial element to be emitted, in case upstream is able to stall the entire stream. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]], initial: Out @uncheckedVariance): Source[Out, Mat] = + new Source(delegate.extrapolate(in ⇒ extrapolator(in).asScala, Some(initial))) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index a35de25c93..615389ef55 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1118,7 +1118,7 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I new SubFlow(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** - * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * @@ -1127,7 +1127,9 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * subscriber. * * Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. - * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * Exceptions from the `expander` function will complete the stream with failure. + * + * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * * '''Emits when''' downstream stops backpressuring * @@ -1137,12 +1139,68 @@ class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[I * * '''Cancels when''' downstream cancels * - * @param seed Provides the first state for extrapolation using the first unconsumed element - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. + * @param expander Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + * @see [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" + * element. */ - def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): SubFlow[In, U, Mat] = - new SubFlow(delegate.expand(in ⇒ extrapolate(in).asScala)) + def expand[U](expander: function.Function[Out, java.util.Iterator[U]]): SubFlow[In, U, Mat] = + new SubFlow(delegate.expand(in ⇒ expander(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.extrapolate(in ⇒ extrapolator(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @param initial the initial element to be emitted, in case upstream is able to stall the entire stream. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]], initial: Out @uncheckedVariance): SubFlow[In, Out, Mat] = + new SubFlow(delegate.extrapolate(in ⇒ extrapolator(in).asScala, Some(initial))) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 970a9626a6..185ad2fc2e 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1103,7 +1103,7 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O new SubSource(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** - * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * @@ -1112,7 +1112,9 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * subscriber. * * Expand does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. - * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * Exceptions from the `expander` function will complete the stream with failure. + * + * See also [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" element. * * '''Emits when''' downstream stops backpressuring * @@ -1122,11 +1124,67 @@ class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[O * * '''Cancels when''' downstream cancels * - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. + * @param expander Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + * @see [[#extrapolate]] */ - def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): SubSource[U, Mat] = - new SubSource(delegate.expand(in ⇒ extrapolate(in).asScala)) + def expand[U](expander: function.Function[Out, java.util.Iterator[U]]): SubSource[U, Mat] = + new SubSource(delegate.expand(in ⇒ expander(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]]): SubSource[Out, Mat] = + new SubSource(delegate.extrapolate(in ⇒ extrapolator(in).asScala)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision#restart]] and [[akka.stream.Supervision#resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * See also [[#expand]] for a version that can overwrite the original element. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @param initial the initial element to be emitted, in case upstream is able to stall the entire stream. + * @see [[#expand]] + */ + def extrapolate(extrapolator: function.Function[Out @uncheckedVariance, java.util.Iterator[Out @uncheckedVariance]], initial: Out @uncheckedVariance): SubSource[Out, Mat] = + new SubSource(delegate.extrapolate(in ⇒ extrapolator(in).asScala, Some(initial))) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ca5cd50a25..7f6313bc1d 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1632,7 +1632,7 @@ trait FlowOps[+Out, +Mat] { via(Batch(max, costFn, seed, aggregate).withAttributes(DefaultAttributes.batchWeighted)) /** - * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older + * Allows a faster downstream to progress independently of a slower upstream by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * @@ -1641,7 +1641,7 @@ trait FlowOps[+Out, +Mat] { * subscriber. * * Expand does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]]. - * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. + * Exceptions from the `seed` function will complete the stream with failure. * * '''Emits when''' downstream stops backpressuring * @@ -1651,11 +1651,43 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * @param seed Provides the first state for extrapolation using the first unconsumed element - * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation - * state. + * @param expander Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + * @see [[#extrapolate]] for a version that always preserves the original element and allows for an initial "startup" + * element. */ - def expand[U](extrapolate: Out ⇒ Iterator[U]): Repr[U] = via(new Expand(extrapolate)) + def expand[U](expander: Out ⇒ Iterator[U]): Repr[U] = via(new Expand(expander)) + + /** + * Allows a faster downstream to progress independent of a slower upstream. + * + * This is achieved by introducing "extrapolated" elements - based on those from upstream - whenever downstream + * signals demand. + * + * Extrapolate does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]]. + * Exceptions from the `extrapolate` function will complete the stream with failure. + * + * '''Emits when''' downstream stops backpressuring, AND EITHER upstream emits OR initial element is present OR + * `extrapolate` is non-empty and applicable + * + * '''Backpressures when''' downstream backpressures or current `extrapolate` runs empty + * + * '''Completes when''' upstream completes and current `extrapolate` runs empty + * + * '''Cancels when''' downstream cancels + * + * @param extrapolator takes the current upstream element and provides a sequence of "extrapolated" elements based + * on the original, to be emitted in case downstream signals demand. + * @param initial the initial element to be emitted, in case upstream is able to stall the entire stream. + * @see [[#expand]] for a version that can overwrite the original element. + */ + def extrapolate[U >: Out](extrapolator: U ⇒ Iterator[U], initial: Option[U] = None): Repr[U] = { + val expandArg = (u: U) ⇒ Iterator.single(u) ++ extrapolator(u) + + val expandStep = new Expand[U, U](expandArg) + + initial.map(e ⇒ prepend(Source.single(e)).via(expandStep)).getOrElse(via(expandStep)) + } /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.