From f6da401abf69e7f45d9705fcf611a5d6726c5bc3 Mon Sep 17 00:00:00 2001 From: Laglangyue <35491928+laglangyue@users.noreply.github.com> Date: Sun, 28 Jan 2024 17:54:07 +0800 Subject: [PATCH] feat: Add Sink#forall operator (#989) Co-authored-by: Jiafu Tang --- .../paradox/stream/operators/Sink/forall.md | 47 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../jdocs/stream/operators/sink/ForAll.java | 41 ++++++++++++++++ .../docs/stream/operators/sink/ForAll.scala | 40 ++++++++++++++++ .../apache/pekko/stream/javadsl/SinkTest.java | 9 ++++ .../pekko/stream/scaladsl/SinkSpec.scala | 42 +++++++++++++++++ .../apache/pekko/stream/impl/fusing/Ops.scala | 3 +- .../apache/pekko/stream/javadsl/Sink.scala | 37 +++++++++++---- .../apache/pekko/stream/scaladsl/Sink.scala | 41 +++++++++++----- 9 files changed, 240 insertions(+), 22 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Sink/forall.md create mode 100644 docs/src/test/java/jdocs/stream/operators/sink/ForAll.java create mode 100644 docs/src/test/scala/docs/stream/operators/sink/ForAll.scala diff --git a/docs/src/main/paradox/stream/operators/Sink/forall.md b/docs/src/main/paradox/stream/operators/Sink/forall.md new file mode 100644 index 0000000000..387f5b2fb6 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/forall.md @@ -0,0 +1,47 @@ +# Sink.forall + +A `Sink` that will test the given predicate `p` for every received element and completes with the result. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.forall](Sink$) { scala="#forall[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#forall(org.apache.pekko.japi.function.Predicate)" } + +## Description +forall applies a predicate function to assert each element received, it returns true if all elements satisfy the assertion, otherwise it returns false. + +It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished. + +Notes that if source is empty, it will return true + +A `Sink` that will test the given predicate `p` for every received element and + + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for all elements; + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements); + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for any element. + +The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false` +when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + +## Example + +This example tests all elements in the stream is `<=` 100. + +Scala +: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala) { #forall } + +Java +: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java) { #forall } + +## Reactive Streams Semantics + +@@@div { .callout } + +***Completes*** when upstream completes or the predicate `p` returns `false` + +**cancels** when predicate `p` returns `false` + +**backpressures** when the invocation of predicate `p` has not yet completed + +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index adbb3183bf..e6d564d962 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -62,6 +62,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | |Sink|@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| +|Sink|@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| |Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| |Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| |Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| @@ -459,6 +460,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [foldAsync](Source-or-Flow/foldAsync.md) * [foldWhile](Source-or-Flow/foldWhile.md) * [foldWhile](Sink/foldWhile.md) +* [forall](Sink/forall.md) * [foreach](Sink/foreach.md) * [foreachAsync](Sink/foreachAsync.md) * [foreachParallel](Sink/foreachParallel.md) diff --git a/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java b/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java new file mode 100644 index 0000000000..b9d2c65662 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/sink/ForAll.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.sink; + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + +import java.util.concurrent.TimeUnit; + +public class ForAll { + private ActorSystem system = null; + + public void forAllUsage() throws Exception { + // #forall + final boolean allMatch = + Source.range(1, 100) + .runWith(Sink.forall(elem -> elem <= 100), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + System.out.println(allMatch); + // Expect prints: + // true + // #forall + } +} diff --git a/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala b/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala new file mode 100644 index 0000000000..bef79330ea --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sink/ForAll.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.sink + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ Await, ExecutionContextExecutor, Future } + +object ForAll { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def foldExample: Unit = { + // #forall + val result: Future[Boolean] = + Source(1 to 100) + .runWith(Sink.forall(_ <= 100)) + val allMatch = Await.result(result, 3.seconds) + println(allMatch) + // Expect prints: + // true + // #forall + } +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index 1e48272dbb..ed4d283867 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -236,4 +236,13 @@ public class SinkTest extends StreamTest { // #foreach assertEquals(Done.done(), done); } + + @Test + public void sinkMustBeAbleToUseForall() + throws InterruptedException, ExecutionException, TimeoutException { + CompletionStage cs = + Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.forall(param -> param > 0), system); + boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); + assertTrue(allMatch); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index a14ef9c37a..2ab8601998 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -20,6 +20,7 @@ import scala.concurrent.duration._ import org.apache.pekko import pekko.Done import pekko.stream._ +import pekko.stream.ActorAttributes.supervisionStrategy import pekko.stream.testkit._ import pekko.stream.testkit.scaladsl.{ TestSink, TestSource } import pekko.testkit.DefaultTimeout @@ -339,6 +340,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The forall sink" must { + + "completes with `ture` when all elements match" in { + Source(1 to 4) + .runWith(Sink.forall(_ > 0)) + .futureValue shouldBe true + } + + "completes with `false` when any element match" in { + Source(1 to 4) + .runWith(Sink.forall(_ > 2)) + .futureValue shouldBe false + } + + "completes with `true` if the stream is empty" in { + Source.empty[Int] + .runWith(Sink.forall(_ > 2)) + .futureValue shouldBe true + } + + "completes with `Failure` if the stream failed" in { + Source.failed[Int](new RuntimeException("Oops")) + .runWith(Sink.forall(_ > 2)) + .failed.futureValue shouldBe a[RuntimeException] + } + + "completes with `true` with restart strategy" in { + val sink = Sink.forall[Int](elem => { + if (elem == 2) { + throw new RuntimeException("Oops") + } + elem > 0 + }).withAttributes(supervisionStrategy(Supervision.restartingDecider)) + + Source(1 to 2) + .runWith(sink) + .futureValue shouldBe true + } + + } + "Sink pre-materialization" must { "materialize the sink and wrap its exposed publisher in a Source" in { val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 9658fcd5a7..d892a04952 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -15,7 +15,8 @@ package org.apache.pekko.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS -import scala.annotation.{ nowarn, tailrec } +import scala.annotation.nowarn +import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.Future diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 2fc19d7681..0e2a0a9f2b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -14,8 +14,7 @@ package org.apache.pekko.stream.javadsl import java.util.Optional -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CompletionStage +import java.util.concurrent.{ CompletableFuture, CompletionStage } import java.util.function.BiFunction import java.util.stream.Collector @@ -27,22 +26,17 @@ import scala.util.Try import org.apache.pekko import pekko._ -import pekko.actor.ActorRef -import pekko.actor.ClassicActorSystemProvider -import pekko.actor.Status +import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status } import pekko.dispatch.ExecutionContexts import pekko.japi.{ function, Util } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.LinearTraversalBuilder -import pekko.stream.javadsl -import pekko.stream.scaladsl import pekko.stream.scaladsl.SinkToCompletionStage import pekko.util.FutureConverters._ import pekko.util.OptionConverters._ -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber +import org.reactivestreams.{ Publisher, Subscriber } /** Java API */ object Sink { @@ -80,6 +74,31 @@ object Sink { f: function.Function2[U, In, CompletionStage[U]]): javadsl.Sink[In, CompletionStage[U]] = new Sink(scaladsl.Sink.foldAsync[U, In](zero)(f(_, _).asScala).toCompletionStage()) + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for all elements; + * 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for any element. + * + * The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `false` + * + * @since 1.1.0 + */ + def forall[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { + import pekko.util.FutureConverters._ + new Sink(scaladsl.Sink.forall[In](p.test) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + } + /** * Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector`` * transformation and reduction operations. This allows usage of Java streams transformations for reactive streams. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 84f1ed0508..b607f1b12c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -16,29 +16,22 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.{ nowarn, tailrec } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success -import scala.util.Try +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success, Try } import org.apache.pekko -import pekko.Done -import pekko.NotUsed -import pekko.actor.ActorRef -import pekko.actor.Status +import pekko.{ util, Done, NotUsed } +import pekko.actor.{ ActorRef, Status } import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.GraphStages -import pekko.stream.javadsl import pekko.stream.stage._ import pekko.util.ccompat._ -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber +import org.reactivestreams.{ Publisher, Subscriber } /** * A `Sink` is a set of stream processing steps that has one open input. @@ -447,6 +440,30 @@ object Sink { def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]] = Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink") + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for all elements; + * 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for any element. + * + * The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `false` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `false` + * + * @since 1.1.0 + */ + def forall[T](p: T => Boolean): Sink[T, Future[Boolean]] = + Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && p(_)) + .toMat(Sink.head)(Keep.right) + .named("forallSink") + /** * A `Sink` that will invoke the given function for every received element, giving it its previous * output (from the second element) and the element as input.