Fix a bug in viaMat for Identity flow optimization #22899
This commit is contained in:
parent
cba1cb79fa
commit
ad5fa124b3
2 changed files with 36 additions and 9 deletions
|
|
@ -5,7 +5,7 @@ package akka.stream.scaladsl
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.{ Done, NotUsed }
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
|
|
||||||
|
|
@ -235,6 +235,25 @@ class GraphMatValueSpec extends StreamSpec {
|
||||||
m4 should ===(NotUsed)
|
m4 should ===(NotUsed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"build more complicated graph with flows optimized for identity flows" in {
|
||||||
|
val flow1 = Flow.fromSinkAndSourceMat(Sink.ignore, Source.single(1).viaMat(Flow[Int])(Keep.both))(Keep.both)
|
||||||
|
val (mA, (m1, m2)) = Source.single(8).viaMat(flow1)(Keep.right).to(Sink.ignore).run()
|
||||||
|
Await.result(mA, 1.second) should ===(Done) //from Sink.ignore
|
||||||
|
m1 should ===(NotUsed) //from Source.single(1)
|
||||||
|
m2 should ===(NotUsed) //from Flow[Int]
|
||||||
|
|
||||||
|
val flow2 = Flow.fromSinkAndSourceMat(Sink.ignore, Source.maybe[Int].viaMat(Flow[Int])(Keep.left))(Keep.both)
|
||||||
|
val (mB, m3) = Source.single(8).viaMat(flow2)(Keep.right).to(Sink.ignore).run()
|
||||||
|
Await.result(mB, 1.second) should ===(Done) //from Sink.ignore
|
||||||
|
// Fails with ClassCastException if value is wrong
|
||||||
|
m3.success(None) //from Source.maybe[Int]
|
||||||
|
|
||||||
|
val flow3 = Flow.fromSinkAndSourceMat(Sink.ignore, Source.single(1).viaMat(Flow[Int])(Keep.right))(Keep.both)
|
||||||
|
val (mC, m4) = Source.single(8).viaMat(flow3)(Keep.right).to(Sink.ignore).run()
|
||||||
|
Await.result(mC, 1.second) should ===(Done) //from Sink.ignore
|
||||||
|
m4 should ===(NotUsed) //from Flow[Int]
|
||||||
|
}
|
||||||
|
|
||||||
"provide a new materialized value for each materialization" in {
|
"provide a new materialized value for each materialization" in {
|
||||||
// coverage for #23577
|
// coverage for #23577
|
||||||
val matGen = new AtomicInteger(0)
|
val matGen = new AtomicInteger(0)
|
||||||
|
|
|
||||||
|
|
@ -45,15 +45,23 @@ final class Source[+Out, +Mat](
|
||||||
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
|
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
|
||||||
|
|
||||||
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {
|
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {
|
||||||
val toAppend =
|
if (flow.traversalBuilder eq Flow.identityTraversalBuilder)
|
||||||
if (flow.traversalBuilder eq Flow.identityTraversalBuilder)
|
if (combine == Keep.left)
|
||||||
LinearTraversalBuilder.empty()
|
//optimization by returning this
|
||||||
|
this.asInstanceOf[Source[T, Mat3]] //Mat == Mat3, due to Keep.left
|
||||||
|
else if (combine == Keep.right || combine == Keep.none) // Mat3 = NotUsed
|
||||||
|
//optimization with LinearTraversalBuilder.empty()
|
||||||
|
new Source[T, Mat3](
|
||||||
|
traversalBuilder.append(LinearTraversalBuilder.empty(), flow.shape, combine),
|
||||||
|
SourceShape(shape.out).asInstanceOf[SourceShape[T]])
|
||||||
else
|
else
|
||||||
flow.traversalBuilder
|
new Source[T, Mat3](
|
||||||
|
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
|
||||||
new Source[T, Mat3](
|
SourceShape(flow.shape.out))
|
||||||
traversalBuilder.append(toAppend, flow.shape, combine),
|
else
|
||||||
SourceShape(flow.shape.out))
|
new Source[T, Mat3](
|
||||||
|
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
|
||||||
|
SourceShape(flow.shape.out))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue