Merge pull request #18904 from drewhk/wip-18798-interpreter-fuzzing

+str #18798: Fuzzing mode for the interpreter
This commit is contained in:
drewhk 2015-11-11 09:33:28 +01:00
commit c56d4396df
19 changed files with 209 additions and 52 deletions

View file

@ -10,6 +10,7 @@ import akka.stream.stage._
import scala.annotation.tailrec
import scala.collection.immutable
import akka.stream._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
/**
@ -332,7 +333,8 @@ private[stream] final class GraphInterpreter(
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
val logics: Array[GraphStageLogic], // Array of stage logics
val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit) {
val onAsyncInput: (GraphStageLogic, Any, (Any) Unit) Unit,
val fuzzingMode: Boolean) {
import GraphInterpreter._
// Maintains additional information for events, basically elements in-flight, or failure.
@ -579,14 +581,17 @@ private[stream] final class GraphInterpreter(
}
private def dequeue(): Int = {
if (queueHead == queueTail) NoEvent
else {
val idx = queueHead & mask
val elem = eventQueue(idx)
eventQueue(idx) = NoEvent
queueHead += 1
elem
val idx = queueHead & mask
if (fuzzingMode) {
val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask
val ev = eventQueue(swapWith)
eventQueue(swapWith) = eventQueue(idx)
eventQueue(idx) = ev
}
val elem = eventQueue(idx)
eventQueue(idx) = NoEvent
queueHead += 1
elem
}
private def enqueue(connection: Int): Unit = {
@ -624,8 +629,9 @@ private[stream] final class GraphInterpreter(
}
private[stream] def push(connection: Int, elem: Any): Unit = {
if ((portStates(connection) & InClosed) == 0) {
portStates(connection) ^= PushStartFlip
val currentState = portStates(connection)
portStates(connection) = currentState ^ PushStartFlip
if ((currentState & InClosed) == 0) {
connectionSlots(connection) = elem
enqueue(connection)
}
@ -633,8 +639,8 @@ private[stream] final class GraphInterpreter(
private[stream] def pull(connection: Int): Unit = {
val currentState = portStates(connection)
portStates(connection) = currentState ^ PullStartFlip
if ((currentState & OutClosed) == 0) {
portStates(connection) = currentState ^ PullStartFlip
enqueue(connection)
}
}