Add unsafeViaData to FlowWithContext and SourceWithContext (#31123)

* Add unsafeViaData to FlowWithContext and SourceWithContext

* Add @ApiMayChange to unsafeDataVia

* Improve documentation for unsafeDataVia

* Add tests for unsafeDataVia

* Add deadlock comment in documentation.

* Add unsafeDataVia to Java DSL

* Add mima rule for unsafeDataVia FlowWithContextOps
This commit is contained in:
Matthew de Detrich 2022-03-02 14:16:32 +01:00 committed by GitHub
parent 331db9b8f8
commit 71463da3bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 117 additions and 1 deletions

View file

@ -62,7 +62,11 @@ produces elements of type `Bar` with contexts of type `Ctx`. The
reason for this is that `flow` might reorder the elements flowing
through it, making `via` challenging to implement.
There is a @ref[Flow.asFlowWithContext](operators/Flow/asFlowWithContext.md)
Due to this there is a `unsafeDataVia` that can be used instead however no
protection is offered to prevent reordering or dropping/duplicating elements
from stream so use this operation with great care.
There is also a @ref[Flow.asFlowWithContext](operators/Flow/asFlowWithContext.md)
which can be used when the types used in the inner
@apidoc[Flow] have room to hold the context. If this is not the
case, a better solution is usually to build the flow from the ground

View file

@ -66,5 +66,22 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext((Message("a", 2L), 2L))
.expectError(boom)
}
"keep the same order for data and context when using unsafeDataVia" in {
val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))
val baseFlow = Flow[(String, Int)]
.asFlowWithContext[String, Int, Int](collapseContext = Tuple2.apply)(extractContext = _._2)
.map(_._1)
.unsafeDataVia(Flow.fromFunction[String, Int] { _.toInt })
SourceWithContext
.fromTuples(Source(data))
.via(baseFlow)
.runWith(TestSink.probe[(Int, Int)])
.request(4)
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}
}
}

View file

@ -135,5 +135,17 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Message("a", 2L), 2L))
.expectError(boom)
}
"keep the same order for data and context when using unsafeDataVia" in {
val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))
SourceWithContext
.fromTuples(Source(data))
.unsafeDataVia(Flow.fromFunction[String, Int] { _.toInt })
.runWith(TestSink.probe[(Int, Int)])
.request(4)
.expectNext((1, 1), (2, 2), (3, 3), (4, 4))
.expectComplete()
}
}
}

View file

@ -0,0 +1 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.unsafeDataVia")

View file

@ -9,6 +9,7 @@ import java.util.concurrent.CompletionStage
import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.{ function, Pair, Util }
import akka.stream._
@ -63,6 +64,23 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
FlowWithContext.fromPairs(under)
}
/**
* Transform this flow by the regular flow. The given flow works on the data portion of the stream and
* ignores the context.
*
* The given flow *must* not re-order, drop or emit multiple elements for one incoming
* element, the sequence of incoming contexts is re-combined with the outgoing
* elements of the stream. If a flow not fulfilling this requirement is used the stream
* will not fail but continue running in a corrupt state and re-combine incorrect pairs
* of elements and contexts or deadlock.
*
* For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*/
@ApiMayChange def unsafeDataVia[Out2, Mat2](
viaFlow: Graph[FlowShape[Out @uncheckedVariance, Out2], Mat2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
viaScala(_.unsafeDataVia(viaFlow))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.withAttributes]].
*

View file

@ -10,6 +10,7 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.compat.java8.FutureConverters._
import akka.actor.ClassicActorSystemProvider
import akka.annotation.ApiMayChange
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.japi.Pair
import akka.japi.Util
@ -58,6 +59,23 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
: SourceWithContext[Out2, Ctx2, Mat] =
viaScala(_.via(akka.stream.scaladsl.Flow[(Out, Ctx)].map { case (o, c) => Pair(o, c) }.via(viaFlow).map(_.toScala)))
/**
* Transform this flow by the regular flow. The given flow works on the data portion of the stream and
* ignores the context.
*
* The given flow *must* not re-order, drop or emit multiple elements for one incoming
* element, the sequence of incoming contexts is re-combined with the outgoing
* elements of the stream. If a flow not fulfilling this requirement is used the stream
* will not fail but continue running in a corrupt state and re-combine incorrect pairs
* of elements and contexts or deadlock.
*
* For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*/
@ApiMayChange def unsafeDataVia[Out2, Mat2](
viaFlow: Graph[FlowShape[Out @uncheckedVariance, Out2], Mat2]): SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.unsafeDataVia(viaFlow))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.withAttributes]].
*

View file

@ -46,6 +46,21 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] =
new FlowWithContext(delegate.via(viaFlow))
override def unsafeDataVia[Out2, Mat2](viaFlow: Graph[FlowShape[Out, Out2], Mat2]): Repr[Out2, CtxOut] =
FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(Out, CtxOut)](2))
val zipper = b.add(Zip[Out2, CtxOut]())
d ~> bcast.in
bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0
bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1
FlowShape(d.in, zipper.out)
}))
override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, CtxOut), (Out2, Ctx2)], Mat2])(
combine: (Mat, Mat2) => Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] =
new FlowWithContext(delegate.viaMat(flow)(combine))

View file

@ -9,6 +9,7 @@ import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.dispatch.ExecutionContexts
import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import akka.stream._
@ -43,6 +44,21 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
*/
def via[Out2, Ctx2, Mat2](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2]
/**
* Transform this flow by the regular flow. The given flow works on the data portion of the stream and
* ignores the context.
*
* The given flow *must* not re-order, drop or emit multiple elements for one incoming
* element, the sequence of incoming contexts is re-combined with the outgoing
* elements of the stream. If a flow not fulfilling this requirement is used the stream
* will not fail but continue running in a corrupt state and re-combine incorrect pairs
* of elements and contexts or deadlock.
*
* For more background on these requirements
* see https://doc.akka.io/docs/akka/current/stream/stream-context.html.
*/
@ApiMayChange def unsafeDataVia[Out2, Mat2](viaFlow: Graph[FlowShape[Out, Out2], Mat2]): Repr[Out2, Ctx]
/**
* Transform this flow by the regular flow. The given flow must support manual context propagation by
* taking and producing tuples of (data, context).

View file

@ -33,6 +33,21 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
override def via[Out2, Ctx2, Mat2](viaFlow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2]): Repr[Out2, Ctx2] =
new SourceWithContext(delegate.via(viaFlow))
override def unsafeDataVia[Out2, Mat2](viaFlow: Graph[FlowShape[Out, Out2], Mat2]): Repr[Out2, Ctx] =
SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(delegate) { implicit b => d =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(Out, Ctx)](2))
val zipper = b.add(Zip[Out2, Ctx]())
d ~> bcast.in
bcast.out(0).map { case (dataOut, _) => dataOut }.via(viaFlow) ~> zipper.in0
bcast.out(1).map { case (_, ctxOut) => ctxOut } ~> zipper.in1
SourceShape(zipper.out)
}))
override def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(
combine: (Mat, Mat2) => Mat3): SourceWithContext[Out2, Ctx2, Mat3] =
new SourceWithContext(delegate.viaMat(flow)(combine))