diff --git a/akka-docs/rst/general/stream/stages-overview.rst b/akka-docs/rst/general/stream/stages-overview.rst index 5c1a4c39ab..b0ff4cdb9f 100644 --- a/akka-docs/rst/general/stream/stages-overview.rst +++ b/akka-docs/rst/general/stream/stages-overview.rst @@ -85,6 +85,7 @@ downstreams and able to adapt their behavior to that signal. Stage Emits when Backpressures when Completes when ===================== ========================================================================================================================= ==================================================================================================================================== ===================================================================================== conflate downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes +conflateWithSeed downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes batch downstream stops backpressuring and there is a batched element available batched elements reached the max limit of allowed batched elements & downstream backpressures upstream completes and a "possibly pending" element was drained [3]_ batchWeighted downstream stops backpressuring and there is a batched element available batched elements reached the max weight limit of allowed batched elements (plus a pending element [3]_ ) & downstream backpressures upstream completes and a "possibly pending" element was drained [3]_ expand downstream stops backpressuring downstream backpressures upstream completes diff --git a/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java b/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java index 99426ad81b..ceb36117f3 100644 --- a/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java @@ -60,7 +60,7 @@ public class RateTransformationDocTest { //#conflate-summarize final Flow, NotUsed> statsFlow = Flow.of(Double.class) - .conflate(elem -> Collections.singletonList(elem), (acc, elem) -> { + .conflateWithSeed(elem -> Collections.singletonList(elem), (acc, elem) -> { return Stream .concat(acc.stream(), Collections.singletonList(elem).stream()) .collect(Collectors.toList()); @@ -86,7 +86,7 @@ public class RateTransformationDocTest { //#conflate-sample final Double p = 0.01; final Flow sampleFlow = Flow.of(Double.class) - .conflate(elem -> Collections.singletonList(elem), (acc, elem) -> { + .conflateWithSeed(elem -> Collections.singletonList(elem), (acc, elem) -> { if (r.nextDouble() < p) { return Stream .concat(acc.stream(), Collections.singletonList(elem).stream()) diff --git a/akka-docs/rst/java/code/docs/stream/StreamBuffersRateDocTest.java b/akka-docs/rst/java/code/docs/stream/StreamBuffersRateDocTest.java index a82d94e42b..194225318e 100644 --- a/akka-docs/rst/java/code/docs/stream/StreamBuffersRateDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/StreamBuffersRateDocTest.java @@ -81,7 +81,7 @@ public class StreamBuffersRateDocTest { final Source tickSource = Source.tick(oneSecond.mul(3), oneSecond.mul(3), "tick"); final Flow conflate = - Flow.of(String.class).conflate( + Flow.of(String.class).conflateWithSeed( first -> 1, (count, elem) -> count + 1); RunnableGraph.fromGraph(GraphDSL.create(b -> { diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMissedTicks.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMissedTicks.java index c16522cbbc..63d8f4c17b 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMissedTicks.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMissedTicks.java @@ -58,11 +58,11 @@ public class RecipeMissedTicks extends RecipeTest { @SuppressWarnings("unused") //#missed-ticks final Flow missedTicks = - Flow.of(Tick.class).conflate(tick -> 0, (missed, tick) -> missed + 1); + Flow.of(Tick.class).conflateWithSeed(tick -> 0, (missed, tick) -> missed + 1); //#missed-ticks final TestLatch latch = new TestLatch(3, system); final Flow realMissedTicks = - Flow.of(Tick.class).conflate(tick -> 0, (missed, tick) -> { latch.countDown(); return missed + 1; }); + Flow.of(Tick.class).conflateWithSeed(tick -> 0, (missed, tick) -> { latch.countDown(); return missed + 1; }); Pair, TestSubscriber.Probe> pubSub = tickStream.via(realMissedTicks).toMat(sink, Keep.both()).run(mat); diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSimpleDrop.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSimpleDrop.java index 7fc1c69431..a4b6003f9d 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSimpleDrop.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSimpleDrop.java @@ -46,11 +46,11 @@ public class RecipeSimpleDrop extends RecipeTest { @SuppressWarnings("unused") //#simple-drop final Flow droppyStream = - Flow.of(Message.class).conflate(i -> i, (lastMessage, newMessage) -> newMessage); + Flow.of(Message.class).conflate((lastMessage, newMessage) -> newMessage); //#simple-drop final TestLatch latch = new TestLatch(2, system); final Flow realDroppyStream = - Flow.of(Message.class).conflate(i -> i, (lastMessage, newMessage) -> { latch.countDown(); return newMessage; }); + Flow.of(Message.class).conflate((lastMessage, newMessage) -> { latch.countDown(); return newMessage; }); final Pair, TestSubscriber.Probe> pubSub = TestSource . probe(system) diff --git a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst index 8867effbc2..731b3ccfb4 100644 --- a/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst +++ b/akka-docs/rst/java/stream/migration-guide-2.0-2.4-java.rst @@ -92,6 +92,18 @@ In Akka 2.4.x this is formulated like so: .. includecode:: ../code/docs/stream/MigrationsJava.java#expand-state +``conflate`` has been renamed to ``conflateWithSeed()`` +------------------------------------------------------- + +The new ``conflate`` operator is a special case of the original behavior (renamed to ``conflateWithSeed``) that does not +change the type of the stream. The usage of the new operator is as simple as:: + + Flow.of(Integer.class).conflate((a, b) -> a + b) // Add numbers while downstream is not ready + +Which is the same as using ``conflateWithSeed`` with an identity function:: + + Flow.of(Integer.class).conflateWithSeed(x -> x, (a, b) -> a + b) // Add numbers while downstream is not ready + Changed Sources / Sinks ======================= @@ -143,4 +155,4 @@ Routing settings parameter name ``RoutingSettings`` were previously the only setting available on ``RequestContext``, and were accessible via ``settings``. We now made it possible to configure the parsers settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is -now accessible via ``parserSettings``. \ No newline at end of file +now accessible via ``parserSettings``. diff --git a/akka-docs/rst/java/stream/stream-cookbook.rst b/akka-docs/rst/java/stream/stream-cookbook.rst index 4b32a314e3..5bfae50759 100644 --- a/akka-docs/rst/java/stream/stream-cookbook.rst +++ b/akka-docs/rst/java/stream/stream-cookbook.rst @@ -218,18 +218,18 @@ Dropping elements **Situation:** Given a fast producer and a slow consumer, we want to drop elements if necessary to not slow down the producer too much. -This can be solved by using the most versatile rate-transforming operation, ``conflate``. Conflate can be thought as -a special ``fold`` operation that collapses multiple upstream elements into one aggregate element if needed to keep +This can be solved by using a versatile rate-transforming operation, ``conflate``. Conflate can be thought as +a special ``reduce`` operation that collapses multiple upstream elements into one aggregate element if needed to keep the speed of the upstream unaffected by the downstream. -When the upstream is faster, the fold process of the ``conflate`` starts. This folding needs a zero element, which -is given by a ``seed`` function that takes the current element and produces a zero for the folding process. In our -case this is ``i -> i`` so our folding state starts form the message itself. The folder function is also -special: given the aggregate value (the last message) and the new element (the freshest element) our aggregate state -becomes simply the freshest element. This choice of functions results in a simple dropping operation. +When the upstream is faster, the reducing process of the ``conflate`` starts. Our reducer function simply takes +the freshest element. This cin a simple dropping operation. .. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeSimpleDrop.java#simple-drop +There is a version of ``conflate`` named ``conflateWithSeed`` that allows to express more complex aggregations, more +similar to a ``fold``. + Dropping broadcast ------------------ @@ -253,7 +253,7 @@ Collecting missed ticks **Situation:** Given a regular (stream) source of ticks, instead of trying to backpressure the producer of the ticks we want to keep a counter of the missed ticks instead and pass it down when possible. -We will use ``conflate`` to solve the problem. Conflate takes two functions: +We will use ``conflateWithSeed`` to solve the problem. Conflate takes two functions: * A seed function that produces the zero element for the folding process that happens when the upstream is faster than the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks diff --git a/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala index b7632cb4eb..69398a68d0 100644 --- a/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala @@ -24,7 +24,7 @@ class RateTransformationDocSpec extends AkkaSpec { "conflate should summarize" in { //#conflate-summarize val statsFlow = Flow[Double] - .conflate(Seq(_))(_ :+ _) + .conflateWithSeed(Seq(_))(_ :+ _) .map { s => val μ = s.sum / s.size val se = s.map(x => pow(x - μ, 2)) @@ -45,7 +45,7 @@ class RateTransformationDocSpec extends AkkaSpec { //#conflate-sample val p = 0.01 val sampleFlow = Flow[Double] - .conflate(Seq(_)) { + .conflateWithSeed(Seq(_)) { case (acc, elem) if Random.nextDouble < p => acc :+ elem case (acc, _) => acc } diff --git a/akka-docs/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 8fc348531a..943e2cb8b0 100644 --- a/akka-docs/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -48,7 +48,7 @@ class StreamBuffersRateSpec extends AkkaSpec { Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0 Source.tick(initialDelay = 1.second, interval = 1.second, "message!") - .conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1 + .conflateWithSeed(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1 zipper.out ~> Sink.foreach(println) ClosedShape diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala index 98bdd2ffeb..28c384dfbc 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala @@ -21,12 +21,12 @@ class RecipeMissedTicks extends RecipeSpec { //#missed-ticks val missedTicks: Flow[Tick, Int, NotUsed] = - Flow[Tick].conflate(seed = (_) => 0)( + Flow[Tick].conflateWithSeed(seed = (_) => 0)( (missedTicks, tick) => missedTicks + 1) //#missed-ticks val latch = TestLatch(3) val realMissedTicks: Flow[Tick, Int, NotUsed] = - Flow[Tick].conflate(seed = (_) => 0)( + Flow[Tick].conflateWithSeed(seed = (_) => 0)( (missedTicks, tick) => { latch.countDown(); missedTicks + 1 }) tickStream.via(realMissedTicks).to(sink).run() diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala index 609803eab5..600c7e8088 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala @@ -15,11 +15,11 @@ class RecipeSimpleDrop extends RecipeSpec { //#simple-drop val droppyStream: Flow[Message, Message, NotUsed] = - Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage) + Flow[Message].conflate((lastMessage, newMessage) => newMessage) //#simple-drop val latch = TestLatch(2) val realDroppyStream = - Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => { latch.countDown(); newMessage }) + Flow[Message].conflate((lastMessage, newMessage) => { latch.countDown(); newMessage }) val pub = TestPublisher.probe[Message]() val sub = TestSubscriber.manualProbe[Message]() diff --git a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst index d63b692a73..69ed312d23 100644 --- a/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst +++ b/akka-docs/rst/scala/stream/migration-guide-2.0-2.4-scala.rst @@ -79,6 +79,18 @@ In Akka 2.4.x this is formulated like so: .. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-state +``conflate`` has been renamed to ``conflateWithSeed()`` +------------------------------------------------------- + +The new ``conflate`` operator is a special case of the original behavior (renamed to ``conflateWithSeed``) that does not +change the type of the stream. The usage of the new operator is as simple as:: + + Flow[Int].conflate(_ + _) // Add numbers while downstream is not ready + +Which is the same as using ``conflateWithSeed`` with an identity function + + Flow[Int].conflateWithSeed(identity)(_ + _) // Add numbers while downstream is not ready + Changes in Akka HTTP ==================== diff --git a/akka-docs/rst/scala/stream/stream-cookbook.rst b/akka-docs/rst/scala/stream/stream-cookbook.rst index 823f98f5ab..262cdbf7ac 100644 --- a/akka-docs/rst/scala/stream/stream-cookbook.rst +++ b/akka-docs/rst/scala/stream/stream-cookbook.rst @@ -213,18 +213,18 @@ Dropping elements **Situation:** Given a fast producer and a slow consumer, we want to drop elements if necessary to not slow down the producer too much. -This can be solved by using the most versatile rate-transforming operation, ``conflate``. Conflate can be thought as -a special ``fold`` operation that collapses multiple upstream elements into one aggregate element if needed to keep +This can be solved by using a versatile rate-transforming operation, ``conflate``. Conflate can be thought as +a special ``reduce`` operation that collapses multiple upstream elements into one aggregate element if needed to keep the speed of the upstream unaffected by the downstream. -When the upstream is faster, the fold process of the ``conflate`` starts. This folding needs a zero element, which -is given by a ``seed`` function that takes the current element and produces a zero for the folding process. In our -case this is ``identity`` so our folding state starts form the message itself. The folder function is also -special: given the aggregate value (the last message) and the new element (the freshest element) our aggregate state -becomes simply the freshest element. This choice of functions results in a simple dropping operation. +When the upstream is faster, the reducing process of the ``conflate`` starts. Our reducer function simply takes +the freshest element. This cin a simple dropping operation. .. includecode:: ../code/docs/stream/cookbook/RecipeSimpleDrop.scala#simple-drop +There is a more general version of ``conflate`` named ``conflateWithSeed`` that allows to express more complex aggregations, more +similar to a ``fold``. + Dropping broadcast ------------------ @@ -246,7 +246,7 @@ Collecting missed ticks **Situation:** Given a regular (stream) source of ticks, instead of trying to backpressure the producer of the ticks we want to keep a counter of the missed ticks instead and pass it down when possible. -We will use ``conflate`` to solve the problem. Conflate takes two functions: +We will use ``conflateWithSeed`` to solve the problem. The seed version of conflate takes two functions: * A seed function that produces the zero element for the folding process that happens when the upstream is faster than the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 13e89fce07..1a5f56ce79 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -501,7 +501,7 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseConflate() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); - final Flow flow = Flow.of(String.class).conflate(new Function() { + final Flow flow = Flow.of(String.class).conflateWithSeed(new Function() { @Override public String apply(String s) throws Exception { return s; @@ -515,6 +515,12 @@ public class FlowTest extends StreamTest { CompletionStage future = Source.from(input).via(flow).runFold("", (aggr, in) -> aggr + in, materializer); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals("ABC", result); + + final Flow flow2 = Flow.of(String.class).conflate((a, b) -> a + b); + + CompletionStage future2 = Source.from(input).via(flow2).runFold("", (a, b) -> a + b, materializer); + String result2 = future2.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals("ABC", result2); } @Test diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 4b697d1b6b..dd1dd36412 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -292,7 +292,7 @@ public class SourceTest extends StreamTest { Source.from(input) . map(in -> { throw new RuntimeException("simulated err"); }) .runWith(Sink.head(), materializer) - .whenComplete((s, ex) -> { + .whenComplete((s, ex) -> { if (ex == null) { probe.getRef().tell("done", ActorRef.noSender()); } else { @@ -393,10 +393,17 @@ public class SourceTest extends StreamTest { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); CompletionStage future = Source.from(input) - .conflate(s -> s, (aggr, in) -> aggr + in) + .conflateWithSeed(s -> s, (aggr, in) -> aggr + in) .runFold("", (aggr, in) -> aggr + in, materializer); String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals("ABC", result); + + + final Flow flow2 = Flow.of(String.class).conflate((a, b) -> a + b); + + CompletionStage future2 = Source.from(input).conflate((String a, String b) -> a + b).runFold("", (a, b) -> a + b, materializer); + String result2 = future2.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals("ABC", result2); } @Test diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index 6a463ef01b..9d951c5fb6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -22,7 +22,23 @@ class FlowConflateSpec extends AkkaSpec { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source.fromPublisher(publisher).conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).conflateWithSeed(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).to(Sink.fromSubscriber(subscriber)).run() + val sub = subscriber.expectSubscription() + + for (i ← 1 to 100) { + sub.request(1) + publisher.sendNext(i) + subscriber.expectNext(i) + } + + sub.cancel() + } + + "pass-through elements unchanged when there is no rate difference (simple conflate)" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(publisher).conflate(_ + _).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() for (i ← 1 to 100) { @@ -38,7 +54,23 @@ class FlowConflateSpec extends AkkaSpec { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source.fromPublisher(publisher).conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).conflateWithSeed(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).to(Sink.fromSubscriber(subscriber)).run() + val sub = subscriber.expectSubscription() + + for (i ← 1 to 100) { + publisher.sendNext(i) + } + subscriber.expectNoMsg(1.second) + sub.request(1) + subscriber.expectNext(5050) + sub.cancel() + } + + "conflate elements while downstream is silent (simple conflate)" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(publisher).conflate(_ + _).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() for (i ← 1 to 100) { @@ -52,7 +84,15 @@ class FlowConflateSpec extends AkkaSpec { "work on a variable rate chain" in { val future = Source(1 to 1000) - .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) + .conflateWithSeed(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .runFold(0)(_ + _) + Await.result(future, 10.seconds) should be(500500) + } + + "work on a variable rate chain (simple conflate)" in { + val future = Source(1 to 1000) + .conflate(_ + _) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .runFold(0)(_ + _) Await.result(future, 10.seconds) should be(500500) @@ -62,7 +102,7 @@ class FlowConflateSpec extends AkkaSpec { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source.fromPublisher(publisher).conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).conflateWithSeed(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() sub.request(1) @@ -89,7 +129,7 @@ class FlowConflateSpec extends AkkaSpec { "work with a buffer and fold" in { val future = Source(1 to 50) - .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) + .conflateWithSeed(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) .buffer(50, OverflowStrategy.backpressure) .runFold(0)(_ + _) Await.result(future, 3.seconds) should be((1 to 50).sum) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 4e2573831b..7888e00950 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -809,6 +809,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the * upstream publisher is faster. * + * This version of conflate allows to derive a seed from the first element and change the aggregated type to be + * different than the input type. See [[Flow.conflate]] for a simpler version that does not change types. + * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * @@ -820,14 +823,41 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels * - * see also [[Flow.batch]] [[Flow.batchWeighted]] + * see also [[Flow.conflate]] [[Flow.batch]] [[Flow.batchWeighted]] * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * */ - def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = - new Flow(delegate.conflate(seed.apply)(aggregate.apply)) + def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = + new Flow(delegate.conflateWithSeed(seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This version of conflate does not change the output type of the stream. See [[Flow.conflateWithSeed]] for a + * more flexible version that can take a seed function and transform elements while rolling up. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is a conflated element available + * + * '''Backpressures when''' never + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * see also [[Flow.conflateWithSeed]] [[Flow.batch]] [[Flow.batchWeighted]] + * + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * + */ + def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): javadsl.Flow[In, O2, Mat] = + new Flow(delegate.conflate(aggregate.apply)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index bd8addaae2..8223673b88 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1266,6 +1266,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the * upstream publisher is faster. * + * This version of conflate allows to derive a seed from the first element and change the aggregated type to be + * different than the input type. See [[Flow.conflate]] for a simpler version that does not change types. + * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * @@ -1277,13 +1280,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels * - * see also [[Source.batch]] [[Source.batchWeighted]] + * see also [[Source.conflate]] [[Source.batch]] [[Source.batchWeighted]] * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ - def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = - new Source(delegate.conflate(seed.apply)(aggregate.apply)) + def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + new Source(delegate.conflateWithSeed(seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * This version of conflate does not change the output type of the stream. See [[Source.conflateWithSeed]] for a + * more flexible version that can take a seed function and transform elements while rolling up. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is a conflated element available + * + * '''Backpressures when''' never + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * see also [[Source.conflateWithSeed]] [[Source.batch]] [[Source.batchWeighted]] + * + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): javadsl.Source[O2, Mat] = + new Source(delegate.conflate(aggregate.apply)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index a331af06bd..aa61e8830b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -651,6 +651,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the * upstream publisher is faster. * + * This version of conflate allows to derive a seed from the first element and change the aggregated type to be + * different than the input type. See [[Flow.conflate]] for a simpler version that does not change types. + * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * @@ -662,14 +665,41 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels * - * see also [[SubFlow.batch]] [[SubFlow.batchWeighted]] + * see also [[SubFlow.conflate]] [[SubFlow.batch]] [[SubFlow.batchWeighted]] * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * */ - def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = - new SubFlow(delegate.conflate(seed.apply)(aggregate.apply)) + def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = + new SubFlow(delegate.conflateWithSeed(seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This version of conflate does not change the output type of the stream. See [[SubFlow.conflateWithSeed]] for a + * more flexible version that can take a seed function and transform elements while rolling up. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is a conflated element available + * + * '''Backpressures when''' never + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * see also [[SubFlow.conflateWithSeed]] [[SubFlow.batch]] [[SubFlow.batchWeighted]] + * + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * + */ + def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubFlow[In, O2, Mat] = + new SubFlow(delegate.conflate(aggregate.apply)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index f970db98e9..80faf6b12e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -647,6 +647,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the * upstream publisher is faster. * + * This version of conflate allows to derive a seed from the first element and change the aggregated type to be + * different than the input type. See [[Flow.conflate]] for a simpler version that does not change types. + * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * @@ -658,14 +661,41 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels * - * see also [[SubSource.batch]] [[SubSource.batchWeighted]] + * see also [[SubSource.conflate]] [[SubSource.batch]] [[SubSource.batchWeighted]] * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * */ - def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = - new SubSource(delegate.conflate(seed.apply)(aggregate.apply)) + def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = + new SubSource(delegate.conflateWithSeed(seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This version of conflate does not change the output type of the stream. See [[SubSource.conflateWithSeed]] for a + * more flexible version that can take a seed function and transform elements while rolling up. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is a conflated element available + * + * '''Backpressures when''' never + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * see also [[SubSource.conflateWithSeed]] [[SubSource.batch]] [[SubSource.batchWeighted]] + * + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * + */ + def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubSource[O2, Mat] = + new SubSource(delegate.conflate(aggregate.apply)) /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 88e8e0d9dd..151ccedfd3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -911,6 +911,9 @@ trait FlowOps[+Out, +Mat] { * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the * upstream publisher is faster. * + * This version of conflate allows to derive a seed from the first element and change the aggregated type to be + * different than the input type. See [[FlowOps.conflate]] for a simpler version that does not change types. + * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * @@ -925,12 +928,39 @@ trait FlowOps[+Out, +Mat] { * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * - * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] + * See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] */ - def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) + def conflateWithSeed[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) //FIXME: conflate can be expressed as a batch //via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate)) + + + /** + * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary + * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream publisher is faster. + * + * This version of conflate does not change the output type of the stream. See [[FlowOps.conflateWithSeed]] for a + * more flexible version that can take a seed function and transform elements while rolling up. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is a conflated element available + * + * '''Backpressures when''' never + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * + * See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] + */ + def conflate[O2 >: Out](aggregate: (O2, O2) => O2): Repr[O2] = conflateWithSeed[O2](ConstantFun.scalaIdentityFunction)(aggregate) + /** * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches * until the subscriber is ready to accept them. For example a batch step might store received elements in @@ -947,7 +977,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.conflate]], [[FlowOps.batchWeighted]] + * See also [[FlowOps.conflateWithSeed]], [[FlowOps.batchWeighted]] * * @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero) * @param seed Provides the first state for a batched value using the first unconsumed element as a start @@ -977,7 +1007,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.conflate]], [[FlowOps.batch]] + * See also [[FlowOps.conflateWithSeed]], [[FlowOps.batch]] * * @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero) * @param costFn a function to compute a single element weight