diff --git a/docs/src/main/paradox/stream/operators/Flow/contramap.md b/docs/src/main/paradox/stream/operators/Flow/contramap.md
new file mode 100644
index 0000000000..fae21bff89
--- /dev/null
+++ b/docs/src/main/paradox/stream/operators/Flow/contramap.md
@@ -0,0 +1,33 @@
+# contramap
+
+Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+## Signature
+
+@apidoc[Flow.contramap](Flow) { scala="#contramap[In2](f:In2=>In):Flow[In2, Out, Mat]" java="#map(
+org.apache.pekko.japi.function.Function)" }
+
+## Description
+
+Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.
+
+## Examples
+
+Scala
+: @@snip [Flow.scala](/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala) { #imports #contramap }
+
+## Reactive Streams semantics
+
+@@@div { .callout }
+
+**emits** when the mapping function returns an element
+
+**backpressures** '''Backpressures when''' original flow backpressures
+
+**completes** when upstream completes
+
+**cancels** when original flow cancels
+
+@@@
\ No newline at end of file
diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md
index 69712e037e..e6cb416cdc 100644
--- a/docs/src/main/paradox/stream/operators/index.md
+++ b/docs/src/main/paradox/stream/operators/index.md
@@ -146,6 +146,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Flow|@ref[completionStageFlow](Flow/completionStageFlow.md)|Streams the elements through the given future flow once it successfully completes.|
+|Flow|@ref[contramap](Flow/contramap.md)|Transform this Flow by applying a function to each *incoming* upstream element before it is passed to the Flow.|
|Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.|
|Source/Flow|@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.|
|Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element|
@@ -419,6 +420,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [concatLazy](Source-or-Flow/concatLazy.md)
* [conflate](Source-or-Flow/conflate.md)
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
+* [contramap](Flow/contramap.md)
* [cycle](Source/cycle.md)
* [deflate](Compression/deflate.md)
* [delay](Source-or-Flow/delay.md)
diff --git a/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
new file mode 100644
index 0000000000..5589599ebc
--- /dev/null
+++ b/docs/src/test/scala/docs/stream/operators/flow/ContraMap.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package docs.stream.operators.flow
+
+//#imports
+import org.apache.pekko.NotUsed
+import org.apache.pekko.stream.scaladsl._
+
+//#imports
+
+object ContraMap {
+
+ // #contramap
+ val flow: Flow[Int, Int, NotUsed] = Flow[Int]
+ val newFlow: Flow[String, Int, NotUsed] = flow.contramap(_.toInt)
+ // #contramap
+}
diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
index cf8c68dc63..5df754d409 100644
--- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
+++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java
@@ -120,6 +120,20 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("de");
}
+ @Test
+ public void mustBeAbleToUseContraMap() {
+ final Source source = Source.from(Arrays.asList("1", "2", "3"));
+ final Flow flow = Flow.fromFunction(String::valueOf);
+ source
+ .via(flow.contramap(Integer::valueOf))
+ .runWith(TestSink.create(system), system)
+ .request(3)
+ .expectNext("1")
+ .expectNext("2")
+ .expectNext("3")
+ .expectComplete();
+ }
+
@Test
public void mustBeAbleToUseDropWhile() throws Exception {
final TestKit probe = new TestKit(system);
@@ -881,7 +895,7 @@ public class FlowTest extends StreamTest {
Source.from(input)
.via(Flow.of(FlowSpec.Fruit.class).collectType(FlowSpec.Apple.class))
.runForeach((apple) -> probe.getRef().tell(apple, ActorRef.noSender()), system);
- probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
+ probe.expectMsgAnyClassOf(FlowSpec.Apple.class);
}
@Test
diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
index 041b874190..c1d71cb70f 100644
--- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
+++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSpec.scala
@@ -207,6 +207,14 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("pekko.actor.debug.r
c1.expectComplete()
}
+ "perform contramap operation" in {
+ val flow = Flow[Int].contramap(Integer.parseInt)
+ val sub = Source(List("1", "2", "3")).via(flow).runWith(TestSink())
+ sub.request(3)
+ sub.expectNextN(List(1, 2, 3))
+ sub.expectComplete()
+ }
+
"perform transformation operation" in {
val flow = Flow[Int].map(i => { testActor ! i.toString; i.toString })
diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
index 1a3ed4683e..54b75b94e2 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
@@ -532,6 +532,17 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def toMat[M, M2](sink: Graph[SinkShape[Out], M], combine: function.Function2[Mat, M, M2]): javadsl.Sink[In, M2] =
new Sink(delegate.toMat(sink)(combinerToScala(combine)))
+ /**
+ * Transform this Flow by applying a function to each *incoming* upstream element before
+ * it is passed to the [[Flow]]
+ *
+ * '''Backpressures when''' original [[Flow]] backpressures
+ *
+ * '''Cancels when''' original [[Flow]] cancels
+ */
+ def contramap[In2](f: function.Function[In2, In]): javadsl.Flow[In2, Out, Mat] =
+ new Flow(delegate.contramap(elem => f(elem)))
+
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{
diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
index e3767bfba0..c3e2fcfe56 100755
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
@@ -173,6 +173,17 @@ final class Flow[-In, +Out, +Mat](
(mat, Flow.fromSinkAndSource(Sink.fromSubscriber(sub), Source.fromPublisher(pub)))
}
+ /**
+ * Transform this Flow by applying a function to each *incoming* upstream element before
+ * it is passed to the [[Flow]]
+ *
+ * '''Backpressures when''' original [[Flow]] backpressures
+ *
+ * '''Cancels when''' original [[Flow]] cancels
+ */
+ def contramap[In2](f: In2 => In): Flow[In2, Out, Mat] =
+ Flow.fromFunction(f).viaMat(this)(Keep.right)
+
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{