From 5704c72ece3fefe22fd6ec1020e0171946ba2ef5 Mon Sep 17 00:00:00 2001 From: Kirill Yankov Date: Tue, 7 Mar 2017 16:56:55 +0300 Subject: [PATCH] fixed corruption of ordering in MergePreferred caused by using emit() with emit in andThen() callback #22446 --- .../stream/impl/GraphStageLogicSpec.scala | 6 ++-- .../scaladsl/GraphMergePreferredSpec.scala | 10 ++++-- .../akka/stream/scaladsl/GraphMergeSpec.scala | 9 +++-- .../scala/akka/stream/stage/GraphStage.scala | 35 +++++++++++++++---- 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala index bfc32d3632..91222f8744 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/GraphStageLogicSpec.scala @@ -140,7 +140,10 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit { Source.empty.via(emit1234.named("testStage")).runWith(TestSink.probe) .request(5) - .expectNext(1, 2, 3, 4) + .expectNext(1) + //emitting with callback gives nondeterminism whether 2 or 3 will be pushed first + .expectNextUnordered(2, 3) + .expectNext(4) .expectComplete() } @@ -236,4 +239,3 @@ class GraphStageLogicSpec extends StreamSpec with GraphInterpreterSpecKit { } } - diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala index e53036b91f..1182c00210 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergePreferredSpec.scala @@ -46,7 +46,7 @@ class GraphMergePreferredSpec extends TwoStreamsSetup { Await.result(result, 3.seconds).filter(_ == 1).size should be(numElements) } - "eventually pass through all elements" in { + "eventually pass through all elements without corrupting the ordering" in { val result = RunnableGraph.fromGraph(GraphDSL.create(Sink.head[Seq[Int]]) { implicit b ⇒ sink ⇒ val merge = b.add(MergePreferred[Int](3)) Source(1 to 100) ~> merge.preferred @@ -58,7 +58,13 @@ class GraphMergePreferredSpec extends TwoStreamsSetup { ClosedShape }).run() - Await.result(result, 3.seconds).toSet should ===((1 to 400).toSet) + val resultSeq = Await.result(result, 3.seconds) + resultSeq.toSet should ===((1 to 400).toSet) + //test ordering of elements coming from each of the flows + resultSeq.filter(_ <= 100) should ===(1 to 100) + resultSeq.filter(e ⇒ e > 100 && e <= 200) should ===(101 to 200) + resultSeq.filter(e ⇒ e > 200 && e <= 300) should ===(201 to 300) + resultSeq.filter(e ⇒ e > 300 && e <= 400) should ===(301 to 400) } "disallow multiple preferred inputs" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index 15ccf8c0d8..da2e5d547c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -49,13 +49,16 @@ class GraphMergeSpec extends TwoStreamsSetup { val subscription = probe.expectSubscription() - var collected = Set.empty[Int] + var collected = Seq.empty[Int] for (_ ← 1 to 10) { subscription.request(1) - collected += probe.expectNext() + collected :+= probe.expectNext() } + //test ordering of elements coming from each of nonempty flows + collected.filter(_ <= 4) should ===(1 to 4) + collected.filter(_ >= 5) should ===(5 to 10) - collected should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + collected.toSet should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) probe.expectComplete() } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 92198412b9..e7fcbdab0a 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -825,12 +825,27 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: setHandler(out, previous) andThen() if (followUps != null) { - getHandler(out) match { - case e: Emitting[_] ⇒ e.as[T].addFollowUps(this) - case _ ⇒ - val next = dequeue() - if (next.isInstanceOf[EmittingCompletion[_]]) complete(out) - else setHandler(out, next) + /** + * If (while executing andThen() callback) handler was changed to new emitting, + * we should add it to the end of emission queue + */ + val currentHandler = getHandler(out) + if (currentHandler.isInstanceOf[Emitting[_]]) + addFollowUp(currentHandler.asInstanceOf[Emitting[T]]) + + val next = dequeue() + if (next.isInstanceOf[EmittingCompletion[_]]) { + /** + * If next element is emitting completion and there are some elements after it, + * we to need pass them before completion + */ + if (next.followUps != null) { + setHandler(out, dequeueHeadAndAddToTail(next)) + } else { + complete(out) + } + } else { + setHandler(out, next) } } } @@ -844,6 +859,14 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: followUpsTail = e } + private def dequeueHeadAndAddToTail(head: Emitting[T]): Emitting[T] = { + val next = head.dequeue() + next.addFollowUp(head) + head.followUps = null + head.followUpsTail = null + next + } + private def addFollowUps(e: Emitting[T]): Unit = if (followUps == null) { followUps = e.followUps