Add Sink.collection #23917

This commit is contained in:
nachinius 2017-11-29 13:49:31 -03:00 committed by Johan Andrén
parent add6c320c3
commit c17a11ddd2
5 changed files with 94 additions and 5 deletions

View file

@ -40,6 +40,8 @@ import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging
import akka.util.OptionVal
import scala.collection.generic.CanBuildFrom
/**
* INTERNAL API
*/
@ -269,7 +271,7 @@ import akka.util.OptionVal
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
val in = Inlet[T]("seq.in")
override def toString: String = "SeqStage"
@ -279,9 +281,9 @@ import akka.util.OptionVal
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[immutable.Seq[T]] = Promise()
val p: Promise[That] = Promise()
val logic = new GraphStageLogic(shape) with InHandler {
val buf = Vector.newBuilder[T]
val buf = cbf()
override def preStart(): Unit = pull(in)