2014-12-08 17:29:40 +01:00
|
|
|
package docs.stream.cookbook
|
|
|
|
|
|
2014-12-19 11:39:41 +01:00
|
|
|
import akka.stream.OverflowStrategy
|
2014-12-08 17:29:40 +01:00
|
|
|
import akka.stream.scaladsl._
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.{ Await, Future }
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
|
|
|
|
class RecipeReduceByKey extends RecipeSpec {
|
|
|
|
|
|
|
|
|
|
"Reduce by key recipe" must {
|
|
|
|
|
|
2014-12-19 11:39:41 +01:00
|
|
|
val MaximumDistinctWords = 1000
|
|
|
|
|
|
2014-12-08 17:29:40 +01:00
|
|
|
"work with simple word count" in {
|
|
|
|
|
|
|
|
|
|
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
|
|
|
|
|
|
|
|
|
|
//#word-count
|
|
|
|
|
// split the words into separate streams first
|
2015-01-28 14:19:50 +01:00
|
|
|
val wordStreams: Source[(String, Source[String, Unit]), Unit] = words.groupBy(identity)
|
2014-12-08 17:29:40 +01:00
|
|
|
|
|
|
|
|
// add counting logic to the streams
|
2015-01-28 14:19:50 +01:00
|
|
|
val countedWords: Source[Future[(String, Int)], Unit] = wordStreams.map {
|
2014-12-08 17:29:40 +01:00
|
|
|
case (word, wordStream) =>
|
2015-01-26 14:57:05 +01:00
|
|
|
wordStream.runFold((word, 0)) {
|
2014-12-08 17:29:40 +01:00
|
|
|
case ((w, count), _) => (w, count + 1)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get a stream of word counts
|
2015-01-28 14:19:50 +01:00
|
|
|
val counts: Source[(String, Int), Unit] =
|
2014-12-19 11:39:41 +01:00
|
|
|
countedWords
|
2015-01-30 10:30:56 +01:00
|
|
|
.buffer(MaximumDistinctWords, OverflowStrategy.fail)
|
2015-04-28 14:37:58 +02:00
|
|
|
.mapAsync(4)(identity)
|
2014-12-08 17:29:40 +01:00
|
|
|
//#word-count
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
|
2014-12-08 17:29:40 +01:00
|
|
|
("hello", 2),
|
|
|
|
|
("world", 1),
|
|
|
|
|
("and", 1),
|
|
|
|
|
("universe", 1),
|
|
|
|
|
("akka", 1),
|
|
|
|
|
("rocks!", 1000)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"work generalized" in {
|
|
|
|
|
|
|
|
|
|
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
|
|
|
|
|
|
|
|
|
|
//#reduce-by-key-general
|
|
|
|
|
def reduceByKey[In, K, Out](
|
2014-12-19 11:39:41 +01:00
|
|
|
maximumGroupSize: Int,
|
2014-12-08 17:29:40 +01:00
|
|
|
groupKey: (In) => K,
|
2015-01-28 14:19:50 +01:00
|
|
|
foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out), Unit] = {
|
2014-12-08 17:29:40 +01:00
|
|
|
|
|
|
|
|
val groupStreams = Flow[In].groupBy(groupKey)
|
|
|
|
|
val reducedValues = groupStreams.map {
|
|
|
|
|
case (key, groupStream) =>
|
2015-01-26 14:57:05 +01:00
|
|
|
groupStream.runFold((key, foldZero(key))) {
|
2014-12-08 17:29:40 +01:00
|
|
|
case ((key, aggregated), elem) => (key, fold(aggregated, elem))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-04-28 14:37:58 +02:00
|
|
|
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4)(identity)
|
2014-12-08 17:29:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val wordCounts = words.via(reduceByKey(
|
2014-12-19 11:39:41 +01:00
|
|
|
MaximumDistinctWords,
|
2014-12-08 17:29:40 +01:00
|
|
|
groupKey = (word: String) => word,
|
|
|
|
|
foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1))
|
|
|
|
|
|
|
|
|
|
//#reduce-by-key-general
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
|
2014-12-08 17:29:40 +01:00
|
|
|
("hello", 2),
|
|
|
|
|
("world", 1),
|
|
|
|
|
("and", 1),
|
|
|
|
|
("universe", 1),
|
|
|
|
|
("akka", 1),
|
|
|
|
|
("rocks!", 1000)))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|