From bd3270cd3e0d8f77e19a93ddd6dbe6a3bdea89f8 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Fri, 11 Aug 2023 00:59:31 +0800 Subject: [PATCH] +str Add Flow contramap. Signed-off-by: He-Pin --- .../stream/operators/Flow/contramap.md | 33 +++++++++++++++++++ .../main/paradox/stream/operators/index.md | 2 ++ .../stream/operators/flow/ContraMap.scala | 32 ++++++++++++++++++ .../apache/pekko/stream/javadsl/FlowTest.java | 16 ++++++++- .../pekko/stream/scaladsl/FlowSpec.scala | 8 +++++ .../apache/pekko/stream/javadsl/Flow.scala | 11 +++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 11 +++++++ 7 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 docs/src/main/paradox/stream/operators/Flow/contramap.md create mode 100644 docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala diff --git a/docs/src/main/paradox/stream/operators/Flow/contramap.md b/docs/src/main/paradox/stream/operators/Flow/contramap.md new file mode 100644 index 0000000000..fae21bff89 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Flow/contramap.md @@ -0,0 +1,33 @@ +# contramap + +Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Flow.contramap](Flow) { scala="#contramap[In2](f:In2=>In):Flow[In2, Out, Mat]" java="#map( +org.apache.pekko.japi.function.Function)" } + +## Description + +Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow. + +## Examples + +Scala +: @@snip [Flow.scala](/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala) { #imports #contramap } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the mapping function returns an element + +**backpressures** '''Backpressures when''' original flow backpressures + +**completes** when upstream completes + +**cancels** when original flow cancels + +@@@ \ No newline at end of file diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 69712e037e..e6cb416cdc 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -146,6 +146,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.| |Source/Flow|@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.| |Flow|@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.| +|Flow|@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.| |Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| |Source/Flow|@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| @@ -419,6 +420,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [concatLazy](Source-or-Flow/concatLazy.md) * [conflate](Source-or-Flow/conflate.md) * [conflateWithSeed](Source-or-Flow/conflateWithSeed.md) +* [contramap](Flow/contramap.md) * [cycle](Source/cycle.md) * [deflate](Compression/deflate.md) * [delay](Source-or-Flow/delay.md) diff --git a/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala new file mode 100644 index 0000000000..5589599ebc --- /dev/null +++ b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package docs.stream.operators.flow + +//#imports +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl._ + +//#imports + +object ContraMap { + + // #contramap + val flow: Flow[Int, Int, NotUsed] = Flow[Int] + val newFlow: Flow[String, Int, NotUsed] = flow.contramap(_.toInt) + // #contramap +} diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index cf8c68dc63..5df754d409 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -120,6 +120,20 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("de"); } + @Test + public void mustBeAbleToUseContraMap() { + final Source source = Source.from(Arrays.asList("1", "2", "3")); + final Flow flow = Flow.fromFunction(String::valueOf); + source + .via(flow.contramap(Integer::valueOf)) + .runWith(TestSink.create(system), system) + .request(3) + .expectNext("1") + .expectNext("2") + .expectNext("3") + .expectComplete(); + } + @Test public void mustBeAbleToUseDropWhile() throws Exception { final TestKit probe = new TestKit(system); @@ -881,7 +895,7 @@ public class FlowTest extends StreamTest { Source.from(input) .via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class)) .runForeach((apple) -> probe.getRef().tell(apple, ActorRef.noSender()), system); - probe.expectMsgAnyClassOf(FlowSpec.Apple.class); + probe.expectMsgAnyClassOf(FlowSpec.Apple.class); } @Test diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala index 041b874190..c1d71cb70f 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala @@ -207,6 +207,14 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r c1.expectComplete() } + "perform contramap operation" in { + val flow = Flow[Int].contramap(Integer.parseInt) + val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink()) + sub.request(3) + sub.expectNextN(List(1, 2, 3)) + sub.expectComplete() + } + "perform transformation operation" in { val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString }) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 1a3ed4683e..54b75b94e2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -532,6 +532,17 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] = new Sink(delegate.toMat(sink)(combinerToScala(combine))) + /** + * Transform this Flow by applying a function to each *incoming* upstream element before + * it is passed to the [[Flow]] + * + * '''Backpressures when''' original [[Flow]] backpressures + * + * '''Cancels when''' original [[Flow]] cancels + */ + def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out, Mat] = + new Flow(delegate.contramap(elem => f(elem))) + /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. * {{{ diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index e3767bfba0..c3e2fcfe56 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -173,6 +173,17 @@ final class Flow[-In, +Out, +Mat]( (mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub))) } + /** + * Transform this Flow by applying a function to each *incoming* upstream element before + * it is passed to the [[Flow]] + * + * '''Backpressures when''' original [[Flow]] backpressures + * + * '''Cancels when''' original [[Flow]] cancels + */ + def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] = + Flow.fromFunction(f).viaMat(this)(Keep.right) + /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. * {{{