From 68bd00659ea11f745f87f607f9c62500639f11fa Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Wed, 3 Apr 2024 01:10:19 +0800 Subject: [PATCH] chore: Add Source.iterate operator. (#1244) --- .../classical/OptimizedActorWithJava21.java | 17 ++++++++ .../stream/operators/Source/iterate.md | 43 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../stream/operators/source/Iterate.java | 30 +++++++++++++ .../stream/operators/source/Iterate.scala | 31 +++++++++++++ .../pekko/stream/javadsl/SourceTest.java | 19 +++++++- .../pekko/stream/scaladsl/SourceSpec.scala | 12 ++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../stream/impl/fusing/CollectFirst.scala | 8 ++-- .../apache/pekko/stream/javadsl/Source.scala | 21 +++++++++ .../apache/pekko/stream/scaladsl/Source.scala | 30 +++++++++++-- 11 files changed, 206 insertions(+), 8 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Source/iterate.md create mode 100644 docs/src/test/java/jdocs/stream/operators/source/Iterate.java create mode 100644 docs/src/test/scala/docs/stream/operators/source/Iterate.scala diff --git a/docs/src/main/java-jdk-21/docs/actors/classical/OptimizedActorWithJava21.java b/docs/src/main/java-jdk-21/docs/actors/classical/OptimizedActorWithJava21.java index 7223b99ce4..eb2521a76a 100644 --- a/docs/src/main/java-jdk-21/docs/actors/classical/OptimizedActorWithJava21.java +++ b/docs/src/main/java-jdk-21/docs/actors/classical/OptimizedActorWithJava21.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package docs.actors.classical; // #pattern-matching diff --git a/docs/src/main/paradox/stream/operators/Source/iterate.md b/docs/src/main/paradox/stream/operators/Source/iterate.md new file mode 100644 index 0000000000..6b5615efdb --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source/iterate.md @@ -0,0 +1,43 @@ +# Source.iterate + +Creates a sequential `Source` by iterating with the given predicate, function and seed. + +@ref[Source operators](../index.md#source-operators) + +## Signature + +@apidoc[Source.iterate](Source$) { scala="#iterate%5BT](seed:T)(hasNext:T=>Boolean,next:T=>T):org.apache.pekko.stream.scaladsl.Source%5BT,org.apache.pekko.NotUsed]" java="#iterate(java.lang.Object,org.apache.pekko.japi.function.Predicate,org.apache.pekko.japi.function.Function)" } + + +## Description + +Creates a sequential Source by iterating with the given hasNext predicate and next function, +starting with the given seed value. If the hasNext function returns false for the seed, the Source completes with empty. + +@@@ warning + +The same `seed` value will be used for every materialization of the `Source` so it is **mandatory** that the state is immutable. For example a `java.util.Iterator`, `Array` or Java standard library collection would not be safe as the fold operation could mutate the value. If you must use a mutable value, combining with @ref:[Source.lazySource](lazySource.md) to make sure a new mutable `zero` value is created for each materialization is one solution. + +@@@ + +## Examples + +The next example shows how to craet + +Scala + : @@snip [Iterate.scala](/docs/src/test/scala/docs/stream/operators/source/Iterate.scala) { #countTo } + +Java + : @@snip [Iterate.java](/docs/src/test/java/jdocs/stream/operators/source/Iterate.java) { #countTo } + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when there is demand and the `next` function returns. + +**completes** when the `haxNext` predicate returns false. + +@@@ + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 908320c26d..eff0593c8b 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -25,6 +25,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad |Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).| |Source|@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.| |Source|@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.| +|Source|@ref[iterate](Source/iterate.md)|Creates a sequential `Source` by iterating with the given predicate, function and seed.| |Source|@ref[lazily](Source/lazily.md)|Deprecated by @ref[`Source.lazySource`](Source/lazySource.md).| |Source|@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).| |Source|@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers creation of a future of a single element source until there is demand.| @@ -505,6 +506,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [interleave](Source-or-Flow/interleave.md) * [interleaveAll](Source-or-Flow/interleaveAll.md) * [intersperse](Source-or-Flow/intersperse.md) +* [iterate](Source/iterate.md) * [javaCollector](StreamConverters/javaCollector.md) * [javaCollectorParallelUnordered](StreamConverters/javaCollectorParallelUnordered.md) * [keepAlive](Source-or-Flow/keepAlive.md) diff --git a/docs/src/test/java/jdocs/stream/operators/source/Iterate.java b/docs/src/test/java/jdocs/stream/operators/source/Iterate.java new file mode 100644 index 0000000000..8d894b3974 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/source/Iterate.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.source; + +import org.apache.pekko.NotUsed; +import org.apache.pekko.stream.javadsl.Source; + +interface Iterate { + + // #countTo + static Source countTo(long n) { + return Source.iterate(1L, i -> true, i -> i + 1).take(n); + } + // #countTo +} diff --git a/docs/src/test/scala/docs/stream/operators/source/Iterate.scala b/docs/src/test/scala/docs/stream/operators/source/Iterate.scala new file mode 100644 index 0000000000..cc5b87fdea --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/source/Iterate.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.source + +import org.apache.pekko +import pekko.NotUsed +import pekko.stream.scaladsl.Source + +object Iterate { + + // #countTo + def countTo(n: Long): Source[Long, NotUsed] = Source + .iterate(1L)(_ => true, _ + 1) + .take(n) + // #countTo +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 6fb73453ca..ced206a4a6 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -19,7 +19,6 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.Cancellable; import org.apache.pekko.actor.Status; import org.apache.pekko.japi.Pair; -import org.apache.pekko.japi.Util; import org.apache.pekko.japi.function.*; import org.apache.pekko.japi.pf.PFBuilder; // #imports @@ -1441,6 +1440,24 @@ public class SourceTest extends StreamTest { resultList); } + @Test + public void iterateTest() throws Exception { + final List resultList = + Source.iterate(0, i -> i + 1) + .take(10) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), resultList); + final List emptyList = + Source.iterate(0, i -> i < 0, i -> i + 1) + .take(10) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + assertTrue(emptyList.isEmpty()); + } + @Test public void flattenOptional() throws Exception { // #flattenOptional diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala index 48ece508e1..2444e72429 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSpec.scala @@ -296,6 +296,18 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } } + "Iteratee Source" must { + "be able to iterate properly" in { + Source.iterate[Int](0)(_ => true, _ + 1) + .take(10) + .runWith(Sink.seq).futureValue should ===(immutable.Seq(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) + } + + "be able to generate an empty sequence" in { + Source.iterate[Int](0)(_ => false, _ + 1).runWith(Sink.seq).futureValue should ===(immutable.Seq()) + } + } + "Iterator Source" must { "properly iterate" in { Source.fromIterator(() => Iterator.iterate(false)(!_)).grouped(10).runWith(Sink.head).futureValue should ===( diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 32fcf8cfa3..0a18b880bb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -116,6 +116,7 @@ import pekko.stream.Attributes._ val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") + val iterateSource = name("iterateSource") val cycledSource = name("cycledSource") val futureSource = name("futureSource") val lazyFutureSource = name("lazyFutureSource") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectFirst.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectFirst.scala index 8590e4799c..0f58a65bc4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectFirst.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CollectFirst.scala @@ -17,16 +17,16 @@ package org.apache.pekko.stream.impl.fusing +import scala.annotation.nowarn +import scala.util.control.NonFatal + import org.apache.pekko import pekko.annotation.InternalApi +import pekko.stream._ import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.Attributes.SourceLocation import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import pekko.stream._ - -import scala.annotation.nowarn -import scala.util.control.NonFatal /** * INTERNAL API diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index d6fd19c4ed..dd3747a244 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -275,6 +275,27 @@ object Source { def unfoldAsync[S, E](s: S, f: function.Function[S, CompletionStage[Optional[Pair[S, E]]]]): Source[E, NotUsed] = new Source(scaladsl.Source.fromGraph(new UnfoldAsyncJava[S, E](s, f))) + /** + * Creates a sequential `Source` by iterating with the given predicate and function, + * starting with the given `seed` value. If the predicate returns `false` for the seed, + * the `Source` completes with empty. + * + * @see [[unfold]] + * @since 1.1.0 + */ + def iterate[T](seed: T, p: function.Predicate[T], f: function.Function[T, T]): Source[T, NotUsed] = + new Source(scaladsl.Source.iterate(seed)(elem => p.test(elem), elem => f(elem))) + + /** + * Creates an infinite sequential `Source` by iterating with the given function, + * starting with the given `seed` value. + * + * @see [[unfold]] + * @since 1.1.0 + */ + def iterate[T](seed: T, f: function.Function[T, T]): Source[T, NotUsed] = + new Source(scaladsl.Source.iterate(seed)(ConstantFun.anyToTrue, elem => f(elem))) + /** * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 2df3d110ea..623bd2a2fb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -17,7 +17,7 @@ import java.util.concurrent.CompletionStage import scala.annotation.{ nowarn, tailrec } import scala.annotation.unchecked.uncheckedVariance -import scala.collection.immutable +import scala.collection.{ immutable, AbstractIterator } import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration @@ -25,8 +25,8 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable } import pekko.annotation.InternalApi -import pekko.stream.{ Outlet, SourceShape, _ } -import pekko.stream.impl.{ PublisherSource, _ } +import pekko.stream._ +import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.{ GraphStages, IterableSource, LazyFutureSource, LazySingleSource } import pekko.stream.impl.fusing.GraphStages._ @@ -456,6 +456,30 @@ object Source { def unfoldAsync[S, E](s: S)(f: S => Future[Option[(S, E)]]): Source[E, NotUsed] = Source.fromGraph(new UnfoldAsync(s, f)) + /** + * Creates a sequential `Source` by iterating with the given predicate and function, + * starting with the given `seed` value. If the predicate returns `false` for the seed, + * the `Source` completes with empty. + * + * @see [[unfold]] + * @since 1.1.0 + */ + def iterate[T](seed: T)(p: T => Boolean, f: T => T): Source[T, NotUsed] = + fromIterator(() => + new AbstractIterator[T] { + private var first = true + private var acc = seed + override def hasNext: Boolean = p(acc) + override def next(): T = { + if (first) { + first = false + } else { + acc = f(acc) + } + acc + } + }).withAttributes(DefaultAttributes.iterableSource) + /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */