diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 9835abce67..3dc8c2788f 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; import java.util.stream.Stream; import static akka.NotUsed.notUsed; @@ -1184,4 +1185,27 @@ public class SourceTest extends StreamTest { new Boolean[] {false, true, false, true, false, true, false, true, false, true}, future.get(1, TimeUnit.SECONDS).toArray()); } + + @Test + public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() { + final Iterator iterator = IntStream.range(1, 10).iterator(); + final Creator> input = () -> iterator; + final Done completion = + Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join(); + assertEquals(completion, Done.getInstance()); + } + + @Test + public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer() { + final Materializer materializer = Materializer.createMaterializer(system); + final Iterator iterator = IntStream.range(1, 10).iterator(); + final Creator> input = () -> iterator; + final Done completion = + Source.fromIterator(input) + .map(it -> it * 10) + .run(materializer) + .toCompletableFuture() + .join(); + assertEquals(completion, Done.getInstance()); + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala index 512a4b8c8e..7ee329e20e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowCompileSpec.scala @@ -39,8 +39,6 @@ class FlowCompileSpec extends StreamSpec { "open3.run()" shouldNot compile val closedSource: Source[Int, _] = intSeq.via(open3) - "closedSource.run()" shouldNot compile - val closedSink: Sink[Int, _] = open3.to(Sink.asPublisher[Int](false)) "closedSink.run()" shouldNot compile @@ -59,7 +57,6 @@ class FlowCompileSpec extends StreamSpec { val open: Flow[Int, String, _] = Flow[Int].map(_.toString) val closedSource: Source[Int, _] = strSeq.via(Flow[String].map(_.hashCode)) val closedSource2: Source[String, _] = closedSource.via(open) - "closedSource2.run()" shouldNot compile "strSeq.to(closedSource2)" shouldNot compile closedSource2.to(Sink.asPublisher[String](false)).run } @@ -88,9 +85,6 @@ class FlowCompileSpec extends StreamSpec { "not be accepted by Source" in { "openSource.to(intSeq)" shouldNot compile } - "not run()" in { - "openSource.run()" shouldNot compile - } } "RunnableGraph" should { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index bbd25039d2..6d2364cf1f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -368,6 +368,12 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } } + "A Source.run" must { + "ignore elements it outputs and only signal the completion of the processing" in { + Source.fromIterator(() => (1 to 5).toIterator).map(_ * 10).run().futureValue shouldBe Done + } + } + "Source pre-materialization" must { "materialize the source and connect it to a publisher" in { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3efcc013bb..2113d68a28 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -929,6 +929,24 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] = RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine))) + /** + * Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded. + * + * Note that the `ActorSystem` can be used as the `materializer` parameter to use the + * [[akka.stream.SystemMaterializer]] for running the stream. + */ + def run(materializer: Materializer): CompletionStage[Done] = + delegate.run()(materializer).toJava + + /** + * Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded. + * + * Note that the `ActorSystem` can be used as the `systemProvider` parameter to use the + * [[akka.stream.SystemMaterializer]] for running the stream. + */ + def run(systemProvider: ClassicActorSystemProvider): CompletionStage[Done] = + delegate.run()(SystemMaterializer(systemProvider.classicSystem).materializer).toJava + /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a `Sink.asPublisher`. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7fa1ae4fa8..170303c022 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -98,6 +98,15 @@ final class Source[+Out, +Mat]( (mat, Source.fromPublisher(pub)) } + /** + * Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded. + * + * Note that the `ActorSystem` can be used as the implicit `materializer` parameter to use the + * [[akka.stream.SystemMaterializer]] for running the stream. + */ + def run()(implicit materializer: Materializer): Future[Done] = + toMat(Sink.ignore)(Keep.right).run() + /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].