Merge pull request #22480 from manonthegithub/wip-22446-fix-merge-preferred-ordering

#22446 fixed corruption of ordering in MergePreferred
This commit is contained in:
Patrik Nordwall 2017-03-15 16:21:30 +01:00 committed by GitHub
commit e4a09c207f
4 changed files with 47 additions and 13 deletions

View file

@ -140,7 +140,10 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
Source.empty.via(emit1234.named("testStage")).runWith(TestSink.probe) Source.empty.via(emit1234.named("testStage")).runWith(TestSink.probe)
.request(5) .request(5)
.expectNext(1, 2, 3, 4) .expectNext(1)
//emitting with callback gives nondeterminism whether 2 or 3 will be pushed first
.expectNextUnordered(2, 3)
.expectNext(4)
.expectComplete() .expectComplete()
} }
@ -236,4 +239,3 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit {
} }
} }

View file

@ -46,7 +46,7 @@ class GraphMergePreferredSpec extends TwoStreamsSetup {
Await.result(result, 3.seconds).filter(_ == 1).size should be(numElements) Await.result(result, 3.seconds).filter(_ == 1).size should be(numElements)
} }
"eventually pass through all elements" in { "eventually pass through all elements without corrupting the ordering" in {
val result = RunnableGraph.fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b sink val result = RunnableGraph.fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b sink
val merge = b.add(MergePreferred[Int](3)) val merge = b.add(MergePreferred[Int](3))
Source(1 to 100) ~> merge.preferred Source(1 to 100) ~> merge.preferred
@ -58,7 +58,13 @@ class GraphMergePreferredSpec extends TwoStreamsSetup {
ClosedShape ClosedShape
}).run() }).run()
Await.result(result, 3.seconds).toSet should ===((1 to 400).toSet) val resultSeq = Await.result(result, 3.seconds)
resultSeq.toSet should ===((1 to 400).toSet)
//test ordering of elements coming from each of the flows
resultSeq.filter(_ <= 100) should ===(1 to 100)
resultSeq.filter(e e > 100 && e <= 200) should ===(101 to 200)
resultSeq.filter(e e > 200 && e <= 300) should ===(201 to 300)
resultSeq.filter(e e > 300 && e <= 400) should ===(301 to 400)
} }
"disallow multiple preferred inputs" in { "disallow multiple preferred inputs" in {

View file

@ -49,13 +49,16 @@ class GraphMergeSpec extends TwoStreamsSetup {
val subscription = probe.expectSubscription() val subscription = probe.expectSubscription()
var collected = Set.empty[Int] var collected = Seq.empty[Int]
for (_ 1 to 10) { for (_ 1 to 10) {
subscription.request(1) subscription.request(1)
collected += probe.expectNext() collected :+= probe.expectNext()
} }
//test ordering of elements coming from each of nonempty flows
collected.filter(_ <= 4) should ===(1 to 4)
collected.filter(_ >= 5) should ===(5 to 10)
collected should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) collected.toSet should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
probe.expectComplete() probe.expectComplete()
} }

View file

@ -825,12 +825,27 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
setHandler(out, previous) setHandler(out, previous)
andThen() andThen()
if (followUps != null) { if (followUps != null) {
getHandler(out) match { /**
case e: Emitting[_] e.as[T].addFollowUps(this) * If (while executing andThen() callback) handler was changed to new emitting,
case _ * we should add it to the end of emission queue
*/
val currentHandler = getHandler(out)
if (currentHandler.isInstanceOf[Emitting[_]])
addFollowUp(currentHandler.asInstanceOf[Emitting[T]])
val next = dequeue() val next = dequeue()
if (next.isInstanceOf[EmittingCompletion[_]]) complete(out) if (next.isInstanceOf[EmittingCompletion[_]]) {
else setHandler(out, next) /**
* If next element is emitting completion and there are some elements after it,
* we to need pass them before completion
*/
if (next.followUps != null) {
setHandler(out, dequeueHeadAndAddToTail(next))
} else {
complete(out)
}
} else {
setHandler(out, next)
} }
} }
} }
@ -844,6 +859,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
followUpsTail = e followUpsTail = e
} }
private def dequeueHeadAndAddToTail(head: Emitting[T]): Emitting[T] = {
val next = head.dequeue()
next.addFollowUp(head)
head.followUps = null
head.followUpsTail = null
next
}
private def addFollowUps(e: Emitting[T]): Unit = private def addFollowUps(e: Emitting[T]): Unit =
if (followUps == null) { if (followUps == null) {
followUps = e.followUps followUps = e.followUps