Balancer should not push to a closed out #20943

This commit is contained in:
Johan Andrén 2016-08-18 15:44:27 +02:00 committed by GitHub
parent c8bcbc53dd
commit a81a61ba1f
3 changed files with 45 additions and 4 deletions

View file

@ -3,10 +3,11 @@ package akka.stream.scaladsl
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.Future
import akka.stream.{ SourceShape, ClosedShape, ActorMaterializer, ActorMaterializerSettings }
import akka.stream._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._
import akka.util.ByteString
class GraphBalanceSpec extends StreamSpec {
@ -256,6 +257,36 @@ class GraphBalanceSpec extends StreamSpec {
bsub.expectCancellation()
}
// Bug #20943
"not push output twice" in assertAllStagesStopped {
val p1 = TestPublisher.manualProbe[Int]()
val c1 = TestSubscriber.manualProbe[Int]()
val c2 = TestSubscriber.manualProbe[Int]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b
val balance = b.add(Balance[Int](2))
Source.fromPublisher(p1.getPublisher) ~> balance.in
balance.out(0) ~> Sink.fromSubscriber(c1)
balance.out(1) ~> Sink.fromSubscriber(c2)
ClosedShape
}).run()
val bsub = p1.expectSubscription()
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.request(1)
p1.expectRequest(bsub, 16)
bsub.sendNext(1)
c1.expectNext(1)
sub2.request(1)
sub2.cancel()
bsub.sendNext(2)
sub1.cancel()
bsub.expectCancellation()
}
}
}

View file

@ -617,10 +617,20 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext
private var needDownstreamPulls: Int = if (waitForAllDownstreams) outputPorts else 0
private var downstreamsRunning: Int = outputPorts
@tailrec
private def dequeueAndDispatch(): Unit = {
val out = pendingQueue.dequeue()
push(out, grab(in))
if (!noPending) pull(in)
// out is null if depleted pendingQueue without reaching
// an out that is not closed, in which case we just return
if (out ne null) {
if (!isClosed(out)) {
push(out, grab(in))
if (!noPending) pull(in)
} else {
// try to find one output that isn't closed
dequeueAndDispatch()
}
}
}
setHandler(in, new InHandler {

View file

@ -455,8 +455,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
// Detailed error information should not add overhead to the hot path
ReactiveStreamsCompliance.requireNonNullElement(elem)
require(isAvailable(out), s"Cannot push port ($out) twice")
require(!isClosed(out), s"Cannot pull closed port ($out)")
require(isAvailable(out), s"Cannot push port ($out) twice")
// No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped
connection.portState = portState ^ PushStartFlip