19455 Simplify conflate signature for the common case

This commit is contained in:
Endre Sándor Varga 2016-01-22 15:22:30 +01:00
parent c36fdb111c
commit 3081e2895b
21 changed files with 281 additions and 55 deletions

View file

@ -85,6 +85,7 @@ downstreams and able to adapt their behavior to that signal.
Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ==================================================================================================================================== =====================================================================================
conflate downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes
conflateWithSeed downstream stops backpressuring and there is a conflated element available never [2]_ upstream completes
batch downstream stops backpressuring and there is a batched element available batched elements reached the max limit of allowed batched elements & downstream backpressures upstream completes and a "possibly pending" element was drained [3]_
batchWeighted downstream stops backpressuring and there is a batched element available batched elements reached the max weight limit of allowed batched elements (plus a pending element [3]_ ) & downstream backpressures upstream completes and a "possibly pending" element was drained [3]_
expand downstream stops backpressuring downstream backpressures upstream completes

View file

@ -60,7 +60,7 @@ public class RateTransformationDocTest {
//#conflate-summarize
final Flow<Double, Tuple3<Double, Double, Integer>, NotUsed> statsFlow =
Flow.of(Double.class)
.conflate(elem -> Collections.singletonList(elem), (acc, elem) -> {
.conflateWithSeed(elem -> Collections.singletonList(elem), (acc, elem) -> {
return Stream
.concat(acc.stream(), Collections.singletonList(elem).stream())
.collect(Collectors.toList());
@ -86,7 +86,7 @@ public class RateTransformationDocTest {
//#conflate-sample
final Double p = 0.01;
final Flow<Double, Double, NotUsed> sampleFlow = Flow.of(Double.class)
.conflate(elem -> Collections.singletonList(elem), (acc, elem) -> {
.conflateWithSeed(elem -> Collections.singletonList(elem), (acc, elem) -> {
if (r.nextDouble() < p) {
return Stream
.concat(acc.stream(), Collections.singletonList(elem).stream())

View file

@ -81,7 +81,7 @@ public class StreamBuffersRateDocTest {
final Source<String, Cancellable> tickSource =
Source.tick(oneSecond.mul(3), oneSecond.mul(3), "tick");
final Flow<String, Integer, NotUsed> conflate =
Flow.of(String.class).conflate(
Flow.of(String.class).conflateWithSeed(
first -> 1, (count, elem) -> count + 1);
RunnableGraph.fromGraph(GraphDSL.create(b -> {

View file

@ -58,11 +58,11 @@ public class RecipeMissedTicks extends RecipeTest {
@SuppressWarnings("unused")
//#missed-ticks
final Flow<Tick, Integer, NotUsed> missedTicks =
Flow.of(Tick.class).conflate(tick -> 0, (missed, tick) -> missed + 1);
Flow.of(Tick.class).conflateWithSeed(tick -> 0, (missed, tick) -> missed + 1);
//#missed-ticks
final TestLatch latch = new TestLatch(3, system);
final Flow<Tick, Integer, NotUsed> realMissedTicks =
Flow.of(Tick.class).conflate(tick -> 0, (missed, tick) -> { latch.countDown(); return missed + 1; });
Flow.of(Tick.class).conflateWithSeed(tick -> 0, (missed, tick) -> { latch.countDown(); return missed + 1; });
Pair<TestPublisher.Probe<Tick>, TestSubscriber.Probe<Integer>> pubSub =
tickStream.via(realMissedTicks).toMat(sink, Keep.both()).run(mat);

View file

@ -46,11 +46,11 @@ public class RecipeSimpleDrop extends RecipeTest {
@SuppressWarnings("unused")
//#simple-drop
final Flow<Message, Message, NotUsed> droppyStream =
Flow.of(Message.class).conflate(i -> i, (lastMessage, newMessage) -> newMessage);
Flow.of(Message.class).conflate((lastMessage, newMessage) -> newMessage);
//#simple-drop
final TestLatch latch = new TestLatch(2, system);
final Flow<Message, Message, NotUsed> realDroppyStream =
Flow.of(Message.class).conflate(i -> i, (lastMessage, newMessage) -> { latch.countDown(); return newMessage; });
Flow.of(Message.class).conflate((lastMessage, newMessage) -> { latch.countDown(); return newMessage; });
final Pair<TestPublisher.Probe<Message>, TestSubscriber.Probe<Message>> pubSub = TestSource
.<Message> probe(system)

View file

@ -92,6 +92,18 @@ In Akka 2.4.x this is formulated like so:
.. includecode:: ../code/docs/stream/MigrationsJava.java#expand-state
``conflate`` has been renamed to ``conflateWithSeed()``
-------------------------------------------------------
The new ``conflate`` operator is a special case of the original behavior (renamed to ``conflateWithSeed``) that does not
change the type of the stream. The usage of the new operator is as simple as::
Flow.of(Integer.class).conflate((a, b) -> a + b) // Add numbers while downstream is not ready
Which is the same as using ``conflateWithSeed`` with an identity function::
Flow.of(Integer.class).conflateWithSeed(x -> x, (a, b) -> a + b) // Add numbers while downstream is not ready
Changed Sources / Sinks
=======================
@ -143,4 +155,4 @@ Routing settings parameter name
``RoutingSettings`` were previously the only setting available on ``RequestContext``,
and were accessible via ``settings``. We now made it possible to configure the parsers
settings as well, so ``RoutingSettings`` is now ``routingSettings`` and ``ParserSettings`` is
now accessible via ``parserSettings``.
now accessible via ``parserSettings``.

View file

@ -218,18 +218,18 @@ Dropping elements
**Situation:** Given a fast producer and a slow consumer, we want to drop elements if necessary to not slow down
the producer too much.
This can be solved by using the most versatile rate-transforming operation, ``conflate``. Conflate can be thought as
a special ``fold`` operation that collapses multiple upstream elements into one aggregate element if needed to keep
This can be solved by using a versatile rate-transforming operation, ``conflate``. Conflate can be thought as
a special ``reduce`` operation that collapses multiple upstream elements into one aggregate element if needed to keep
the speed of the upstream unaffected by the downstream.
When the upstream is faster, the fold process of the ``conflate`` starts. This folding needs a zero element, which
is given by a ``seed`` function that takes the current element and produces a zero for the folding process. In our
case this is ``i -> i`` so our folding state starts form the message itself. The folder function is also
special: given the aggregate value (the last message) and the new element (the freshest element) our aggregate state
becomes simply the freshest element. This choice of functions results in a simple dropping operation.
When the upstream is faster, the reducing process of the ``conflate`` starts. Our reducer function simply takes
the freshest element. This cin a simple dropping operation.
.. includecode:: ../code/docs/stream/javadsl/cookbook/RecipeSimpleDrop.java#simple-drop
There is a version of ``conflate`` named ``conflateWithSeed`` that allows to express more complex aggregations, more
similar to a ``fold``.
Dropping broadcast
------------------
@ -253,7 +253,7 @@ Collecting missed ticks
**Situation:** Given a regular (stream) source of ticks, instead of trying to backpressure the producer of the ticks
we want to keep a counter of the missed ticks instead and pass it down when possible.
We will use ``conflate`` to solve the problem. Conflate takes two functions:
We will use ``conflateWithSeed`` to solve the problem. Conflate takes two functions:
* A seed function that produces the zero element for the folding process that happens when the upstream is faster than
the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks

View file

@ -24,7 +24,7 @@ class RateTransformationDocSpec extends AkkaSpec {
"conflate should summarize" in {
//#conflate-summarize
val statsFlow = Flow[Double]
.conflate(Seq(_))(_ :+ _)
.conflateWithSeed(Seq(_))(_ :+ _)
.map { s =>
val μ = s.sum / s.size
val se = s.map(x => pow(x - μ, 2))
@ -45,7 +45,7 @@ class RateTransformationDocSpec extends AkkaSpec {
//#conflate-sample
val p = 0.01
val sampleFlow = Flow[Double]
.conflate(Seq(_)) {
.conflateWithSeed(Seq(_)) {
case (acc, elem) if Random.nextDouble < p => acc :+ elem
case (acc, _) => acc
}

View file

@ -48,7 +48,7 @@ class StreamBuffersRateSpec extends AkkaSpec {
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
.conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
.conflateWithSeed(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
zipper.out ~> Sink.foreach(println)
ClosedShape

View file

@ -21,12 +21,12 @@ class RecipeMissedTicks extends RecipeSpec {
//#missed-ticks
val missedTicks: Flow[Tick, Int, NotUsed] =
Flow[Tick].conflate(seed = (_) => 0)(
Flow[Tick].conflateWithSeed(seed = (_) => 0)(
(missedTicks, tick) => missedTicks + 1)
//#missed-ticks
val latch = TestLatch(3)
val realMissedTicks: Flow[Tick, Int, NotUsed] =
Flow[Tick].conflate(seed = (_) => 0)(
Flow[Tick].conflateWithSeed(seed = (_) => 0)(
(missedTicks, tick) => { latch.countDown(); missedTicks + 1 })
tickStream.via(realMissedTicks).to(sink).run()

View file

@ -15,11 +15,11 @@ class RecipeSimpleDrop extends RecipeSpec {
//#simple-drop
val droppyStream: Flow[Message, Message, NotUsed] =
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)
Flow[Message].conflate((lastMessage, newMessage) => newMessage)
//#simple-drop
val latch = TestLatch(2)
val realDroppyStream =
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => { latch.countDown(); newMessage })
Flow[Message].conflate((lastMessage, newMessage) => { latch.countDown(); newMessage })
val pub = TestPublisher.probe[Message]()
val sub = TestSubscriber.manualProbe[Message]()

View file

@ -79,6 +79,18 @@ In Akka 2.4.x this is formulated like so:
.. includecode:: ../code/docs/stream/MigrationsScala.scala#expand-state
``conflate`` has been renamed to ``conflateWithSeed()``
-------------------------------------------------------
The new ``conflate`` operator is a special case of the original behavior (renamed to ``conflateWithSeed``) that does not
change the type of the stream. The usage of the new operator is as simple as::
Flow[Int].conflate(_ + _) // Add numbers while downstream is not ready
Which is the same as using ``conflateWithSeed`` with an identity function
Flow[Int].conflateWithSeed(identity)(_ + _) // Add numbers while downstream is not ready
Changes in Akka HTTP
====================

View file

@ -213,18 +213,18 @@ Dropping elements
**Situation:** Given a fast producer and a slow consumer, we want to drop elements if necessary to not slow down
the producer too much.
This can be solved by using the most versatile rate-transforming operation, ``conflate``. Conflate can be thought as
a special ``fold`` operation that collapses multiple upstream elements into one aggregate element if needed to keep
This can be solved by using a versatile rate-transforming operation, ``conflate``. Conflate can be thought as
a special ``reduce`` operation that collapses multiple upstream elements into one aggregate element if needed to keep
the speed of the upstream unaffected by the downstream.
When the upstream is faster, the fold process of the ``conflate`` starts. This folding needs a zero element, which
is given by a ``seed`` function that takes the current element and produces a zero for the folding process. In our
case this is ``identity`` so our folding state starts form the message itself. The folder function is also
special: given the aggregate value (the last message) and the new element (the freshest element) our aggregate state
becomes simply the freshest element. This choice of functions results in a simple dropping operation.
When the upstream is faster, the reducing process of the ``conflate`` starts. Our reducer function simply takes
the freshest element. This cin a simple dropping operation.
.. includecode:: ../code/docs/stream/cookbook/RecipeSimpleDrop.scala#simple-drop
There is a more general version of ``conflate`` named ``conflateWithSeed`` that allows to express more complex aggregations, more
similar to a ``fold``.
Dropping broadcast
------------------
@ -246,7 +246,7 @@ Collecting missed ticks
**Situation:** Given a regular (stream) source of ticks, instead of trying to backpressure the producer of the ticks
we want to keep a counter of the missed ticks instead and pass it down when possible.
We will use ``conflate`` to solve the problem. Conflate takes two functions:
We will use ``conflateWithSeed`` to solve the problem. The seed version of conflate takes two functions:
* A seed function that produces the zero element for the folding process that happens when the upstream is faster than
the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks

View file

@ -501,7 +501,7 @@ public class FlowTest extends StreamTest {
public void mustBeAbleToUseConflate() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
final Flow<String, String, NotUsed> flow = Flow.of(String.class).conflate(new Function<String, String>() {
final Flow<String, String, NotUsed> flow = Flow.of(String.class).conflateWithSeed(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
@ -515,6 +515,12 @@ public class FlowTest extends StreamTest {
CompletionStage<String> future = Source.from(input).via(flow).runFold("", (aggr, in) -> aggr + in, materializer);
String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("ABC", result);
final Flow<String, String, NotUsed> flow2 = Flow.of(String.class).conflate((a, b) -> a + b);
CompletionStage<String> future2 = Source.from(input).via(flow2).runFold("", (a, b) -> a + b, materializer);
String result2 = future2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("ABC", result2);
}
@Test

View file

@ -292,7 +292,7 @@ public class SourceTest extends StreamTest {
Source.from(input)
.<String> map(in -> { throw new RuntimeException("simulated err"); })
.runWith(Sink.<String>head(), materializer)
.whenComplete((s, ex) -> {
.whenComplete((s, ex) -> {
if (ex == null) {
probe.getRef().tell("done", ActorRef.noSender());
} else {
@ -393,10 +393,17 @@ public class SourceTest extends StreamTest {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
CompletionStage<String> future = Source.from(input)
.conflate(s -> s, (aggr, in) -> aggr + in)
.conflateWithSeed(s -> s, (aggr, in) -> aggr + in)
.runFold("", (aggr, in) -> aggr + in, materializer);
String result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("ABC", result);
final Flow<String, String, NotUsed> flow2 = Flow.of(String.class).conflate((a, b) -> a + b);
CompletionStage<String> future2 = Source.from(input).conflate((String a, String b) -> a + b).runFold("", (a, b) -> a + b, materializer);
String result2 = future2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals("ABC", result2);
}
@Test

View file

@ -22,7 +22,23 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source.fromPublisher(publisher).conflate(seed = i i)(aggregate = (sum, i) sum + i).to(Sink.fromSubscriber(subscriber)).run()
Source.fromPublisher(publisher).conflateWithSeed(seed = i i)(aggregate = (sum, i) sum + i).to(Sink.fromSubscriber(subscriber)).run()
val sub = subscriber.expectSubscription()
for (i 1 to 100) {
sub.request(1)
publisher.sendNext(i)
subscriber.expectNext(i)
}
sub.cancel()
}
"pass-through elements unchanged when there is no rate difference (simple conflate)" in {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source.fromPublisher(publisher).conflate(_ + _).to(Sink.fromSubscriber(subscriber)).run()
val sub = subscriber.expectSubscription()
for (i 1 to 100) {
@ -38,7 +54,23 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source.fromPublisher(publisher).conflate(seed = i i)(aggregate = (sum, i) sum + i).to(Sink.fromSubscriber(subscriber)).run()
Source.fromPublisher(publisher).conflateWithSeed(seed = i i)(aggregate = (sum, i) sum + i).to(Sink.fromSubscriber(subscriber)).run()
val sub = subscriber.expectSubscription()
for (i 1 to 100) {
publisher.sendNext(i)
}
subscriber.expectNoMsg(1.second)
sub.request(1)
subscriber.expectNext(5050)
sub.cancel()
}
"conflate elements while downstream is silent (simple conflate)" in {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source.fromPublisher(publisher).conflate(_ + _).to(Sink.fromSubscriber(subscriber)).run()
val sub = subscriber.expectSubscription()
for (i 1 to 100) {
@ -52,7 +84,15 @@ class FlowConflateSpec extends AkkaSpec {
"work on a variable rate chain" in {
val future = Source(1 to 1000)
.conflate(seed = i i)(aggregate = (sum, i) sum + i)
.conflateWithSeed(seed = i i)(aggregate = (sum, i) sum + i)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.runFold(0)(_ + _)
Await.result(future, 10.seconds) should be(500500)
}
"work on a variable rate chain (simple conflate)" in {
val future = Source(1 to 1000)
.conflate(_ + _)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.runFold(0)(_ + _)
Await.result(future, 10.seconds) should be(500500)
@ -62,7 +102,7 @@ class FlowConflateSpec extends AkkaSpec {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source.fromPublisher(publisher).conflate(seed = i i)(aggregate = (sum, i) sum + i).to(Sink.fromSubscriber(subscriber)).run()
Source.fromPublisher(publisher).conflateWithSeed(seed = i i)(aggregate = (sum, i) sum + i).to(Sink.fromSubscriber(subscriber)).run()
val sub = subscriber.expectSubscription()
sub.request(1)
@ -89,7 +129,7 @@ class FlowConflateSpec extends AkkaSpec {
"work with a buffer and fold" in {
val future = Source(1 to 50)
.conflate(seed = i i)(aggregate = (sum, i) sum + i)
.conflateWithSeed(seed = i i)(aggregate = (sum, i) sum + i)
.buffer(50, OverflowStrategy.backpressure)
.runFold(0)(_ + _)
Await.result(future, 3.seconds) should be((1 to 50).sum)

View file

@ -809,6 +809,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate allows to derive a seed from the first element and change the aggregated type to be
* different than the input type. See [[Flow.conflate]] for a simpler version that does not change types.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
@ -820,14 +823,41 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*
* '''Cancels when''' downstream cancels
*
* see also [[Flow.batch]] [[Flow.batchWeighted]]
* see also [[Flow.conflate]] [[Flow.batch]] [[Flow.batchWeighted]]
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] =
new Flow(delegate.conflate(seed.apply)(aggregate.apply))
def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] =
new Flow(delegate.conflateWithSeed(seed.apply)(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate does not change the output type of the stream. See [[Flow.conflateWithSeed]] for a
* more flexible version that can take a seed function and transform elements while rolling up.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* see also [[Flow.conflateWithSeed]] [[Flow.batch]] [[Flow.batchWeighted]]
*
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): javadsl.Flow[In, O2, Mat] =
new Flow(delegate.conflate(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches

View file

@ -1266,6 +1266,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate allows to derive a seed from the first element and change the aggregated type to be
* different than the input type. See [[Flow.conflate]] for a simpler version that does not change types.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
@ -1277,13 +1280,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
*
* '''Cancels when''' downstream cancels
*
* see also [[Source.batch]] [[Source.batchWeighted]]
* see also [[Source.conflate]] [[Source.batch]] [[Source.batchWeighted]]
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] =
new Source(delegate.conflate(seed.apply)(aggregate.apply))
def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] =
new Source(delegate.conflateWithSeed(seed.apply)(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
* This version of conflate does not change the output type of the stream. See [[Source.conflateWithSeed]] for a
* more flexible version that can take a seed function and transform elements while rolling up.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* see also [[Source.conflateWithSeed]] [[Source.batch]] [[Source.batchWeighted]]
*
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): javadsl.Source[O2, Mat] =
new Source(delegate.conflate(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches

View file

@ -651,6 +651,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate allows to derive a seed from the first element and change the aggregated type to be
* different than the input type. See [[Flow.conflate]] for a simpler version that does not change types.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
@ -662,14 +665,41 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
*
* '''Cancels when''' downstream cancels
*
* see also [[SubFlow.batch]] [[SubFlow.batchWeighted]]
* see also [[SubFlow.conflate]] [[SubFlow.batch]] [[SubFlow.batchWeighted]]
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] =
new SubFlow(delegate.conflate(seed.apply)(aggregate.apply))
def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] =
new SubFlow(delegate.conflateWithSeed(seed.apply)(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate does not change the output type of the stream. See [[SubFlow.conflateWithSeed]] for a
* more flexible version that can take a seed function and transform elements while rolling up.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* see also [[SubFlow.conflateWithSeed]] [[SubFlow.batch]] [[SubFlow.batchWeighted]]
*
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubFlow[In, O2, Mat] =
new SubFlow(delegate.conflate(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches

View file

@ -647,6 +647,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate allows to derive a seed from the first element and change the aggregated type to be
* different than the input type. See [[Flow.conflate]] for a simpler version that does not change types.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
@ -658,14 +661,41 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*
* see also [[SubSource.batch]] [[SubSource.batchWeighted]]
* see also [[SubSource.conflate]] [[SubSource.batch]] [[SubSource.batchWeighted]]
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] =
new SubSource(delegate.conflate(seed.apply)(aggregate.apply))
def conflateWithSeed[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] =
new SubSource(delegate.conflateWithSeed(seed.apply)(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate does not change the output type of the stream. See [[SubSource.conflateWithSeed]] for a
* more flexible version that can take a seed function and transform elements while rolling up.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* see also [[SubSource.conflateWithSeed]] [[SubSource.batch]] [[SubSource.batchWeighted]]
*
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubSource[O2, Mat] =
new SubSource(delegate.conflate(aggregate.apply))
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches

View file

@ -911,6 +911,9 @@ trait FlowOps[+Out, +Mat] {
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate allows to derive a seed from the first element and change the aggregated type to be
* different than the input type. See [[FlowOps.conflate]] for a simpler version that does not change types.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
@ -925,12 +928,39 @@ trait FlowOps[+Out, +Mat] {
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]]
* See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]]
*/
def conflate[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] = andThen(Conflate(seed, aggregate))
def conflateWithSeed[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] = andThen(Conflate(seed, aggregate))
//FIXME: conflate can be expressed as a batch
//via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate))
/**
* Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary
* until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream publisher is faster.
*
* This version of conflate does not change the output type of the stream. See [[FlowOps.conflateWithSeed]] for a
* more flexible version that can take a seed function and transform elements while rolling up.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* '''Emits when''' downstream stops backpressuring and there is a conflated element available
*
* '''Backpressures when''' never
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
* See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]]
*/
def conflate[O2 >: Out](aggregate: (O2, O2) => O2): Repr[O2] = conflateWithSeed[O2](ConstantFun.scalaIdentityFunction)(aggregate)
/**
* Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches
* until the subscriber is ready to accept them. For example a batch step might store received elements in
@ -947,7 +977,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.conflate]], [[FlowOps.batchWeighted]]
* See also [[FlowOps.conflateWithSeed]], [[FlowOps.batchWeighted]]
*
* @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero)
* @param seed Provides the first state for a batched value using the first unconsumed element as a start
@ -977,7 +1007,7 @@ trait FlowOps[+Out, +Mat] {
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.conflate]], [[FlowOps.batch]]
* See also [[FlowOps.conflateWithSeed]], [[FlowOps.batch]]
*
* @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero)
* @param costFn a function to compute a single element weight