Add Sink.collection that generalizes Sink.seq to any immutable collection #23917 (#24016)

This commit is contained in:
nachinius 2017-11-24 03:19:17 -03:00 committed by Konrad `ktoso` Malawski
parent 1751292580
commit 2c31f57539
3 changed files with 89 additions and 4 deletions

View file

@ -40,6 +40,8 @@ import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging
import akka.util.OptionVal
import scala.collection.generic.CanBuildFrom
/**
* INTERNAL API
*/
@ -269,7 +271,7 @@ import akka.util.OptionVal
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
val in = Inlet[T]("seq.in")
override def toString: String = "SeqStage"
@ -279,9 +281,9 @@ import akka.util.OptionVal
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[immutable.Seq[T]] = Promise()
val p: Promise[That] = Promise()
val logic = new GraphStageLogic(shape) with InHandler {
val buf = Vector.newBuilder[T]
val buf = cbf()
override def preStart(): Unit = pull(in)