Adds Flow.asFlowWithContext

This commit is contained in:
Luc Bourlier 2019-02-13 13:38:45 +01:00
parent 555fe026d0
commit 4ee1ae09a5
5 changed files with 84 additions and 1 deletions

View file

@ -0,0 +1,19 @@
# Flow.asFlowWithContext
Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.
@ref[Simple operators](../index.md#simple-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #asFlowWithContext }
@@@
## Description
Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.
The first function passed into asFlowWithContext must turn each incoming pair of element and context value into an element of this Flow.
The second function passed into asFlowWithContext must turn each outgoing element of this Flow into an outgoing context value.

View file

@ -124,6 +124,7 @@ depending on being backpressured by downstream or not.
| |Operator|Description|
|--|--|--|
|Source/Flow|<a name="alsoto"></a>@ref[alsoTo](Source-or-Flow/alsoTo.md)|Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.|
|Flow|<a name="asflowwithcontext"></a>@ref[asFlowWithContext](Flow/asFlowWithContext.md)|Turns a Flow into a FlowWithContext which can propagate a context per element along a stream.|
|Source/Flow|<a name="collect"></a>@ref[collect](Source-or-Flow/collect.md)|Apply a partial function to each incoming element, if the partial function is defined for a value the returned value is passed downstream.|
|Source/Flow|<a name="collecttype"></a>@ref[collectType](Source-or-Flow/collectType.md)|Transform this stream by testing the type of each of the elements on which the element is an instance of the provided type as they pass through this processing step.|
|Source/Flow|<a name="detach"></a>@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.|
@ -385,6 +386,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [monitor](Source-or-Flow/monitor.md)
* [initialDelay](Source-or-Flow/initialDelay.md)
* [log](Source-or-Flow/log.md)
* [asFlowWithContext](Flow/asFlowWithContext.md)
* [fromSinkAndSource](Flow/fromSinkAndSource.md)
* [fromSinkAndSourceCoupled](Flow/fromSinkAndSourceCoupled.md)
* [lazyInitAsync](Flow/lazyInitAsync.md)

View file

@ -0,0 +1,33 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.StreamSpec
class FlowWithContextSpec extends StreamSpec {
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
"A FlowWithContext" must {
"get created from Flow.asFlowWithContext" in {
val flow = Flow[Message].map { case m m.copy(data = m.data + "z") }
val flowWithContext = flow.asFlowWithContext((m: Message, o: Long) Message(m.data, o)) { m m.offset }
val msg = Message("a", 1L)
Source(Vector(msg))
.startContextPropagation(_.offset)
.via(flowWithContext)
.endContextPropagation
.runWith(TestSink.probe[(Message, Long)])
.request(1)
.expectNext(((Message("az", 1L), 1L)))
.expectComplete()
}
}
}

View file

@ -20,6 +20,7 @@ import akka.util.JavaDurationConverters._
import akka.actor.ActorRef
import akka.dispatch.ExecutionContexts
import akka.stream.impl.fusing.LazyFlow
import akka.annotation.ApiMayChange
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
@ -3283,6 +3284,19 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def toProcessor: RunnableGraph[Processor[In, Out]] = {
RunnableGraph.fromGraph(delegate.toProcessor)
}
/**
* Turns a Flow into a FlowWithContext which manages a context per element along a stream.
*
* @param collapseContext turn each incoming pair of element and context value into an element of this Flow
* @param extractContext turn each outgoing element of this Flow into an outgoing context value
*
* API MAY CHANGE
*/
@ApiMayChange
def asFlowWithContext[U, CtxU, CtxOut](collapseContext: function.Function2[U, CtxU, In], extractContext: function.Function[Out, CtxOut]): FlowWithContext[U, CtxU, Out, CtxOut, Mat] =
this.asScala.asFlowWithContext((x: U, c: CtxU) collapseContext.apply(x, c))(x extractContext.apply(x)).asJava
}
object RunnableGraph {

View file

@ -21,7 +21,7 @@ import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
import akka.actor.ActorRef
import akka.annotation.DoNotInherit
import akka.annotation.{ ApiMayChange, DoNotInherit }
import scala.annotation.implicitNotFound
import scala.reflect.ClassTag
@ -308,6 +308,21 @@ final class Flow[-In, +Out, +Mat](
}
}
/**
* Turns a Flow into a FlowWithContext which manages a context per element along a stream.
*
* @param collapseContext turn each incoming pair of element and context value into an element of this Flow
* @param extractContext turn each outgoing element of this Flow into an outgoing context value
*
* API MAY CHANGE
*/
@ApiMayChange
def asFlowWithContext[U, CtxU, CtxOut](collapseContext: (U, CtxU) In)(extractContext: Out CtxOut): FlowWithContext[U, CtxU, Out, CtxOut, Mat] =
new FlowWithContext(Flow[(U, CtxU)].map {
case (e, ctx)
collapseContext(e, ctx)
}.viaMat(this)(Keep.right).map(e (e, extractContext(e))))
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava[JIn <: In]: javadsl.Flow[JIn, Out @uncheckedVariance, Mat @uncheckedVariance] =
new javadsl.Flow(this)