feat: Add support for for comprehensions. (#935)
This commit is contained in:
parent
6883d15576
commit
72f0a426b8
5 changed files with 162 additions and 7 deletions
|
|
@ -61,7 +61,6 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|
||||||
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|
||||||
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|
||||||
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
||||||
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|
|
||||||
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|
||||||
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
|
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
|
||||||
|Sink|<a name="frommaterializer"></a>@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`|
|
|Sink|<a name="frommaterializer"></a>@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`|
|
||||||
|
|
@ -446,7 +445,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
* [fold](Source-or-Flow/fold.md)
|
* [fold](Source-or-Flow/fold.md)
|
||||||
* [fold](Sink/fold.md)
|
* [fold](Sink/fold.md)
|
||||||
* [foldAsync](Source-or-Flow/foldAsync.md)
|
* [foldAsync](Source-or-Flow/foldAsync.md)
|
||||||
* [foreach](Sink/foreach.md)
|
|
||||||
* [foreachAsync](Sink/foreachAsync.md)
|
* [foreachAsync](Sink/foreachAsync.md)
|
||||||
* [foreachParallel](Sink/foreachParallel.md)
|
* [foreachParallel](Sink/foreachParallel.md)
|
||||||
* [from](Source/from.md)
|
* [from](Source/from.md)
|
||||||
|
|
|
||||||
|
|
@ -67,6 +67,10 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
"actorPublisher",
|
"actorPublisher",
|
||||||
"addAttributes",
|
"addAttributes",
|
||||||
"mapMaterializedValue",
|
"mapMaterializedValue",
|
||||||
|
// for comprehensions
|
||||||
|
"withFilter",
|
||||||
|
"flatMap",
|
||||||
|
"foreach",
|
||||||
// *Graph:
|
// *Graph:
|
||||||
"concatGraph",
|
"concatGraph",
|
||||||
"prependGraph",
|
"prependGraph",
|
||||||
|
|
@ -108,7 +112,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
"foldAsync",
|
"foldAsync",
|
||||||
"newOnCompleteStage"))
|
"newOnCompleteStage"))
|
||||||
|
|
||||||
val ignore =
|
val ignore = {
|
||||||
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
|
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
|
||||||
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
|
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
|
||||||
Set(
|
Set(
|
||||||
|
|
@ -123,6 +127,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
"transformMaterializing") ++
|
"transformMaterializing") ++
|
||||||
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
|
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
|
||||||
Set("++", "onPush", "onPull", "actorRefWithAck")
|
Set("++", "onPush", "onPull", "actorRefWithAck")
|
||||||
|
}
|
||||||
|
|
||||||
def isPending(element: String, opName: String) =
|
def isPending(element: String, opName: String) =
|
||||||
pendingTestCases.get(element).exists(_.contains(opName))
|
pendingTestCases.get(element).exists(_.contains(opName))
|
||||||
|
|
|
||||||
|
|
@ -91,12 +91,16 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
|
||||||
"orElseGraph",
|
"orElseGraph",
|
||||||
"divertToGraph")
|
"divertToGraph")
|
||||||
|
|
||||||
|
val forComprehensions = Set("withFilter", "flatMap", "foreach")
|
||||||
|
|
||||||
val allowMissing: Map[Class[_], Set[String]] = Map(
|
val allowMissing: Map[Class[_], Set[String]] = Map(
|
||||||
jFlowClass -> graphHelpers,
|
jFlowClass -> (graphHelpers ++ forComprehensions),
|
||||||
jSourceClass -> (graphHelpers ++ Set("watch", "ask")),
|
jSourceClass -> (graphHelpers ++ forComprehensions ++ Set("watch", "ask")),
|
||||||
// Java subflows can only be nested using .via and .to (due to type system restrictions)
|
// Java subflows can only be nested using .via and .to (due to type system restrictions)
|
||||||
jSubFlowClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
|
jSubFlowClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch",
|
||||||
jSubSourceClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
|
"ask")),
|
||||||
|
jSubSourceClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow",
|
||||||
|
"watch", "ask")),
|
||||||
sFlowClass -> Set("of"),
|
sFlowClass -> Set("of"),
|
||||||
sSourceClass -> Set("adapt", "from", "watch"),
|
sSourceClass -> Set("adapt", "from", "watch"),
|
||||||
sSinkClass -> Set("adapt"),
|
sSinkClass -> Set("adapt"),
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.stream.scaladsl
|
||||||
|
|
||||||
|
import org.apache.pekko
|
||||||
|
import pekko.Done
|
||||||
|
import pekko.japi.Util
|
||||||
|
import pekko.stream.testkit.StreamSpec
|
||||||
|
import pekko.stream.testkit.scaladsl.TestSink
|
||||||
|
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration.DurationInt
|
||||||
|
|
||||||
|
class ForComprehensionsCompileSpec extends StreamSpec {
|
||||||
|
"A Source" must {
|
||||||
|
"be able to be used in a for comprehension which yield" in {
|
||||||
|
val source = Source(1 to 5)
|
||||||
|
val evenSource = for {
|
||||||
|
i <- source if i % 2 == 0
|
||||||
|
} yield i.toString
|
||||||
|
evenSource.runWith(TestSink[String]())
|
||||||
|
.request(5)
|
||||||
|
.expectNextN(List("2", "4"))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to be used in a for comprehension which flatMap" in {
|
||||||
|
val source = Source(1 to 5)
|
||||||
|
val evenSource = for {
|
||||||
|
i <- source if i % 2 == 0
|
||||||
|
j <- Source.lazySingle(() => i)
|
||||||
|
str = j.toString
|
||||||
|
} yield str
|
||||||
|
evenSource.runWith(TestSink[String]())
|
||||||
|
.request(5)
|
||||||
|
.expectNextN(List("2", "4"))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to be used in a for comprehension which yield a runnable graph" in {
|
||||||
|
val source = Source(1 to 5)
|
||||||
|
val list = new CopyOnWriteArrayList[String]()
|
||||||
|
val future = (for (i <- source if i % 2 == 0) {
|
||||||
|
list.add(i.toString)
|
||||||
|
}).run()
|
||||||
|
|
||||||
|
Await.result(future, 3.seconds) shouldBe Done
|
||||||
|
Util.immutableSeq(list) shouldBe List("2", "4")
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to be used in a for comprehension which with Flow" in {
|
||||||
|
(for {
|
||||||
|
i <- Source(1 to 20) if i % 2 == 0
|
||||||
|
j <- Source.lazySingle(() => i)
|
||||||
|
str = j.toString
|
||||||
|
} yield str)
|
||||||
|
.via(for {
|
||||||
|
str <- Flow[String] if str.length > 1
|
||||||
|
doubleStr = str + str
|
||||||
|
number <- Source.lazySingle(() => doubleStr)
|
||||||
|
} yield number.toInt)
|
||||||
|
.runWith(TestSink[Int]())
|
||||||
|
.request(6)
|
||||||
|
.expectNextN(List(1010, 1212, 1414, 1616, 1818, 2020))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"A Flow" must {
|
||||||
|
"be able to be used in a for comprehension which yield" in {
|
||||||
|
Source(1 to 5).via(for (i <- Flow[Int] if i % 2 == 0) yield i.toString)
|
||||||
|
.runWith(TestSink[String]())
|
||||||
|
.request(5)
|
||||||
|
.expectNextN(List("2", "4"))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to be used in a for comprehension which flatmap" in {
|
||||||
|
Source(1 to 5).via(for {
|
||||||
|
i <- Flow[Int] if i % 2 == 0
|
||||||
|
j <- Source.single(i)
|
||||||
|
str = j.toString
|
||||||
|
} yield str)
|
||||||
|
.runWith(TestSink[String]())
|
||||||
|
.request(5)
|
||||||
|
.expectNextN(List("2", "4"))
|
||||||
|
.expectComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
"be able to be used in a for comprehension which yield a sink" in {
|
||||||
|
val source = Source(1 to 5)
|
||||||
|
val list = new CopyOnWriteArrayList[String]()
|
||||||
|
val sink = for (i <- Flow[Int] if i % 2 == 0) {
|
||||||
|
list.add(i.toString)
|
||||||
|
}
|
||||||
|
val future = source.runWith(sink)
|
||||||
|
Await.result(future, 3.seconds) shouldBe Done
|
||||||
|
Util.immutableSeq(list) shouldBe List("2", "4")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1383,6 +1383,16 @@ trait FlowOps[+Out, +Mat] {
|
||||||
*/
|
*/
|
||||||
def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))
|
def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alias for [[filter]], added to enable filtering in for comprehensions.
|
||||||
|
*
|
||||||
|
* NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
|
||||||
|
* the internal implementation.
|
||||||
|
* @since 1.1.0
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
def withFilter(p: Out => Boolean): Repr[Out] = filter(p)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only pass on those elements that NOT satisfy the given predicate.
|
* Only pass on those elements that NOT satisfy the given predicate.
|
||||||
*
|
*
|
||||||
|
|
@ -2521,6 +2531,16 @@ trait FlowOps[+Out, +Mat] {
|
||||||
*/
|
*/
|
||||||
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1))
|
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Alias for [[flatMapConcat]], added to enable for comprehensions.
|
||||||
|
*
|
||||||
|
* NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
|
||||||
|
* the internal implementation.
|
||||||
|
* @since 1.1.0
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
def flatMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = flatMapConcat(f)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform each input element into a `Source` of output elements that is
|
* Transform each input element into a `Source` of output elements that is
|
||||||
* then flattened into the output stream by merging, where at most `breadth`
|
* then flattened into the output stream by merging, where at most `breadth`
|
||||||
|
|
@ -3732,6 +3752,17 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
|
||||||
*/
|
*/
|
||||||
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3]
|
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect this [[Flow]] to a `foreach` [[Sink]], that will invoke the given procedure for each received element.
|
||||||
|
* Added to enable for comprehensions.
|
||||||
|
*
|
||||||
|
* NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
|
||||||
|
* the internal implementation.
|
||||||
|
* @since 1.1.0
|
||||||
|
*/
|
||||||
|
@ApiMayChange
|
||||||
|
def foreach(f: Out => Unit): ClosedMat[Future[Done]] = toMat(Sink.foreach(f))(Keep.right)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow.
|
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow.
|
||||||
* see [[#flatMapPrefix]] for details.
|
* see [[#flatMapPrefix]] for details.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue