diff --git a/docs/src/main/paradox/stream/operators/Sink/none.md b/docs/src/main/paradox/stream/operators/Sink/none.md new file mode 100644 index 0000000000..faf0af9e16 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/none.md @@ -0,0 +1,47 @@ +# Sink.none + +A `Sink` that will test the given predicate `p` for every received element and completes with the result. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.none](Sink$) { scala="#none[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#none(org.apache.pekko.japi.function.Predicate)" } + +## Description +none operator applies a predicate function to assert each element received, it returns false if any element satisfy the assertion, otherwise it returns true. + +It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished. + +Notes that if source is empty, it will return true + +A `Sink` that will test the given predicate `p` for every received element and + + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is false for all elements; + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the stream is empty (i.e. completes before signalling any elements); + - completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is true for any element. + +The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false` +when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + +## Example + +This example tests all elements in the stream is `<=` 100. + +Scala +: @@snip [ForAll.scala](/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala) { #none } + +Java +: @@snip [ForAll.java](/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java) { #none } + +## Reactive Streams Semantics + +@@@div { .callout } + +***Completes*** when upstream completes or the predicate `p` returns `true` + +**cancels** when predicate `p` returns `true` + +**backpressures** when the invocation of predicate `p` has not yet completed + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 3de82bd35f..643d2caa2b 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -82,6 +82,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).| |Sink|@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.| |Sink|@ref[never](Sink/never.md)|Always backpressure never cancel and never consume any elements from the stream.| +|Sink|@ref[none](Sink/none.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| |Sink|@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.| |Sink|@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.| |Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| @@ -555,6 +556,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [monitor](Source-or-Flow/monitor.md) * [never](Source/never.md) * [never](Sink/never.md) +* [none](Sink/none.md) * [onComplete](Sink/onComplete.md) * [onErrorComplete](Source-or-Flow/onErrorComplete.md) * [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md) diff --git a/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java b/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java new file mode 100644 index 0000000000..2a2952d8e5 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/sink/NoneMatch.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.sink; + +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Sink; +import org.apache.pekko.stream.javadsl.Source; + +import java.util.concurrent.TimeUnit; + +public class NoneMatch { + private ActorSystem system = null; + + public void noneUsage() throws Exception { + // #none + final boolean noneMatch = + Source.range(1, 100) + .runWith(Sink.none(elem -> elem > 100), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + System.out.println(noneMatch); + // Expect prints: + // true + // #none + } +} diff --git a/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala b/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala new file mode 100644 index 0000000000..d539298740 --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/sink/NoneMatch.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.sink + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.{ Sink, Source } + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ Await, ExecutionContextExecutor, Future } + +object NoneMatch { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def noneExample(): Unit = { + // #none + val result: Future[Boolean] = + Source(1 to 100) + .runWith(Sink.none(_ > 100)) + val noneMatch = Await.result(result, 3.seconds) + println(noneMatch) + // Expect prints: + // true + // #none + } +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index 652754ce8a..d88fde6bf3 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -246,6 +246,15 @@ public class SinkTest extends StreamTest { assertTrue(allMatch); } + @Test + public void sinkMustBeAbleToUseNoneMatch() + throws InterruptedException, ExecutionException, TimeoutException { + CompletionStage cs = + Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.none(param -> param < 0), system); + boolean noneMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); + assertTrue(noneMatch); + } + @Test public void sinkMustBeAbleToUseForExists() throws InterruptedException, ExecutionException, TimeoutException { diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index 8368d7a96c..e81c44168a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -381,6 +381,46 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } + "The none sink" must { + + "completes with `ture` when all elements not match" in { + Source(1 to 4) + .runWith(Sink.none(_ < 0)) + .futureValue shouldBe true + } + + "completes with `false` when any element match" in { + Source(1 to 4) + .runWith(Sink.none(_ > 2)) + .futureValue shouldBe false + } + + "completes with `true` if the stream is empty" in { + Source.empty[Int] + .runWith(Sink.none(_ > 2)) + .futureValue shouldBe true + } + + "completes with `Failure` if the stream failed" in { + Source.failed[Int](new RuntimeException("Oops")) + .runWith(Sink.none(_ > 2)) + .failed.futureValue shouldBe a[RuntimeException] + } + + "completes with `false` with restart strategy" in { + val sink = Sink.none[Int](elem => { + if (elem == 2) { + throw new RuntimeException("Oops") + } + elem > 1 + }).withAttributes(supervisionStrategy(Supervision.restartingDecider)) + + Source(1 to 3) + .runWith(sink) + .futureValue shouldBe false + } + } + "The exists sink" must { "completes with `false` when none element match" in { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index b43ff064b8..41af12379a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -101,6 +101,31 @@ object Sink { .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) } + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is false for all elements; + * 2. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is true for any element. + * + * The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `true` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `true` + * + * @since 1.1.3 + */ + def none[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { + import pekko.util.FutureConverters._ + new Sink(scaladsl.Sink.none[In](p.test) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + } + /** * A `Sink` that will test the given predicate `p` for every received element and * 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element; diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index cf8b7151d1..653c116ebe 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -470,6 +470,30 @@ object Sink { .toMat(Sink.head)(Keep.right) .named("forallSink") + /** + * A `Sink` that will test the given predicate `p` for every received element and + * 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is false for all elements; + * 2. completes and returns [[scala.concurrent.Future]] of `true` if the stream is empty (i.e. completes before signalling any elements); + * 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is true for any element. + * + * The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false` + * when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Completes when''' upstream completes or the predicate `p` returns `true` + * + * '''Backpressures when''' the invocation of predicate `p` has not yet completed + * + * '''Cancels when''' predicate `p` returns `true` + * + * @since 1.1.3 + */ + def none[T](p: T => Boolean): Sink[T, Future[Boolean]] = + Flow[T].foldWhile(true)(util.ConstantFun.scalaIdentityFunction)(_ && !p(_)) + .toMat(Sink.head)(Keep.right) + .named("noneSink") + /** * A `Sink` that will test the given predicate `p` for every received element and * 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;