From a2c811d75a3e7c8d8ca76b9b99c55cecb17ae787 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 15 Jan 2019 16:53:02 +0100 Subject: [PATCH] Cleanup, improvements, simplicification, scaladoc and javadoc of SourceWithContext and FlowWithContext --- .../src/main/scala/akka/stream/Graph.scala | 14 ++ .../akka/stream/javadsl/FlowWithContext.scala | 195 +++++++++++++---- .../scala/akka/stream/javadsl/Source.scala | 2 +- .../stream/javadsl/SourceWithContext.scala | 207 +++++++++++++----- .../stream/scaladsl/FlowWithContext.scala | 110 ++-------- .../stream/scaladsl/FlowWithContextOps.scala | 201 +++++++++++++++++ .../scala/akka/stream/scaladsl/Source.scala | 3 +- .../stream/scaladsl/SourceWithContext.scala | 54 ++--- 8 files changed, 560 insertions(+), 226 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 4899c080f2..61874af190 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -4,6 +4,7 @@ package akka.stream +import akka.annotation.InternalApi import akka.stream.impl.TraversalBuilder import scala.annotation.unchecked.uncheckedVariance @@ -68,3 +69,16 @@ trait Graph[+S <: Shape, +M] { */ def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr) } + +/** + * INTERNAL API + * + * Allows creating additional API on top of an existing Graph by extending from this class and + * accessing the delegate + */ +@InternalApi +private[stream] abstract class GraphDelegate[+S <: Shape, +Mat](delegate: Graph[S, Mat]) extends Graph[S, Mat] { + final override def shape: S = delegate.shape + final override private[stream] def traversalBuilder: TraversalBuilder = delegate.traversalBuilder + final override def withAttributes(attr: Attributes): Graph[S, Mat] = delegate.withAttributes(attr) +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index c34eb989fb..d3e13ea619 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -7,11 +7,9 @@ package akka.stream.javadsl import akka.annotation.ApiMayChange import akka.japi.{ Pair, Util, function } import akka.stream._ -import akka.stream.impl.LinearTraversalBuilder import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ -import scala.collection.immutable import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ @@ -24,79 +22,184 @@ object FlowWithContext { def create[Ctx, In](): FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { new FlowWithContext(scaladsl.FlowWithContext[Ctx, In]) } - def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]) = { + def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = { new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) } } /** + * A flow that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[Flow]] is supported. As an escape hatch you can + * use [[FlowWithContext.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`. + * * API MAY CHANGE */ @ApiMayChange -final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends Graph[FlowShape[(In, CtxIn), (Out, CtxOut)], Mat] { - override val traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder - override val shape: FlowShape[(In, CtxIn), (Out, CtxOut)] = delegate.shape - override def withAttributes(attr: Attributes): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = new FlowWithContext(delegate.withAttributes(attr)) - - def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = { - new FlowWithContext(delegate.mapContext(extractContext.apply)) - } - +final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends GraphDelegate(delegate) { + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * @see [[akka.stream.javadsl.Flow.via]] + */ def via[CtxOut2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[CtxIn, In, CtxOut2, Out2, Mat] = { - val under = endContextPropagation().via(viaFlow) + val under = asFlow().via(viaFlow) FlowWithContext.fromPairs(under) } - def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance]], Mat2]): Sink[Pair[In, CtxIn], Mat] @uncheckedVariance = - endContextPropagation().toMat(sink, Keep.left) - - def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): Sink[Pair[In, CtxIn], Mat3] @uncheckedVariance = - endContextPropagation().toMat(sink, combine) - - def endContextPropagation(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance = + /** + * Creates a regular flow of pairs (data, context). + */ + def asFlow(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance = scaladsl.Flow[Pair[In, CtxIn]] .map(_.toScala) - .viaMat(delegate.endContextPropagation)(scaladsl.Keep.right) + .viaMat(delegate.asFlow)(scaladsl.Keep.right) .map { case (o, c) ⇒ Pair(o, c) } .asJava + // remaining operations in alphabetic order + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.collect]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Flow.collect]] + */ + def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.collect(pf)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.filter]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Flow.filter]] + */ + def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + viaScala(_.filter(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.filterNot]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Flow.filterNot]] + */ + def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + viaScala(_.filterNot(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.grouped]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Flow.grouped]] + */ + def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.map]]. + * + * @see [[akka.stream.javadsl.Flow.map]] + */ def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = - new FlowWithContext(delegate.map(f.apply)) + viaScala(_.map(f.apply)) def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = - new FlowWithContext(delegate.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) + viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) - def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = - new FlowWithContext(delegate.collect(pf)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.mapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.mapConcat(dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Flow.mapConcat]] + */ + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) - def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = - new FlowWithContext(delegate.filter(p.test)) - - def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = - new FlowWithContext(delegate.filterNot(p.test)) - - def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = { - val f = new function.Function[immutable.Seq[CtxOut], java.util.List[CtxOut]] { - def apply(ctxs: immutable.Seq[CtxOut]) = ctxs.asJava - } - new FlowWithContext(delegate.grouped(n).map(_.asJava)).mapContext(f) + /** + * Apply the given function to each context element (leaving the data elements unchanged). + */ + def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = { + viaScala(_.mapContext(extractContext.apply)) } - def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = - new FlowWithContext(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.sliding]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Flow.sliding]] + */ + def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.statefulMapConcat(() => dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Flow.statefulMapConcat]] + */ def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = - new FlowWithContext(delegate.statefulMapConcat { () ⇒ + viaScala(_.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) }) - def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = { - val f = new function.Function[immutable.Seq[CtxOut], java.util.List[CtxOut]] { - def apply(ctxs: immutable.Seq[CtxOut]) = ctxs.asJava - } - new FlowWithContext(delegate.sliding(n, step).map(_.asJava)).mapContext(f) - } + def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate - def asScala = delegate + private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] ⇒ scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] = + new FlowWithContext(f(delegate)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 3ea9763479..98a0135c3f 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3473,5 +3473,5 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ @ApiMayChange def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Ctx, Out, Mat] = - new javadsl.SourceWithContext(scaladsl.SourceWithContext(this.asScala).mapContext(extractContext.apply)) + new scaladsl.SourceWithContext(this.asScala.map(x ⇒ (x, extractContext.apply(x)))).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 3bbdd37868..b9387c945c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -7,87 +7,180 @@ package akka.stream.javadsl import akka.annotation.ApiMayChange import akka.japi.{ Pair, Util, function } import akka.stream._ -import akka.stream.impl.LinearTraversalBuilder import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ -import scala.collection.immutable import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ /** + * A source that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[Source]] is supported. As an escape hatch you can + * use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * Can be created by calling [[Source.startContextPropagation()]] + * * API MAY CHANGE */ @ApiMayChange -object SourceWithContext { - def from[Out, Mat](underlying: Source[Out, Mat]): SourceWithContext[Out, Out, Mat] = { - new SourceWithContext(scaladsl.SourceWithContext(underlying.asScala)) - } - - def fromPairs[Out, Ctx, Mat](under: Source[Pair[Out, Ctx], Mat]): SourceWithContext[Ctx, Out, Mat] = { - new SourceWithContext(scaladsl.SourceWithContext.from(under.asScala.map(_.toScala))) - } -} - -/** - * API MAY CHANGE - */ -@ApiMayChange -final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends Graph[SourceShape[(Out, Ctx)], Mat] { - override val traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder - override val shape: SourceShape[(Out, Ctx)] = delegate.shape - override def withAttributes(attr: Attributes): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(delegate.withAttributes(attr)) - - def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] = { - new SourceWithContext(delegate.mapContext(extractContext.apply)) - } - - def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] = { - val under = endContextPropagation().via(viaFlow) - SourceWithContext.fromPairs(under) - } - - def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2]): RunnableGraph[Mat] = - endContextPropagation().toMat(sink, Keep.left) - - def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): RunnableGraph[Mat3] = - endContextPropagation().toMat(sink, combine) +final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends GraphDelegate(delegate) { + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * @see [[akka.stream.javadsl.Flow.via]] + */ + def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] = + viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) ⇒ Pair(o, c) }.via(viaFlow).map(_.toScala))) + /** + * Stops automatic context propagation from here and converts this to a regular + * stream of a pair of (data, context). + */ def endContextPropagation(): Source[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Mat @uncheckedVariance] = delegate.endContextPropagation.map { case (o, c) ⇒ Pair(o, c) }.asJava + // remaining operations in alphabetic order + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.collect]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Source.collect]] + */ + def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + viaScala(_.collect(pf)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.filter]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Source.filter]] + */ + def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + viaScala(_.filter(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.filterNot]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.javadsl.Source.filterNot]] + */ + def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + viaScala(_.filterNot(p.test)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.grouped]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Source.grouped]] + */ + def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.map]]. + * + * @see [[akka.stream.javadsl.Source.map]] + */ def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = - new SourceWithContext(delegate.map(f.apply)) + viaScala(_.map(f.apply)) def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Ctx, Out2, Mat] = - new SourceWithContext(delegate.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) - - def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = - new SourceWithContext(delegate.collect(pf)) - - def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = - new SourceWithContext(delegate.filter(p.test)) - - def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = - new SourceWithContext(delegate.filterNot(p.test)) - - def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = { - val f = new function.Function[immutable.Seq[Ctx], java.util.List[Ctx]] { - def apply(ctxs: immutable.Seq[Ctx]) = ctxs.asJava - } - - new SourceWithContext(delegate.grouped(n).map(_.asJava)).mapContext(f) - } + viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.mapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.mapConcat(dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Source.mapConcat]] + */ def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Ctx, Out2, Mat] = - new SourceWithContext(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + /** + * Apply the given function to each context element (leaving the data elements unchanged). + */ + def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] = + viaScala(_.mapContext(extractContext.apply)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.sliding]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.javadsl.Source.sliding]] + */ + def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) + + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.statefulMapConcat(() => dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.javadsl.Source.statefulMapConcat]] + */ def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Ctx, Out2, Mat] = - new SourceWithContext(delegate.statefulMapConcat { () ⇒ + viaScala(_.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) }) - def asScala = delegate + def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate + + private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] ⇒ scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] = + new SourceWithContext(f(delegate)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 7be4254690..52a0108015 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -4,116 +4,52 @@ package akka.stream.scaladsl -import scala.collection.immutable -import scala.concurrent.Future -import scala.language.higherKinds import scala.annotation.unchecked.uncheckedVariance - -import akka.NotUsed import akka.annotation.ApiMayChange -import akka.dispatch.ExecutionContexts import akka.stream._ -import akka.stream.impl.LinearTraversalBuilder - -/** - * API MAY CHANGE - */ -@ApiMayChange -trait FlowWithContextOps[+Ctx, +Out, +Mat] { - type Repr[+C, +O] <: FlowWithContextOps[C, O, Mat] { - type Repr[+CC, +OO] = FlowWithContextOps.this.Repr[CC, OO] - type Prov[+CC, +OO] = FlowWithContextOps.this.Prov[CC, OO] - } - - type Prov[+C, +O] <: FlowOpsMat[(O, C), Mat] - - def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] - - def map[Out2](f: Out ⇒ Out2): Repr[Ctx, Out2] = - via(flow.map { case (e, ctx) ⇒ (f(e), ctx) }) - - def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Ctx, Out2] = - via(flow.mapAsync(parallelism) { case (e, ctx) ⇒ f(e).map(o ⇒ (o, ctx))(ExecutionContexts.sameThreadExecutionContext) }) - - def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] = - via(flow.collect { - case (e, ctx) if f.isDefinedAt(e) ⇒ (f(e), ctx) - }) - - def filter(pred: Out ⇒ Boolean): Repr[Ctx, Out] = - collect { case e if pred(e) ⇒ e } - - def filterNot(pred: Out ⇒ Boolean): Repr[Ctx, Out] = - collect { case e if !pred(e) ⇒ e } - - def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = - via(flow.grouped(n).map { elsWithContext ⇒ - val (els, ctxs) = elsWithContext.unzip - (els, ctxs) - }) - - def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = - via(flow.sliding(n, step).map { elsWithContext ⇒ - val (els, ctxs) = elsWithContext.unzip - (els, ctxs) - }) - - def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() ⇒ f) - - def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = { - val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = { () ⇒ elWithContext ⇒ - val (el, ctx) = elWithContext - f()(el).map(o ⇒ (o, ctx)) - } - via(flow.statefulMapConcat(fCtx)) - } - - def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] = - via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) }) - - def endContextPropagation: Prov[Ctx, Out] - - private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] -} /** * API MAY CHANGE */ @ApiMayChange object FlowWithContext { + /** + * Creates an "empty" FlowWithContext that passes elements through with their context unchanged. + */ def apply[Ctx, In]: FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { val under = Flow[(In, Ctx)] - new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under, under.traversalBuilder, under.shape) + new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under) } - def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]) = new FlowWithContext(flow, flow.traversalBuilder, flow.shape) + /** + * Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements. + */ + def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]): FlowWithContext[CI, I, CO, O, M] = new FlowWithContext(flow) } /** + * A flow that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can + * use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * An "empty" flow can be created by calling `FlowWithContext[Ctx, T]`. + * * API MAY CHANGE */ @ApiMayChange final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat]( - underlying: Flow[(In, CtxIn), (Out, CtxOut), Mat], - override val traversalBuilder: LinearTraversalBuilder, - override val shape: FlowShape[(In, CtxIn), (Out, CtxOut)] -) extends FlowWithContextOps[CtxOut, Out, Mat] with Graph[FlowShape[(In, CtxIn), (Out, CtxOut)], Mat] { + delegate: Flow[(In, CtxIn), (Out, CtxOut), Mat] +) extends GraphDelegate(delegate) with FlowWithContextOps[CtxOut, Out, Mat] { + override type ReprMat[+C, +O, +M] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, M @uncheckedVariance] - override def withAttributes(attr: Attributes): Repr[CtxOut, Out] = new FlowWithContext(underlying, traversalBuilder.setAttributes(attr), shape) + override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = + FlowWithContext.from(delegate.via(viaFlow)) - override type Repr[+C, +O] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, Mat @uncheckedVariance] - override type Prov[+C, +O] = Flow[(In @uncheckedVariance, CtxIn @uncheckedVariance), (O, C), Mat @uncheckedVariance] + override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[CtxIn, In, Ctx2, Out2, Mat3] = + FlowWithContext.from(delegate.viaMat(flow)(combine)) - override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = from(underlying.via(viaFlow)) - - def to[Mat2](sink: Graph[SinkShape[(Out, CtxOut)], Mat2]): Sink[(In, CtxIn), Mat] = underlying.toMat(sink)(Keep.left) - - def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, CtxOut)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[(In, CtxIn), Mat3] = underlying.toMat(sink)(combine) - - override def endContextPropagation: Prov[CtxOut, Out] = underlying - - private[this] def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]) = FlowWithContext.from(flow) + def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate def asJava[JCtxIn <: CtxIn, JIn <: In, JCtxOut >: CtxOut, JOut >: Out, JMat >: Mat]: javadsl.FlowWithContext[JCtxIn, JIn, JCtxOut, JOut, JMat] = new javadsl.FlowWithContext(this) } - diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala new file mode 100644 index 0000000000..c36e07cb2e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -0,0 +1,201 @@ +/* + * Copyright (C) 2014-2019 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import scala.collection.immutable +import scala.concurrent.Future +import scala.language.higherKinds +import scala.annotation.unchecked.uncheckedVariance +import akka.NotUsed +import akka.annotation.ApiMayChange +import akka.dispatch.ExecutionContexts +import akka.stream._ + +/** + * Shared stream operations for [[FlowWithContext]] and [[SourceWithContext]] that automatically propagate a context + * element with each data element. + * + * API MAY CHANGE + */ +@ApiMayChange +trait FlowWithContextOps[+Ctx, +Out, +Mat] { + type ReprMat[+C, +O, +M] <: FlowWithContextOps[C, O, M] { + type ReprMat[+CC, +OO, +MatMat] = FlowWithContextOps.this.ReprMat[CC, OO, MatMat] + } + type Repr[+C, +O] = ReprMat[C, O, Mat @uncheckedVariance] + + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * @see [[akka.stream.scaladsl.FlowOps.via]] + */ + def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] + + /** + * Transform this flow by the regular flow. The given flow must support manual context propagation by + * taking and producing tuples of (data, context). + * + * This can be used as an escape hatch for operations that are not (yet) provided with automatic + * context propagation here. + * + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. + * + * @see [[akka.stream.scaladsl.FlowOps.viaMat]] + */ + def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[Ctx2, Out2, Mat3] + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.map]]. + * + * @see [[akka.stream.scaladsl.FlowOps.map]] + */ + def map[Out2](f: Out ⇒ Out2): Repr[Ctx, Out2] = + via(flow.map { case (e, ctx) ⇒ (f(e), ctx) }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapAsync]]. + * + * @see [[akka.stream.scaladsl.FlowOps.mapAsync]] + */ + def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Ctx, Out2] = + via(flow.mapAsync(parallelism) { case (e, ctx) ⇒ f(e).map(o ⇒ (o, ctx))(ExecutionContexts.sameThreadExecutionContext) }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.collect]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.scaladsl.FlowOps.collect]] + */ + def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] = + via(flow.collect { + case (e, ctx) if f.isDefinedAt(e) ⇒ (f(e), ctx) + }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filter]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.scaladsl.FlowOps.filter]] + */ + def filter(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + collect { case e if pred(e) ⇒ e } + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.filterNot]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[akka.stream.scaladsl.FlowOps.filterNot]] + */ + def filterNot(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + collect { case e if !pred(e) ⇒ e } + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.grouped]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.scaladsl.FlowOps.grouped]] + */ + def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + via(flow.grouped(n).map { elsWithContext ⇒ + val (els, ctxs) = elsWithContext.unzip + (els, ctxs) + }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.sliding]]. + * + * Each output group will be associated with a `Seq` of corresponding context elements. + * + * @see [[akka.stream.scaladsl.FlowOps.sliding]] + */ + def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + via(flow.sliding(n, step).map { elsWithContext ⇒ + val (els, ctxs) = elsWithContext.unzip + (els, ctxs) + }) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.mapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.mapConcat(dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.scaladsl.FlowOps.mapConcat]] + */ + def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() ⇒ f) + + /** + * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]. + * + * The context of the input element will be associated with each of the output elements calculated from + * this input element. + * + * Example: + * + * ``` + * def dup(element: String) = Seq(element, element) + * + * Input: + * + * ("a", 1) + * ("b", 2) + * + * inputElements.statefulMapConcat(() => dup) + * + * Output: + * + * ("a", 1) + * ("a", 1) + * ("b", 2) + * ("b", 2) + * ``` + * + * @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]] + */ + def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = { + val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = { () ⇒ elWithContext ⇒ + val (el, ctx) = elWithContext + f()(el).map(o ⇒ (o, ctx)) + } + via(flow.statefulMapConcat(fCtx)) + } + + /** + * Apply the given function to each context element (leaving the data elements unchanged). + */ + def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] = + via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) }) + + private[akka] def flow[T, C]: Flow[(T, C), (T, C), NotUsed] = Flow[(T, C)] +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index cda704a48d..56110ac117 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -215,11 +215,12 @@ final class Source[+Out, +Mat]( combineRest(2, rest.iterator) }) + /** * API MAY CHANGE */ @ApiMayChange - def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Ctx, Out, Mat] = SourceWithContext(this).mapContext(f) + def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e)))) } object Source { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index 8489bf7409..a73b168734 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -8,48 +8,34 @@ import scala.annotation.unchecked.uncheckedVariance import akka.annotation.ApiMayChange import akka.stream._ -import akka.stream.impl.LinearTraversalBuilder /** + * A source that provides operations which automatically propagate the context of an element. + * Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can + * use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported + * operations. + * + * Can be created by calling [[Source.startContextPropagation()]] + * * API MAY CHANGE */ @ApiMayChange -object SourceWithContext { - def apply[Out, Mat](underlying: Source[Out, Mat]): SourceWithContext[Out, Out, Mat] = { - val under = underlying.map(e ⇒ (e, e)) - new SourceWithContext[Out, Out, Mat](under, under.traversalBuilder, under.shape) - } - def from[Out, Ctx, Mat](under: Source[(Out, Ctx), Mat]): SourceWithContext[Ctx, Out, Mat] = { - new SourceWithContext[Ctx, Out, Mat](under, under.traversalBuilder, under.shape) - } -} +final class SourceWithContext[+Ctx, +Out, +Mat] private[stream] ( + delegate: Source[(Out, Ctx), Mat] +) extends GraphDelegate(delegate) with FlowWithContextOps[Ctx, Out, Mat] { + override type ReprMat[+C, +O, +M] = SourceWithContext[C, O, M @uncheckedVariance] -/** - * API MAY CHANGE - */ -@ApiMayChange -final class SourceWithContext[+Ctx, +Out, +Mat]( - underlying: Source[(Out, Ctx), Mat], - override val traversalBuilder: LinearTraversalBuilder, - override val shape: SourceShape[(Out, Ctx)] -) extends FlowWithContextOps[Ctx, Out, Mat] with Graph[SourceShape[(Out, Ctx)], Mat] { + override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = + new SourceWithContext(delegate.via(viaFlow)) - override def withAttributes(attr: Attributes): Repr[Ctx, Out] = new SourceWithContext(underlying, traversalBuilder.setAttributes(attr), shape) + override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): SourceWithContext[Ctx2, Out2, Mat3] = + new SourceWithContext(delegate.viaMat(flow)(combine)) - override type Repr[+C, +O] = SourceWithContext[C, O, Mat @uncheckedVariance] - override type Prov[+C, +O] = Source[(O, C), Mat @uncheckedVariance] - - override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = { - val under = underlying.via(viaFlow) - new SourceWithContext[Ctx2, Out2, Mat](under, under.traversalBuilder, under.shape) - } - - def to[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2]): RunnableGraph[Mat] = underlying.toMat(sink)(Keep.left) - - def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = - underlying.toMat(sink)(combine) - - override def endContextPropagation: Prov[Ctx, Out] = underlying + /** + * Stops automatic context propagation from here and converts this to a regular + * stream of a pair of (data, context). + */ + def endContextPropagation: Source[(Out, Ctx), Mat] = delegate def asJava[JCtx >: Ctx, JOut >: Out, JMat >: Mat]: javadsl.SourceWithContext[JCtx, JOut, JMat] = new javadsl.SourceWithContext(this)