feat: Add collectFirst stream operator. (#984)

This commit is contained in:
He-Pin(kerr) 2024-01-20 19:39:22 +08:00 committed by GitHub
parent cd70ae8a6a
commit 51b7ac519a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 275 additions and 1 deletions

View file

@ -0,0 +1,38 @@
# collectFirst
Transform this stream by applying the given partial function to the first element on which the function is defined as it pass through this processing step, and cancel the upstream publisher after the first element is emitted.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Source.collectFirst](Source) { scala="#collectFirst[T](pf:PartialFunction[Out,T]):FlowOps.this.Repr[T]" java="#collectFirst(scala.PartialFunction)" }
@apidoc[Flow.collectFirst](Flow) { scala="#collectFirst[T](pf:PartialFunction[Out,T]):FlowOps.this.Repr[T]" java="#collectFirst(scala.PartialFunction)" }
## Description
Transform this stream by applying the given partial function to the first element on which the function is defined as
it pass through this processing step, and cancel the upstream publisher after the first element is emitted.
## Example
Scala
: @@snip [Collect.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collectFirst }
Java
: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectFirst }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the provided partial function is defined for the first element
**backpressures** when the partial function is defined for the element and downstream backpressures
**completes** upstream completes or the first element is emitted
**cancels** when downstream cancels
@@@

View file

@ -144,6 +144,7 @@ depending on being backpressured by downstream or not.
|--|--|--|
|Flow|<a name="asflowwithcontext"></a>@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.|
|Source/Flow|<a name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|<a name="collectfirst"></a>@ref[collectFirst](Source-or-Flow/collectFirst.md)|Transform this stream by applying the given partial function to the first element on which the function is defined as it pass through this processing step, and cancel the upstream publisher after the first element is emitted.|
|Source/Flow|<a name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Source/Flow|<a name="collectwhile"></a>@ref[collectWhile](Source-or-Flow/collectWhile.md)|Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.|
|Flow|<a name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|
@ -412,6 +413,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [cancelled](Sink/cancelled.md)
* [collect](Source-or-Flow/collect.md)
* [collect](Sink/collect.md)
* [collectFirst](Source-or-Flow/collectFirst.md)
* [collection](Sink/collection.md)
* [collectType](Source-or-Flow/collectType.md)
* [collectWhile](Source-or-Flow/collectWhile.md)

View file

@ -66,6 +66,7 @@ import org.apache.pekko.stream.Attributes;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
class SourceOrFlow {
@ -412,6 +413,19 @@ class SourceOrFlow {
// #collectWhile
}
void collectFirstExample() {
// #collectFirst
Source.from(Arrays.asList(1, 3, 5, 7, 8, 9, 10))
.collectFirst(
PFBuilder.<Integer, Integer>create()
.match(Integer.class, i -> i % 2 == 0, i -> i)
.build())
.runWith(Sink.foreach(System.out::println), system);
// expect prints output:
// 8
// #collectFirst
}
void collectTypeExample() {
// #collectType
Flow<Message, Pong, NotUsed> flow =

View file

@ -14,9 +14,11 @@
package docs.stream.operators.sourceorflow
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source }
object Collect {
private implicit val system: ActorSystem = null
// #collect-elements
trait Message
final case class Ping(id: Int) extends Message
@ -46,4 +48,16 @@ object Collect {
}
// #collectWhile
}
def collectFirst(): Unit = {
// #collectFirst
Source(List(1, 3, 5, 7, 8, 9, 10))
.collectFirst {
case elem if elem % 2 == 0 => elem
}
.runWith(Sink.foreach(println))
// expect prints output:
// 8
// #collectFirst
}
}

View file

@ -997,6 +997,25 @@ public class FlowTest extends StreamTest {
.expectComplete();
}
@Test
public void mustBeAbleToUseCollectFirst() {
Source.from(
Arrays.asList(
Optional.of(1), Optional.<Integer>empty(), Optional.of(2), Optional.of(3)))
.collectFirst(
PFBuilder.<Optional<Integer>, Integer>create()
.match(
Optional.class,
elem -> elem.isPresent() && (Integer) elem.get() % 2 == 0,
elem -> (Integer) elem.get())
.build())
.runWith(TestSink.create(system), system)
.ensureSubscription()
.request(4)
.expectNext(2)
.expectComplete();
}
@Test
public void mustBeAbleToUseCollectType() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.stream.scaladsl
import org.apache.pekko
import pekko.stream.ActorAttributes._
import pekko.stream.OverflowStrategy
import pekko.stream.Supervision._
import pekko.stream.testkit.Utils.TE
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
class FlowCollectFirstSpec extends StreamSpec with ScriptedTest {
"A CollectFirst" must {
"collect in happy path" in {
Source(List(1, 3, 5, 7, 8, 9, 10))
.collectFirst {
case elem if elem % 2 == 0 => elem
}
.runWith(TestSink())
.request(7)
.expectNext(8)
.expectComplete()
}
"complete with buffer even no explict request" in {
Source(List(2, 4, 6))
.collectFirst {
case elem if elem % 2 != 0 => elem
}
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
.runWith(TestSink())
.ensureSubscription()
.expectComplete()
}
"complete with empty Source" in {
Source.empty[Int].collectFirst {
case elem if elem % 2 != 0 => elem
}.runWith(TestSink[Int]())
.ensureSubscription()
.expectComplete()
}
"restart when pf throws" in {
Source(1 to 6)
.collectFirst { case x: Int => if (x % 2 != 0) throw TE("") else x }
.withAttributes(supervisionStrategy(restartingDecider))
.runWith(TestSink[Int]())
.request(1)
.expectNext(2)
.request(1)
.expectComplete()
}
"resume when pf throws" in {
Source(1 to 6)
.collectFirst { case x: Int => if (x % 2 != 0) throw TE("") else x }
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(TestSink[Int]())
.request(1)
.expectNext(2)
.request(1)
.expectComplete()
}
}
}

View file

@ -38,6 +38,7 @@ import pekko.stream.Attributes._
val filter = name("filter")
val filterNot = name("filterNot")
val collect = name("collect")
val collectFirst = name("collectFirst")
val collectWhile = name("collectWhile")
val recover = name("recover")
val mapError = name("mapError")

View file

@ -1125,6 +1125,26 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collect(pf))
/**
* Transform this stream by applying the given partial function to the first element
* on which the function is defined as it pass through this processing step, and cancel the upstream publisher
* after the first element is emitted.
*
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the first element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*/
def collectFirst[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.collectFirst(pf))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the

View file

@ -2839,6 +2839,26 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.collect(pf))
/**
* Transform this stream by applying the given partial function to the first element
* on which the function is defined as it pass through this processing step, and cancel the upstream publisher
* after the first element is emitted.
*
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the first element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*/
def collectFirst[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.collectFirst(pf))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the

View file

@ -507,6 +507,26 @@ class SubFlow[In, Out, Mat](
def collect[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.collect(pf))
/**
* Transform this stream by applying the given partial function to the first element
* on which the function is defined as it pass through this processing step, and cancel the upstream publisher
* after the first element is emitted.
*
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the first element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*/
def collectFirst[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.collectFirst(pf))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the

View file

@ -498,6 +498,26 @@ class SubSource[Out, Mat](
def collect[T](pf: PartialFunction[Out, T]): SubSource[T, Mat] =
new SubSource(delegate.collect(pf))
/**
* Transform this stream by applying the given partial function to the first element
* on which the function is defined as it pass through this processing step, and cancel the upstream publisher
* after the first element is emitted.
*
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the first element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*/
def collectFirst[T](pf: PartialFunction[Out, T]): SubSource[T, Mat] =
new SubSource(delegate.collectFirst(pf))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.

View file

@ -1544,6 +1544,27 @@ trait FlowOps[+Out, +Mat] {
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T] = via(Collect(pf))
/**
* Transform this stream by applying the given partial function to the first element
* on which the function is defined as it pass through this processing step, and cancel the upstream publisher
* after the first element is emitted.
*
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the first element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes or the first element is emitted
*
* '''Cancels when''' downstream cancels
*/
def collectFirst[T](pf: PartialFunction[Out, T]): Repr[T] =
via(Flow[Out].collect(pf).take(1)
.withAttributes(DefaultAttributes.collectFirst and SourceLocation.forLambda(pf)))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step, and cancel the