2014-12-08 17:29:40 +01:00
|
|
|
package docs.stream.cookbook
|
|
|
|
|
|
|
|
|
|
import akka.stream.scaladsl.{ Sink, Source }
|
|
|
|
|
|
|
|
|
|
import scala.collection.immutable
|
|
|
|
|
import scala.concurrent.Await
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
|
|
|
|
class RecipeMultiGroupBy extends RecipeSpec {
|
|
|
|
|
|
|
|
|
|
"Recipe for multi-groupBy" must {
|
|
|
|
|
|
|
|
|
|
"work" in {
|
|
|
|
|
|
|
|
|
|
case class Topic(name: String)
|
|
|
|
|
|
|
|
|
|
val elems = Source(List("1: a", "1: b", "all: c", "all: d", "1: e"))
|
2015-07-08 10:23:10 +03:00
|
|
|
val extractTopics = { msg: Message =>
|
2014-12-08 17:29:40 +01:00
|
|
|
if (msg.startsWith("1")) List(Topic("1"))
|
|
|
|
|
else List(Topic("1"), Topic("2"))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#multi-groupby
|
2015-07-08 10:23:10 +03:00
|
|
|
val topicMapper: (Message) => immutable.Seq[Topic] = extractTopics
|
|
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
val messageAndTopic: Source[(Message, Topic), Unit] = elems.mapConcat { msg: Message =>
|
2014-12-08 17:29:40 +01:00
|
|
|
val topicsForMessage = topicMapper(msg)
|
|
|
|
|
// Create a (Msg, Topic) pair for each of the topics
|
|
|
|
|
// the message belongs to
|
|
|
|
|
topicsForMessage.map(msg -> _)
|
|
|
|
|
}
|
|
|
|
|
|
2015-07-08 10:23:10 +03:00
|
|
|
val multiGroups: Source[(Topic, Source[String, Unit]), Unit] = messageAndTopic
|
|
|
|
|
.groupBy(_._2).map {
|
|
|
|
|
case (topic, topicStream) =>
|
|
|
|
|
// chopping of the topic from the (Message, Topic) pairs
|
|
|
|
|
(topic, topicStream.map(_._1))
|
|
|
|
|
}
|
2014-12-08 17:29:40 +01:00
|
|
|
//#multi-groupby
|
|
|
|
|
|
|
|
|
|
val result = multiGroups.map {
|
2015-03-05 12:21:17 +01:00
|
|
|
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
|
2015-04-28 14:37:58 +02:00
|
|
|
}.mapAsync(4)(identity).grouped(10).runWith(Sink.head)
|
2014-12-08 17:29:40 +01:00
|
|
|
|
|
|
|
|
Await.result(result, 3.seconds).toSet should be(Set(
|
|
|
|
|
"1[1: a, 1: b, all: c, all: d, 1: e]",
|
|
|
|
|
"2[all: c, all: d]"))
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|