From c5076a3b9a4679cbb44fcd2a5d73fad64c20e451 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 14 Mar 2017 17:22:48 +0100 Subject: [PATCH 1/2] =str formatting fix --- akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index c8b3576596..fbf54b9f55 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -26,7 +26,7 @@ private[akka] object ConstantFun { def javaAnyToNone[A, B]: A ⇒ Option[B] = none val conforms = (a: Any) ⇒ a - + val zeroLong = (_: Any) ⇒ 0L val oneLong = (_: Any) ⇒ 1L From 5b9295b3f05893d326e022bda6bd4360cd52fe49 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 15 Mar 2017 11:55:52 +0100 Subject: [PATCH 2/2] =str #22554 fix ClassCastException in SubSink Fixes #22554. --- .../stream/impl/fusing/StreamOfStreams.scala | 86 ++++++++++++------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 179d4d391c..34f7b4dd71 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -572,7 +572,24 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va /** * INTERNAL API */ -object SubSink { +private[stream] object SubSink { + sealed trait State + /** Not yet materialized and no command has been scheduled */ + case object Uninitialized extends State + + /** A command was scheduled before materialization */ + sealed abstract class CommandScheduledBeforeMaterialization(val command: Command) extends State + + // preallocated instances for both commands + /** A RequestOne command was scheduled before materialization */ + case object RequestOneScheduledBeforeMaterialization extends CommandScheduledBeforeMaterialization(RequestOne) + /** A Cancel command was scheduled before materialization */ + case object CancelScheduledBeforeMaterialization extends CommandScheduledBeforeMaterialization(Cancel) + + /** Steady state: sink has been materialized, commands can be delivered through the callback */ + // Represented in unwrapped form as AsyncCallback[Command] directly to prevent a level of indirection + // case class Materialized(callback: AsyncCallback[Command]) extends State + sealed trait Command case object RequestOne extends Command case object Cancel extends Command @@ -581,7 +598,7 @@ object SubSink { /** * INTERNAL API */ -final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage ⇒ Unit) +private[stream] final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage ⇒ Unit) extends GraphStage[SinkShape[T]] { import SubSink._ @@ -590,23 +607,27 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage override def initialAttributes = Attributes.name(s"SubSink($name)") override val shape = SinkShape(in) - private val status = new AtomicReference[AnyRef] + private val status = new AtomicReference[ /* State */ AnyRef](Uninitialized) - def pullSubstream(): Unit = { + def pullSubstream(): Unit = dispatchCommand(RequestOneScheduledBeforeMaterialization) + def cancelSubstream(): Unit = dispatchCommand(CancelScheduledBeforeMaterialization) + + @tailrec + private def dispatchCommand(newState: CommandScheduledBeforeMaterialization): Unit = status.get match { - case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(RequestOne) - case null ⇒ - if (!status.compareAndSet(null, RequestOne)) - status.get.asInstanceOf[Command ⇒ Unit](RequestOne) - } - } + case /* Materialized */ callback: AsyncCallback[Command @unchecked] ⇒ callback.invoke(newState.command) + case Uninitialized ⇒ + if (!status.compareAndSet(Uninitialized, newState)) + dispatchCommand(newState) // changed to materialized in the meantime - def cancelSubstream(): Unit = status.get match { - case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(Cancel) - case x ⇒ // a potential RequestOne is overwritten - if (!status.compareAndSet(x, Cancel)) - status.get.asInstanceOf[Command ⇒ Unit](Cancel) - } + case RequestOneScheduledBeforeMaterialization if newState == CancelScheduledBeforeMaterialization ⇒ + // cancellation is allowed to replace pull + if (!status.compareAndSet(RequestOneScheduledBeforeMaterialization, newState)) + dispatchCommand(RequestOneScheduledBeforeMaterialization) + + case cmd: CommandScheduledBeforeMaterialization ⇒ + throw new IllegalStateException(s"${newState.command} on subsink is illegal when ${cmd.command} is still pending") + } override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler { setHandler(in, this) @@ -615,29 +636,30 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage override def onUpstreamFinish(): Unit = externalCallback(ActorSubscriberMessage.OnComplete) override def onUpstreamFailure(ex: Throwable): Unit = externalCallback(ActorSubscriberMessage.OnError(ex)) - @tailrec private def setCB(cb: AsyncCallback[Command]): Unit = { + @tailrec + private def setCallback(callback: Command ⇒ Unit): Unit = status.get match { - case null ⇒ - if (!status.compareAndSet(null, cb)) setCB(cb) - case RequestOne ⇒ - pull(in) - if (!status.compareAndSet(RequestOne, cb)) setCB(cb) - case Cancel ⇒ - completeStage() - if (!status.compareAndSet(Cancel, cb)) setCB(cb) - case _: AsyncCallback[_] ⇒ + case Uninitialized ⇒ + if (!status.compareAndSet(Uninitialized, /* Materialized */ getAsyncCallback[Command](callback))) + setCallback(callback) + + case cmd: CommandScheduledBeforeMaterialization ⇒ + if (status.compareAndSet(cmd, /* Materialized */ getAsyncCallback[Command](callback))) + // between those two lines a new command might have been scheduled, but that will go through the + // async interface, so that the ordering is still kept + callback(cmd.command) + else + setCallback(callback) + + case m: /* Materialized */ AsyncCallback[Command @unchecked] ⇒ failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) } - } - override def preStart(): Unit = { - val ourOwnCallback = getAsyncCallback[Command] { + override def preStart(): Unit = + setCallback { case RequestOne ⇒ tryPull(in) case Cancel ⇒ completeStage() - case _ ⇒ throw new IllegalStateException("Bug") } - setCB(ourOwnCallback) - } } override def toString: String = name