diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBatchSpec.scala similarity index 79% rename from akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBatchSpec.scala index 01f8f7c4b9..d970ffb1ed 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBatchSpec.scala @@ -9,20 +9,20 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import akka.stream.{ OverflowStrategy, ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ -class FlowAggregateSpec extends AkkaSpec { +class FlowBatchSpec extends AkkaSpec { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 2) implicit val materializer = ActorMaterializer(settings) - "Aggregate" must { + "Batch" must { "pass-through elements unchanged when there is no rate difference" in { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source.fromPublisher(publisher).aggregate(max = 2, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).batch(max = 2, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() for (i ← 1 to 100) { @@ -38,7 +38,7 @@ class FlowAggregateSpec extends AkkaSpec { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[List[Int]]() - Source.fromPublisher(publisher).aggregate(max = Long.MaxValue, seed = i ⇒ List(i))(aggregate = (ints, i) ⇒ i :: ints).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).batch(max = Long.MaxValue, seed = i ⇒ List(i))(aggregate = (ints, i) ⇒ i :: ints).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() for (i ← 1 to 10) { @@ -52,7 +52,7 @@ class FlowAggregateSpec extends AkkaSpec { "work on a variable rate chain" in { val future = Source(1 to 1000) - .aggregate(max = 100, seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) + .batch(max = 100, seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .runFold(0)(_ + _) Await.result(future, 10.seconds) should be(500500) @@ -62,7 +62,7 @@ class FlowAggregateSpec extends AkkaSpec { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source.fromPublisher(publisher).aggregate(max = 2, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).batch(max = 2, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() sub.request(1) @@ -89,7 +89,7 @@ class FlowAggregateSpec extends AkkaSpec { "work with a buffer and fold" in { val future = Source(1 to 50) - .aggregate(max = Long.MaxValue, seed = i ⇒ i)(aggregate = _ + _) + .batch(max = Long.MaxValue, seed = i ⇒ i)(aggregate = _ + _) .buffer(50, OverflowStrategy.backpressure) .runFold(0)(_ + _) Await.result(future, 3.seconds) should be((1 to 50).sum) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateWeightedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBatchWeightedSpec.scala similarity index 79% rename from akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateWeightedSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBatchWeightedSpec.scala index 6ea5e15387..05e3f8ff4a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateWeightedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBatchWeightedSpec.scala @@ -7,19 +7,19 @@ import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ import scala.concurrent.duration._ -class FlowAggregateWeightedSpec extends AkkaSpec { +class FlowBatchWeightedSpec extends AkkaSpec { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 2) implicit val materializer = ActorMaterializer(settings) - "AggregateWeighted" must { + "BatchWeighted" must { "Not aggregate heavy elements" in { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.manualProbe[Int]() - Source.fromPublisher(publisher).aggregateWeighted(max = 3, _ ⇒ 4, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).batchWeighted(max = 3, _ ⇒ 4, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() val sub = subscriber.expectSubscription() publisher.sendNext(1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 87626b82ae..94592b6f67 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -48,8 +48,6 @@ private[stream] object Stages { val intersperse = name("intersperse") val buffer = name("buffer") val conflate = name("conflate") - val aggregate = name("aggregate") - val aggregateWeighted = name("aggregateWeighted") val expand = name("expand") val mapConcat = name("mapConcat") val detacher = name("detacher") @@ -206,15 +204,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[In, Out] = fusing.Conflate(seed, aggregate, supervision(attr)) } - final case class Aggregate[In, Out](max: Long, seed: In ⇒ Out, aggregateFn: (Out, In) ⇒ Out, attributes: Attributes = aggregate) extends SymbolicStage[In, Out] { - private[this] val inc: Any ⇒ Long = _ ⇒ 1L - override def create(attr: Attributes): Stage[In, Out] = fusing.AggregateWeighted(max, inc, seed, aggregateFn, supervision(attr)) - } - - final case class AggregateWeighted[In, Out](max: Long, weightFn: In ⇒ Long, seed: In ⇒ Out, aggregateFn: (Out, In) ⇒ Out, attributes: Attributes = aggregateWeighted) extends SymbolicStage[In, Out] { - override def create(attr: Attributes): Stage[In, Out] = fusing.AggregateWeighted(max, weightFn, seed, aggregateFn, supervision(attr)) - } - final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], attributes: Attributes = mapConcat) extends SymbolicStage[In, Out] { override def create(attr: Attributes): Stage[In, Out] = fusing.MapConcat(f, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d314c5d1f8..3817a5abe8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -478,6 +478,97 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O override def restart(): Conflate[In, Out] = copy() } +private[akka] sealed abstract class AbstractBatch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, + aggregate: (Out, In) ⇒ Out, val in: Inlet[In], + val out: Outlet[Out]) extends GraphStage[FlowShape[In, Out]] { + + override val shape: FlowShape[In, Out] = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + private var agg: Any = null + private var left: Long = max + private var pending: Any = null + + private def flush(): Unit = { + push(out, agg.asInstanceOf[Out]) + left = max + if (pending != null) { + val elem = pending.asInstanceOf[In] + agg = seed(elem) + left -= costFn(elem) + pending = null + } else { + agg = null + } + } + + override def preStart() = pull(in) + + setHandler(in, new InHandler { + + override def onPush(): Unit = { + val elem = grab(in) + val cost = costFn(elem) + if (agg == null) { + left -= cost + agg = seed(elem) + } else if (left < cost) { + pending = elem + } else { + left -= cost + agg = aggregate(agg.asInstanceOf[Out], elem) + } + + if (isAvailable(out)) flush() + if (pending == null) pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (agg == null) completeStage() + } + }) + + setHandler(out, new OutHandler { + + override def onPull(): Unit = { + //if upstream finished, we still might emit up to 2 more elements (whatever agg is + possibly a pending heavy element) + if (isClosed(in)) { + if (agg == null) completeStage() + else { + push(out, agg.asInstanceOf[Out]) + if (pending == null) completeStage() + else { + agg = seed(pending.asInstanceOf[In]) + pending = null + } + } + } else if (agg == null) { + if (!hasBeenPulled(in)) + pull(in) + } else { + flush() + if (!hasBeenPulled(in)) pull(in) + } + } + }) + } +} + +private[akka] final case class BatchWeighted[I, O](max: Long, costFn: I ⇒ Long, seed: I ⇒ O, aggregate: (O, I) ⇒ O) + extends AbstractBatch(max, costFn, seed, aggregate, Inlet[I]("BatchWeighted.in"), Outlet[O]("BatchWeighted.out")) { + override def initialAttributes = Attributes.name("BatchWeighted") +} + +private[akka] final case class Batch[I, O](max: Long, seed: I ⇒ O, aggregate: (O, I) ⇒ O) + extends AbstractBatch(max, { _: I ⇒ 1L }, seed, aggregate, Inlet[I]("Batch.in"), Outlet[O]("Batch.out")) { + override def initialAttributes = Attributes.name("Batch") +} + +//private[akka] final case class Conflate[I, O](seed: I ⇒ O, aggregate: (O, I) ⇒ O) +// extends AbstractBatch(Long.MaxValue, { _: I ⇒ 0L }, seed, aggregate, Inlet[I]("Conflate.in"), Outlet[O]("Conflate.out")) { +// override def initialAttributes = Attributes.name("Conflate") +//} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 3adf6c01ce..0f5deb8e14 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -803,6 +803,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels * + * see also [[Flow.batch]] [[Flow.batchWeighted]] + * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * @@ -811,8 +813,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.conflate(seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might store received elements in * an array up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not @@ -820,47 +822,51 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Emits when''' downstream stops backpressuring and there is an aggregated element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` batched elements and 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * See also [[Flow.conflate]], [[Flow.batchWeighted]] + * + * @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ - def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = - new Flow(delegate.aggregate(max, seed.apply)(aggregate.apply)) + def batch[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = + new Flow(delegate.batch(max, seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might concatenate `ByteString` * elements up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * - * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. - * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after - * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the - * incoming elemetns is aggregated. + * Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without batching further elements with it, and then the rest of the + * incoming elements are batched. * - * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * '''Emits when''' downstream stops backpressuring and there is a batched element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` weighted batched elements + 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * See also [[Flow.conflate]], [[Flow.batch]] + * + * @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero) * @param costFn a function to compute a single element weight - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ - def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = - new Flow(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + def batchWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = + new Flow(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index eabbedfd34..e0ada6f680 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1233,6 +1233,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels * + * see also [[Source.batch]] [[Source.batchWeighted]] + * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ @@ -1240,8 +1242,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.conflate(seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might store received elements in * an array up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not @@ -1249,47 +1251,51 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Emits when''' downstream stops backpressuring and there is an aggregated element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` batched elements and 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * See also [[Source.conflate]], [[Source.batchWeighted]] + * + * @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ - def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = - new Source(delegate.aggregate(max, seed.apply)(aggregate.apply)) + def batch[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + new Source(delegate.batch(max, seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might concatenate `ByteString` * elements up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * - * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. - * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after - * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the - * incoming elemetns is aggregated. + * Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without batching further elements with it, and then the rest of the + * incoming elements are batched. * - * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * '''Emits when''' downstream stops backpressuring and there is a batched element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` weighted batched elements + 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * See also [[Source.conflate]], [[Source.batch]] + * + * @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero) * @param costFn a function to compute a single element weight - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ - def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = - new Source(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + def batchWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + new Source(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 9da26d18ef..c58410c6a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -645,6 +645,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels * + * see also [[SubFlow.batch]] [[SubFlow.batchWeighted]] + * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * @@ -653,8 +655,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo new SubFlow(delegate.conflate(seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might store received elements in * an array up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not @@ -662,47 +664,51 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Emits when''' downstream stops backpressuring and there is an aggregated element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` batched elements and 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * See also [[SubFlow.conflate]], [[SubFlow.batchWeighted]] + * + * @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ - def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = - new SubFlow(delegate.aggregate(max, seed.apply)(aggregate.apply)) + def batch[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = + new SubFlow(delegate.batch(max, seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might concatenate `ByteString` * elements up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * - * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. - * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after - * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the - * incoming elemetns is aggregated. + * Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without batching further elements with it, and then the rest of the + * incoming elements are batched. * - * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * '''Emits when''' downstream stops backpressuring and there is a batched element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` weighted batched elements + 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * See also [[SubFlow.conflate]], [[SubFlow.batch]] + * + * @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero) * @param costFn a function to compute a single element weight - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ - def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = - new SubFlow(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + def batchWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = + new SubFlow(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 14cd680ac7..970408133c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -641,6 +641,8 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels * + * see also [[SubSource.batch]] [[SubSource.batchWeighted]] + * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * @@ -649,8 +651,8 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source new SubSource(delegate.conflate(seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might store received elements in * an array up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not @@ -658,47 +660,51 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Emits when''' downstream stops backpressuring and there is an aggregated element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` batched elements and 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * See also [[SubSource.conflate]], [[SubSource.batchWeighted]] + * + * @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ - def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = - new SubSource(delegate.aggregate(max, seed.apply)(aggregate.apply)) + def batch[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = + new SubSource(delegate.batch(max, seed.apply)(aggregate.apply)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might concatenate `ByteString` * elements up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * - * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. - * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after - * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the - * incoming elemetns is aggregated. + * Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without batching further elements with it, and then the rest of the + * incoming elements are batched. * - * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * '''Emits when''' downstream stops backpressuring and there is a batched element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` weighted batched elements + 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * See also [[SubSource.conflate]], [[SubSource.batch]] + * + * @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero) * @param costFn a function to compute a single element weight - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ - def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = - new SubSource(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + def batchWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = + new SubSource(delegate.batchWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 81f296b105..9c8e93ece1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -904,13 +904,13 @@ trait FlowOps[+Out, +Mat] { * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * - * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] + * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might store received elements in * an array up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not @@ -918,47 +918,51 @@ trait FlowOps[+Out, +Mat] { * * '''Emits when''' downstream stops backpressuring and there is an aggregated element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` batched elements and 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * See also [[FlowOps.conflate]], [[FlowOps.batchWeighted]] + * + * @param max maximum number of elements to batch before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new aggregate */ - def aggregate[S](max: Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = - andThen(Aggregate(max, seed, aggregate)) + def batch[S](max: Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = + via(Batch(max, seed, aggregate)) /** - * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks - * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into batches + * until the subscriber is ready to accept them. For example a batch step might concatenate `ByteString` * elements up to the allowed max limit if the upstream publisher is faster. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * - * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. - * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after - * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the - * incoming elemetns is aggregated. + * Batching will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous batched elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without batching further elements with it, and then the rest of the + * incoming elements are batched. * - * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * '''Emits when''' downstream stops backpressuring and there is a batched element available * - * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * '''Backpressures when''' there are `max` weighted batched elements + 1 pending element and downstream backpressures * - * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * '''Completes when''' upstream completes and there is no batched/pending element waiting * * '''Cancels when''' downstream cancels * - * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * See also [[FlowOps.conflate]], [[FlowOps.batch]] + * + * @param max maximum weight of elements to batch before backpressuring upstream (must be positive non-zero) * @param costFn a function to compute a single element weight - * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start - * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + * @param seed Provides the first state for a batched value using the first unconsumed element as a start + * @param aggregate Takes the currently batched value and the current pending element to produce a new batch */ - def aggregateWeighted[S](max: Long, costFn: Out ⇒ Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = - andThen(AggregateWeighted(max, costFn, seed, aggregate)) + def batchWeighted[S](max: Long, costFn: Out ⇒ Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = + via(BatchWeighted(max, costFn, seed, aggregate)) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older