=str #24413 make sure to check if available to push (#24434)

* =str #24413 make sure to check if available to push

* address review

* Update SourceRefBenchmark.scala
This commit is contained in:
Konrad `ktoso` Malawski 2018-01-30 00:27:40 +09:00 committed by GitHub
parent 81c5b1d571
commit 800923522a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 7 deletions

View file

@ -119,7 +119,7 @@ private[stream] final class SourceRefStageImpl[Out](
}
}
def scheduleDemandRedelivery() =
def scheduleDemandRedelivery(): Unit =
scheduleOnce(DemandRedeliveryTimerKey, settings.demandRedeliveryInterval)
override protected def onTimer(timerKey: Any): Unit = timerKey match {
@ -183,17 +183,20 @@ private[stream] final class SourceRefStageImpl[Out](
}
def tryPush(): Unit =
if (receiveBuffer.nonEmpty) push(out, receiveBuffer.dequeue())
else if ( /* buffer is empty && */ completed) completeStage()
if (receiveBuffer.nonEmpty && isAvailable(out)) push(out, receiveBuffer.dequeue())
else if (receiveBuffer.isEmpty && completed) completeStage()
private def onReceiveElement(payload: Out): Unit = {
localRemainingRequested -= 1
if (receiveBuffer.isEmpty && isAvailable(out))
if (receiveBuffer.isEmpty && isAvailable(out)) {
push(out, payload)
else if (receiveBuffer.isFull)
throw new IllegalStateException(s"Attempted to overflow buffer! Capacity: ${receiveBuffer.capacity}, incoming element: $payload, localRemainingRequested: ${localRemainingRequested}, localCumulativeDemand: ${localCumulativeDemand}")
else
} else if (receiveBuffer.isFull) {
throw new IllegalStateException(s"Attempted to overflow buffer! " +
s"Capacity: ${receiveBuffer.capacity}, incoming element: $payload, " +
s"localRemainingRequested: $localRemainingRequested, localCumulativeDemand: $localCumulativeDemand")
} else {
receiveBuffer.enqueue(payload)
}
}
/** @throws InvalidPartnerActorException when partner ref is invalid */