Stream: Sugar for Sink.ignore #27789, #24986

This commit is contained in:
Jacek Ewertowski 2020-03-23 13:05:25 +01:00 committed by GitHub
parent f8d7b0c94a
commit 2cccfa3c29
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 6 deletions

View file

@ -36,6 +36,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static akka.NotUsed.notUsed;
@ -1184,4 +1185,27 @@ public class SourceTest extends StreamTest {
new Boolean[] {false, true, false, true, false, true, false, true, false, true},
future.get(1, TimeUnit.SECONDS).toArray());
}
@Test
public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletion() {
final Iterator<Integer> iterator = IntStream.range(1, 10).iterator();
final Creator<Iterator<Integer>> input = () -> iterator;
final Done completion =
Source.fromIterator(input).map(it -> it * 10).run(system).toCompletableFuture().join();
assertEquals(completion, Done.getInstance());
}
@Test
public void mustRunSourceAndIgnoreElementsItOutputsAndOnlySignalTheCompletionWithMaterializer() {
final Materializer materializer = Materializer.createMaterializer(system);
final Iterator<Integer> iterator = IntStream.range(1, 10).iterator();
final Creator<Iterator<Integer>> input = () -> iterator;
final Done completion =
Source.fromIterator(input)
.map(it -> it * 10)
.run(materializer)
.toCompletableFuture()
.join();
assertEquals(completion, Done.getInstance());
}
}

View file

@ -39,8 +39,6 @@ class FlowCompileSpec extends StreamSpec {
"open3.run()" shouldNot compile
val closedSource: Source[Int, _] = intSeq.via(open3)
"closedSource.run()" shouldNot compile
val closedSink: Sink[Int, _] = open3.to(Sink.asPublisher[Int](false))
"closedSink.run()" shouldNot compile
@ -59,7 +57,6 @@ class FlowCompileSpec extends StreamSpec {
val open: Flow[Int, String, _] = Flow[Int].map(_.toString)
val closedSource: Source[Int, _] = strSeq.via(Flow[String].map(_.hashCode))
val closedSource2: Source[String, _] = closedSource.via(open)
"closedSource2.run()" shouldNot compile
"strSeq.to(closedSource2)" shouldNot compile
closedSource2.to(Sink.asPublisher[String](false)).run
}
@ -88,9 +85,6 @@ class FlowCompileSpec extends StreamSpec {
"not be accepted by Source" in {
"openSource.to(intSeq)" shouldNot compile
}
"not run()" in {
"openSource.run()" shouldNot compile
}
}
"RunnableGraph" should {

View file

@ -368,6 +368,12 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
}
}
"A Source.run" must {
"ignore elements it outputs and only signal the completion of the processing" in {
Source.fromIterator(() => (1 to 5).toIterator).map(_ * 10).run().futureValue shouldBe Done
}
}
"Source pre-materialization" must {
"materialize the source and connect it to a publisher" in {

View file

@ -929,6 +929,24 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.RunnableGraph[M2] =
RunnableGraph.fromGraph(delegate.toMat(sink)(combinerToScala(combine)))
/**
* Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded.
*
* Note that the `ActorSystem` can be used as the `materializer` parameter to use the
* [[akka.stream.SystemMaterializer]] for running the stream.
*/
def run(materializer: Materializer): CompletionStage[Done] =
delegate.run()(materializer).toJava
/**
* Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded.
*
* Note that the `ActorSystem` can be used as the `systemProvider` parameter to use the
* [[akka.stream.SystemMaterializer]] for running the stream.
*/
def run(systemProvider: ClassicActorSystemProvider): CompletionStage[Done] =
delegate.run()(SystemMaterializer(systemProvider.classicSystem).materializer).toJava
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a `Sink.asPublisher`.

View file

@ -98,6 +98,15 @@ final class Source[+Out, +Mat](
(mat, Source.fromPublisher(pub))
}
/**
* Connect this `Source` to the `Sink.ignore` and run it. Elements from the stream will be consumed and discarded.
*
* Note that the `ActorSystem` can be used as the implicit `materializer` parameter to use the
* [[akka.stream.SystemMaterializer]] for running the stream.
*/
def run()(implicit materializer: Materializer): Future[Done] =
toMat(Sink.ignore)(Keep.right).run()
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].