diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 76d1e5aa2a..96876a5da6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -281,6 +281,31 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re Source(1 to 10).via(identity2).limit(100).runWith(Sink.seq), 3.seconds) should ===(1 to 10) } + + "eliminate passed in when matval from passed in not used" in { + val map = Flow.fromFunction((n: Int) ⇒ n + 1) + val result = map.viaMat(Flow[Int])(Keep.left) + result shouldBe theSameInstanceAs(map) + } + + "not eliminate passed in when matval from passed in is used" in { + val map = Flow.fromFunction((n: Int) ⇒ n + 1) + val result = map.viaMat(Flow[Int])(Keep.right) + result shouldNot be theSameInstanceAs (map) + } + + "eliminate itself if identity" in { + val map = Flow.fromFunction((n: Int) ⇒ n + 1) + val result = Flow[Int].viaMat(map)(Keep.right) + result shouldBe theSameInstanceAs(map) + } + + "not eliminate itself if identity but matval is used" in { + val map = Flow.fromFunction((n: Int) ⇒ n + 1) + val result = Flow[Int].viaMat(map)(Keep.left) + result shouldNot be theSameInstanceAs (map) + } + } "A Flow with multiple subscribers (FanOutBox)" must { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 22c649bd4c..ce1af3a87b 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -48,9 +48,29 @@ final class Flow[-In, +Out, +Mat]( override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = { if (this.isIdentity) { - new Flow( - LinearTraversalBuilder.fromBuilder(flow.traversalBuilder, flow.shape, combine), - flow.shape).asInstanceOf[Flow[In, T, Mat3]] + // optimization by returning flow if possible since we know Mat2 == Mat3 from flow + if (combine == Keep.right) Flow.fromGraph(flow).asInstanceOf[Flow[In, T, Mat3]] + else { + // Keep.none is optimized and we know left means Mat3 == NotUsed + val useCombine = + if (combine == Keep.left) Keep.none + else combine + new Flow( + LinearTraversalBuilder.empty().append(flow.traversalBuilder, flow.shape, useCombine), + flow.shape).asInstanceOf[Flow[In, T, Mat3]] + } + } else if (flow.traversalBuilder eq Flow.identityTraversalBuilder) { + // optimization by returning this if possible since we know Mat2 == Mat from this + if (combine == Keep.left) this.asInstanceOf[Flow[In, T, Mat3]] + else { + // Keep.none is somewhat optimized and we know Mat == NotUsed + val useCombine = + if (combine == Keep.right) Keep.none + else combine + new Flow( + traversalBuilder.append(LinearTraversalBuilder.empty(), shape, useCombine), + FlowShape[In, T](shape.in, flow.shape.out)) + } } else { new Flow( traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),