Merge pull request #22559 from jrudolph/jr/w/22554-fix-CCE-in-SubSink

=str #22554 fix ClassCastException in SubSink
This commit is contained in:
Johannes Rudolph 2017-03-15 15:26:18 +01:00 committed by GitHub
commit 70253fab3f
2 changed files with 55 additions and 33 deletions

View file

@ -572,7 +572,24 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va
/**
* INTERNAL API
*/
object SubSink {
private[stream] object SubSink {
sealed trait State
/** Not yet materialized and no command has been scheduled */
case object Uninitialized extends State
/** A command was scheduled before materialization */
sealed abstract class CommandScheduledBeforeMaterialization(val command: Command) extends State
// preallocated instances for both commands
/** A RequestOne command was scheduled before materialization */
case object RequestOneScheduledBeforeMaterialization extends CommandScheduledBeforeMaterialization(RequestOne)
/** A Cancel command was scheduled before materialization */
case object CancelScheduledBeforeMaterialization extends CommandScheduledBeforeMaterialization(Cancel)
/** Steady state: sink has been materialized, commands can be delivered through the callback */
// Represented in unwrapped form as AsyncCallback[Command] directly to prevent a level of indirection
// case class Materialized(callback: AsyncCallback[Command]) extends State
sealed trait Command
case object RequestOne extends Command
case object Cancel extends Command
@ -581,7 +598,7 @@ object SubSink {
/**
* INTERNAL API
*/
final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage Unit)
private[stream] final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage Unit)
extends GraphStage[SinkShape[T]] {
import SubSink._
@ -590,23 +607,27 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage
override def initialAttributes = Attributes.name(s"SubSink($name)")
override val shape = SinkShape(in)
private val status = new AtomicReference[AnyRef]
private val status = new AtomicReference[ /* State */ AnyRef](Uninitialized)
def pullSubstream(): Unit = {
def pullSubstream(): Unit = dispatchCommand(RequestOneScheduledBeforeMaterialization)
def cancelSubstream(): Unit = dispatchCommand(CancelScheduledBeforeMaterialization)
@tailrec
private def dispatchCommand(newState: CommandScheduledBeforeMaterialization): Unit =
status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(RequestOne)
case null
if (!status.compareAndSet(null, RequestOne))
status.get.asInstanceOf[Command Unit](RequestOne)
}
}
case /* Materialized */ callback: AsyncCallback[Command @unchecked] callback.invoke(newState.command)
case Uninitialized
if (!status.compareAndSet(Uninitialized, newState))
dispatchCommand(newState) // changed to materialized in the meantime
def cancelSubstream(): Unit = status.get match {
case f: AsyncCallback[Any] @unchecked f.invoke(Cancel)
case x // a potential RequestOne is overwritten
if (!status.compareAndSet(x, Cancel))
status.get.asInstanceOf[Command Unit](Cancel)
}
case RequestOneScheduledBeforeMaterialization if newState == CancelScheduledBeforeMaterialization
// cancellation is allowed to replace pull
if (!status.compareAndSet(RequestOneScheduledBeforeMaterialization, newState))
dispatchCommand(RequestOneScheduledBeforeMaterialization)
case cmd: CommandScheduledBeforeMaterialization
throw new IllegalStateException(s"${newState.command} on subsink is illegal when ${cmd.command} is still pending")
}
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler {
setHandler(in, this)
@ -615,29 +636,30 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage
override def onUpstreamFinish(): Unit = externalCallback(ActorSubscriberMessage.OnComplete)
override def onUpstreamFailure(ex: Throwable): Unit = externalCallback(ActorSubscriberMessage.OnError(ex))
@tailrec private def setCB(cb: AsyncCallback[Command]): Unit = {
@tailrec
private def setCallback(callback: Command Unit): Unit =
status.get match {
case null
if (!status.compareAndSet(null, cb)) setCB(cb)
case RequestOne
pull(in)
if (!status.compareAndSet(RequestOne, cb)) setCB(cb)
case Cancel
completeStage()
if (!status.compareAndSet(Cancel, cb)) setCB(cb)
case _: AsyncCallback[_]
case Uninitialized
if (!status.compareAndSet(Uninitialized, /* Materialized */ getAsyncCallback[Command](callback)))
setCallback(callback)
case cmd: CommandScheduledBeforeMaterialization
if (status.compareAndSet(cmd, /* Materialized */ getAsyncCallback[Command](callback)))
// between those two lines a new command might have been scheduled, but that will go through the
// async interface, so that the ordering is still kept
callback(cmd.command)
else
setCallback(callback)
case m: /* Materialized */ AsyncCallback[Command @unchecked]
failStage(new IllegalStateException("Substream Source cannot be materialized more than once"))
}
}
override def preStart(): Unit = {
val ourOwnCallback = getAsyncCallback[Command] {
override def preStart(): Unit =
setCallback {
case RequestOne tryPull(in)
case Cancel completeStage()
case _ throw new IllegalStateException("Bug")
}
setCB(ourOwnCallback)
}
}
override def toString: String = name