+str Add Flow contramap.

Signed-off-by: He-Pin <hepin1989@gmail.com>
This commit is contained in:
He-Pin 2023-08-11 00:59:31 +08:00 committed by kerr
parent eeaec22bd5
commit bd3270cd3e
7 changed files with 112 additions and 1 deletions

View file

@ -0,0 +1,33 @@
# contramap
Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Flow.contramap](Flow) { scala="#contramap[In2](f:In2=&gt;In):Flow[In2, Out, Mat]" java="#map(
org.apache.pekko.japi.function.Function)" }
## Description
Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.
## Examples
Scala
: @@snip [Flow.scala](/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala) { #imports #contramap }
## Reactive Streams semantics
@@@div { .callout }
**emits** when the mapping function returns an element
**backpressures** '''Backpressures when''' original flow backpressures
**completes** when upstream completes
**cancels** when original flow cancels
@@@

View file

@ -146,6 +146,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|<a name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Flow|<a name="completionstageflow"></a>@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|
|Flow|<a name="contramap"></a>@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.|
|Source/Flow|<a name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.|
|Source/Flow|<a name="drop"></a>@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.|
|Source/Flow|<a name="dropwhile"></a>@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element|
@ -419,6 +420,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
* [contramap](Flow/contramap.md)
* [cycle](Source/cycle.md)
* [deflate](Compression/deflate.md)
* [delay](Source-or-Flow/delay.md)

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package docs.stream.operators.flow
//#imports
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.scaladsl._
//#imports
object ContraMap {
// #contramap
val flow: Flow[Int, Int, NotUsed] = Flow[Int]
val newFlow: Flow[String, Int, NotUsed] = flow.contramap(_.toInt)
// #contramap
}

View file

@ -120,6 +120,20 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("de");
}
@Test
public void mustBeAbleToUseContraMap() {
final Source<String, NotUsed> source = Source.from(Arrays.asList("1", "2", "3"));
final Flow<Integer, String, NotUsed> flow = Flow.fromFunction(String::valueOf);
source
.via(flow.contramap(Integer::valueOf))
.runWith(TestSink.create(system), system)
.request(3)
.expectNext("1")
.expectNext("2")
.expectNext("3")
.expectComplete();
}
@Test
public void mustBeAbleToUseDropWhile() throws Exception {
final TestKit probe = new TestKit(system);
@ -881,7 +895,7 @@ public class FlowTest extends StreamTest {
Source.from(input)
.via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class))
.runForeach((apple) -> probe.getRef().tell(apple, ActorRef.noSender()), system);
probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
probe.<Apple>expectMsgAnyClassOf(FlowSpec.Apple.class);
}
@Test

View file

@ -207,6 +207,14 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
c1.expectComplete()
}
"perform contramap operation" in {
val flow = Flow[Int].contramap(Integer.parseInt)
val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink())
sub.request(3)
sub.expectNextN(List(1, 2, 3))
sub.expectComplete()
}
"perform transformation operation" in {
val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString })

View file

@ -532,6 +532,17 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink)(combinerToScala(combine)))
/**
* Transform this Flow by applying a function to each *incoming* upstream element before
* it is passed to the [[Flow]]
*
* '''Backpressures when''' original [[Flow]] backpressures
*
* '''Cancels when''' original [[Flow]] cancels
*/
def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out, Mat] =
new Flow(delegate.contramap(elem => f(elem)))
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{

View file

@ -173,6 +173,17 @@ final class Flow[-In, +Out, +Mat](
(mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub)))
}
/**
* Transform this Flow by applying a function to each *incoming* upstream element before
* it is passed to the [[Flow]]
*
* '''Backpressures when''' original [[Flow]] backpressures
*
* '''Cancels when''' original [[Flow]] cancels
*/
def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] =
Flow.fromFunction(f).viaMat(this)(Keep.right)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{