diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 660262cb85..3d6cf357c9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -684,8 +684,11 @@ final case class LinearTraversalBuilder( */ def append[A, B, C](toAppend: LinearTraversalBuilder, matCompose: (A, B) ⇒ C): LinearTraversalBuilder = { - if (toAppend.isEmpty) this - else if (this.isEmpty) { + if (toAppend.isEmpty) { + copy( + traversalSoFar = PushNotUsed.concat(LinearTraversalBuilder.addMatCompose(traversalSoFar, matCompose)) + ) + } else if (this.isEmpty) { toAppend.copy( traversalSoFar = toAppend.traversalSoFar.concat(LinearTraversalBuilder.addMatCompose(traversal, matCompose)) ) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index ac43033479..5c6bfc374a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -45,14 +45,16 @@ final class Source[+Out, +Mat]( override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { - if (flow.traversalBuilder eq Flow.identityTraversalBuilder) { - traversalBuilder.append(LinearTraversalBuilder.empty(), combine).asInstanceOf[Source[T, Mat3]] - } else { - new Source[T, Mat3]( - traversalBuilder.append(flow.traversalBuilder, flow.shape, combine), - SourceShape(flow.shape.out) - ) - } + val toAppend = + if (flow.traversalBuilder eq Flow.identityTraversalBuilder) + LinearTraversalBuilder.empty() + else + flow.traversalBuilder + + new Source[T, Mat3]( + traversalBuilder.append(toAppend, flow.shape, combine), + SourceShape(flow.shape.out) + ) } /**