From 71463da3bb82f4e1f053c7d80e75fb41bf726ac8 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Wed, 2 Mar 2022 14:16:32 +0100 Subject: [PATCH] Add unsafeViaData to FlowWithContext and SourceWithContext (#31123) * Add unsafeViaData to FlowWithContext and SourceWithContext * Add @ApiMayChange to unsafeDataVia * Improve documentation for unsafeDataVia * Add tests for unsafeDataVia * Add deadlock comment in documentation. * Add unsafeDataVia to Java DSL * Add mima rule for unsafeDataVia FlowWithContextOps --- .../src/main/paradox/stream/stream-context.md | 6 +++++- .../stream/scaladsl/FlowWithContextSpec.scala | 17 +++++++++++++++++ .../scaladsl/SourceWithContextSpec.scala | 12 ++++++++++++ .../flow-with-context-ops.excludes | 1 + .../akka/stream/javadsl/FlowWithContext.scala | 18 ++++++++++++++++++ .../stream/javadsl/SourceWithContext.scala | 18 ++++++++++++++++++ .../akka/stream/scaladsl/FlowWithContext.scala | 15 +++++++++++++++ .../stream/scaladsl/FlowWithContextOps.scala | 16 ++++++++++++++++ .../stream/scaladsl/SourceWithContext.scala | 15 +++++++++++++++ 9 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/flow-with-context-ops.excludes diff --git a/akka-docs/src/main/paradox/stream/stream-context.md b/akka-docs/src/main/paradox/stream/stream-context.md index 603f7cfd6d..a625a4be95 100644 --- a/akka-docs/src/main/paradox/stream/stream-context.md +++ b/akka-docs/src/main/paradox/stream/stream-context.md @@ -62,7 +62,11 @@ produces elements of type `Bar` with contexts of type `Ctx`. The reason for this is that `flow` might reorder the elements flowing through it, making `via` challenging to implement. -There is a @ref[Flow.asFlowWithContext](operators/Flow/asFlowWithContext.md) +Due to this there is a `unsafeDataVia` that can be used instead however no +protection is offered to prevent reordering or dropping/duplicating elements +from stream so use this operation with great care. + +There is also a @ref[Flow.asFlowWithContext](operators/Flow/asFlowWithContext.md) which can be used when the types used in the inner @apidoc[Flow] have room to hold the context. If this is not the case, a better solution is usually to build the flow from the ground diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala index eae11ce865..8c08c35969 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala @@ -66,5 +66,22 @@ class FlowWithContextSpec extends StreamSpec { .expectNext((Message("a", 2L), 2L)) .expectError(boom) } + + "keep the same order for data and context when using unsafeDataVia" in { + val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4)) + + val baseFlow = Flow[(String, Int)] + .asFlowWithContext[String, Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2) + .map(_._1) + .unsafeDataVia(Flow.fromFunction[String, Int] { _.toInt }) + + SourceWithContext + .fromTuples(Source(data)) + .via(baseFlow) + .runWith(TestSink.probe[(Int, Int)]) + .request(4) + .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) + .expectComplete() + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index e6f1be2646..0ef68d4b0b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -135,5 +135,17 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Message("a", 2L), 2L)) .expectError(boom) } + + "keep the same order for data and context when using unsafeDataVia" in { + val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4)) + + SourceWithContext + .fromTuples(Source(data)) + .unsafeDataVia(Flow.fromFunction[String, Int] { _.toInt }) + .runWith(TestSink.probe[(Int, Int)]) + .request(4) + .expectNext((1, 1), (2, 2), (3, 3), (4, 4)) + .expectComplete() + } } } diff --git a/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/flow-with-context-ops.excludes b/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/flow-with-context-ops.excludes new file mode 100644 index 0000000000..80307493e5 --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.18.backwards.excludes/flow-with-context-ops.excludes @@ -0,0 +1 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.unsafeDataVia") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 83eb9d4913..47d1b92d84 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ +import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.{ function, Pair, Util } import akka.stream._ @@ -63,6 +64,23 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( FlowWithContext.fromPairs(under) } + /** + * Transform this flow by the regular flow. The given flow works on the data portion of the stream and + * ignores the context. + * + * The given flow *must* not re-order, drop or emit multiple elements for one incoming + * element, the sequence of incoming contexts is re-combined with the outgoing + * elements of the stream. If a flow not fulfilling this requirement is used the stream + * will not fail but continue running in a corrupt state and re-combine incorrect pairs + * of elements and contexts or deadlock. + * + * For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + */ + @ApiMayChange def unsafeDataVia[Out2, Mat2]( + viaFlow: Graph[FlowShape[Out @uncheckedVariance, Out2], Mat2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = + viaScala(_.unsafeDataVia(viaFlow)) + /** * Context-preserving variant of [[akka.stream.javadsl.Flow.withAttributes]]. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 0c11e965e1..e10b330a5c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -10,6 +10,7 @@ import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ import akka.actor.ClassicActorSystemProvider +import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.japi.Pair import akka.japi.Util @@ -58,6 +59,23 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon : SourceWithContext[Out2, Ctx2, Mat] = viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) => Pair(o, c) }.via(viaFlow).map(_.toScala))) + /** + * Transform this flow by the regular flow. The given flow works on the data portion of the stream and + * ignores the context. + * + * The given flow *must* not re-order, drop or emit multiple elements for one incoming + * element, the sequence of incoming contexts is re-combined with the outgoing + * elements of the stream. If a flow not fulfilling this requirement is used the stream + * will not fail but continue running in a corrupt state and re-combine incorrect pairs + * of elements and contexts or deadlock. + * + * For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + */ + @ApiMayChange def unsafeDataVia[Out2, Mat2]( + viaFlow: Graph[FlowShape[Out @uncheckedVariance, Out2], Mat2]): SourceWithContext[Out2, Ctx, Mat] = + viaScala(_.unsafeDataVia(viaFlow)) + /** * Context-preserving variant of [[akka.stream.javadsl.Source.withAttributes]]. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index d7dd7aa359..7461109c0e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -46,6 +46,21 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] = new FlowWithContext(delegate.via(viaFlow)) + override def unsafeDataVia[Out2, Mat2](viaFlow: Graph[FlowShape[Out, Out2], Mat2]): Repr[Out2, CtxOut] = + FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d => + import GraphDSL.Implicits._ + + val bcast = b.add(Broadcast[(Out, CtxOut)](2)) + val zipper = b.add(Zip[Out2, CtxOut]()) + + d ~> bcast.in + + bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0 + bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1 + + FlowShape(d.in, zipper.out) + })) + override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])( combine: (Mat, Mat2) => Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] = new FlowWithContext(delegate.viaMat(flow)(combine)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index e046646bcf..691144ec97 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -9,6 +9,7 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.NotUsed +import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import akka.stream._ @@ -43,6 +44,21 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { */ def via[Out2, Ctx2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] + /** + * Transform this flow by the regular flow. The given flow works on the data portion of the stream and + * ignores the context. + * + * The given flow *must* not re-order, drop or emit multiple elements for one incoming + * element, the sequence of incoming contexts is re-combined with the outgoing + * elements of the stream. If a flow not fulfilling this requirement is used the stream + * will not fail but continue running in a corrupt state and re-combine incorrect pairs + * of elements and contexts or deadlock. + * + * For more background on these requirements + * see https://doc.akka.io/docs/akka/current/stream/stream-context.html. + */ + @ApiMayChange def unsafeDataVia[Out2, Mat2](viaFlow: Graph[FlowShape[Out, Out2], Mat2]): Repr[Out2, Ctx] + /** * Transform this flow by the regular flow. The given flow must support manual context propagation by * taking and producing tuples of (data, context). diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index 8f9c34859e..6ee7dbeb7d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -33,6 +33,21 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] = new SourceWithContext(delegate.via(viaFlow)) + override def unsafeDataVia[Out2, Mat2](viaFlow: Graph[FlowShape[Out, Out2], Mat2]): Repr[Out2, Ctx] = + SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d => + import GraphDSL.Implicits._ + + val bcast = b.add(Broadcast[(Out, Ctx)](2)) + val zipper = b.add(Zip[Out2, Ctx]()) + + d ~> bcast.in + + bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0 + bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1 + + SourceShape(zipper.out) + })) + override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])( combine: (Mat, Mat2) => Mat3): SourceWithContext[Out2, Ctx2, Mat3] = new SourceWithContext(delegate.viaMat(flow)(combine))