feat: Add dimap operator to Flow. (#942)

This commit is contained in:
kerr 2024-01-15 15:01:34 +08:00 committed by GitHub
parent 057fe066e8
commit cf70478201
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 190 additions and 8 deletions

View file

@ -0,0 +1,36 @@
# dimap
Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Flow.dimap](Flow) { scala="#dimap%5BIn2%2C%20Out2%5D(f%3A%20In2%20%3D%3E%20In)(g%3A%20Out%20%3D%3E%20Out2)%3Aorg.apache.pekko.stream.scaladsl.Flow%5BIn2%2COut2%2CMat%5D" java="#dimap(org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function)" }
## Description
Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow,
and a function `g` to each *outgoing* downstream element.
## Examples
Scala
: @@snip [DiMap.scala](/docs/src/test/scala/docs/stream/operators/flow/DiMap.scala) { #imports #dimap }
Java
: @@snip [DiMap.java](/docs/src/test/java/jdocs/stream/operators/flow/DiMap.java) { #imports #dimap }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the mapping function `g` returns an element
**backpressures** '''Backpressures when''' original flow backpressures
**completes** when original flow completes
**cancels** when original flow cancels
@@@

View file

@ -148,6 +148,7 @@ depending on being backpressured by downstream or not.
|Flow|<a name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.|
|Source/Flow|<a name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.|
|Flow|<a name="dimap"></a>@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.|
|Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.|
|Source/Flow|<a name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element|
|Source/Flow|<a name="filter"></a>@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.|
@ -429,6 +430,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [delay](Source-or-Flow/delay.md)
* [delayWith](Source-or-Flow/delayWith.md)
* [detach](Source-or-Flow/detach.md)
* [dimap](Flow/dimap.md)
* [divertTo](Source-or-Flow/divertTo.md)
* [drop](Source-or-Flow/drop.md)
* [dropWhile](Source-or-Flow/dropWhile.md)

View file

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jdocs.stream.operators.flow;
// #imports
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Source;
import java.util.Arrays;
// #imports
public class DiMap {
private static final ActorSystem system = null;
private void demoDiMapUsage() {
// #dimap
final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", "3"));
final Flow<Integer, Integer, NotUsed> flow = Flow.<Integer>create().map(elem -> elem * 2);
source
.via(flow.dimap(Integer::parseInt, String::valueOf))
.runForeach(System.out::println, system);
// expected prints:
// 2
// 4
// 6
// #dimap
}
}

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package docs.stream.operators.flow
//#imports
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.stream.scaladsl._
//#imports
object DiMap {
implicit val system: ActorSystem = null
def demoDiMap(): Unit = {
// #dimap
val source = Source(List("1", "2", "3"))
val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
val newFlow: Flow[String, String, NotUsed] = flow.dimap(Integer.parseInt)(_.toString)
source.via(newFlow).runForeach(println)
// expected prints:
// 2
// 4
// 6
// #dimap
}
}

View file

@ -134,6 +134,20 @@ public class FlowTest extends StreamTest {
.expectComplete();
}
@Test
public void mustBeAbleToUseDiMap() {
final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", "3"));
final Flow<Integer, Integer, NotUsed> flow = Flow.<Integer>create().map(elem -> elem * 2);
source
.via(flow.dimap(Integer::valueOf, String::valueOf))
.runWith(TestSink.create(system), system)
.request(3)
.expectNext("2")
.expectNext("4")
.expectNext("6")
.expectComplete();
}
@Test
public void mustBeAbleToUseDropWhile() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -215,6 +215,14 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
sub.expectComplete()
}
"perform dimap operation" in {
val flow = Flow[Int].dimap(Integer.parseInt)(_ * 2)
val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink())
sub.request(3)
sub.expectNextN(List(2, 4, 6))
sub.expectComplete()
}
"perform transformation operation" in {
val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString })

View file

@ -32,6 +32,8 @@ import pekko.stream.Attributes._
val fused = name("fused")
val materializedValueSource = name("matValueSource")
val map = name("map")
val contramap = name("contramap")
val dimap = name("dimap")
val log = name("log")
val filter = name("filter")
val filterNot = name("filterNot")

View file

@ -25,8 +25,6 @@ import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import org.reactivestreams.Processor
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@ -47,6 +45,8 @@ import pekko.util.OptionConverters._
import pekko.util.Timeout
import pekko.util.unused
import org.reactivestreams.Processor
object Flow {
/** Create a `Flow` which can process elements of type `T`. */
@ -544,6 +544,22 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out, Mat] =
new Flow(delegate.contramap(elem => f(elem)))
/**
* Transform this Flow by applying a function `f` to each *incoming* upstream element before
* it is passed to the [[Flow]], and a function `g` to each *outgoing* downstream element.
*
* '''Emits when''' the mapping function `g` returns an element
*
* '''Backpressures when''' original [[Flow]] backpressures
*
* '''Completes when''' original [[Flow]] completes
*
* '''Cancels when''' original [[Flow]] cancels
* @since 1.1.0
*/
def dimap[In2, Out2](f: function.Function[In2, In], g: function.Function[Out, Out2]): javadsl.Flow[In2, Out2, Mat] =
new Flow(delegate.dimap(f(_))(g(_)))
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{

View file

@ -21,11 +21,6 @@ import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@ -48,12 +43,18 @@ import pekko.stream.impl.TraversalBuilder
import pekko.stream.impl.fusing
import pekko.stream.impl.fusing._
import pekko.stream.impl.fusing.FlattenMerge
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage._
import pekko.util.ConstantFun
import pekko.util.OptionVal
import pekko.util.Timeout
import pekko.util.ccompat._
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
*/
@ -182,7 +183,23 @@ final class Flow[-In, +Out, +Mat](
* @since 1.1.0
*/
def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] =
Flow.fromFunction(f).viaMat(this)(Keep.right)
Flow.fromFunction(f).viaMat(this)(Keep.right).withAttributes(DefaultAttributes.contramap)
/**
* Transform this Flow by applying a function `f` to each *incoming* upstream element before
* it is passed to the [[Flow]], and a function `g` to each *outgoing* downstream element.
*
* '''Emits when''' the mapping function `g` returns an element
*
* '''Backpressures when''' original [[Flow]] backpressures
*
* '''Completes when''' original [[Flow]] completes
*
* '''Cancels when''' original [[Flow]] cancels
* @since 1.1.0
*/
def dimap[In2, Out2](f: In2 => In)(g: Out => Out2): Flow[In2, Out2, Mat] =
Flow.fromFunction(f).viaMat(this)(Keep.right).map(g).withAttributes(DefaultAttributes.dimap)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].