feat: Add Sink.exists operator (#990)
* feat: +Flow.exists & Sink.exists * chore: Update some doc and code. * chore: Update method names in doc. --------- Co-authored-by: He-Pin <hepin1989@gmail.com>
This commit is contained in:
parent
cfff9c53df
commit
abad72d869
9 changed files with 237 additions and 1 deletions
47
docs/src/main/paradox/stream/operators/Sink/exists.md
Normal file
47
docs/src/main/paradox/stream/operators/Sink/exists.md
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
# Sink.exists
|
||||
|
||||
A `Sink` that will test the given predicate `p` for every received element and completes with the result.
|
||||
|
||||
@ref[Sink operators](../index.md#sink-operators)
|
||||
|
||||
## Signature
|
||||
|
||||
@apidoc[Sink.exists](Sink$) { scala="#exists[T](p:T=%3EBoolean):org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Boolean]]" java="#exists(org.apache.pekko.japi.function.Predicate)" }
|
||||
|
||||
## Description
|
||||
`exists` applies a predicate function to assert each element received, it returns true if any elements satisfy the assertion, otherwise it returns false.
|
||||
|
||||
It materializes into a `Future` (in Scala) or a `CompletionStage` (in Java) that completes with the last state when the stream has finished.
|
||||
|
||||
Notes that if source is empty, it will return false
|
||||
|
||||
A `Sink` that will test the given predicate `p` for every received element and
|
||||
|
||||
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `true` if the predicate is true for any element;
|
||||
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the stream is empty (i.e. completes before signalling any elements);
|
||||
- completes and returns @scala[`Future`] @java[`CompletionStage`] of `false` if the predicate is false for all elements.
|
||||
|
||||
The materialized value @scala[`Future`] @java[`CompletionStage`] will be completed with the value `true` or `false`
|
||||
when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
|
||||
|
||||
## Example
|
||||
|
||||
This example tests any element in the stream is `>` 3.
|
||||
|
||||
Scala
|
||||
: @@snip [exists.scala](/docs/src/test/scala/docs/stream/operators/sink/Exists.scala) { #exists }
|
||||
|
||||
Java
|
||||
: @@snip [exists.java](/docs/src/test/java/jdocs/stream/operators/sink/Exists.java) { #exists }
|
||||
|
||||
## Reactive Streams Semantics
|
||||
|
||||
@@@div { .callout }
|
||||
|
||||
***Completes*** when upstream completes or the predicate `p` returns `true`
|
||||
|
||||
**cancels** when predicate `p` returns `true`
|
||||
|
||||
**backpressures** when the invocation of predicate `p` has not yet completed
|
||||
|
||||
@@@
|
||||
|
|
@ -44,4 +44,4 @@ Java
|
|||
|
||||
**backpressures** when the invocation of predicate `p` has not yet completed
|
||||
|
||||
@@@
|
||||
@@@
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|
|||
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|
||||
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|
||||
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|
||||
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|
||||
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
||||
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
||||
|Sink|<a name="forall"></a>@ref[forall](Sink/forall.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|
||||
|
|
@ -445,6 +446,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
|||
* [dropWhile](Source-or-Flow/dropWhile.md)
|
||||
* [dropWithin](Source-or-Flow/dropWithin.md)
|
||||
* [empty](Source/empty.md)
|
||||
* [exists](Sink/exists.md)
|
||||
* [expand](Source-or-Flow/expand.md)
|
||||
* [extrapolate](Source-or-Flow/extrapolate.md)
|
||||
* [failed](Source/failed.md)
|
||||
|
|
|
|||
44
docs/src/test/java/jdocs/stream/operators/sink/Exists.java
Normal file
44
docs/src/test/java/jdocs/stream/operators/sink/Exists.java
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package jdocs.stream.operators.sink;
|
||||
|
||||
// #imports
|
||||
|
||||
import org.apache.pekko.actor.ActorSystem;
|
||||
import org.apache.pekko.stream.javadsl.Sink;
|
||||
import org.apache.pekko.stream.javadsl.Source;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
// #imports
|
||||
|
||||
public class Exists {
|
||||
private static final ActorSystem system = null;
|
||||
|
||||
private void existsExample() throws Exception {
|
||||
// #exists
|
||||
final boolean anyMatch =
|
||||
Source.range(1, 4)
|
||||
.runWith(Sink.exists(elem -> elem > 3), system)
|
||||
.toCompletableFuture()
|
||||
.get(3, TimeUnit.SECONDS);
|
||||
System.out.println(anyMatch);
|
||||
// Expected prints:
|
||||
// true
|
||||
// #exists
|
||||
}
|
||||
}
|
||||
44
docs/src/test/scala/docs/stream/operators/sink/Exists.scala
Normal file
44
docs/src/test/scala/docs/stream/operators/sink/Exists.scala
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package docs.stream.operators.sink
|
||||
|
||||
//#imports
|
||||
import org.apache.pekko.actor.ActorSystem
|
||||
import org.apache.pekko.stream.scaladsl._
|
||||
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import scala.concurrent.{ Await, ExecutionContext }
|
||||
//#imports
|
||||
|
||||
object Exists {
|
||||
|
||||
implicit val system: ActorSystem = null
|
||||
implicit val ec: ExecutionContext = system.dispatcher
|
||||
|
||||
def existsExample(): Unit = {
|
||||
// #exists
|
||||
val result = Source(1 to 4)
|
||||
.runWith(Sink.exists(_ > 3))
|
||||
val anyMatch = Await.result(result, 3.seconds)
|
||||
println(anyMatch)
|
||||
// Expect prints:
|
||||
// true
|
||||
// #exists
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -245,4 +245,13 @@ public class SinkTest extends StreamTest {
|
|||
boolean allMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
|
||||
assertTrue(allMatch);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sinkMustBeAbleToUseForExists()
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
CompletionStage<Boolean> cs =
|
||||
Source.from(Arrays.asList(1, 2, 3, 4)).runWith(Sink.exists(param -> param > 3), system);
|
||||
boolean anyMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
|
||||
assertTrue(anyMatch);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -381,6 +381,47 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
|
|||
|
||||
}
|
||||
|
||||
"The exists sink" must {
|
||||
|
||||
"completes with `false` when none element match" in {
|
||||
Source(1 to 4)
|
||||
.runWith(Sink.exists[Int](_ > 5))
|
||||
.futureValue shouldBe false
|
||||
}
|
||||
|
||||
"completes with `true` when any element match" in {
|
||||
Source(1 to 4)
|
||||
.runWith(Sink.exists(_ > 2))
|
||||
.futureValue shouldBe true
|
||||
}
|
||||
|
||||
"completes with `false` if the stream is empty" in {
|
||||
Source.empty[Int]
|
||||
.runWith(Sink.exists(_ > 2))
|
||||
.futureValue shouldBe false
|
||||
}
|
||||
|
||||
"completes with `Failure` if the stream failed" in {
|
||||
Source.failed[Int](new RuntimeException("Oops"))
|
||||
.runWith(Sink.exists(_ > 2))
|
||||
.failed.futureValue shouldBe a[RuntimeException]
|
||||
}
|
||||
|
||||
"completes with `exists` with restart strategy" in {
|
||||
val sink = Sink.exists[Int](elem => {
|
||||
if (elem == 2) {
|
||||
throw new RuntimeException("Oops")
|
||||
}
|
||||
elem > 1
|
||||
}).withAttributes(supervisionStrategy(Supervision.restartingDecider))
|
||||
|
||||
Source(1 to 2)
|
||||
.runWith(sink)
|
||||
.futureValue shouldBe false
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"Sink pre-materialization" must {
|
||||
"materialize the sink and wrap its exposed publisher in a Source" in {
|
||||
val publisherSink: Sink[String, Publisher[String]] = Sink.asPublisher[String](false)
|
||||
|
|
|
|||
|
|
@ -99,6 +99,31 @@ object Sink {
|
|||
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
|
||||
}
|
||||
|
||||
/**
|
||||
* A `Sink` that will test the given predicate `p` for every received element and
|
||||
* 1. completes and returns [[java.util.concurrent.CompletionStage]] of `true` if the predicate is true for any element;
|
||||
* 2. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the stream is empty (i.e. completes before signalling any elements);
|
||||
* 3. completes and returns [[java.util.concurrent.CompletionStage]] of `false` if the predicate is false for all elements.
|
||||
*
|
||||
* The materialized value [[java.util.concurrent.CompletionStage]] will be completed with the value `true` or `false`
|
||||
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
* '''Completes when''' upstream completes or the predicate `p` returns `true`
|
||||
*
|
||||
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
|
||||
*
|
||||
* '''Cancels when''' predicate `p` returns `true`
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def exists[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
|
||||
import pekko.util.FutureConverters._
|
||||
new Sink(scaladsl.Sink.exists[In](p.test)
|
||||
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a sink which materializes into a ``CompletionStage`` which will be completed with a result of the Java ``Collector``
|
||||
* transformation and reduction operations. This allows usage of Java streams transformations for reactive streams.
|
||||
|
|
|
|||
|
|
@ -464,6 +464,30 @@ object Sink {
|
|||
.toMat(Sink.head)(Keep.right)
|
||||
.named("forallSink")
|
||||
|
||||
/**
|
||||
* A `Sink` that will test the given predicate `p` for every received element and
|
||||
* 1. completes and returns [[scala.concurrent.Future]] of `true` if the predicate is true for any element;
|
||||
* 2. completes and returns [[scala.concurrent.Future]] of `false` if the stream is empty (i.e. completes before signalling any elements);
|
||||
* 3. completes and returns [[scala.concurrent.Future]] of `false` if the predicate is false for all elements.
|
||||
*
|
||||
* The materialized value [[scala.concurrent.Future]] will be completed with the value `true` or `false`
|
||||
* when the input stream ends, or completed with `Failure` if there is a failure signaled in the stream.
|
||||
*
|
||||
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
|
||||
*
|
||||
* '''Completes when''' upstream completes or the predicate `p` returns `true`
|
||||
*
|
||||
* '''Backpressures when''' the invocation of predicate `p` has not yet completed
|
||||
*
|
||||
* '''Cancels when''' predicate `p` returns `true`
|
||||
*
|
||||
* @since 1.1.0
|
||||
*/
|
||||
def exists[T](p: T => Boolean): Sink[T, Future[Boolean]] =
|
||||
Flow[T].foldWhile(false)(!_)(_ || p(_))
|
||||
.toMat(Sink.head)(Keep.right)
|
||||
.named("existsSink")
|
||||
|
||||
/**
|
||||
* A `Sink` that will invoke the given function for every received element, giving it its previous
|
||||
* output (from the second element) and the element as input.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue