diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index 171975563c..07c2f9e443 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicInteger -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.stream._ import akka.stream.testkit._ @@ -235,6 +235,25 @@ class GraphMatValueSpec extends StreamSpec { m4 should ===(NotUsed) } + "build more complicated graph with flows optimized for identity flows" in { + val flow1 = Flow.fromSinkAndSourceMat(Sink.ignore, Source.single(1).viaMat(Flow[Int])(Keep.both))(Keep.both) + val (mA, (m1, m2)) = Source.single(8).viaMat(flow1)(Keep.right).to(Sink.ignore).run() + Await.result(mA, 1.second) should ===(Done) //from Sink.ignore + m1 should ===(NotUsed) //from Source.single(1) + m2 should ===(NotUsed) //from Flow[Int] + + val flow2 = Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[Int].viaMat(Flow[Int])(Keep.left))(Keep.both) + val (mB, m3) = Source.single(8).viaMat(flow2)(Keep.right).to(Sink.ignore).run() + Await.result(mB, 1.second) should ===(Done) //from Sink.ignore + // Fails with ClassCastException if value is wrong + m3.success(None) //from Source.maybe[Int] + + val flow3 = Flow.fromSinkAndSourceMat(Sink.ignore, Source.single(1).viaMat(Flow[Int])(Keep.right))(Keep.both) + val (mC, m4) = Source.single(8).viaMat(flow3)(Keep.right).to(Sink.ignore).run() + Await.result(mC, 1.second) should ===(Done) //from Sink.ignore + m4 should ===(NotUsed) //from Flow[Int] + } + "provide a new materialized value for each materialization" in { // coverage for #23577 val matGen = new AtomicInteger(0) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index d867e86891..5de428a57c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -45,15 +45,23 @@ final class Source[+Out, +Mat]( override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { - val toAppend = - if (flow.traversalBuilder eq Flow.identityTraversalBuilder) - LinearTraversalBuilder.empty() + if (flow.traversalBuilder eq Flow.identityTraversalBuilder) + if (combine == Keep.left) + //optimization by returning this + this.asInstanceOf[Source[T, Mat3]] //Mat == Mat3, due to Keep.left + else if (combine == Keep.right || combine == Keep.none) // Mat3 = NotUsed + //optimization with LinearTraversalBuilder.empty() + new Source[T, Mat3]( + traversalBuilder.append(LinearTraversalBuilder.empty(), flow.shape, combine), + SourceShape(shape.out).asInstanceOf[SourceShape[T]]) else - flow.traversalBuilder - - new Source[T, Mat3]( - traversalBuilder.append(toAppend, flow.shape, combine), - SourceShape(flow.shape.out)) + new Source[T, Mat3]( + traversalBuilder.append(flow.traversalBuilder, flow.shape, combine), + SourceShape(flow.shape.out)) + else + new Source[T, Mat3]( + traversalBuilder.append(flow.traversalBuilder, flow.shape, combine), + SourceShape(flow.shape.out)) } /**