diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java index 34acdb8b45..3770d4804b 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java @@ -55,8 +55,10 @@ public class RecipeReduceByKeyTest extends RecipeTest { final Source, NotUsed> counts = words // split the words into separate streams first .groupBy(MAXIMUM_DISTINCT_WORDS, i -> i) + //transform each element to pair with number of words in it + .map(i -> new Pair<>(i, 1)) // add counting logic to the streams - .fold(new Pair<>("", 0), (pair, elem) -> new Pair<>(elem, pair.second() + 1)) + .reduce((left, right) -> new Pair<>(left.first(), left.second() + right.second())) // get a stream of word counts .mergeSubstreams(); //#word-count @@ -77,17 +79,13 @@ public class RecipeReduceByKeyTest extends RecipeTest { static public Flow, NotUsed> reduceByKey( int maximumGroupSize, Function groupKey, - Function foldZero, - Function2 fold, - Materializer mat) { + Function map, + Function2 reduce) { return Flow. create() - .groupBy(maximumGroupSize, i -> i) - .fold((Pair) null, (pair, elem) -> { - final K key = groupKey.apply(elem); - if (pair == null) return new Pair<>(key, fold.apply(foldZero.apply(key), elem)); - else return new Pair<>(key, fold.apply(pair.second(), elem)); - }) + .groupBy(maximumGroupSize, groupKey) + .map(i -> new Pair<>(groupKey.apply(i), map.apply(i))) + .reduce((left, right) -> new Pair<>(left.first(), reduce.apply(left.second(), right.second()))) .mergeSubstreams(); } //#reduce-by-key-general @@ -104,9 +102,8 @@ public class RecipeReduceByKeyTest extends RecipeTest { Source, NotUsed> counts = words.via(reduceByKey( MAXIMUM_DISTINCT_WORDS, word -> word, - key -> 0, - (count, elem) -> count + 1, - mat)); + word -> 1, + (left, right) -> left + right)); //#reduce-by-key-general2 final Future>> f = counts.grouped(10).runWith(Sink.head(), mat); diff --git a/akka-docs/rst/java/stream/stream-cookbook.rst b/akka-docs/rst/java/stream/stream-cookbook.rst index de0e26a603..18095f5575 100644 --- a/akka-docs/rst/java/stream/stream-cookbook.rst +++ b/akka-docs/rst/java/stream/stream-cookbook.rst @@ -113,7 +113,7 @@ we have a stream of streams, where every substream will serve identical words. To count the words, we need to process the stream of streams (the actual groups containing identical words). ``groupBy`` returns a :class:`SubSource`, which means that we transform the resulting substreams directly. In this case we use -the ``fold`` combinator to aggregate the word itself and the number of its +the ``reduce`` combinator to aggregate the word itself and the number of its occurrences within a :class:`Pair`. Each substream will then emit one final value—precisely such a pair—when the overall input completes. As a last step we merge back these values from the substreams into one single @@ -133,8 +133,8 @@ stream cannot continue without violating its resource bound, in this case By extracting the parts specific to *wordcount* into * a ``groupKey`` function that defines the groups -* a ``foldZero`` that defines the zero element used by the fold on the substream given the group key -* a ``fold`` function that does the actual reduction +* a ``map`` map each element to value that is used by the reduce on the substream +* a ``reduce`` function that does the actual reduction we get a generalized version below: diff --git a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 5e80a6124a..494303fb32 100644 --- a/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -21,10 +21,10 @@ class RecipeReduceByKey extends RecipeSpec { val counts: Source[(String, Int), NotUsed] = words // split the words into separate streams first .groupBy(MaximumDistinctWords, identity) + //transform each element to pair with number of words in it + .map(_ -> 1) // add counting logic to the streams - .fold(("", 0)) { - case ((_, count), word) => (word, count + 1) - } + .reduce((l, r) => (l._1, l._2 + r._2)) // get a stream of word counts .mergeSubstreams //#word-count @@ -46,26 +46,19 @@ class RecipeReduceByKey extends RecipeSpec { def reduceByKey[In, K, Out]( maximumGroupSize: Int, groupKey: (In) => K, - foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out), NotUsed] = { + map: (In) => Out)(reduce: (Out, Out) => Out): Flow[In, (K, Out), NotUsed] = { Flow[In] - .groupBy(maximumGroupSize, groupKey) - .fold(Option.empty[(K, Out)]) { - case (None, elem) => - val key = groupKey(elem) - Some((key, fold(foldZero(key), elem))) - case (Some((key, out)), elem) => - Some((key, fold(out, elem))) - } - .map(_.get) + .groupBy[K](maximumGroupSize, groupKey) + .map(e => groupKey(e) -> map(e)) + .reduce((l, r) => l._1 -> reduce(l._2, r._2)) .mergeSubstreams } - val wordCounts = words.via(reduceByKey( - MaximumDistinctWords, - groupKey = (word: String) => word, - foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1)) - + val wordCounts = words.via( + reduceByKey(MaximumDistinctWords, + groupKey = (word: String) => word, + map = (word: String) => 1)((left: Int, right: Int) => left + right)) //#reduce-by-key-general Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( diff --git a/akka-docs/rst/scala/stream/stream-cookbook.rst b/akka-docs/rst/scala/stream/stream-cookbook.rst index 4a4516eaaa..823f98f5ab 100644 --- a/akka-docs/rst/scala/stream/stream-cookbook.rst +++ b/akka-docs/rst/scala/stream/stream-cookbook.rst @@ -111,7 +111,7 @@ we have a stream of streams, where every substream will serve identical words. To count the words, we need to process the stream of streams (the actual groups containing identical words). ``groupBy`` returns a :class:`SubFlow`, which means that we transform the resulting substreams directly. In this case we use -the ``fold`` combinator to aggregate the word itself and the number of its +the ``reduce`` combinator to aggregate the word itself and the number of its occurrences within a tuple :class:`(String, Integer)`. Each substream will then emit one final value—precisely such a pair—when the overall input completes. As a last step we merge back these values from the substreams into one single @@ -131,8 +131,8 @@ this case ``groupBy`` will terminate with a failure. By extracting the parts specific to *wordcount* into * a ``groupKey`` function that defines the groups -* a ``foldZero`` that defines the zero element used by the fold on the substream given the group key -* a ``fold`` function that does the actual reduction +* a ``map`` map each element to value that is used by the reduce on the substream +* a ``reduce`` function that does the actual reduction we get a generalized version below: diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala new file mode 100644 index 0000000000..7624f645a2 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowReduceSpec.scala @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import scala.concurrent.Await +import scala.util.control.NoStackTrace + +import akka.stream.ActorMaterializer +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.Utils._ +import scala.concurrent.duration._ + +class FlowReduceSpec extends AkkaSpec { + implicit val materializer = ActorMaterializer() + + "A Reduce" must { + val input = 1 to 100 + val expected = input.sum + val inputSource = Source(input).filter(_ ⇒ true).map(identity) + val reduceSource = inputSource.reduce[Int](_ + _).filter(_ ⇒ true).map(identity) + val reduceFlow = Flow[Int].filter(_ ⇒ true).map(identity).reduce(_ + _).filter(_ ⇒ true).map(identity) + val reduceSink = Sink.reduce[Int](_ + _) + + "work when using Source.runReduce" in assertAllStagesStopped { + Await.result(inputSource.runReduce(_ + _), 3.seconds) should be(expected) + } + + "work when using Source.reduce" in assertAllStagesStopped { + Await.result(reduceSource runWith Sink.head, 3.seconds) should be(expected) + } + + "work when using Sink.reduce" in assertAllStagesStopped { + Await.result(inputSource runWith reduceSink, 3.seconds) should be(expected) + } + + "work when using Flow.reduce" in assertAllStagesStopped { + Await.result(inputSource via reduceFlow runWith Sink.head, 3.seconds) should be(expected) + } + + "work when using Source.reduce + Flow.reduce + Sink.reduce" in assertAllStagesStopped { + Await.result(reduceSource via reduceFlow runWith reduceSink, 3.seconds) should be(expected) + } + + "propagate an error" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val future = inputSource.map(x ⇒ if (x > 50) throw error else x).runReduce(Keep.none) + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) + } + + "complete future with failure when reducing function throws" in assertAllStagesStopped { + val error = new Exception with NoStackTrace + val future = inputSource.runReduce[Int]((x, y) ⇒ if (x > 50) throw error else x + y) + the[Exception] thrownBy Await.result(future, 3.seconds) should be(error) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index e33ee8481c..34469b4722 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -45,6 +45,7 @@ private[stream] object Stages { val dropWhile = name("dropWhile") val scan = name("scan") val fold = name("fold") + val reduce = name("reduce") val intersperse = name("intersperse") val buffer = name("buffer") val conflate = name("conflate") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index c1074b6c45..e12d14c672 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1071,3 +1071,38 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl override def toString = "DropWithin" } + +/** + * INTERNAL API + */ +private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { + override def initialAttributes: Attributes = DefaultAttributes.reduce + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def toString = s"Reduce.Logic(aggregator=$aggregator)" + var aggregator: T = _ + + setHandler(in, new InHandler { + override def onPush(): Unit = { + aggregator = grab(in) + pull(in) + setHandler(in, rest) + } + }) + def rest = new InHandler { + override def onPush(): Unit = { + aggregator = f(aggregator, grab(in)) + pull(in) + } + override def onUpstreamFinish(): Unit = { + push(out, aggregator) + completeStage() + } + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + } + override def toString = "Reduce" +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 136511f26b..739094c194 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -555,6 +555,22 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but uses first element as zero element. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.reduce(f.apply)) + /** * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * injects a separator between a List's elements. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index fb1bc07461..b7854e0253 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -29,6 +29,16 @@ object Sink { def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, Future[U]] = new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply)) + /** + * A `Sink` that will invoke the given function for every received element, giving it its previous + * output (from the second element) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure signaled in the stream. + */ + def reduce[In](f: function.Function2[In, In, In]): Sink[In, Future[In]] = + new Sink(scaladsl.Sink.reduce[In](f.apply)) + /** * Helper to create [[Sink]] from `Subscriber`. */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2aa378591e..580cfd59eb 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -457,6 +457,17 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: Materializer): Future[U] = runWith(Sink.fold(zero, f), materializer) + /** + * Shortcut for running this `Source` with a reduce function. + * The given function is invoked for every received element, giving it its previous + * output (from the second ones) an the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure is signaled in the stream. + */ + def runReduce[U >: Out](f: function.Function2[U, U, U], materializer: Materializer): Future[U] = + runWith(Sink.reduce(f), materializer) + /** * Concatenate this [[Source]] with the given one, meaning that once current * is exhausted and all result elements have been generated, @@ -1006,6 +1017,22 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but uses first element as zero element. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): javadsl.Source[Out, Mat] = + new Source(delegate.reduce(f.apply)) + /** * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * injects a separator between a List's elements. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 7198d01dc0..ca5d32d0ab 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -400,6 +400,22 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but uses first element as zero element. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.reduce(f.apply)) + /** * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * injects a separator between a List's elements. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 08bec7646a..137904da40 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -396,6 +396,22 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = new SubSource(delegate.fold(zero)(f.apply)) + /** + * Similar to `fold` but uses first element as zero element. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): SubSource[Out, Mat] = + new SubSource(delegate.reduce(f.apply)) + /** * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * injects a separator between a List's elements. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 339e9b94ba..ebd8326b2c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -706,9 +706,28 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scan]] */ def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[T] = andThen(Fold(zero, f)) + /** + * Similar to `fold` but uses first element as zero element. + * Applies the given function towards its current and next value, + * yielding the next current value. + * + * '''Emits when''' upstream completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.fold]] + */ + def reduce[T >: Out](f: (T, T) ⇒ T): Repr[T] = via(new Reduce[T](f)) + /** * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * injects a separator between a List's elements. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 2674e7d292..d9463aa84d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -230,6 +230,16 @@ object Sink { def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] = Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink") + /** + * A `Sink` that will invoke the given function for every received element, giving it its previous + * output (from the second element) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure signaled in the stream. + */ + def reduce[T](f: (T, T) ⇒ T): Sink[T, Future[T]] = + Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink") + /** * A `Sink` that when the flow is completed, either through a failure or normal * completion, apply the provided function with [[scala.util.Success]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 88e32bd8f6..19170ab203 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -101,6 +101,17 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.fold(zero)(f)) + /** + * Shortcut for running this `Source` with a reduce function. + * The given function is invoked for every received element, giving it its previous + * output (from the second element) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is a failure signaled in the stream. + */ + def runReduce[U >: Out](f: (U, U) ⇒ U)(implicit materializer: Materializer): Future[U] = + runWith(Sink.reduce(f)) + /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * for each received element.