Fix the case where an identity flow is appended to a Source with materialized value composition (#22442)
This commit is contained in:
parent
a2203d7b13
commit
46b869d041
2 changed files with 15 additions and 10 deletions
|
|
@ -684,8 +684,11 @@ final case class LinearTraversalBuilder(
|
|||
*/
|
||||
def append[A, B, C](toAppend: LinearTraversalBuilder, matCompose: (A, B) ⇒ C): LinearTraversalBuilder = {
|
||||
|
||||
if (toAppend.isEmpty) this
|
||||
else if (this.isEmpty) {
|
||||
if (toAppend.isEmpty) {
|
||||
copy(
|
||||
traversalSoFar = PushNotUsed.concat(LinearTraversalBuilder.addMatCompose(traversalSoFar, matCompose))
|
||||
)
|
||||
} else if (this.isEmpty) {
|
||||
toAppend.copy(
|
||||
traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose))
|
||||
)
|
||||
|
|
|
|||
|
|
@ -45,15 +45,17 @@ final class Source[+Out, +Mat](
|
|||
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
|
||||
|
||||
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {
|
||||
if (flow.traversalBuilder eq Flow.identityTraversalBuilder) {
|
||||
traversalBuilder.append(LinearTraversalBuilder.empty(), combine).asInstanceOf[Source[T, Mat3]]
|
||||
} else {
|
||||
val toAppend =
|
||||
if (flow.traversalBuilder eq Flow.identityTraversalBuilder)
|
||||
LinearTraversalBuilder.empty()
|
||||
else
|
||||
flow.traversalBuilder
|
||||
|
||||
new Source[T, Mat3](
|
||||
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
|
||||
traversalBuilder.append(toAppend, flow.shape, combine),
|
||||
SourceShape(flow.shape.out)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue