diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateSpec.scala new file mode 100644 index 0000000000..01f8f7c4b9 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.stream.scaladsl + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.stream.{ OverflowStrategy, ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit._ + +class FlowAggregateSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = ActorMaterializer(settings) + + "Aggregate" must { + + "pass-through elements unchanged when there is no rate difference" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(publisher).aggregate(max = 2, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() + val sub = subscriber.expectSubscription() + + for (i ← 1 to 100) { + sub.request(1) + publisher.sendNext(i) + subscriber.expectNext(i) + } + + sub.cancel() + } + + "aggregate elements while downstream is silent" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.manualProbe[List[Int]]() + + Source.fromPublisher(publisher).aggregate(max = Long.MaxValue, seed = i ⇒ List(i))(aggregate = (ints, i) ⇒ i :: ints).to(Sink.fromSubscriber(subscriber)).run() + val sub = subscriber.expectSubscription() + + for (i ← 1 to 10) { + publisher.sendNext(i) + } + subscriber.expectNoMsg(1.second) + sub.request(1) + subscriber.expectNext(List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)) + sub.cancel() + } + + "work on a variable rate chain" in { + val future = Source(1 to 1000) + .aggregate(max = 100, seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .runFold(0)(_ + _) + Await.result(future, 10.seconds) should be(500500) + } + + "backpressure subscriber when upstream is slower" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(publisher).aggregate(max = 2, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() + val sub = subscriber.expectSubscription() + + sub.request(1) + publisher.sendNext(1) + subscriber.expectNext(1) + + sub.request(1) + subscriber.expectNoMsg(500.millis) + publisher.sendNext(2) + subscriber.expectNext(2) + + publisher.sendNext(3) + publisher.sendNext(4) + // The request can be in race with the above onNext(4) so the result would be either 3 or 7. + subscriber.expectNoMsg(500.millis) + sub.request(1) + subscriber.expectNext(7) + + sub.request(1) + subscriber.expectNoMsg(500.millis) + sub.cancel() + + } + + "work with a buffer and fold" in { + val future = Source(1 to 50) + .aggregate(max = Long.MaxValue, seed = i ⇒ i)(aggregate = _ + _) + .buffer(50, OverflowStrategy.backpressure) + .runFold(0)(_ + _) + Await.result(future, 3.seconds) should be((1 to 50).sum) + } + + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateWeightedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateWeightedSpec.scala new file mode 100644 index 0000000000..6ea5e15387 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAggregateWeightedSpec.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit._ +import scala.concurrent.duration._ + +class FlowAggregateWeightedSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + + implicit val materializer = ActorMaterializer(settings) + + "AggregateWeighted" must { + "Not aggregate heavy elements" in { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.manualProbe[Int]() + + Source.fromPublisher(publisher).aggregateWeighted(max = 3, _ ⇒ 4, seed = i ⇒ i)(aggregate = _ + _).to(Sink.fromSubscriber(subscriber)).run() + val sub = subscriber.expectSubscription() + + publisher.sendNext(1) + publisher.sendNext(2) + + sub.request(1) + subscriber.expectNext(1) + + publisher.sendNext(3) + subscriber.expectNoMsg(1.second) + + sub.request(2) + subscriber.expectNext(2) + subscriber.expectNext(3) + + sub.cancel() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 94592b6f67..87626b82ae 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -48,6 +48,8 @@ private[stream] object Stages { val intersperse = name("intersperse") val buffer = name("buffer") val conflate = name("conflate") + val aggregate = name("aggregate") + val aggregateWeighted = name("aggregateWeighted") val expand = name("expand") val mapConcat = name("mapConcat") val detacher = name("detacher") @@ -204,6 +206,15 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[In, Out] = fusing.Conflate(seed, aggregate, supervision(attr)) } + final case class Aggregate[In, Out](max: Long, seed: In ⇒ Out, aggregateFn: (Out, In) ⇒ Out, attributes: Attributes = aggregate) extends SymbolicStage[In, Out] { + private[this] val inc: Any ⇒ Long = _ ⇒ 1L + override def create(attr: Attributes): Stage[In, Out] = fusing.AggregateWeighted(max, inc, seed, aggregateFn, supervision(attr)) + } + + final case class AggregateWeighted[In, Out](max: Long, weightFn: In ⇒ Long, seed: In ⇒ Out, aggregateFn: (Out, In) ⇒ Out, attributes: Attributes = aggregateWeighted) extends SymbolicStage[In, Out] { + override def create(attr: Attributes): Stage[In, Out] = fusing.AggregateWeighted(max, weightFn, seed, aggregateFn, supervision(attr)) + } + final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], attributes: Attributes = mapConcat) extends SymbolicStage[In, Out] { override def create(attr: Attributes): Stage[In, Out] = fusing.MapConcat(f, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 7dd91da4ce..d314c5d1f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -478,6 +478,86 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O override def restart(): Conflate[In, Out] = copy() } +/** + * INTERNAL API + */ +private[akka] final case class AggregateWeighted[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, + aggregate: (Out, In) ⇒ Out, + decider: Supervision.Decider) extends DetachedStage[In, Out] { + private var agg: Any = null + private var left: Long = max + private var pending: Any = null + + private[this] def flush(ctx: DetachedContext[Out]) = { + val result = agg.asInstanceOf[Out] + agg = null + left = max + if (pending != null) { + val elem = pending.asInstanceOf[In] + agg = seed(elem) + left -= costFn(elem) + pending = null + } + ctx.pushAndPull(result) + } + + override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = { + val cost = costFn(elem) + if (agg == null) { + left -= cost + agg = seed(elem) + } else if (left <= 0 || left - cost < 0) { + pending = elem + } else { + left -= cost + agg = aggregate(agg.asInstanceOf[Out], elem) + } + + if (!ctx.isHoldingDownstream && pending == null) ctx.pull() + else if (!ctx.isHoldingDownstream) ctx.holdUpstream() + else flush(ctx) + } + + override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = { + //if ctx.isFinishing, we still might emit up to 2 more elements (whatever agg is + possibly a pending heavy element) + if (ctx.isFinishing) { + //agg != null since we already checked it in onUpstreamFinish + val result = agg.asInstanceOf[Out] + if (pending == null) ctx.pushAndFinish(result) + else { + val elem = pending.asInstanceOf[In] + agg = seed(elem) + pending = null + ctx.push(result) + } + } else if (ctx.isHoldingBoth) flush(ctx) + else if (agg == null) ctx.holdDownstream() + else { + val result = agg.asInstanceOf[Out] + left = max + if (pending != null) { + val elem = pending.asInstanceOf[In] + agg = seed(elem) + left -= costFn(elem) + pending = null + } else { + agg = null + } + if (ctx.isHoldingUpstream) ctx.pushAndPull(result) + else ctx.push(result) + } + } + + override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = { + if (agg == null) ctx.finish() + else ctx.absorbTermination() + } + + override def decide(t: Throwable): Supervision.Directive = decider(t) + + override def restart(): AggregateWeighted[In, Out] = copy() +} + /** * INTERNAL API */ @@ -993,4 +1073,4 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl } override def toString = "DropWithin" -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c76740775e..3adf6c01ce 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -810,6 +810,58 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = new Flow(delegate.conflate(seed.apply)(aggregate.apply)) + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * an array up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = + new Flow(delegate.aggregate(max, seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * elements up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the + * incoming elemetns is aggregated. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param costFn a function to compute a single element weight + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Flow[In, S, Mat] = + new Flow(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c8b9ea5e97..eabbedfd34 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1239,6 +1239,58 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = new Source(delegate.conflate(seed.apply)(aggregate.apply)) + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * an array up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + new Source(delegate.aggregate(max, seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * elements up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the + * incoming elemetns is aggregated. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param costFn a function to compute a single element weight + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): javadsl.Source[S, Mat] = + new Source(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index ab45f4ba09..9da26d18ef 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -652,6 +652,58 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = new SubFlow(delegate.conflate(seed.apply)(aggregate.apply)) + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * an array up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = + new SubFlow(delegate.aggregate(max, seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * elements up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the + * incoming elemetns is aggregated. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param costFn a function to compute a single element weight + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubFlow[In, S, Mat] = + new SubFlow(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 88eec41d19..14cd680ac7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -648,6 +648,58 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def conflate[S](seed: function.Function[Out, S], aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = new SubSource(delegate.conflate(seed.apply)(aggregate.apply)) + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * an array up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregate[S](max: Long, seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = + new SubSource(delegate.aggregate(max, seed.apply)(aggregate.apply)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * elements up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the + * incoming elemetns is aggregated. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param costFn a function to compute a single element weight + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregateWeighted[S](max: Long, costFn: function.Function[Out, Long], seed: function.Function[Out, S])(aggregate: function.Function2[S, Out, S]): SubSource[S, Mat] = + new SubSource(delegate.aggregateWeighted(max, costFn.apply, seed.apply)(aggregate.apply)) + /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1958652769..81f296b105 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -908,6 +908,58 @@ trait FlowOps[+Out, +Mat] { */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = andThen(Conflate(seed, aggregate)) + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might store received elements in + * an array up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregate[S](max: Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = + andThen(Aggregate(max, seed, aggregate)) + + /** + * Allows a faster upstream to progress independently of a slower subscriber by aggregating elements into chunks + * until the subscriber is ready to accept them. For example an aggregate step might concatenate `ByteString` + * elements up to the allowed max limit if the upstream publisher is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * Aggregation will apply for all elements, even if a single element cost is greater than the total allowed limit. + * In this case, previous aggregated elements will be emitted, then the "heavy" element will be emitted (after + * being applied with the `seed` function) without aggregating further elements to it, and then the rest of the + * incoming elemetns is aggregated. + * + * '''Emits when''' downstream stops backpressuring and there is an aggregated element available + * + * '''Backpressures when''' there are `max` aggregated elements and 1 pending element and downstream backpressures + * + * '''Completes when''' upstream completes and there is no aggregated/pending element waiting + * + * '''Cancels when''' downstream cancels + * + * @param max maximum number of elements to aggregate before backpressuring upstream (must be positive non-zero) + * @param costFn a function to compute a single element weight + * @param seed Provides the first state for an aggregated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def aggregateWeighted[S](max: Long, costFn: Out ⇒ Long, seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = + andThen(AggregateWeighted(max, costFn, seed, aggregate)) + /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for