Microoptimizaitons in the interpreter

This commit is contained in:
Endre Sándor Varga 2016-07-20 13:53:53 +02:00
parent a6cf6c646e
commit b01acfd8d6
2 changed files with 34 additions and 28 deletions

View file

@ -650,7 +650,7 @@ final class GraphInterpreter(
elem
}
private def enqueue(connection: Int): Unit = {
def enqueue(connection: Int): Unit = {
if (Debug) if (queueTail - queueHead > mask) new Exception(s"$Name internal queue full ($queueStatus) + $connection").printStackTrace()
eventQueue(queueTail & mask) = connection
queueTail += 1
@ -688,23 +688,6 @@ final class GraphInterpreter(
}
}
private[stream] def push(connection: Int, elem: Any): Unit = {
val currentState = portStates(connection)
portStates(connection) = currentState ^ PushStartFlip
if ((currentState & InClosed) == 0) {
connectionSlots(connection) = elem
enqueue(connection)
}
}
private[stream] def pull(connection: Int): Unit = {
val currentState = portStates(connection)
portStates(connection) = currentState ^ PullStartFlip
if ((currentState & OutClosed) == 0) {
enqueue(connection)
}
}
private[stream] def complete(connection: Int): Unit = {
val currentState = portStates(connection)
if (Debug) println(s"$Name complete($connection) [$currentState]")

View file

@ -340,12 +340,21 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* query whether pull is allowed to be called or not. This method will also fail if the port is already closed.
*/
final protected def pull[T](in: Inlet[T]): Unit = {
if ((interpreter.portStates(conn(in)) & (InReady | InClosed)) == InReady) {
interpreter.pull(conn(in))
val connection = conn(in)
val portState = interpreter.portStates(connection)
val it = interpreter
if ((portState & (InReady | InClosed | OutClosed)) == InReady) {
it.portStates(connection) = portState ^ PullStartFlip
it.enqueue(connection)
} else {
// Detailed error information should not add overhead to the hot path
require(!isClosed(in), s"Cannot pull closed port ($in)")
require(!hasBeenPulled(in), s"Cannot pull port ($in) twice")
// There were no errors, the pull was simply ignored as the target stage already closed its port. We
// still need to track proper state though.
it.portStates(connection) = portState ^ PullStartFlip
}
}
@ -371,18 +380,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/
final protected def grab[T](in: Inlet[T]): T = {
val connection = conn(in)
val it = interpreter
val elem = it.connectionSlots(connection)
// Fast path
if ((interpreter.portStates(connection) & (InReady | InFailed)) == InReady &&
(interpreter.connectionSlots(connection).asInstanceOf[AnyRef] ne Empty)) {
val elem = interpreter.connectionSlots(connection)
interpreter.connectionSlots(connection) = Empty
if ((it.portStates(connection) & (InReady | InFailed)) == InReady && (elem.asInstanceOf[AnyRef] ne Empty)) {
it.connectionSlots(connection) = Empty
elem.asInstanceOf[T]
} else {
// Slow path
require(isAvailable(in), s"Cannot get element from already empty input port ($in)")
val failed = interpreter.connectionSlots(connection).asInstanceOf[Failed]
val failed = it.connectionSlots(connection).asInstanceOf[Failed]
val elem = failed.previousElem.asInstanceOf[T]
interpreter.connectionSlots(connection) = Failed(failed.ex, Empty)
it.connectionSlots(connection) = Failed(failed.ex, Empty)
elem
}
}
@ -428,13 +438,26 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* used to check if the port is ready to be pushed or not.
*/
final protected def push[T](out: Outlet[T], elem: T): Unit = {
if ((interpreter.portStates(conn(out)) & (OutReady | OutClosed)) == OutReady && (elem != null)) {
interpreter.push(conn(out), elem)
val connection = conn(out)
val portState = interpreter.portStates(connection)
val it = interpreter
it.portStates(connection) = portState ^ PushStartFlip
if ((portState & (OutReady | OutClosed | InClosed)) == OutReady && (elem != null)) {
it.connectionSlots(connection) = elem
it.enqueue(connection)
} else {
// Restore state for the error case
it.portStates(connection) = portState
// Detailed error information should not add overhead to the hot path
ReactiveStreamsCompliance.requireNonNullElement(elem)
require(isAvailable(out), s"Cannot push port ($out) twice")
require(!isClosed(out), s"Cannot pull closed port ($out)")
// No error, just InClosed caused the actual pull to be ignored, but the status flag still needs to be flipped
it.portStates(connection) = portState ^ PushStartFlip
}
}