Eliminate identity flow when possible #24554
This commit is contained in:
parent
1a63aa8637
commit
f7637d24e6
2 changed files with 48 additions and 3 deletions
|
|
@ -281,6 +281,31 @@ class FlowSpec extends StreamSpec(ConfigFactory.parseString("akka.actor.debug.re
|
||||||
Source(1 to 10).via(identity2).limit(100).runWith(Sink.seq),
|
Source(1 to 10).via(identity2).limit(100).runWith(Sink.seq),
|
||||||
3.seconds) should ===(1 to 10)
|
3.seconds) should ===(1 to 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"eliminate passed in when matval from passed in not used" in {
|
||||||
|
val map = Flow.fromFunction((n: Int) ⇒ n + 1)
|
||||||
|
val result = map.viaMat(Flow[Int])(Keep.left)
|
||||||
|
result shouldBe theSameInstanceAs(map)
|
||||||
|
}
|
||||||
|
|
||||||
|
"not eliminate passed in when matval from passed in is used" in {
|
||||||
|
val map = Flow.fromFunction((n: Int) ⇒ n + 1)
|
||||||
|
val result = map.viaMat(Flow[Int])(Keep.right)
|
||||||
|
result shouldNot be theSameInstanceAs (map)
|
||||||
|
}
|
||||||
|
|
||||||
|
"eliminate itself if identity" in {
|
||||||
|
val map = Flow.fromFunction((n: Int) ⇒ n + 1)
|
||||||
|
val result = Flow[Int].viaMat(map)(Keep.right)
|
||||||
|
result shouldBe theSameInstanceAs(map)
|
||||||
|
}
|
||||||
|
|
||||||
|
"not eliminate itself if identity but matval is used" in {
|
||||||
|
val map = Flow.fromFunction((n: Int) ⇒ n + 1)
|
||||||
|
val result = Flow[Int].viaMat(map)(Keep.left)
|
||||||
|
result shouldNot be theSameInstanceAs (map)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"A Flow with multiple subscribers (FanOutBox)" must {
|
"A Flow with multiple subscribers (FanOutBox)" must {
|
||||||
|
|
|
||||||
|
|
@ -48,9 +48,29 @@ final class Flow[-In, +Out, +Mat](
|
||||||
|
|
||||||
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = {
|
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = {
|
||||||
if (this.isIdentity) {
|
if (this.isIdentity) {
|
||||||
|
// optimization by returning flow if possible since we know Mat2 == Mat3 from flow
|
||||||
|
if (combine == Keep.right) Flow.fromGraph(flow).asInstanceOf[Flow[In, T, Mat3]]
|
||||||
|
else {
|
||||||
|
// Keep.none is optimized and we know left means Mat3 == NotUsed
|
||||||
|
val useCombine =
|
||||||
|
if (combine == Keep.left) Keep.none
|
||||||
|
else combine
|
||||||
new Flow(
|
new Flow(
|
||||||
LinearTraversalBuilder.fromBuilder(flow.traversalBuilder, flow.shape, combine),
|
LinearTraversalBuilder.empty().append(flow.traversalBuilder, flow.shape, useCombine),
|
||||||
flow.shape).asInstanceOf[Flow[In, T, Mat3]]
|
flow.shape).asInstanceOf[Flow[In, T, Mat3]]
|
||||||
|
}
|
||||||
|
} else if (flow.traversalBuilder eq Flow.identityTraversalBuilder) {
|
||||||
|
// optimization by returning this if possible since we know Mat2 == Mat from this
|
||||||
|
if (combine == Keep.left) this.asInstanceOf[Flow[In, T, Mat3]]
|
||||||
|
else {
|
||||||
|
// Keep.none is somewhat optimized and we know Mat == NotUsed
|
||||||
|
val useCombine =
|
||||||
|
if (combine == Keep.right) Keep.none
|
||||||
|
else combine
|
||||||
|
new Flow(
|
||||||
|
traversalBuilder.append(LinearTraversalBuilder.empty(), shape, useCombine),
|
||||||
|
FlowShape[In, T](shape.in, flow.shape.out))
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
new Flow(
|
new Flow(
|
||||||
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
|
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue