diff --git a/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md b/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md
new file mode 100644
index 0000000000..e2932c5c15
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Flow/asFlowWithContext.md
@@ -0,0 +1,19 @@
+# Flow.asFlowWithContext
+
+Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.
+
+@ref[Simple operators](../index.md#simple-operators)
+
+@@@div { .group-scala }
+
+## Signature
+
+@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #asFlowWithContext }
+
+@@@
+
+## Description
+
+Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.
+The first function passed into asFlowWithContext must turn each incoming pair of element and context value into an element of this Flow.
+The second function passed into asFlowWithContext must turn each outgoing element of this Flow into an outgoing context value.
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index 51e4c1a133..35714194f0 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -124,6 +124,7 @@ depending on being backpressured by downstream or not.
| |Operator|Description|
|--|--|--|
|Source/Flow|@ref[alsoTo](Source-or-Flow/alsoTo.md)|Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.|
+|Flow|@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.|
|Source/Flow|@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.|
@@ -385,6 +386,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [monitor](Source-or-Flow/monitor.md)
* [initialDelay](Source-or-Flow/initialDelay.md)
* [log](Source-or-Flow/log.md)
+* [asFlowWithContext](Flow/asFlowWithContext.md)
* [fromSinkAndSource](Flow/fromSinkAndSource.md)
* [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md)
* [lazyInitAsync](Flow/lazyInitAsync.md)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala
new file mode 100644
index 0000000000..178249352e
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2018-2019 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import akka.stream.testkit.scaladsl.TestSink
+import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
+import akka.stream.testkit.StreamSpec
+
+class FlowWithContextSpec extends StreamSpec {
+
+ val settings = ActorMaterializerSettings(system)
+ implicit val materializer = ActorMaterializer(settings)
+
+ "A FlowWithContext" must {
+
+ "get created from Flow.asFlowWithContext" in {
+ val flow = Flow[Message].map { case m ⇒ m.copy(data = m.data + "z") }
+ val flowWithContext = flow.asFlowWithContext((m: Message, o: Long) ⇒ Message(m.data, o)) { m ⇒ m.offset }
+
+ val msg = Message("a", 1L)
+ Source(Vector(msg))
+ .startContextPropagation(_.offset)
+ .via(flowWithContext)
+ .endContextPropagation
+ .runWith(TestSink.probe[(Message, Long)])
+ .request(1)
+ .expectNext(((Message("az", 1L), 1L)))
+ .expectComplete()
+ }
+ }
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 3e4e7d6091..37253d2c37 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -20,6 +20,7 @@ import akka.util.JavaDurationConverters._
import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts
import akka.stream.impl.fusing.LazyFlow
+import akka.annotation.ApiMayChange
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
@@ -3283,6 +3284,19 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def toProcessor: RunnableGraph[Processor[In, Out]] = {
RunnableGraph.fromGraph(delegate.toProcessor)
}
+
+ /**
+ * Turns a Flow into a FlowWithContext which manages a context per element along a stream.
+ *
+ * @param collapseContext turn each incoming pair of element and context value into an element of this Flow
+ * @param extractContext turn each outgoing element of this Flow into an outgoing context value
+ *
+ * API MAY CHANGE
+ */
+ @ApiMayChange
+ def asFlowWithContext[U, CtxU, CtxOut](collapseContext: function.Function2[U, CtxU, In], extractContext: function.Function[Out, CtxOut]): FlowWithContext[U, CtxU, Out, CtxOut, Mat] =
+ this.asScala.asFlowWithContext((x: U, c: CtxU) ⇒ collapseContext.apply(x, c))(x ⇒ extractContext.apply(x)).asJava
+
}
object RunnableGraph {
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index a1d76ccfc3..a16cae235f 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -21,7 +21,7 @@ import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
import akka.actor.ActorRef
-import akka.annotation.DoNotInherit
+import akka.annotation.{ ApiMayChange, DoNotInherit }
import scala.annotation.implicitNotFound
import scala.reflect.ClassTag
@@ -308,6 +308,21 @@ final class Flow[-In, +Out, +Mat](
}
}
+ /**
+ * Turns a Flow into a FlowWithContext which manages a context per element along a stream.
+ *
+ * @param collapseContext turn each incoming pair of element and context value into an element of this Flow
+ * @param extractContext turn each outgoing element of this Flow into an outgoing context value
+ *
+ * API MAY CHANGE
+ */
+ @ApiMayChange
+ def asFlowWithContext[U, CtxU, CtxOut](collapseContext: (U, CtxU) ⇒ In)(extractContext: Out ⇒ CtxOut): FlowWithContext[U, CtxU, Out, CtxOut, Mat] =
+ new FlowWithContext(Flow[(U, CtxU)].map {
+ case (e, ctx) ⇒
+ collapseContext(e, ctx)
+ }.viaMat(this)(Keep.right).map(e ⇒ (e, extractContext(e))))
+
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava[JIn <: In]: javadsl.Flow[JIn, Out @uncheckedVariance, Mat @uncheckedVariance] =
new javadsl.Flow(this)