Revert "Add Sink.collection that generalizes Sink.seq to any immutable collection #23917
This commit is contained in:
parent
e722e1aafe
commit
994d1d7e67
3 changed files with 4 additions and 89 deletions
|
|
@ -1,68 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
package akka.stream.scaladsl
|
|
||||||
|
|
||||||
import akka.stream.testkit.{ StreamSpec, TestPublisher }
|
|
||||||
import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings }
|
|
||||||
|
|
||||||
import scala.collection.immutable
|
|
||||||
import scala.concurrent.{ Await, Future }
|
|
||||||
|
|
||||||
class CollectionSinkSpec extends StreamSpec with a {
|
|
||||||
|
|
||||||
val settings = ActorMaterializerSettings(system)
|
|
||||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
|
||||||
|
|
||||||
implicit val mat = ActorMaterializer(settings)
|
|
||||||
|
|
||||||
"Sink.collection" when {
|
|
||||||
"using Seq as Collection" must {
|
|
||||||
"return a Seq[T] from a Source" in {
|
|
||||||
val input = (1 to 6)
|
|
||||||
val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.collection)
|
|
||||||
val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault)
|
|
||||||
result should be(input.toSeq)
|
|
||||||
}
|
|
||||||
|
|
||||||
"return an empty Seq[T] from an empty Source" in {
|
|
||||||
val input: immutable.Seq[Int] = Nil
|
|
||||||
val future: Future[immutable.Seq[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.collection)
|
|
||||||
val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault)
|
|
||||||
result should be(input)
|
|
||||||
}
|
|
||||||
|
|
||||||
"fail the future on abrupt termination" in {
|
|
||||||
val mat = ActorMaterializer()
|
|
||||||
val probe = TestPublisher.probe()
|
|
||||||
val future = Source.fromPublisher(probe).runWith(Sink.collection[Nothing, Seq[Nothing]])(mat)
|
|
||||||
mat.shutdown()
|
|
||||||
future.failed.futureValue shouldBe an[AbruptTerminationException]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"using Vector as Collection" must {
|
|
||||||
"return a Vector[T] from a Source" in {
|
|
||||||
val input = (1 to 6)
|
|
||||||
val future: Future[immutable.Vector[Int]] = Source(input).runWith(Sink.collection)
|
|
||||||
val result: immutable.Vector[Int] = Await.result(future, remainingOrDefault)
|
|
||||||
result should be(input.toVector)
|
|
||||||
}
|
|
||||||
|
|
||||||
"return an empty Vector[T] from an empty Source" in {
|
|
||||||
val input = Nil
|
|
||||||
val future: Future[immutable.Vector[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.collection[Int, Vector[Int]])
|
|
||||||
val result: immutable.Vector[Int] = Await.result(future, remainingOrDefault)
|
|
||||||
result should be(Vector.empty[Int])
|
|
||||||
}
|
|
||||||
|
|
||||||
"fail the future on abrupt termination" in {
|
|
||||||
val mat = ActorMaterializer()
|
|
||||||
val probe = TestPublisher.probe()
|
|
||||||
val future = Source.fromPublisher(probe).runWith(Sink.collection[Nothing, Seq[Nothing]])(mat)
|
|
||||||
mat.shutdown()
|
|
||||||
future.failed.futureValue shouldBe an[AbruptTerminationException]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -40,8 +40,6 @@ import akka.annotation.{ DoNotInherit, InternalApi }
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
|
|
||||||
import scala.collection.generic.CanBuildFrom
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
|
|
@ -271,7 +269,7 @@ import scala.collection.generic.CanBuildFrom
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
|
@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
|
||||||
val in = Inlet[T]("seq.in")
|
val in = Inlet[T]("seq.in")
|
||||||
|
|
||||||
override def toString: String = "SeqStage"
|
override def toString: String = "SeqStage"
|
||||||
|
|
@ -281,9 +279,9 @@ import scala.collection.generic.CanBuildFrom
|
||||||
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
|
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
|
||||||
|
|
||||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||||
val p: Promise[That] = Promise()
|
val p: Promise[immutable.Seq[T]] = Promise()
|
||||||
val logic = new GraphStageLogic(shape) with InHandler {
|
val logic = new GraphStageLogic(shape) with InHandler {
|
||||||
val buf = cbf()
|
val buf = Vector.newBuilder[T]
|
||||||
|
|
||||||
override def preStart(): Unit = pull(in)
|
override def preStart(): Unit = pull(in)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,6 @@ import akka.stream.{ javadsl, _ }
|
||||||
import org.reactivestreams.{ Publisher, Subscriber }
|
import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.generic.CanBuildFrom
|
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.{ ExecutionContext, Future }
|
import scala.concurrent.{ ExecutionContext, Future }
|
||||||
import scala.util.{ Failure, Success, Try }
|
import scala.util.{ Failure, Success, Try }
|
||||||
|
|
@ -199,21 +198,7 @@ object Sink {
|
||||||
*
|
*
|
||||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||||
*/
|
*/
|
||||||
def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]])
|
def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T])
|
||||||
|
|
||||||
/**
|
|
||||||
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
|
|
||||||
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)
|
|
||||||
* may be used to ensure boundedness.
|
|
||||||
* Materializes into a `Future` of `That[T]` containing all the collected elements.
|
|
||||||
* `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to
|
|
||||||
* `Int.MaxValue` elements. See [The Architecture of Scala Collectionss](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info.
|
|
||||||
* This Sink will cancel the stream after having received that many elements.
|
|
||||||
*
|
|
||||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
|
||||||
*/
|
|
||||||
def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] =
|
|
||||||
Sink.fromGraph(new SeqStage[T, That])
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue