Cleanup, improvements, simplicification, scaladoc and javadoc of SourceWithContext and FlowWithContext

This commit is contained in:
Johannes Rudolph 2019-01-15 16:53:02 +01:00 committed by Raymond Roestenburg
parent d76d259408
commit a2c811d75a
8 changed files with 560 additions and 226 deletions

View file

@ -4,6 +4,7 @@
package akka.stream
import akka.annotation.InternalApi
import akka.stream.impl.TraversalBuilder
import scala.annotation.unchecked.uncheckedVariance
@ -68,3 +69,16 @@ trait Graph[+S <: Shape, +M] {
*/
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)
}
/**
* INTERNAL API
*
* Allows creating additional API on top of an existing Graph by extending from this class and
* accessing the delegate
*/
@InternalApi
private[stream] abstract class GraphDelegate[+S <: Shape, +Mat](delegate: Graph[S, Mat]) extends Graph[S, Mat] {
final override def shape: S = delegate.shape
final override private[stream] def traversalBuilder: TraversalBuilder = delegate.traversalBuilder
final override def withAttributes(attr: Attributes): Graph[S, Mat] = delegate.withAttributes(attr)
}

View file

@ -7,11 +7,9 @@ package akka.stream.javadsl
import akka.annotation.ApiMayChange
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.stream.impl.LinearTraversalBuilder
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
import scala.collection.immutable
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
@ -24,79 +22,184 @@ object FlowWithContext {
def create[Ctx, In](): FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = {
new FlowWithContext(scaladsl.FlowWithContext[Ctx, In])
}
def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]) = {
def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = {
new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right)))
}
}
/**
* A flow that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[Flow]] is supported. As an escape hatch you can
* use [[FlowWithContext.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`.
*
* API MAY CHANGE
*/
@ApiMayChange
final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends Graph[FlowShape[(In, CtxIn), (Out, CtxOut)], Mat] {
override val traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder
override val shape: FlowShape[(In, CtxIn), (Out, CtxOut)] = delegate.shape
override def withAttributes(attr: Attributes): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = new FlowWithContext(delegate.withAttributes(attr))
def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = {
new FlowWithContext(delegate.mapContext(extractContext.apply))
}
final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends GraphDelegate(delegate) {
/**
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*
* @see [[akka.stream.javadsl.Flow.via]]
*/
def via[CtxOut2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[CtxIn, In, CtxOut2, Out2, Mat] = {
val under = endContextPropagation().via(viaFlow)
val under = asFlow().via(viaFlow)
FlowWithContext.fromPairs(under)
}
def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance]], Mat2]): Sink[Pair[In, CtxIn], Mat] @uncheckedVariance =
endContextPropagation().toMat(sink, Keep.left)
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): Sink[Pair[In, CtxIn], Mat3] @uncheckedVariance =
endContextPropagation().toMat(sink, combine)
def endContextPropagation(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance =
/**
* Creates a regular flow of pairs (data, context).
*/
def asFlow(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance =
scaladsl.Flow[Pair[In, CtxIn]]
.map(_.toScala)
.viaMat(delegate.endContextPropagation)(scaladsl.Keep.right)
.viaMat(delegate.asFlow)(scaladsl.Keep.right)
.map { case (o, c) Pair(o, c) }
.asJava
// remaining operations in alphabetic order
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.collect]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.javadsl.Flow.collect]]
*/
def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
viaScala(_.collect(pf))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.filter]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.javadsl.Flow.filter]]
*/
def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
viaScala(_.filter(p.test))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.filterNot]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.javadsl.Flow.filterNot]]
*/
def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
viaScala(_.filterNot(p.test))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.grouped]].
*
* Each output group will be associated with a `Seq` of corresponding context elements.
*
* @see [[akka.stream.javadsl.Flow.grouped]]
*/
def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.map]].
*
* @see [[akka.stream.javadsl.Flow.map]]
*/
def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
new FlowWithContext(delegate.map(f.apply))
viaScala(_.map(f.apply))
def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
new FlowWithContext(delegate.mapAsync[Out2](parallelism)(o f.apply(o).toScala))
viaScala(_.mapAsync[Out2](parallelism)(o f.apply(o).toScala))
def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
new FlowWithContext(delegate.collect(pf))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.mapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.mapConcat(dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.javadsl.Flow.mapConcat]]
*/
def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
viaScala(_.mapConcat(elem Util.immutableSeq(f.apply(elem))))
def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
new FlowWithContext(delegate.filter(p.test))
def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] =
new FlowWithContext(delegate.filterNot(p.test))
def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = {
val f = new function.Function[immutable.Seq[CtxOut], java.util.List[CtxOut]] {
def apply(ctxs: immutable.Seq[CtxOut]) = ctxs.asJava
}
new FlowWithContext(delegate.grouped(n).map(_.asJava)).mapContext(f)
/**
* Apply the given function to each context element (leaving the data elements unchanged).
*/
def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = {
viaScala(_.mapContext(extractContext.apply))
}
def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
new FlowWithContext(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem))))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.sliding]].
*
* Each output group will be associated with a `Seq` of corresponding context elements.
*
* @see [[akka.stream.javadsl.Flow.sliding]]
*/
def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.statefulMapConcat(() => dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.javadsl.Flow.statefulMapConcat]]
*/
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] =
new FlowWithContext(delegate.statefulMapConcat { ()
viaScala(_.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
})
def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = {
val f = new function.Function[immutable.Seq[CtxOut], java.util.List[CtxOut]] {
def apply(ctxs: immutable.Seq[CtxOut]) = ctxs.asJava
}
new FlowWithContext(delegate.sliding(n, step).map(_.asJava)).mapContext(f)
}
def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate
def asScala = delegate
private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] =
new FlowWithContext(f(delegate))
}

View file

@ -3473,5 +3473,5 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
*/
@ApiMayChange
def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Ctx, Out, Mat] =
new javadsl.SourceWithContext(scaladsl.SourceWithContext(this.asScala).mapContext(extractContext.apply))
new scaladsl.SourceWithContext(this.asScala.map(x (x, extractContext.apply(x)))).asJava
}

View file

@ -7,87 +7,180 @@ package akka.stream.javadsl
import akka.annotation.ApiMayChange
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.stream.impl.LinearTraversalBuilder
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
import scala.collection.immutable
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
/**
* A source that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[Source]] is supported. As an escape hatch you can
* use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.startContextPropagation()]]
*
* API MAY CHANGE
*/
@ApiMayChange
object SourceWithContext {
def from[Out, Mat](underlying: Source[Out, Mat]): SourceWithContext[Out, Out, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext(underlying.asScala))
}
def fromPairs[Out, Ctx, Mat](under: Source[Pair[Out, Ctx], Mat]): SourceWithContext[Ctx, Out, Mat] = {
new SourceWithContext(scaladsl.SourceWithContext.from(under.asScala.map(_.toScala)))
}
}
/**
* API MAY CHANGE
*/
@ApiMayChange
final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends Graph[SourceShape[(Out, Ctx)], Mat] {
override val traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder
override val shape: SourceShape[(Out, Ctx)] = delegate.shape
override def withAttributes(attr: Attributes): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(delegate.withAttributes(attr))
def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] = {
new SourceWithContext(delegate.mapContext(extractContext.apply))
}
def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] = {
val under = endContextPropagation().via(viaFlow)
SourceWithContext.fromPairs(under)
}
def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2]): RunnableGraph[Mat] =
endContextPropagation().toMat(sink, Keep.left)
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): RunnableGraph[Mat3] =
endContextPropagation().toMat(sink, combine)
final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends GraphDelegate(delegate) {
/**
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*
* @see [[akka.stream.javadsl.Flow.via]]
*/
def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] =
viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) Pair(o, c) }.via(viaFlow).map(_.toScala)))
/**
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).
*/
def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] =
delegate.endContextPropagation.map { case (o, c) Pair(o, c) }.asJava
// remaining operations in alphabetic order
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.collect]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.javadsl.Source.collect]]
*/
def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] =
viaScala(_.collect(pf))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.filter]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.javadsl.Source.filter]]
*/
def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] =
viaScala(_.filter(p.test))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.filterNot]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.javadsl.Source.filterNot]]
*/
def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] =
viaScala(_.filterNot(p.test))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.grouped]].
*
* Each output group will be associated with a `Seq` of corresponding context elements.
*
* @see [[akka.stream.javadsl.Source.grouped]]
*/
def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.map]].
*
* @see [[akka.stream.javadsl.Source.map]]
*/
def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] =
new SourceWithContext(delegate.map(f.apply))
viaScala(_.map(f.apply))
def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Ctx, Out2, Mat] =
new SourceWithContext(delegate.mapAsync[Out2](parallelism)(o f.apply(o).toScala))
def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] =
new SourceWithContext(delegate.collect(pf))
def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] =
new SourceWithContext(delegate.filter(p.test))
def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] =
new SourceWithContext(delegate.filterNot(p.test))
def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = {
val f = new function.Function[immutable.Seq[Ctx], java.util.List[Ctx]] {
def apply(ctxs: immutable.Seq[Ctx]) = ctxs.asJava
}
new SourceWithContext(delegate.grouped(n).map(_.asJava)).mapContext(f)
}
viaScala(_.mapAsync[Out2](parallelism)(o f.apply(o).toScala))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.mapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.mapConcat(dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.javadsl.Source.mapConcat]]
*/
def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Ctx, Out2, Mat] =
new SourceWithContext(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem))))
viaScala(_.mapConcat(elem Util.immutableSeq(f.apply(elem))))
/**
* Apply the given function to each context element (leaving the data elements unchanged).
*/
def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] =
viaScala(_.mapContext(extractContext.apply))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.sliding]].
*
* Each output group will be associated with a `Seq` of corresponding context elements.
*
* @see [[akka.stream.javadsl.Source.sliding]]
*/
def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] =
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.statefulMapConcat(() => dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.javadsl.Source.statefulMapConcat]]
*/
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Ctx, Out2, Mat] =
new SourceWithContext(delegate.statefulMapConcat { ()
viaScala(_.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
})
def asScala = delegate
def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate
private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] =
new SourceWithContext(f(delegate))
}

View file

@ -4,116 +4,52 @@
package akka.stream.scaladsl
import scala.collection.immutable
import scala.concurrent.Future
import scala.language.higherKinds
import scala.annotation.unchecked.uncheckedVariance
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.stream._
import akka.stream.impl.LinearTraversalBuilder
/**
* API MAY CHANGE
*/
@ApiMayChange
trait FlowWithContextOps[+Ctx, +Out, +Mat] {
type Repr[+C, +O] <: FlowWithContextOps[C, O, Mat] {
type Repr[+CC, +OO] = FlowWithContextOps.this.Repr[CC, OO]
type Prov[+CC, +OO] = FlowWithContextOps.this.Prov[CC, OO]
}
type Prov[+C, +O] <: FlowOpsMat[(O, C), Mat]
def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2]
def map[Out2](f: Out Out2): Repr[Ctx, Out2] =
via(flow.map { case (e, ctx) (f(e), ctx) })
def mapAsync[Out2](parallelism: Int)(f: Out Future[Out2]): Repr[Ctx, Out2] =
via(flow.mapAsync(parallelism) { case (e, ctx) f(e).map(o (o, ctx))(ExecutionContexts.sameThreadExecutionContext) })
def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] =
via(flow.collect {
case (e, ctx) if f.isDefinedAt(e) (f(e), ctx)
})
def filter(pred: Out Boolean): Repr[Ctx, Out] =
collect { case e if pred(e) e }
def filterNot(pred: Out Boolean): Repr[Ctx, Out] =
collect { case e if !pred(e) e }
def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] =
via(flow.grouped(n).map { elsWithContext
val (els, ctxs) = elsWithContext.unzip
(els, ctxs)
})
def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] =
via(flow.sliding(n, step).map { elsWithContext
val (els, ctxs) = elsWithContext.unzip
(els, ctxs)
})
def mapConcat[Out2](f: Out immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() f)
def statefulMapConcat[Out2](f: () Out immutable.Iterable[Out2]): Repr[Ctx, Out2] = {
val fCtx: () ((Out, Ctx)) immutable.Iterable[(Out2, Ctx)] = { () elWithContext
val (el, ctx) = elWithContext
f()(el).map(o (o, ctx))
}
via(flow.statefulMapConcat(fCtx))
}
def mapContext[Ctx2](f: Ctx Ctx2): Repr[Ctx2, Out] =
via(flow.map { case (e, ctx) (e, f(ctx)) })
def endContextPropagation: Prov[Ctx, Out]
private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)]
}
/**
* API MAY CHANGE
*/
@ApiMayChange
object FlowWithContext {
/**
* Creates an "empty" FlowWithContext that passes elements through with their context unchanged.
*/
def apply[Ctx, In]: FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = {
val under = Flow[(In, Ctx)]
new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under, under.traversalBuilder, under.shape)
new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under)
}
def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]) = new FlowWithContext(flow, flow.traversalBuilder, flow.shape)
/**
* Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements.
*/
def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]): FlowWithContext[CI, I, CO, O, M] = new FlowWithContext(flow)
}
/**
* A flow that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`.
*
* API MAY CHANGE
*/
@ApiMayChange
final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](
underlying: Flow[(In, CtxIn), (Out, CtxOut), Mat],
override val traversalBuilder: LinearTraversalBuilder,
override val shape: FlowShape[(In, CtxIn), (Out, CtxOut)]
) extends FlowWithContextOps[CtxOut, Out, Mat] with Graph[FlowShape[(In, CtxIn), (Out, CtxOut)], Mat] {
delegate: Flow[(In, CtxIn), (Out, CtxOut), Mat]
) extends GraphDelegate(delegate) with FlowWithContextOps[CtxOut, Out, Mat] {
override type ReprMat[+C, +O, +M] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, M @uncheckedVariance]
override def withAttributes(attr: Attributes): Repr[CtxOut, Out] = new FlowWithContext(underlying, traversalBuilder.setAttributes(attr), shape)
override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] =
FlowWithContext.from(delegate.via(viaFlow))
override type Repr[+C, +O] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, Mat @uncheckedVariance]
override type Prov[+C, +O] = Flow[(In @uncheckedVariance, CtxIn @uncheckedVariance), (O, C), Mat @uncheckedVariance]
override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) Mat3): FlowWithContext[CtxIn, In, Ctx2, Out2, Mat3] =
FlowWithContext.from(delegate.viaMat(flow)(combine))
override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = from(underlying.via(viaFlow))
def to[Mat2](sink: Graph[SinkShape[(Out, CtxOut)], Mat2]): Sink[(In, CtxIn), Mat] = underlying.toMat(sink)(Keep.left)
def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, CtxOut)], Mat2])(combine: (Mat, Mat2) Mat3): Sink[(In, CtxIn), Mat3] = underlying.toMat(sink)(combine)
override def endContextPropagation: Prov[CtxOut, Out] = underlying
private[this] def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]) = FlowWithContext.from(flow)
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
def asJava[JCtxIn <: CtxIn, JIn <: In, JCtxOut >: CtxOut, JOut >: Out, JMat >: Mat]: javadsl.FlowWithContext[JCtxIn, JIn, JCtxOut, JOut, JMat] =
new javadsl.FlowWithContext(this)
}

View file

@ -0,0 +1,201 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import scala.collection.immutable
import scala.concurrent.Future
import scala.language.higherKinds
import scala.annotation.unchecked.uncheckedVariance
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.stream._
/**
* Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context
* element with each data element.
*
* API MAY CHANGE
*/
@ApiMayChange
trait FlowWithContextOps[+Ctx, +Out, +Mat] {
type ReprMat[+C, +O, +M] <: FlowWithContextOps[C, O, M] {
type ReprMat[+CC, +OO, +MatMat] = FlowWithContextOps.this.ReprMat[CC, OO, MatMat]
}
type Repr[+C, +O] = ReprMat[C, O, Mat @uncheckedVariance]
/**
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*
* @see [[akka.stream.scaladsl.FlowOps.via]]
*/
def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2]
/**
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).
*
* This can be used as an escape hatch for operations that are not (yet) provided with automatic
* context propagation here.
*
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*
* @see [[akka.stream.scaladsl.FlowOps.viaMat]]
*/
def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) Mat3): ReprMat[Ctx2, Out2, Mat3]
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.map]].
*
* @see [[akka.stream.scaladsl.FlowOps.map]]
*/
def map[Out2](f: Out Out2): Repr[Ctx, Out2] =
via(flow.map { case (e, ctx) (f(e), ctx) })
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]].
*
* @see [[akka.stream.scaladsl.FlowOps.mapAsync]]
*/
def mapAsync[Out2](parallelism: Int)(f: Out Future[Out2]): Repr[Ctx, Out2] =
via(flow.mapAsync(parallelism) { case (e, ctx) f(e).map(o (o, ctx))(ExecutionContexts.sameThreadExecutionContext) })
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.collect]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.scaladsl.FlowOps.collect]]
*/
def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] =
via(flow.collect {
case (e, ctx) if f.isDefinedAt(e) (f(e), ctx)
})
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filter]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.scaladsl.FlowOps.filter]]
*/
def filter(pred: Out Boolean): Repr[Ctx, Out] =
collect { case e if pred(e) e }
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filterNot]].
*
* Note, that the context of elements that are filtered out is skipped as well.
*
* @see [[akka.stream.scaladsl.FlowOps.filterNot]]
*/
def filterNot(pred: Out Boolean): Repr[Ctx, Out] =
collect { case e if !pred(e) e }
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.grouped]].
*
* Each output group will be associated with a `Seq` of corresponding context elements.
*
* @see [[akka.stream.scaladsl.FlowOps.grouped]]
*/
def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] =
via(flow.grouped(n).map { elsWithContext
val (els, ctxs) = elsWithContext.unzip
(els, ctxs)
})
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.sliding]].
*
* Each output group will be associated with a `Seq` of corresponding context elements.
*
* @see [[akka.stream.scaladsl.FlowOps.sliding]]
*/
def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] =
via(flow.sliding(n, step).map { elsWithContext
val (els, ctxs) = elsWithContext.unzip
(els, ctxs)
})
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.mapConcat(dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.scaladsl.FlowOps.mapConcat]]
*/
def mapConcat[Out2](f: Out immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() f)
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.statefulMapConcat(() => dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]
*/
def statefulMapConcat[Out2](f: () Out immutable.Iterable[Out2]): Repr[Ctx, Out2] = {
val fCtx: () ((Out, Ctx)) immutable.Iterable[(Out2, Ctx)] = { () elWithContext
val (el, ctx) = elWithContext
f()(el).map(o (o, ctx))
}
via(flow.statefulMapConcat(fCtx))
}
/**
* Apply the given function to each context element (leaving the data elements unchanged).
*/
def mapContext[Ctx2](f: Ctx Ctx2): Repr[Ctx2, Out] =
via(flow.map { case (e, ctx) (e, f(ctx)) })
private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)]
}

View file

@ -215,11 +215,12 @@ final class Source[+Out, +Mat](
combineRest(2, rest.iterator)
})
/**
* API MAY CHANGE
*/
@ApiMayChange
def startContextPropagation[Ctx](f: Out Ctx): SourceWithContext[Ctx, Out, Mat] = SourceWithContext(this).mapContext(f)
def startContextPropagation[Ctx](f: Out Ctx): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(this.map(e (e, f(e))))
}
object Source {

View file

@ -8,48 +8,34 @@ import scala.annotation.unchecked.uncheckedVariance
import akka.annotation.ApiMayChange
import akka.stream._
import akka.stream.impl.LinearTraversalBuilder
/**
* A source that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can
* use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.startContextPropagation()]]
*
* API MAY CHANGE
*/
@ApiMayChange
object SourceWithContext {
def apply[Out, Mat](underlying: Source[Out, Mat]): SourceWithContext[Out, Out, Mat] = {
val under = underlying.map(e (e, e))
new SourceWithContext[Out, Out, Mat](under, under.traversalBuilder, under.shape)
}
def from[Out, Ctx, Mat](under: Source[(Out, Ctx), Mat]): SourceWithContext[Ctx, Out, Mat] = {
new SourceWithContext[Ctx, Out, Mat](under, under.traversalBuilder, under.shape)
}
}
final class SourceWithContext[+Ctx, +Out, +Mat] private[stream] (
delegate: Source[(Out, Ctx), Mat]
) extends GraphDelegate(delegate) with FlowWithContextOps[Ctx, Out, Mat] {
override type ReprMat[+C, +O, +M] = SourceWithContext[C, O, M @uncheckedVariance]
/**
* API MAY CHANGE
*/
@ApiMayChange
final class SourceWithContext[+Ctx, +Out, +Mat](
underlying: Source[(Out, Ctx), Mat],
override val traversalBuilder: LinearTraversalBuilder,
override val shape: SourceShape[(Out, Ctx)]
) extends FlowWithContextOps[Ctx, Out, Mat] with Graph[SourceShape[(Out, Ctx)], Mat] {
override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] =
new SourceWithContext(delegate.via(viaFlow))
override def withAttributes(attr: Attributes): Repr[Ctx, Out] = new SourceWithContext(underlying, traversalBuilder.setAttributes(attr), shape)
override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) Mat3): SourceWithContext[Ctx2, Out2, Mat3] =
new SourceWithContext(delegate.viaMat(flow)(combine))
override type Repr[+C, +O] = SourceWithContext[C, O, Mat @uncheckedVariance]
override type Prov[+C, +O] = Source[(O, C), Mat @uncheckedVariance]
override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = {
val under = underlying.via(viaFlow)
new SourceWithContext[Ctx2, Out2, Mat](under, under.traversalBuilder, under.shape)
}
def to[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2]): RunnableGraph[Mat] = underlying.toMat(sink)(Keep.left)
def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] =
underlying.toMat(sink)(combine)
override def endContextPropagation: Prov[Ctx, Out] = underlying
/**
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).
*/
def endContextPropagation: Source[(Out, Ctx), Mat] = delegate
def asJava[JCtx >: Ctx, JOut >: Out, JMat >: Mat]: javadsl.SourceWithContext[JCtx, JOut, JMat] =
new javadsl.SourceWithContext(this)