diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/collectWhile.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/collectWhile.md new file mode 100644 index 0000000000..8e6f705968 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/collectWhile.md @@ -0,0 +1,38 @@ +# collectWhile + +Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.collectWhile](Source) { scala="#collectWhile[T](pf:PartialFunction[Out,T]):FlowOps.this.Repr[T]" java="#collectWhile(scala.PartialFunction)" } +@apidoc[Flow.collectWhile](Flow) { scala="#collectWhile[T](pf:PartialFunction[Out,T]):FlowOps.this.Repr[T]" java="#collectWhile(scala.PartialFunction)" } + + +## Description + +Transform this stream by applying the given partial function to each of the elements on which the function is defined +as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied. + +## Example + +Scala +: @@snip [Collect.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala) { #collectWhile } + +Java +: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #collectWhile } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the provided partial function is defined for the element + +**backpressures** when the partial function is defined for the element and downstream backpressures + +**completes** when upstream completes or the partial function is not applied + +**cancels** when downstream cancels + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 5f2d73c464..2483c2169d 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -145,6 +145,7 @@ depending on being backpressured by downstream or not. |Flow|@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Extracts context data from the elements of a `Flow` so that it can be turned into a `FlowWithContext` which can propagate that context per element along a stream.| |Source/Flow|@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.| |Source/Flow|@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.| +|Source/Flow|@ref[collectWhile](Source-or-Flow/collectWhile.md)|Transform this stream by applying the given partial function to each of the elements on which the function is defined as they pass through this processing step, and cancel the upstream publisher after the partial function is not applied.| |Flow|@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.| |Flow|@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.| |Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| @@ -413,6 +414,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [collect](Sink/collect.md) * [collection](Sink/collection.md) * [collectType](Source-or-Flow/collectType.md) +* [collectWhile](Source-or-Flow/collectWhile.md) * [combine](Source/combine.md) * [combine](Sink/combine.md) * [completionStage](Source/completionStage.md) diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 4859c5ad75..a1a55ca5f3 100644 --- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -401,6 +401,17 @@ class SourceOrFlow { // #collect } + void collectWhileExample() { + // #collectWhile + Flow flow = + Flow.of(Message.class) + .collectWhile( + PFBuilder.create() + .match(Ping.class, p -> p.id <= 100, p -> new Pong(p.id)) + .build()); + // #collectWhile + } + void collectTypeExample() { // #collectType Flow flow = diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala index d8bb1fbddf..fa96da7fb1 100644 --- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Collect.scala @@ -38,4 +38,12 @@ object Collect { Flow[Message].collectType[Ping].filter(_.id != 0).map(p => Pong(p.id)) // #collectType } + + def collectWhile(): Unit = { + // #collectWhile + Flow[Message].collectWhile { + case Ping(id) if id <= 100 => Pong(id) + } + // #collectWhile + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 9704046ba1..a5133b1f5a 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -983,6 +983,20 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToUseCollectWhile() { + Source.from(Arrays.asList(1, 3, 5, 6, 7, 8, 9)) + .collectWhile( + PFBuilder.create() + .match(Integer.class, elem -> elem % 2 != 0, elem -> elem) + .build()) + .runWith(TestSink.create(system), system) + .ensureSubscription() + .request(5) + .expectNextN(Arrays.asList(1, 3, 5)) + .expectComplete(); + } + @Test public void mustBeAbleToUseCollectType() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowCollectWhileSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowCollectWhileSpec.scala new file mode 100644 index 0000000000..fd119853a2 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowCollectWhileSpec.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.ActorAttributes._ +import pekko.stream.OverflowStrategy +import pekko.stream.Supervision._ +import pekko.stream.testkit.{ ScriptedTest, StreamSpec } +import pekko.stream.testkit.Utils.TE +import pekko.stream.testkit.scaladsl.TestSink + +class FlowCollectWhileSpec extends StreamSpec with ScriptedTest { + + "A CollectWhile" must { + + "collect in happy path" in { + Source(List(1, 3, 5, 7, 8, 9, 10)) + .collectWhile { + case elem if elem % 2 != 0 => elem + } + .runWith(TestSink()) + .request(7) + .expectNextN(List(1, 3, 5, 7)) + .expectComplete() + } + + "complete with buffer even no explict request" in { + Source(List(2, 3, 5)) + .collectWhile { + case elem if elem % 2 != 0 => elem + } + .buffer(1, overflowStrategy = OverflowStrategy.backpressure) + .runWith(TestSink()) + .ensureSubscription() + .expectComplete() + } + + "complete with empty Source" in { + Source.empty[Int].collectWhile { + case elem if elem % 2 != 0 => elem + }.runWith(TestSink[Int]()) + .ensureSubscription() + .expectComplete() + } + + "restart when pf throws" in { + Source(1 to 6) + .collect { case x: Int => if (x % 2 == 0) throw TE("") else x } + .withAttributes(supervisionStrategy(restartingDecider)) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(1) + .request(1) + .expectNext(3) + .request(1) + .expectNext(5) + .request(1) + .expectComplete() + } + + "resume when pf throws" in { + Source(1 to 6) + .collect { case x: Int => if (x % 2 == 0) throw TE("") else x } + .withAttributes(supervisionStrategy(resumingDecider)) + .runWith(TestSink[Int]()) + .request(1) + .expectNext(1) + .request(1) + .expectNext(3) + .request(1) + .expectNext(5) + .request(1) + .expectComplete() + } + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index d2cf599f3c..a0baa1d3e9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -38,6 +38,7 @@ import pekko.stream.Attributes._ val filter = name("filter") val filterNot = name("filterNot") val collect = name("collect") + val collectWhile = name("collectWhile") val recover = name("recover") val mapError = name("mapError") val mapAsync = name("mapAsync") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectWhile.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectWhile.scala new file mode 100644 index 0000000000..11617e28ec --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectWhile.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.util.control.NonFatal + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, Supervision } +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.Attributes.SourceLocation +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] final class CollectWhile[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] { + private val in = Inlet[In]("CollectWhile.in") + private val out = Outlet[Out]("CollectWhile.out") + override val shape = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.collectWhile and SourceLocation.forLambda(pf) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + import Collect.NotApplied + + override final def onPush(): Unit = + try { + pf.applyOrElse(grab(in), NotApplied) match { + case NotApplied => completeStage() + case result: Out @unchecked => push(out, result) + case _ => throw new RuntimeException() // won't happen, compiler exhaustiveness check pleaser + } + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => failStage(ex) + case _ => pull(in) + } + } + + override final def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } + + override def toString: String = "CollectWhile" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 58566f2d2e..1059482ef3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1125,6 +1125,28 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def collect[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.collect(pf)) + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step, and cancel the + * upstream publisher after the partial function is not applied. + * + * The stream will be completed without producing any elements if the partial function is not applied for + * the first stream element, eg: there is a downstream buffer. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the provided partial function is defined for the element + * + * '''Backpressures when''' the partial function is defined for the element and downstream backpressures + * + * '''Completes when''' upstream completes or the partial function is not applied. + * + * '''Cancels when''' downstream cancels + * @since 1.1.0 + */ + def collectWhile[T](pf: PartialFunction[Out, T]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.collectWhile(pf)) + /** * Transform this stream by testing the type of each of the elements * on which the element is an instance of the provided type as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 7396f370e2..4958f7fef9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2839,6 +2839,28 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def collect[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] = new Source(delegate.collect(pf)) + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step, and cancel the + * upstream publisher after the partial function is not applied. + * + * The stream will be completed without producing any elements if the partial function is not applied for + * the first stream element, eg: there is a downstream buffer. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the provided partial function is defined for the element + * + * '''Backpressures when''' the partial function is defined for the element and downstream backpressures + * + * '''Completes when''' upstream completes or the partial function is not applied. + * + * '''Cancels when''' downstream cancels + * @since 1.1.0 + */ + def collectWhile[T](pf: PartialFunction[Out, T]): javadsl.Source[T, Mat] = + new Source(delegate.collectWhile(pf)) + /** * Transform this stream by testing the type of each of the elements * on which the element is an instance of the provided type as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index dab7481fcc..bbdad2bd85 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -507,6 +507,28 @@ class SubFlow[In, Out, Mat]( def collect[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.collect(pf)) + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step, and cancel the + * upstream publisher after the partial function is not applied. + * + * The stream will be completed without producing any elements if the partial function is not applied for + * the first stream element, eg: there is a downstream buffer. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the provided partial function is defined for the element + * + * '''Backpressures when''' the partial function is defined for the element and downstream backpressures + * + * '''Completes when''' upstream completes or the partial function is not applied. + * + * '''Cancels when''' downstream cancels + * @since 1.1.0 + */ + def collectWhile[T](pf: PartialFunction[Out, T]): SubFlow[In, T, Mat] = + new SubFlow(delegate.collectWhile(pf)) + /** * Transform this stream by testing the type of each of the elements * on which the element is an instance of the provided type as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index eb11925696..03e5cb80e3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -499,19 +499,41 @@ class SubSource[Out, Mat]( new SubSource(delegate.collect(pf)) /** - * Transform this stream by testing the type of each of the elements - * on which the element is an instance of the provided type as they pass through this processing step. + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. * Non-matching elements are filtered out. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * - * '''Emits when''' the element is an instance of the provided type + * '''Emits when''' the provided partial function is defined for the element * - * '''Backpressures when''' the element is an instance of the provided type and downstream backpressures + * '''Backpressures when''' the partial function is defined for the element and downstream backpressures * * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * @since 1.1.0 + */ + def collectWhile[T](pf: PartialFunction[Out, T]): SubSource[T, Mat] = + new SubSource(delegate.collectWhile(pf)) + + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step, and cancel the + * upstream publisher after the partial function is not applied. + * + * The stream will be completed without producing any elements if the partial function is not applied for + * the first stream element, eg: there is a downstream buffer. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the provided partial function is defined for the element + * + * '''Backpressures when''' the partial function is defined for the element and downstream backpressures + * + * '''Completes when''' upstream completes or the partial function is not applied. + * + * '''Cancels when''' downstream cancels */ def collectType[T](clazz: Class[T]): javadsl.SubSource[T, Mat] = new SubSource(delegate.collectType[T](ClassTag[T](clazz))) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 39c42f6f27..3695644d1f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1544,6 +1544,27 @@ trait FlowOps[+Out, +Mat] { */ def collect[T](pf: PartialFunction[Out, T]): Repr[T] = via(Collect(pf)) + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step, and cancel the + * upstream publisher after the partial function is not applied. + * + * The stream will be completed without producing any elements if the partial function is not applied for + * the first stream element, eg: there is a downstream buffer. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the provided partial function is defined for the element + * + * '''Backpressures when''' the partial function is defined for the element and downstream backpressures + * + * '''Completes when''' upstream completes or the partial function is not applied. + * + * '''Cancels when''' downstream cancels + * @since 1.1.0 + */ + def collectWhile[T](pf: PartialFunction[Out, T]): Repr[T] = via(new CollectWhile(pf)) + /** * Transform this stream by testing the type of each of the elements * on which the element is an instance of the provided type as they pass through this processing step.