=str #24581 Bugfix in VirtualProcessor (#24722)

* Single-runnable repeater (hopefully)

* Repeat like no-one is watching

* Fix plus noisy printlns

* Toggleable debug logging for VirtualProcessor

* Updated docs graph

* Re-enable debug logging to pinpoint another potential bug

* Covered some more cases

* Revert "Repeat like no-one is watching"

This reverts commit ae5e41c80ad906a80cb3455a93f9f72a6ac1246d.

* Removed duplicate repeater

* Fixed some minor things

* copyright header

* MiMa filter

* 10000 times repeater passed PR-validation, removing

* Spinlock free solution

* MiMa

* Review feedback adressed

* Update StreamLayout.scala
This commit is contained in:
Johan Andrén 2018-03-19 09:25:36 +01:00 committed by Konrad `ktoso` Malawski
parent 382e4d82b9
commit 770b3a3474
2 changed files with 166 additions and 56 deletions

View file

@ -1,2 +1,6 @@
# #24604 Deduplicate logic for IODispatcher
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$")
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$")
# #24581 RS violation
ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.VirtualProcessor$Both")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProcessor#Both.create")

View file

@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.util.OptionVal
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.tailrec
@ -34,12 +35,24 @@ import scala.util.control.NonFatal
* INTERNAL API
*/
@InternalApi private[stream] object VirtualProcessor {
// intentional syntax to make compile time constant
final val Debug = false
sealed trait HasActualSubscriber {
def subscriber: Subscriber[Any]
}
case object Inert {
val subscriber = new CancellingSubscriber[Any]
}
case class Both(subscriber: Subscriber[Any])
object Both {
def create(s: Subscriber[_]) = Both(s.asInstanceOf[Subscriber[Any]])
final case class Both(subscriber: Subscriber[Any]) extends HasActualSubscriber
final case class Establishing(
subscriber: Subscriber[Any],
onCompleteBuffered: Boolean = false,
onErrorBuffered: OptionVal[Throwable] = OptionVal.None) extends HasActualSubscriber
object Establishing {
def create(s: Subscriber[_]) = Establishing(s.asInstanceOf[Subscriber[Any]])
}
}
@ -51,21 +64,29 @@ import scala.util.control.NonFatal
* downstream and upstream, this needs an atomic state machine which looks a
* little like this:
*
* +--------------+ (2) +------------+
* | null | ----------> | Subscriber |
* +--------------+ +------------+
* | |
* (1) | | (1)
* \|/ \|/
* +--------------+ (2) +------------+ --\
* | Subscription | ----------> | Both | | (4)
* +--------------+ +------------+ <-/
* | |
* (3) | | (3)
* \|/ \|/
* +--------------+ (2) +------------+ --\
* | Publisher | ----------> | Inert | | (4, *)
* +--------------+ +------------+ <-/
*
* +--------+ (2) +---------------+
* | null +------------>+ Subscriber |
* +---+----+ +-----+---------+
* | |
* (1)| | (1)
* v v
* +---+----------+ (2) +-----+---------+
* | Subscription +------>+ Establishing |
* +---+----------+ +-----+---------+
* | |
* | | (4)
* | v
* | +-----+---------+ ---
* | (3) | Both | | (5)
* | +-----+---------+ <--
* | |
* | |
* v v
* +---+----------+ (2) +-----+---------+ ---
* | Publisher +-----> | Inert | | (5, *)
* +--------------+ +---------------+ <--
*
*
* The idea is to keep the major state in only one atomic reference. The actions
* that can happen are:
@ -73,7 +94,8 @@ import scala.util.control.NonFatal
* (1) onSubscribe
* (2) subscribe
* (3) onError / onComplete
* (4) onNext
* (4) establishing subscription completes
* (5) onNext
* (*) Inert can be reached also by cancellation after which onNext is still fine
* so we just silently ignore possible spec violations here
*
@ -98,20 +120,28 @@ import scala.util.control.NonFatal
import VirtualProcessor._
override def toString: String = s"VirtualProcessor(${this.hashCode()})"
if (VirtualProcessor.Debug) println(s"created: $this")
override def subscribe(s: Subscriber[_ >: T]): Unit = {
@tailrec def rec(sub: Subscriber[Any]): Unit =
@tailrec def rec(sub: Subscriber[Any]): Unit = {
get() match {
case null if (!compareAndSet(null, s)) rec(sub)
case null
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).subscribe.rec($s) -> sub")
if (!compareAndSet(null, s)) rec(sub)
case subscription: Subscription
if (compareAndSet(subscription, Both(sub))) establishSubscription(sub, subscription)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscription).subscribe.rec($s) -> Establishing(sub)")
val establishing = Establishing(sub, false)
if (compareAndSet(subscription, establishing)) establishSubscription(establishing, subscription)
else rec(sub)
case pub: Publisher[_]
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($pub).subscribe.rec($s) -> Inert")
if (compareAndSet(pub, Inert)) pub.subscribe(sub)
else rec(sub)
case _
case other
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).subscribe.rec($s): rejectAdditionalSubscriber")
rejectAdditionalSubscriber(sub, "VirtualProcessor")
}
}
if (s == null) {
val ex = subscriberMustNotBeNullException
@ -120,26 +150,32 @@ import scala.util.control.NonFatal
} else rec(s.asInstanceOf[Subscriber[Any]])
}
override final def onSubscribe(s: Subscription): Unit = {
@tailrec def rec(obj: AnyRef): Unit =
override def onSubscribe(s: Subscription): Unit = {
@tailrec def rec(obj: AnyRef): Unit = {
get() match {
case null if (!compareAndSet(null, obj)) rec(obj)
case null
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onSubscribe.rec($obj) -> ${obj.getClass}")
if (!compareAndSet(null, obj)) rec(obj)
case subscriber: Subscriber[_]
obj match {
case subscription: Subscription
if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(subscriber, subscription)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscriber).onSubscribe.rec($obj) -> Establishing")
val establishing = Establishing.create(subscriber)
if (compareAndSet(subscriber, establishing)) establishSubscription(establishing, subscription)
else rec(obj)
case pub: Publisher[_]
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscriber).onSubscribe.rec($obj) -> INert")
getAndSet(Inert) match {
case Inert // nothing to be done
case _ pub.subscribe(subscriber.asInstanceOf[Subscriber[Any]])
}
}
case _
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(_).onSubscribe.rec($s) spec violation")
// spec violation
tryCancel(s)
}
}
if (s == null) {
val ex = subscriptionMustNotBeNullException
@ -148,18 +184,48 @@ import scala.util.control.NonFatal
} else rec(s)
}
private def establishSubscription(subscriber: Subscriber[_], subscription: Subscription): Unit = {
private def establishSubscription(establishing: Establishing, subscription: Subscription): Unit = {
val wrapped = new WrappedSubscription(subscription)
try {
subscriber.onSubscribe(wrapped)
// Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before
// onSubscribe completed
wrapped.ungateDemandAndRequestBuffered()
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription(wrapped)")
establishing.subscriber.onSubscribe(wrapped)
// while we were establishing some stuff could have happened
// most likely case, nobody changed it while we where establishing
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription.rec($establishing) -> Both")
if (compareAndSet(establishing, Both(establishing.subscriber))) {
// cas won - life is good
// Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before
// onSubscribe completed
wrapped.ungateDemandAndRequestBuffered()
} else {
// changed by someone else
get() match {
case Establishing(sub, _, OptionVal.Some(error))
// there was an onError while establishing
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription.rec(Establishing(buffered-error) -> Inert")
tryOnError(sub, error)
set(Inert)
case Establishing(sub, true, _)
// there was on onComplete while we were establishing
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription.rec(Establishing(buffered-complete) -> Inert")
tryOnComplete(sub)
set(Inert)
case Inert
tryCancel(subscription)
case other
throw new IllegalStateException(s"Unexpected state while establishing: [$other], if this ever happens it is a bug.")
}
}
} catch {
case NonFatal(ex)
set(Inert)
tryCancel(subscription)
tryOnError(subscriber, ex)
tryOnError(establishing.subscriber, ex)
}
}
@ -172,46 +238,66 @@ import scala.util.control.NonFatal
@tailrec def rec(ex: Throwable): Unit =
get() match {
case null
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${t.getMessage}) -> ErrorPublisher")
if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case s: Subscription
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> ErrorPublisher")
if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex)
else if (t == null) throw ex
case Both(s)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onError(${t.getMessage}) -> ErrorPublisher")
set(Inert)
try tryOnError(s, ex)
finally if (t == null) throw ex // must throw NPE, rule 2.13
case s: Subscriber[_] // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> Inert")
getAndSet(Inert) match {
case Inert // nothing to be done
case _ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
case _ if t == null
// cancelled before onError(null), must throw NPE, rule 2.13
throw ex
case _ // spec violation or cancellation race, but nothing we can do
case est @ Establishing(_, false, OptionVal.None)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${t.getMessage}), loop")
if (!compareAndSet(est, est.copy(onErrorBuffered = OptionVal.Some(ex)))) rec(ex)
case other // spec violation or cancellation race, but nothing we can do
if (t == null) throw ex // must throw NPE, rule 2.13
// spec violation or cancellation race, but nothing we can do
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onError(${t.getMessage}). spec violation or cancellation race")
}
val ex = if (t == null) exceptionMustNotBeNullException else t
rec(ex)
}
@tailrec override final def onComplete(): Unit =
@tailrec override def onComplete(): Unit = {
get() match {
case null if (!compareAndSet(null, EmptyPublisher)) onComplete()
case s: Subscription if (!compareAndSet(s, EmptyPublisher)) onComplete()
case Both(s)
case null
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onComplete -> EmptyPublisher")
if (!compareAndSet(null, EmptyPublisher)) onComplete()
case s: Subscription
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> EmptyPublisher")
if (!compareAndSet(s, EmptyPublisher)) onComplete()
case b @ Both(s)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert")
set(Inert)
tryOnComplete(s)
case s: Subscriber[_] // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert")
set(Inert)
EmptyPublisher.subscribe(s)
case _ // spec violation or cancellation race, but nothing we can do
case est @ Establishing(_, false, OptionVal.None)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onComplete -> Establishing with buffered complete")
if (!est.onCompleteBuffered && !compareAndSet(est, est.copy(onCompleteBuffered = true))) onComplete()
case other
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onComplete spec violation")
// spec violation or cancellation race, but nothing we can do
}
}
override def onNext(t: T): Unit =
if (t == null) {
val ex = elementMustNotBeNullException
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.onNext(null)")
@tailrec def rec(): Unit =
get() match {
case x @ (null | _: Subscription) if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec()
@ -222,28 +308,38 @@ import scala.util.control.NonFatal
rec()
throw ex // must throw NPE, rule 2:13
} else {
@tailrec def rec(): Unit =
@tailrec def rec(): Unit = {
get() match {
case Both(s)
try s.onNext(t)
catch {
case h: HasActualSubscriber
val s = h.subscriber
try {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(${h.getClass.getName}($s)).onNext($t).rec()")
s.onNext(t)
} catch {
case NonFatal(e)
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onNext($t) threw, spec violation -> Inert")
set(Inert)
throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e)
}
case s: Subscriber[_] // spec violation
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onNext($t).rec(): spec violation -> Inert")
val ex = new IllegalStateException(noDemand)
getAndSet(Inert) match {
case Inert // nothing to be done
case _ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s)
}
throw ex
case Inert | _: Publisher[_] // nothing to be done
case Inert | _: Publisher[_]
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Inert|Publisher).onNext($t).rec(): nop")
// nothing to be done
case other
if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onNext($t).rec() -> ErrorPublisher")
val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher")
if (!compareAndSet(other, pub)) rec()
else throw pub.t
}
}
rec()
}
@ -264,6 +360,7 @@ import scala.util.control.NonFatal
// Release
def ungateDemandAndRequestBuffered(): Unit = {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}.WrappedSubscription($real).ungateDemandAndRequestBuffered")
// Ungate demand
val requests = getAndSet(PassThrough).demand
// And request buffered demand
@ -272,11 +369,13 @@ import scala.util.control.NonFatal
override def request(n: Long): Unit = {
if (n < 1) {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}.WrappedSubscription($real).request($n)")
tryCancel(real)
VirtualProcessor.this.getAndSet(Inert) match {
case Both(s) rejectDueToNonPositiveDemand(s)
case Inert // another failure has won the race
case _ // this cannot possibly happen, but signaling errors is impossible at this point
case Both(subscriber) rejectDueToNonPositiveDemand(subscriber)
case est: Establishing rejectDueToNonPositiveDemand(est.subscriber)
case Inert // another failure has won the race
case _ // this cannot possibly happen, but signaling errors is impossible at this point
}
} else {
// NOTE: At this point, batched requests might not have been dispatched, i.e. this can reorder requests.
@ -286,13 +385,19 @@ import scala.util.control.NonFatal
// The only invariant we need to keep is to never emit more requests than the downstream emitted so far.
@tailrec def bufferDemand(n: Long): Unit = {
val current = get()
if (current eq PassThrough) real.request(n)
else if (!compareAndSet(current, Buffering(current.demand + n))) bufferDemand(n)
if (current eq PassThrough) {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription($real).bufferDemand($n) passthrough")
real.request(n)
} else if (!compareAndSet(current, Buffering(current.demand + n))) {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription($real).bufferDemand($n) buffering")
bufferDemand(n)
}
}
bufferDemand(n)
}
}
override def cancel(): Unit = {
if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription.cancel() -> Inert")
VirtualProcessor.this.set(Inert)
real.cancel()
}
@ -320,7 +425,6 @@ import scala.util.control.NonFatal
@InternalApi private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] {
import ReactiveStreamsCompliance._
import VirtualProcessor.Inert
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
requireNonNullSubscriber(subscriber)
@tailrec def rec(): Unit = {
@ -340,7 +444,8 @@ import scala.util.control.NonFatal
rec() // return value is boolean only to make the expressions above compile
}
@tailrec final def registerPublisher(pub: Publisher[_]): Unit =
@tailrec final def registerPublisher(pub: Publisher[_]): Unit = {
if (VirtualProcessor.Debug) println(s"$this.registerPublisher: $pub")
get() match {
case null
if (!compareAndSet(null, pub)) registerPublisher(pub) // retry
@ -355,6 +460,7 @@ import scala.util.control.NonFatal
case unexpected
throw new IllegalStateException(s"internal error, unexpected state: $unexpected")
}
}
override def toString: String = s"VirtualPublisher(state = ${get()})"
}