diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 2ad7d3dd5d..f25183c3ae 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -116,6 +116,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { Ignore(_ == akka.stream.scaladsl.Source.getClass, _ == "apply", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.Flow.getClass, _ == "apply", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "apply", _ == 24, _ ⇒ true), + Ignore(_ == akka.stream.scaladsl.Sink.getClass, _ == "collection", _ ⇒ true, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.BidiFlow.getClass, _ == "apply", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "runnable", _ == 24, _ ⇒ true), Ignore(_ == akka.stream.scaladsl.GraphDSL.getClass, _ == "create", _ == 24, _ ⇒ true), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CollectionSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CollectionSinkSpec.scala new file mode 100644 index 0000000000..88ef02aedf --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/CollectionSinkSpec.scala @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.{ StreamSpec, TestPublisher } +import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings } + +import scala.collection.immutable +import scala.concurrent.{ Await, Future } + +class CollectionSinkSpec extends StreamSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val mat = ActorMaterializer(settings) + + "Sink.collection" when { + "using Seq as Collection" must { + "return a Seq[T] from a Source" in { + val input = (1 to 6) + val future: Future[immutable.Seq[Int]] = Source(input).runWith(Sink.collection) + val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) + result should be(input.toSeq) + } + + "return an empty Seq[T] from an empty Source" in { + val input: immutable.Seq[Int] = Nil + val future: Future[immutable.Seq[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.collection) + val result: immutable.Seq[Int] = Await.result(future, remainingOrDefault) + result should be(input) + } + + "fail the future on abrupt termination" in { + val mat = ActorMaterializer() + val probe = TestPublisher.probe() + val future = Source.fromPublisher(probe).runWith(Sink.collection[Nothing, Seq[Nothing]])(mat) + mat.shutdown() + future.failed.futureValue shouldBe an[AbruptTerminationException] + } + } + "using Vector as Collection" must { + "return a Vector[T] from a Source" in { + val input = (1 to 6) + val future: Future[immutable.Vector[Int]] = Source(input).runWith(Sink.collection) + val result: immutable.Vector[Int] = Await.result(future, remainingOrDefault) + result should be(input.toVector) + } + + "return an empty Vector[T] from an empty Source" in { + val input = Nil + val future: Future[immutable.Vector[Int]] = Source.fromIterator(() ⇒ input.iterator).runWith(Sink.collection[Int, Vector[Int]]) + val result: immutable.Vector[Int] = Await.result(future, remainingOrDefault) + result should be(Vector.empty[Int]) + } + + "fail the future on abrupt termination" in { + val mat = ActorMaterializer() + val probe = TestPublisher.probe() + val future = Source.fromPublisher(probe).runWith(Sink.collection[Nothing, Seq[Nothing]])(mat) + mat.shutdown() + future.failed.futureValue shouldBe an[AbruptTerminationException] + } + } + } +} + diff --git a/akka-stream/src/main/mima-filters/2.5.3.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.3.backwards.excludes index 4ccf532985..5f486b76c5 100644 --- a/akka-stream/src/main/mima-filters/2.5.3.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.3.backwards.excludes @@ -7,4 +7,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.attributes") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.create") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.MaybeSource.this") -ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$") \ No newline at end of file +ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.MaybePublisher$") + +# #23917 Add Sink.collection +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.SeqStage.this") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index f231f2638b..b0e2d81ab4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -40,6 +40,8 @@ import akka.annotation.{ DoNotInherit, InternalApi } import akka.event.Logging import akka.util.OptionVal +import scala.collection.generic.CanBuildFrom + /** * INTERNAL API */ @@ -269,7 +271,7 @@ import akka.util.OptionVal /** * INTERNAL API */ -@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { +@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]) extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" @@ -279,9 +281,9 @@ import akka.util.OptionVal override protected def initialAttributes: Attributes = DefaultAttributes.seqSink override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val p: Promise[immutable.Seq[T]] = Promise() + val p: Promise[That] = Promise() val logic = new GraphStageLogic(shape) with InHandler { - val buf = Vector.newBuilder[T] + val buf = cbf() override def preStart(): Unit = pull(in) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 477e0e9d7f..8c37de5335 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -15,6 +15,7 @@ import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec +import scala.collection.generic.CanBuildFrom import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } @@ -198,7 +199,21 @@ object Sink { * * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] */ - def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T]) + def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]]) + + /** + * A `Sink` that keeps on collecting incoming elements until upstream terminates. + * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants) + * may be used to ensure boundedness. + * Materializes into a `Future` of `That[T]` containing all the collected elements. + * `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to + * `Int.MaxValue` elements. See [The Architecture of Scala Collectionss](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info. + * This Sink will cancel the stream after having received that many elements. + * + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] = + Sink.fromGraph(new SeqStage[T, That]) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].