diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala index e6d68c299f..67280e0e3d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextLogSpec.scala @@ -32,7 +32,7 @@ class FlowWithContextLogSpec extends StreamSpec(""" "on FlowWithContext" must { "log each element" in { - val logging = FlowWithContext[Long, Message].log("my-log") + val logging = FlowWithContext[Message, Long].log("my-log") Source(List(Message("a", 1L), Message("b", 2L))) .startContextPropagation(m ⇒ m.offset) .via(logging) @@ -45,7 +45,7 @@ class FlowWithContextLogSpec extends StreamSpec(""" } "allow extracting value to be logged" in { - val logging = FlowWithContext[Long, Message].log("my-log2", m ⇒ m.data) + val logging = FlowWithContext[Message, Long].log("my-log2", m ⇒ m.data) Source(List(Message("a", 1L))) .startContextPropagation(m ⇒ m.offset) .via(logging) @@ -62,7 +62,7 @@ class FlowWithContextLogSpec extends StreamSpec(""" onFinish = Logging.DebugLevel, onFailure = Logging.DebugLevel) - val logging = FlowWithContext[Long, Message].log("my-log3") + val logging = FlowWithContext[Message, Long].log("my-log3") Source(List(Message("a", 1L), Message("b", 2L))) .startContextPropagation(m ⇒ m.offset) .via(logging) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index 7f723a91cd..5efde24383 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -58,7 +58,7 @@ class SourceWithContextSpec extends StreamSpec { "pass through contexts via a FlowWithContext" in { - def flowWithContext[T] = FlowWithContext[Long, T] + def flowWithContext[T] = FlowWithContext[T, Long] Source(Vector(Message("a", 1L))) .startContextPropagation(_.offset) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala index 155f110558..dd24b34291 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/WithContextUsageSpec.scala @@ -163,7 +163,7 @@ class WithContextUsageSpec extends StreamSpec { def genValue(ix: Int) = s"v$ix" } - def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Offset, Record, NotUsed] = + def createSourceWithContext(committableMessages: Vector[Consumer.CommittableMessage[Record]]): SourceWithContext[Record, Offset, NotUsed] = Consumer .committableSource(committableMessages) .startContextPropagation(m ⇒ Offset(m.committableOffset.offset)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 64a456e9ab..7c2f63647a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -21,10 +21,12 @@ import scala.compat.java8.FutureConverters._ */ @ApiMayChange object FlowWithContext { - def create[Ctx, In](): FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { - new FlowWithContext(scaladsl.FlowWithContext[Ctx, In]) + + def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, akka.NotUsed] = { + new FlowWithContext(scaladsl.FlowWithContext[In, Ctx]) } - def fromPairs[CtxIn, In, CtxOut, Out, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = { + + def fromPairs[In, CtxIn, Out, CtxOut, Mat](under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = { new FlowWithContext(scaladsl.FlowWithContext.from(scaladsl.Flow[(In, CtxIn)].map { case (i, c) ⇒ Pair(i, c) }.viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) } } @@ -40,7 +42,7 @@ object FlowWithContext { * API MAY CHANGE */ @ApiMayChange -final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat]) extends GraphDelegate(delegate) { +final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) extends GraphDelegate(delegate) { /** * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). @@ -50,7 +52,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.via]] */ - def via[CtxOut2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[CtxIn, In, CtxOut2, Out2, Mat] = { + def via[Out2, CtxOut2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, CtxOut @uncheckedVariance], Pair[Out2, CtxOut2]], Mat2]): FlowWithContext[In, CtxIn, Out2, CtxOut2, Mat] = { val under = asFlow().via(viaFlow) FlowWithContext.fromPairs(under) } @@ -74,7 +76,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.collect]] */ - def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.collect(pf)) /** @@ -84,7 +86,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.filter]] */ - def filter(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + def filter(p: function.Predicate[Out]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = viaScala(_.filter(p.test)) /** @@ -94,7 +96,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.filterNot]] */ - def filterNot(p: function.Predicate[Out]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + def filterNot(p: function.Predicate[Out]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = viaScala(_.filterNot(p.test)) /** @@ -104,7 +106,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.grouped]] */ - def grouped(n: Int): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + def grouped(n: Int): FlowWithContext[In, CtxIn, java.util.List[Out @uncheckedVariance], java.util.List[CtxOut @uncheckedVariance], Mat] = viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava)) /** @@ -112,10 +114,10 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.map]] */ - def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.map(f.apply)) - def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) /** @@ -146,13 +148,13 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.mapConcat]] */ - def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) /** * Apply the given function to each context element (leaving the data elements unchanged). */ - def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[CtxIn, In, CtxOut2, Out, Mat] = { + def mapContext[CtxOut2](extractContext: function.Function[CtxOut, CtxOut2]): FlowWithContext[In, CtxIn, Out, CtxOut2, Mat] = { viaScala(_.mapContext(extractContext.apply)) } @@ -163,7 +165,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.sliding]] */ - def sliding(n: Int, step: Int = 1): FlowWithContext[CtxIn, In, java.util.List[CtxOut @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + def sliding(n: Int, step: Int = 1): FlowWithContext[In, CtxIn, java.util.List[Out @uncheckedVariance], java.util.List[CtxOut @uncheckedVariance], Mat] = viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) /** @@ -194,7 +196,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.statefulMapConcat]] */ - def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[CtxIn, In, CtxOut, Out2, Mat] = + def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) @@ -205,7 +207,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = viaScala(_.log(name, e ⇒ extract.apply(e))(log)) /** @@ -213,7 +215,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String, extract: function.Function[Out, Any]): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + def log(name: String, extract: function.Function[Out, Any]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.log(name, extract, null) /** @@ -221,7 +223,7 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String, log: LoggingAdapter): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + def log(name: String, log: LoggingAdapter): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], log) /** @@ -229,11 +231,11 @@ final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat](delegate: scaladsl * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String): FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = + def log(name: String): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) - def asScala: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] = delegate + def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = delegate - private[this] def viaScala[CtxIn2, In2, CtxOut2, Out2, Mat2](f: scaladsl.FlowWithContext[CtxIn, In, CtxOut, Out, Mat] ⇒ scaladsl.FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2]): FlowWithContext[CtxIn2, In2, CtxOut2, Out2, Mat2] = + private[this] def viaScala[In2, CtxIn2, Out2, CtxOut2, Mat2](f: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] ⇒ scaladsl.FlowWithContext[In2, CtxIn2, Out2, CtxOut2, Mat2]): FlowWithContext[In2, CtxIn2, Out2, CtxOut2, Mat2] = new FlowWithContext(f(delegate)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index ac23dc77d9..02740ebfc9 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -3472,6 +3472,6 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * API MAY CHANGE */ @ApiMayChange - def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Ctx, Out, Mat] = + def startContextPropagation[Ctx](extractContext: function.Function[Out, Ctx]): SourceWithContext[Out, Ctx, Mat] = new scaladsl.SourceWithContext(this.asScala.map(x ⇒ (x, extractContext.apply(x)))).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 6ddd66ace0..a353e4a8c8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -27,7 +27,7 @@ import scala.compat.java8.FutureConverters._ * API MAY CHANGE */ @ApiMayChange -final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithContext[Ctx, Out, Mat]) extends GraphDelegate(delegate) { +final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithContext[Out, Ctx, Mat]) extends GraphDelegate(delegate) { /** * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). @@ -37,7 +37,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Flow.via]] */ - def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Ctx2, Out2, Mat] = + def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance], Pair[Out2, Ctx2]], Mat2]): SourceWithContext[Out2, Ctx2, Mat] = viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) ⇒ Pair(o, c) }.via(viaFlow).map(_.toScala))) /** @@ -56,7 +56,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.collect]] */ - def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.collect(pf)) /** @@ -66,7 +66,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.filter]] */ - def filter(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + def filter(p: function.Predicate[Out]): SourceWithContext[Out, Ctx, Mat] = viaScala(_.filter(p.test)) /** @@ -76,7 +76,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.filterNot]] */ - def filterNot(p: function.Predicate[Out]): SourceWithContext[Ctx, Out, Mat] = + def filterNot(p: function.Predicate[Out]): SourceWithContext[Out, Ctx, Mat] = viaScala(_.filterNot(p.test)) /** @@ -86,7 +86,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.grouped]] */ - def grouped(n: Int): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + def grouped(n: Int): SourceWithContext[java.util.List[Out @uncheckedVariance], java.util.List[Ctx @uncheckedVariance], Mat] = viaScala(_.grouped(n).map(_.asJava).mapContext(_.asJava)) /** @@ -94,10 +94,10 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.map]] */ - def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Ctx, Out2, Mat] = + def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.map(f.apply)) - def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Ctx, Out2, Mat] = + def mapAsync[Out2](parallelism: Int, f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.mapAsync[Out2](parallelism)(o ⇒ f.apply(o).toScala)) /** @@ -128,13 +128,13 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.mapConcat]] */ - def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Ctx, Out2, Mat] = + def mapConcat[Out2](f: function.Function[Out, _ <: java.lang.Iterable[Out2]]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) /** * Apply the given function to each context element (leaving the data elements unchanged). */ - def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Ctx2, Out, Mat] = + def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Out, Ctx2, Mat] = viaScala(_.mapContext(extractContext.apply)) /** @@ -144,7 +144,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.sliding]] */ - def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Ctx @uncheckedVariance], java.util.List[Out @uncheckedVariance], Mat] = + def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Out @uncheckedVariance], java.util.List[Ctx @uncheckedVariance], Mat] = viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) /** @@ -175,7 +175,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.statefulMapConcat]] */ - def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Ctx, Out2, Mat] = + def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.statefulMapConcat { () ⇒ val fun = f.create() elem ⇒ Util.immutableSeq(fun(elem)) @@ -186,7 +186,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Source.log]] */ - def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Ctx, Out, Mat] = + def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] = viaScala(_.log(name, e ⇒ extract.apply(e))(log)) /** @@ -194,7 +194,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String, extract: function.Function[Out, Any]): SourceWithContext[Ctx, Out, Mat] = + def log(name: String, extract: function.Function[Out, Any]): SourceWithContext[Out, Ctx, Mat] = this.log(name, extract, null) /** @@ -202,7 +202,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String, log: LoggingAdapter): SourceWithContext[Ctx, Out, Mat] = + def log(name: String, log: LoggingAdapter): SourceWithContext[Out, Ctx, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], log) /** @@ -210,11 +210,11 @@ final class SourceWithContext[+Ctx, +Out, +Mat](delegate: scaladsl.SourceWithCon * * @see [[akka.stream.javadsl.Flow.log]] */ - def log(name: String): SourceWithContext[Ctx, Out, Mat] = + def log(name: String): SourceWithContext[Out, Ctx, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) - def asScala: scaladsl.SourceWithContext[Ctx, Out, Mat] = delegate + def asScala: scaladsl.SourceWithContext[Out, Ctx, Mat] = delegate - private[this] def viaScala[Ctx2, Out2, Mat2](f: scaladsl.SourceWithContext[Ctx, Out, Mat] ⇒ scaladsl.SourceWithContext[Ctx2, Out2, Mat2]): SourceWithContext[Ctx2, Out2, Mat2] = + private[this] def viaScala[Out2, Ctx2, Mat2](f: scaladsl.SourceWithContext[Out, Ctx, Mat] ⇒ scaladsl.SourceWithContext[Out2, Ctx2, Mat2]): SourceWithContext[Out2, Ctx2, Mat2] = new SourceWithContext(f(delegate)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 52a0108015..87c2cec324 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -13,17 +13,20 @@ import akka.stream._ */ @ApiMayChange object FlowWithContext { + /** * Creates an "empty" FlowWithContext that passes elements through with their context unchanged. */ - def apply[Ctx, In]: FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed] = { + def apply[In, Ctx]: FlowWithContext[In, Ctx, In, Ctx, akka.NotUsed] = { val under = Flow[(In, Ctx)] - new FlowWithContext[Ctx, In, Ctx, In, akka.NotUsed](under) + new FlowWithContext[In, Ctx, In, Ctx, akka.NotUsed](under) } + /** * Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements. */ - def from[CI, I, CO, O, M](flow: Flow[(I, CI), (O, CO), M]): FlowWithContext[CI, I, CO, O, M] = new FlowWithContext(flow) + def from[In, CtxIn, Out, CtxOut, Mat](flow: Flow[(In, CtxIn), (Out, CtxOut), Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + new FlowWithContext(flow) } /** @@ -37,19 +40,19 @@ object FlowWithContext { * API MAY CHANGE */ @ApiMayChange -final class FlowWithContext[-CtxIn, -In, +CtxOut, +Out, +Mat]( +final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( delegate: Flow[(In, CtxIn), (Out, CtxOut), Mat] -) extends GraphDelegate(delegate) with FlowWithContextOps[CtxOut, Out, Mat] { - override type ReprMat[+C, +O, +M] = FlowWithContext[CtxIn @uncheckedVariance, In @uncheckedVariance, C, O, M @uncheckedVariance] +) extends GraphDelegate(delegate) with FlowWithContextOps[Out, CtxOut, Mat] { + override type ReprMat[+O, +C, +M] = FlowWithContext[In @uncheckedVariance, CtxIn @uncheckedVariance, O, C, M @uncheckedVariance] - override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = + override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] = FlowWithContext.from(delegate.via(viaFlow)) - override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[CtxIn, In, Ctx2, Out2, Mat3] = + override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] = FlowWithContext.from(delegate.viaMat(flow)(combine)) def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate - def asJava[JCtxIn <: CtxIn, JIn <: In, JCtxOut >: CtxOut, JOut >: Out, JMat >: Mat]: javadsl.FlowWithContext[JCtxIn, JIn, JCtxOut, JOut, JMat] = + def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat]: javadsl.FlowWithContext[JIn, JCtxIn, JOut, JCtxOut, JMat] = new javadsl.FlowWithContext(this) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index cfa12f63cb..92bf314a92 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -22,11 +22,11 @@ import akka.event.LoggingAdapter * API MAY CHANGE */ @ApiMayChange -trait FlowWithContextOps[+Ctx, +Out, +Mat] { - type ReprMat[+C, +O, +M] <: FlowWithContextOps[C, O, M] { - type ReprMat[+CC, +OO, +MatMat] = FlowWithContextOps.this.ReprMat[CC, OO, MatMat] +trait FlowWithContextOps[+Out, +Ctx, +Mat] { + type ReprMat[+O, +C, +M] <: FlowWithContextOps[O, C, M] { + type ReprMat[+OO, +CC, +MatMat] = FlowWithContextOps.this.ReprMat[OO, CC, MatMat] } - type Repr[+C, +O] = ReprMat[C, O, Mat @uncheckedVariance] + type Repr[+O, +C] = ReprMat[O, C, Mat @uncheckedVariance] /** * Transform this flow by the regular flow. The given flow must support manual context propagation by @@ -37,7 +37,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.via]] */ - def via[Ctx2, Out2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] + def via[Out2, Ctx2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] /** * Transform this flow by the regular flow. The given flow must support manual context propagation by @@ -51,14 +51,14 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.viaMat]] */ - def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[Ctx2, Out2, Mat3] + def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): ReprMat[Out2, Ctx2, Mat3] /** * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.map]]. * * @see [[akka.stream.scaladsl.FlowOps.map]] */ - def map[Out2](f: Out ⇒ Out2): Repr[Ctx, Out2] = + def map[Out2](f: Out ⇒ Out2): Repr[Out2, Ctx] = via(flow.map { case (e, ctx) ⇒ (f(e), ctx) }) /** @@ -66,7 +66,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.mapAsync]] */ - def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Ctx, Out2] = + def mapAsync[Out2](parallelism: Int)(f: Out ⇒ Future[Out2]): Repr[Out2, Ctx] = via(flow.mapAsync(parallelism) { case (e, ctx) ⇒ f(e).map(o ⇒ (o, ctx))(ExecutionContexts.sameThreadExecutionContext) }) /** @@ -76,7 +76,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.collect]] */ - def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Ctx, Out2] = + def collect[Out2](f: PartialFunction[Out, Out2]): Repr[Out2, Ctx] = via(flow.collect { case (e, ctx) if f.isDefinedAt(e) ⇒ (f(e), ctx) }) @@ -88,7 +88,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.filter]] */ - def filter(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + def filter(pred: Out ⇒ Boolean): Repr[Out, Ctx] = collect { case e if pred(e) ⇒ e } /** @@ -98,7 +98,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.filterNot]] */ - def filterNot(pred: Out ⇒ Boolean): Repr[Ctx, Out] = + def filterNot(pred: Out ⇒ Boolean): Repr[Out, Ctx] = collect { case e if !pred(e) ⇒ e } /** @@ -108,7 +108,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.grouped]] */ - def grouped(n: Int): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + def grouped(n: Int): Repr[immutable.Seq[Out], immutable.Seq[Ctx]] = via(flow.grouped(n).map { elsWithContext ⇒ val (els, ctxs) = elsWithContext.unzip (els, ctxs) @@ -121,7 +121,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.sliding]] */ - def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Ctx], immutable.Seq[Out]] = + def sliding(n: Int, step: Int = 1): Repr[immutable.Seq[Out], immutable.Seq[Ctx]] = via(flow.sliding(n, step).map { elsWithContext ⇒ val (els, ctxs) = elsWithContext.unzip (els, ctxs) @@ -155,7 +155,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.mapConcat]] */ - def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = statefulMapConcat(() ⇒ f) + def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = statefulMapConcat(() ⇒ f) /** * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]. @@ -195,7 +195,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]] */ - def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Ctx, Out2] = { + def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = { val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = () ⇒ { val plainFun = f() elWithContext ⇒ { @@ -209,7 +209,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { /** * Apply the given function to each context element (leaving the data elements unchanged). */ - def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Ctx2, Out] = + def mapContext[Ctx2](f: Ctx ⇒ Ctx2): Repr[Out, Ctx2] = via(flow.map { case (e, ctx) ⇒ (e, f(ctx)) }) /** @@ -217,7 +217,7 @@ trait FlowWithContextOps[+Ctx, +Out, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.log]] */ - def log(name: String, extract: Out ⇒ Any = ConstantFun.scalaIdentityFunction)(implicit log: LoggingAdapter = null): Repr[Ctx, Out] = { + def log(name: String, extract: Out ⇒ Any = ConstantFun.scalaIdentityFunction)(implicit log: LoggingAdapter = null): Repr[Out, Ctx] = { val extractWithContext: ((Out, Ctx)) ⇒ Any = { case (e, _) ⇒ extract(e) } via(flow.log(name, extractWithContext)(log)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0676cd1c00..a4d19d4f3d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -220,7 +220,7 @@ final class Source[+Out, +Mat]( * API MAY CHANGE */ @ApiMayChange - def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Ctx, Out, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e)))) + def startContextPropagation[Ctx](f: Out ⇒ Ctx): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(this.map(e ⇒ (e, f(e)))) } object Source { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index a73b168734..e4ef91198a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -20,15 +20,15 @@ import akka.stream._ * API MAY CHANGE */ @ApiMayChange -final class SourceWithContext[+Ctx, +Out, +Mat] private[stream] ( +final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] ( delegate: Source[(Out, Ctx), Mat] -) extends GraphDelegate(delegate) with FlowWithContextOps[Ctx, Out, Mat] { - override type ReprMat[+C, +O, +M] = SourceWithContext[C, O, M @uncheckedVariance] +) extends GraphDelegate(delegate) with FlowWithContextOps[Out, Ctx, Mat] { + override type ReprMat[+O, +C, +M] = SourceWithContext[O, C, M @uncheckedVariance] - override def via[Ctx2, Out2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Ctx2, Out2] = + override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] = new SourceWithContext(delegate.via(viaFlow)) - override def viaMat[Ctx2, Out2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): SourceWithContext[Ctx2, Out2, Mat3] = + override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): SourceWithContext[Out2, Ctx2, Mat3] = new SourceWithContext(delegate.viaMat(flow)(combine)) /** @@ -37,7 +37,7 @@ final class SourceWithContext[+Ctx, +Out, +Mat] private[stream] ( */ def endContextPropagation: Source[(Out, Ctx), Mat] = delegate - def asJava[JCtx >: Ctx, JOut >: Out, JMat >: Mat]: javadsl.SourceWithContext[JCtx, JOut, JMat] = + def asJava[JOut >: Out, JCtx >: Ctx, JMat >: Mat]: javadsl.SourceWithContext[JOut, JCtx, JMat] = new javadsl.SourceWithContext(this) }