diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CollectionSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CollectionSinkSpec.scala deleted file mode 100644 index b101868ec2..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CollectionSinkSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Copyright (C) 2015-2017 Lightbend Inc. - */ -package akka.stream.scaladsl - -import akka.stream.testkit.{ StreamSpec, TestPublisher } -import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } - -import scala.collection.immutable -import scala.concurrent.{ Await, Future } - -class CollectionSinkSpec extends StreamSpec with a { - - val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialSize = 2, maxSize = 16) - - implicit val mat = ActorMaterializer(settings) - - "Sink.collection" when { - "using Seq as Collection" must { - "return a Seq[T] from a Source" in { - val input = (1 to 6) - val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.collection) - val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) - result should be(input.toSeq) - } - - "return an empty Seq[T] from an empty Source" in { - val input: immutable.Seq[Int] = Nil - val future: Future[immutable.Seq[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.collection) - val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) - result should be(input) - } - - "fail the future on abrupt termination" in { - val mat = ActorMaterializer() - val probe = TestPublisher.probe() - val future = Source.fromPublisher(probe).runWith(Sink.collection[Nothing, Seq[Nothing]])(mat) - mat.shutdown() - future.failed.futureValue shouldBe an[AbruptTerminationException] - } - } - "using Vector as Collection" must { - "return a Vector[T] from a Source" in { - val input = (1 to 6) - val future: Future[immutable.Vector[Int]] = Source(input).runWith(Sink.collection) - val result: immutable.Vector[Int] = Await.result(future, remainingOrDefault) - result should be(input.toVector) - } - - "return an empty Vector[T] from an empty Source" in { - val input = Nil - val future: Future[immutable.Vector[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.collection[Int, Vector[Int]]) - val result: immutable.Vector[Int] = Await.result(future, remainingOrDefault) - result should be(Vector.empty[Int]) - } - - "fail the future on abrupt termination" in { - val mat = ActorMaterializer() - val probe = TestPublisher.probe() - val future = Source.fromPublisher(probe).runWith(Sink.collection[Nothing, Seq[Nothing]])(mat) - mat.shutdown() - future.failed.futureValue shouldBe an[AbruptTerminationException] - } - } - } -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index b0e2d81ab4..f231f2638b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -40,8 +40,6 @@ import akka.annotation.{ DoNotInherit, InternalApi } import akka.event.Logging import akka.util.OptionVal -import scala.collection.generic.CanBuildFrom - /** * INTERNAL API */ @@ -271,7 +269,7 @@ import scala.collection.generic.CanBuildFrom /** * INTERNAL API */ -@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] { +@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" @@ -281,9 +279,9 @@ import scala.collection.generic.CanBuildFrom override protected def initialAttributes: Attributes = DefaultAttributes.seqSink override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val p: Promise[That] = Promise() + val p: Promise[immutable.Seq[T]] = Promise() val logic = new GraphStageLogic(shape) with InHandler { - val buf = cbf() + val buf = Vector.newBuilder[T] override def preStart(): Unit = pull(in) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 8c37de5335..477e0e9d7f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -15,7 +15,6 @@ import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec -import scala.collection.generic.CanBuildFrom import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } @@ -199,21 +198,7 @@ object Sink { * * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]]) - - /** - * A `Sink` that keeps on collecting incoming elements until upstream terminates. - * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants) - * may be used to ensure boundedness. - * Materializes into a `Future` of `That[T]` containing all the collected elements. - * `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to - * `Int.MaxValue` elements. See [The Architecture of Scala Collectionss](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info. - * This Sink will cancel the stream after having received that many elements. - * - * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] - */ - def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] = - Sink.fromGraph(new SeqStage[T, That]) + def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T]) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].