diff --git a/docs/src/main/paradox/stream/operators/Flow/dimap.md b/docs/src/main/paradox/stream/operators/Flow/dimap.md new file mode 100644 index 0000000000..a00e965626 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Flow/dimap.md @@ -0,0 +1,36 @@ +# dimap + +Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Flow.dimap](Flow) { scala="#dimap%5BIn2%2C%20Out2%5D(f%3A%20In2%20%3D%3E%20In)(g%3A%20Out%20%3D%3E%20Out2)%3Aorg.apache.pekko.stream.scaladsl.Flow%5BIn2%2COut2%2CMat%5D" java="#dimap(org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function)" } + +## Description + +Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, +and a function `g` to each *outgoing* downstream element. + +## Examples + +Scala +: @@snip [DiMap.scala](/docs/src/test/scala/docs/stream/operators/flow/DiMap.scala) { #imports #dimap } + +Java +: @@snip [DiMap.java](/docs/src/test/java/jdocs/stream/operators/flow/DiMap.java) { #imports #dimap } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the mapping function `g` returns an element + +**backpressures** '''Backpressures when''' original flow backpressures + +**completes** when original flow completes + +**cancels** when original flow cancels + +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index d797f53942..7504329bcf 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -148,6 +148,7 @@ depending on being backpressured by downstream or not. |Flow|@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.| |Flow|@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.| |Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| +|Flow|@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.| |Source/Flow|@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| |Source/Flow|@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.| @@ -429,6 +430,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [delay](Source-or-Flow/delay.md) * [delayWith](Source-or-Flow/delayWith.md) * [detach](Source-or-Flow/detach.md) +* [dimap](Flow/dimap.md) * [divertTo](Source-or-Flow/divertTo.md) * [drop](Source-or-Flow/drop.md) * [dropWhile](Source-or-Flow/dropWhile.md) diff --git a/docs/src/test/java/jdocs/stream/operators/flow/DiMap.java b/docs/src/test/java/jdocs/stream/operators/flow/DiMap.java new file mode 100644 index 0000000000..6c82e910c7 --- /dev/null +++ b/docs/src/test/java/jdocs/stream/operators/flow/DiMap.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package jdocs.stream.operators.flow; + +// #imports +import org.apache.pekko.NotUsed; +import org.apache.pekko.actor.ActorSystem; +import org.apache.pekko.stream.javadsl.Flow; +import org.apache.pekko.stream.javadsl.Source; + +import java.util.Arrays; +// #imports + +public class DiMap { + private static final ActorSystem system = null; + + private void demoDiMapUsage() { + // #dimap + final Source source = Source.from(Arrays.asList("1", "2", "3")); + final Flow flow = Flow.create().map(elem -> elem * 2); + source + .via(flow.dimap(Integer::parseInt, String::valueOf)) + .runForeach(System.out::println, system); + // expected prints: + // 2 + // 4 + // 6 + // #dimap + } +} diff --git a/docs/src/test/scala/docs/stream/operators/flow/DiMap.scala b/docs/src/test/scala/docs/stream/operators/flow/DiMap.scala new file mode 100644 index 0000000000..18a7f87b7c --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/flow/DiMap.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.flow + +//#imports +import org.apache.pekko +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl._ +//#imports + +object DiMap { + implicit val system: ActorSystem = null + + def demoDiMap(): Unit = { + // #dimap + val source = Source(List("1", "2", "3")) + val flow: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2) + val newFlow: Flow[String, String, NotUsed] = flow.dimap(Integer.parseInt)(_.toString) + source.via(newFlow).runForeach(println) + // expected prints: + // 2 + // 4 + // 6 + // #dimap + } +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f886cfbd0d..395276a026 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -134,6 +134,20 @@ public class FlowTest extends StreamTest { .expectComplete(); } + @Test + public void mustBeAbleToUseDiMap() { + final Source source = Source.from(Arrays.asList("1", "2", "3")); + final Flow flow = Flow.create().map(elem -> elem * 2); + source + .via(flow.dimap(Integer::valueOf, String::valueOf)) + .runWith(TestSink.create(system), system) + .request(3) + .expectNext("2") + .expectNext("4") + .expectNext("6") + .expectComplete(); + } + @Test public void mustBeAbleToUseDropWhile() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index c1d71cb70f..4621110cbb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -215,6 +215,14 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r sub.expectComplete() } + "perform dimap operation" in { + val flow = Flow[Int].dimap(Integer.parseInt)(_ * 2) + val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink()) + sub.request(3) + sub.expectNextN(List(2, 4, 6)) + sub.expectComplete() + } + "perform transformation operation" in { val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString }) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 8ec7409dc3..13ecb94a6b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -32,6 +32,8 @@ import pekko.stream.Attributes._ val fused = name("fused") val materializedValueSource = name("matValueSource") val map = name("map") + val contramap = name("contramap") + val dimap = name("dimap") val log = name("log") val filter = name("filter") val filterNot = name("filterNot") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 659ac20073..e568a1e982 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -25,8 +25,6 @@ import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import org.reactivestreams.Processor - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -47,6 +45,8 @@ import pekko.util.OptionConverters._ import pekko.util.Timeout import pekko.util.unused +import org.reactivestreams.Processor + object Flow { /** Create a `Flow` which can process elements of type `T`. */ @@ -544,6 +544,22 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out, Mat] = new Flow(delegate.contramap(elem => f(elem))) + /** + * Transform this Flow by applying a function `f` to each *incoming* upstream element before + * it is passed to the [[Flow]], and a function `g` to each *outgoing* downstream element. + * + * '''Emits when''' the mapping function `g` returns an element + * + * '''Backpressures when''' original [[Flow]] backpressures + * + * '''Completes when''' original [[Flow]] completes + * + * '''Cancels when''' original [[Flow]] cancels + * @since 1.1.0 + */ + def dimap[In2, Out2](f: function.Function[In2, In], g: function.Function[Out, Out2]): javadsl.Flow[In2, Out2, Mat] = + new Flow(delegate.dimap(f(_))(g(_))) + /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. * {{{ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 5e67b9b7a8..bdd6ea1b5a 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -21,11 +21,6 @@ import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import org.reactivestreams.Processor -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -48,12 +43,18 @@ import pekko.stream.impl.TraversalBuilder import pekko.stream.impl.fusing import pekko.stream.impl.fusing._ import pekko.stream.impl.fusing.FlattenMerge +import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage._ import pekko.util.ConstantFun import pekko.util.OptionVal import pekko.util.Timeout import pekko.util.ccompat._ +import org.reactivestreams.Processor +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription + /** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ @@ -182,7 +183,23 @@ final class Flow[-In, +Out, +Mat]( * @since 1.1.0 */ def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] = - Flow.fromFunction(f).viaMat(this)(Keep.right) + Flow.fromFunction(f).viaMat(this)(Keep.right).withAttributes(DefaultAttributes.contramap) + + /** + * Transform this Flow by applying a function `f` to each *incoming* upstream element before + * it is passed to the [[Flow]], and a function `g` to each *outgoing* downstream element. + * + * '''Emits when''' the mapping function `g` returns an element + * + * '''Backpressures when''' original [[Flow]] backpressures + * + * '''Completes when''' original [[Flow]] completes + * + * '''Cancels when''' original [[Flow]] cancels + * @since 1.1.0 + */ + def dimap[In2, Out2](f: In2 => In)(g: Out => Out2): Flow[In2, Out2, Mat] = + Flow.fromFunction(f).viaMat(this)(Keep.right).map(g).withAttributes(DefaultAttributes.dimap) /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].