+str #19020 reduce combinator

This commit is contained in:
Alexander Golubev 2016-01-15 22:51:26 -05:00
parent 55425e5ef3
commit a2ab7f29e1
15 changed files with 247 additions and 37 deletions

View file

@ -55,8 +55,10 @@ public class RecipeReduceByKeyTest extends RecipeTest {
final Source<Pair<String, Integer>, NotUsed> counts = words final Source<Pair<String, Integer>, NotUsed> counts = words
// split the words into separate streams first // split the words into separate streams first
.groupBy(MAXIMUM_DISTINCT_WORDS, i -> i) .groupBy(MAXIMUM_DISTINCT_WORDS, i -> i)
//transform each element to pair with number of words in it
.map(i -> new Pair<>(i, 1))
// add counting logic to the streams // add counting logic to the streams
.fold(new Pair<>("", 0), (pair, elem) -> new Pair<>(elem, pair.second() + 1)) .reduce((left, right) -> new Pair<>(left.first(), left.second() + right.second()))
// get a stream of word counts // get a stream of word counts
.mergeSubstreams(); .mergeSubstreams();
//#word-count //#word-count
@ -77,17 +79,13 @@ public class RecipeReduceByKeyTest extends RecipeTest {
static public <In, K, Out> Flow<In, Pair<K, Out>, NotUsed> reduceByKey( static public <In, K, Out> Flow<In, Pair<K, Out>, NotUsed> reduceByKey(
int maximumGroupSize, int maximumGroupSize,
Function<In, K> groupKey, Function<In, K> groupKey,
Function<K, Out> foldZero, Function<In, Out> map,
Function2<Out, In, Out> fold, Function2<Out, Out, Out> reduce) {
Materializer mat) {
return Flow.<In> create() return Flow.<In> create()
.groupBy(maximumGroupSize, i -> i) .groupBy(maximumGroupSize, groupKey)
.fold((Pair<K, Out>) null, (pair, elem) -> { .map(i -> new Pair<>(groupKey.apply(i), map.apply(i)))
final K key = groupKey.apply(elem); .reduce((left, right) -> new Pair<>(left.first(), reduce.apply(left.second(), right.second())))
if (pair == null) return new Pair<>(key, fold.apply(foldZero.apply(key), elem));
else return new Pair<>(key, fold.apply(pair.second(), elem));
})
.mergeSubstreams(); .mergeSubstreams();
} }
//#reduce-by-key-general //#reduce-by-key-general
@ -104,9 +102,8 @@ public class RecipeReduceByKeyTest extends RecipeTest {
Source<Pair<String, Integer>, NotUsed> counts = words.via(reduceByKey( Source<Pair<String, Integer>, NotUsed> counts = words.via(reduceByKey(
MAXIMUM_DISTINCT_WORDS, MAXIMUM_DISTINCT_WORDS,
word -> word, word -> word,
key -> 0, word -> 1,
(count, elem) -> count + 1, (left, right) -> left + right));
mat));
//#reduce-by-key-general2 //#reduce-by-key-general2
final Future<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat); final Future<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);

View file

@ -113,7 +113,7 @@ we have a stream of streams, where every substream will serve identical words.
To count the words, we need to process the stream of streams (the actual groups To count the words, we need to process the stream of streams (the actual groups
containing identical words). ``groupBy`` returns a :class:`SubSource`, which containing identical words). ``groupBy`` returns a :class:`SubSource`, which
means that we transform the resulting substreams directly. In this case we use means that we transform the resulting substreams directly. In this case we use
the ``fold`` combinator to aggregate the word itself and the number of its the ``reduce`` combinator to aggregate the word itself and the number of its
occurrences within a :class:`Pair<String, Integer>`. Each substream will then occurrences within a :class:`Pair<String, Integer>`. Each substream will then
emit one final value—precisely such a pair—when the overall input completes. As emit one final value—precisely such a pair—when the overall input completes. As
a last step we merge back these values from the substreams into one single a last step we merge back these values from the substreams into one single
@ -133,8 +133,8 @@ stream cannot continue without violating its resource bound, in this case
By extracting the parts specific to *wordcount* into By extracting the parts specific to *wordcount* into
* a ``groupKey`` function that defines the groups * a ``groupKey`` function that defines the groups
* a ``foldZero`` that defines the zero element used by the fold on the substream given the group key * a ``map`` map each element to value that is used by the reduce on the substream
* a ``fold`` function that does the actual reduction * a ``reduce`` function that does the actual reduction
we get a generalized version below: we get a generalized version below:

View file

@ -21,10 +21,10 @@ class RecipeReduceByKey extends RecipeSpec {
val counts: Source[(String, Int), NotUsed] = words val counts: Source[(String, Int), NotUsed] = words
// split the words into separate streams first // split the words into separate streams first
.groupBy(MaximumDistinctWords, identity) .groupBy(MaximumDistinctWords, identity)
//transform each element to pair with number of words in it
.map(_ -> 1)
// add counting logic to the streams // add counting logic to the streams
.fold(("", 0)) { .reduce((l, r) => (l._1, l._2 + r._2))
case ((_, count), word) => (word, count + 1)
}
// get a stream of word counts // get a stream of word counts
.mergeSubstreams .mergeSubstreams
//#word-count //#word-count
@ -46,26 +46,19 @@ class RecipeReduceByKey extends RecipeSpec {
def reduceByKey[In, K, Out]( def reduceByKey[In, K, Out](
maximumGroupSize: Int, maximumGroupSize: Int,
groupKey: (In) => K, groupKey: (In) => K,
foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out), NotUsed] = { map: (In) => Out)(reduce: (Out, Out) => Out): Flow[In, (K, Out), NotUsed] = {
Flow[In] Flow[In]
.groupBy(maximumGroupSize, groupKey) .groupBy[K](maximumGroupSize, groupKey)
.fold(Option.empty[(K, Out)]) { .map(e => groupKey(e) -> map(e))
case (None, elem) => .reduce((l, r) => l._1 -> reduce(l._2, r._2))
val key = groupKey(elem)
Some((key, fold(foldZero(key), elem)))
case (Some((key, out)), elem) =>
Some((key, fold(out, elem)))
}
.map(_.get)
.mergeSubstreams .mergeSubstreams
} }
val wordCounts = words.via(reduceByKey( val wordCounts = words.via(
MaximumDistinctWords, reduceByKey(MaximumDistinctWords,
groupKey = (word: String) => word, groupKey = (word: String) => word,
foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1)) map = (word: String) => 1)((left: Int, right: Int) => left + right))
//#reduce-by-key-general //#reduce-by-key-general
Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(

View file

@ -111,7 +111,7 @@ we have a stream of streams, where every substream will serve identical words.
To count the words, we need to process the stream of streams (the actual groups To count the words, we need to process the stream of streams (the actual groups
containing identical words). ``groupBy`` returns a :class:`SubFlow`, which containing identical words). ``groupBy`` returns a :class:`SubFlow`, which
means that we transform the resulting substreams directly. In this case we use means that we transform the resulting substreams directly. In this case we use
the ``fold`` combinator to aggregate the word itself and the number of its the ``reduce`` combinator to aggregate the word itself and the number of its
occurrences within a tuple :class:`(String, Integer)`. Each substream will then occurrences within a tuple :class:`(String, Integer)`. Each substream will then
emit one final value—precisely such a pair—when the overall input completes. As emit one final value—precisely such a pair—when the overall input completes. As
a last step we merge back these values from the substreams into one single a last step we merge back these values from the substreams into one single
@ -131,8 +131,8 @@ this case ``groupBy`` will terminate with a failure.
By extracting the parts specific to *wordcount* into By extracting the parts specific to *wordcount* into
* a ``groupKey`` function that defines the groups * a ``groupKey`` function that defines the groups
* a ``foldZero`` that defines the zero element used by the fold on the substream given the group key * a ``map`` map each element to value that is used by the reduce on the substream
* a ``fold`` function that does the actual reduction * a ``reduce`` function that does the actual reduction
we get a generalized version below: we get a generalized version below:

View file

@ -0,0 +1,59 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.concurrent.Await
import scala.util.control.NoStackTrace
import akka.stream.ActorMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._
import scala.concurrent.duration._
class FlowReduceSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"A Reduce" must {
val input = 1 to 100
val expected = input.sum
val inputSource = Source(input).filter(_ true).map(identity)
val reduceSource = inputSource.reduce[Int](_ + _).filter(_ true).map(identity)
val reduceFlow = Flow[Int].filter(_ true).map(identity).reduce(_ + _).filter(_ true).map(identity)
val reduceSink = Sink.reduce[Int](_ + _)
"work when using Source.runReduce" in assertAllStagesStopped {
Await.result(inputSource.runReduce(_ + _), 3.seconds) should be(expected)
}
"work when using Source.reduce" in assertAllStagesStopped {
Await.result(reduceSource runWith Sink.head, 3.seconds) should be(expected)
}
"work when using Sink.reduce" in assertAllStagesStopped {
Await.result(inputSource runWith reduceSink, 3.seconds) should be(expected)
}
"work when using Flow.reduce" in assertAllStagesStopped {
Await.result(inputSource via reduceFlow runWith Sink.head, 3.seconds) should be(expected)
}
"work when using Source.reduce + Flow.reduce + Sink.reduce" in assertAllStagesStopped {
Await.result(reduceSource via reduceFlow runWith reduceSink, 3.seconds) should be(expected)
}
"propagate an error" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val future = inputSource.map(x if (x > 50) throw error else x).runReduce(Keep.none)
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
}
"complete future with failure when reducing function throws" in assertAllStagesStopped {
val error = new Exception with NoStackTrace
val future = inputSource.runReduce[Int]((x, y) if (x > 50) throw error else x + y)
the[Exception] thrownBy Await.result(future, 3.seconds) should be(error)
}
}
}

View file

@ -45,6 +45,7 @@ private[stream] object Stages {
val dropWhile = name("dropWhile") val dropWhile = name("dropWhile")
val scan = name("scan") val scan = name("scan")
val fold = name("fold") val fold = name("fold")
val reduce = name("reduce")
val intersperse = name("intersperse") val intersperse = name("intersperse")
val buffer = name("buffer") val buffer = name("buffer")
val conflate = name("conflate") val conflate = name("conflate")

View file

@ -1071,3 +1071,38 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl
override def toString = "DropWithin" override def toString = "DropWithin"
} }
/**
* INTERNAL API
*/
private[stream] final class Reduce[T](f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
override def toString = s"Reduce.Logic(aggregator=$aggregator)"
var aggregator: T = _
setHandler(in, new InHandler {
override def onPush(): Unit = {
aggregator = grab(in)
pull(in)
setHandler(in, rest)
}
})
def rest = new InHandler {
override def onPush(): Unit = {
aggregator = f(aggregator, grab(in))
pull(in)
}
override def onUpstreamFinish(): Unit = {
push(out, aggregator)
completeStage()
}
}
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
override def toString = "Reduce"
}

View file

@ -555,6 +555,22 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.fold(zero)(f.apply)) new Flow(delegate.fold(zero)(f.apply))
/**
* Similar to `fold` but uses first element as zero element.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.reduce(f.apply))
/** /**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements. * injects a separator between a List's elements.

View file

@ -29,6 +29,16 @@ object Sink {
def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, Future[U]] = def fold[U, In](zero: U, f: function.Function2[U, In, U]): javadsl.Sink[In, Future[U]] =
new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply)) new Sink(scaladsl.Sink.fold[U, In](zero)(f.apply))
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*/
def reduce[In](f: function.Function2[In, In, In]): Sink[In, Future[In]] =
new Sink(scaladsl.Sink.reduce[In](f.apply))
/** /**
* Helper to create [[Sink]] from `Subscriber`. * Helper to create [[Sink]] from `Subscriber`.
*/ */

View file

@ -457,6 +457,17 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: Materializer): Future[U] = def runFold[U](zero: U, f: function.Function2[U, Out, U], materializer: Materializer): Future[U] =
runWith(Sink.fold(zero, f), materializer) runWith(Sink.fold(zero, f), materializer)
/**
* Shortcut for running this `Source` with a reduce function.
* The given function is invoked for every received element, giving it its previous
* output (from the second ones) an the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure is signaled in the stream.
*/
def runReduce[U >: Out](f: function.Function2[U, U, U], materializer: Materializer): Future[U] =
runWith(Sink.reduce(f), materializer)
/** /**
* Concatenate this [[Source]] with the given one, meaning that once current * Concatenate this [[Source]] with the given one, meaning that once current
* is exhausted and all result elements have been generated, * is exhausted and all result elements have been generated,
@ -1006,6 +1017,22 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.fold(zero)(f.apply)) new Source(delegate.fold(zero)(f.apply))
/**
* Similar to `fold` but uses first element as zero element.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): javadsl.Source[Out, Mat] =
new Source(delegate.reduce(f.apply))
/** /**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements. * injects a separator between a List's elements.

View file

@ -400,6 +400,22 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.fold(zero)(f.apply)) new SubFlow(delegate.fold(zero)(f.apply))
/**
* Similar to `fold` but uses first element as zero element.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): SubFlow[In, Out, Mat] =
new SubFlow(delegate.reduce(f.apply))
/** /**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements. * injects a separator between a List's elements.

View file

@ -396,6 +396,22 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = def fold[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.fold(zero)(f.apply)) new SubSource(delegate.fold(zero)(f.apply))
/**
* Similar to `fold` but uses first element as zero element.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): SubSource[Out, Mat] =
new SubSource(delegate.reduce(f.apply))
/** /**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements. * injects a separator between a List's elements.

View file

@ -706,9 +706,28 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/ */
def fold[T](zero: T)(f: (T, Out) T): Repr[T] = andThen(Fold(zero, f)) def fold[T](zero: T)(f: (T, Out) T): Repr[T] = andThen(Fold(zero, f))
/**
* Similar to `fold` but uses first element as zero element.
* Applies the given function towards its current and next value,
* yielding the next current value.
*
* '''Emits when''' upstream completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.fold]]
*/
def reduce[T >: Out](f: (T, T) T): Repr[T] = via(new Reduce[T](f))
/** /**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements. * injects a separator between a List's elements.

View file

@ -230,6 +230,16 @@ object Sink {
def fold[U, T](zero: U)(f: (U, T) U): Sink[T, Future[U]] = def fold[U, T](zero: U)(f: (U, T) U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink") Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*/
def reduce[T](f: (T, T) T): Sink[T, Future[T]] =
Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink")
/** /**
* A `Sink` that when the flow is completed, either through a failure or normal * A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]] * completion, apply the provided function with [[scala.util.Success]]

View file

@ -101,6 +101,17 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
def runFold[U](zero: U)(f: (U, Out) U)(implicit materializer: Materializer): Future[U] = def runFold[U](zero: U)(f: (U, Out) U)(implicit materializer: Materializer): Future[U] =
runWith(Sink.fold(zero)(f)) runWith(Sink.fold(zero)(f))
/**
* Shortcut for running this `Source` with a reduce function.
* The given function is invoked for every received element, giving it its previous
* output (from the second element) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*/
def runReduce[U >: Out](f: (U, U) U)(implicit materializer: Materializer): Future[U] =
runWith(Sink.reduce(f))
/** /**
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
* for each received element. * for each received element.