diff --git a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes index 22bab86520..531cacef61 100644 --- a/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.11.backwards.excludes @@ -1,2 +1,6 @@ # #24604 Deduplicate logic for IODispatcher -ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$") \ No newline at end of file +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.ActorAttributes$Dispatcher$") + +# #24581 RS violation +ProblemFilters.exclude[FinalClassProblem]("akka.stream.impl.VirtualProcessor$Both") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.VirtualProcessor#Both.create") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index a03f66903c..c585f0cf3e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.annotation.InternalApi import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes +import akka.util.OptionVal import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.annotation.tailrec @@ -34,12 +35,24 @@ import scala.util.control.NonFatal * INTERNAL API */ @InternalApi private[stream] object VirtualProcessor { + + // intentional syntax to make compile time constant + final val Debug = false + + sealed trait HasActualSubscriber { + def subscriber: Subscriber[Any] + } case object Inert { val subscriber = new CancellingSubscriber[Any] } - case class Both(subscriber: Subscriber[Any]) - object Both { - def create(s: Subscriber[_]) = Both(s.asInstanceOf[Subscriber[Any]]) + final case class Both(subscriber: Subscriber[Any]) extends HasActualSubscriber + + final case class Establishing( + subscriber: Subscriber[Any], + onCompleteBuffered: Boolean = false, + onErrorBuffered: OptionVal[Throwable] = OptionVal.None) extends HasActualSubscriber + object Establishing { + def create(s: Subscriber[_]) = Establishing(s.asInstanceOf[Subscriber[Any]]) } } @@ -51,21 +64,29 @@ import scala.util.control.NonFatal * downstream and upstream, this needs an atomic state machine which looks a * little like this: * - * +--------------+ (2) +------------+ - * | null | ----------> | Subscriber | - * +--------------+ +------------+ - * | | - * (1) | | (1) - * \|/ \|/ - * +--------------+ (2) +------------+ --\ - * | Subscription | ----------> | Both | | (4) - * +--------------+ +------------+ <-/ - * | | - * (3) | | (3) - * \|/ \|/ - * +--------------+ (2) +------------+ --\ - * | Publisher | ----------> | Inert | | (4, *) - * +--------------+ +------------+ <-/ + * + * +--------+ (2) +---------------+ + * | null +------------>+ Subscriber | + * +---+----+ +-----+---------+ + * | | + * (1)| | (1) + * v v + * +---+----------+ (2) +-----+---------+ + * | Subscription +------>+ Establishing | + * +---+----------+ +-----+---------+ + * | | + * | | (4) + * | v + * | +-----+---------+ --- + * | (3) | Both | | (5) + * | +-----+---------+ <-- + * | | + * | | + * v v + * +---+----------+ (2) +-----+---------+ --- + * | Publisher +-----> | Inert | | (5, *) + * +--------------+ +---------------+ <-- + * * * The idea is to keep the major state in only one atomic reference. The actions * that can happen are: @@ -73,7 +94,8 @@ import scala.util.control.NonFatal * (1) onSubscribe * (2) subscribe * (3) onError / onComplete - * (4) onNext + * (4) establishing subscription completes + * (5) onNext * (*) Inert can be reached also by cancellation after which onNext is still fine * so we just silently ignore possible spec violations here * @@ -98,20 +120,28 @@ import scala.util.control.NonFatal import VirtualProcessor._ override def toString: String = s"VirtualProcessor(${this.hashCode()})" + if (VirtualProcessor.Debug) println(s"created: $this") override def subscribe(s: Subscriber[_ >: T]): Unit = { - @tailrec def rec(sub: Subscriber[Any]): Unit = + @tailrec def rec(sub: Subscriber[Any]): Unit = { get() match { - case null ⇒ if (!compareAndSet(null, s)) rec(sub) + case null ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).subscribe.rec($s) -> sub") + if (!compareAndSet(null, s)) rec(sub) case subscription: Subscription ⇒ - if (compareAndSet(subscription, Both(sub))) establishSubscription(sub, subscription) + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscription).subscribe.rec($s) -> Establishing(sub)") + val establishing = Establishing(sub, false) + if (compareAndSet(subscription, establishing)) establishSubscription(establishing, subscription) else rec(sub) case pub: Publisher[_] ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($pub).subscribe.rec($s) -> Inert") if (compareAndSet(pub, Inert)) pub.subscribe(sub) else rec(sub) - case _ ⇒ + case other ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).subscribe.rec($s): rejectAdditionalSubscriber") rejectAdditionalSubscriber(sub, "VirtualProcessor") } + } if (s == null) { val ex = subscriberMustNotBeNullException @@ -120,26 +150,32 @@ import scala.util.control.NonFatal } else rec(s.asInstanceOf[Subscriber[Any]]) } - override final def onSubscribe(s: Subscription): Unit = { - - @tailrec def rec(obj: AnyRef): Unit = + override def onSubscribe(s: Subscription): Unit = { + @tailrec def rec(obj: AnyRef): Unit = { get() match { - case null ⇒ if (!compareAndSet(null, obj)) rec(obj) + case null ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onSubscribe.rec($obj) -> ${obj.getClass}") + if (!compareAndSet(null, obj)) rec(obj) case subscriber: Subscriber[_] ⇒ obj match { case subscription: Subscription ⇒ - if (compareAndSet(subscriber, Both.create(subscriber))) establishSubscription(subscriber, subscription) + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscriber).onSubscribe.rec($obj) -> Establishing") + val establishing = Establishing.create(subscriber) + if (compareAndSet(subscriber, establishing)) establishSubscription(establishing, subscription) else rec(obj) case pub: Publisher[_] ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($subscriber).onSubscribe.rec($obj) -> INert") getAndSet(Inert) match { case Inert ⇒ // nothing to be done case _ ⇒ pub.subscribe(subscriber.asInstanceOf[Subscriber[Any]]) } } case _ ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(_).onSubscribe.rec($s) spec violation") // spec violation tryCancel(s) } + } if (s == null) { val ex = subscriptionMustNotBeNullException @@ -148,18 +184,48 @@ import scala.util.control.NonFatal } else rec(s) } - private def establishSubscription(subscriber: Subscriber[_], subscription: Subscription): Unit = { + private def establishSubscription(establishing: Establishing, subscription: Subscription): Unit = { val wrapped = new WrappedSubscription(subscription) try { - subscriber.onSubscribe(wrapped) - // Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before - // onSubscribe completed - wrapped.ungateDemandAndRequestBuffered() + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription(wrapped)") + establishing.subscriber.onSubscribe(wrapped) + + // while we were establishing some stuff could have happened + // most likely case, nobody changed it while we where establishing + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription.rec($establishing) -> Both") + if (compareAndSet(establishing, Both(establishing.subscriber))) { + // cas won - life is good + // Requests will be only allowed once onSubscribe has returned to avoid reentering on an onNext before + // onSubscribe completed + wrapped.ungateDemandAndRequestBuffered() + } else { + // changed by someone else + get() match { + case Establishing(sub, _, OptionVal.Some(error)) ⇒ + // there was an onError while establishing + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription.rec(Establishing(buffered-error) -> Inert") + tryOnError(sub, error) + set(Inert) + + case Establishing(sub, true, _) ⇒ + // there was on onComplete while we were establishing + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.establishSubscription.rec(Establishing(buffered-complete) -> Inert") + tryOnComplete(sub) + set(Inert) + + case Inert ⇒ + tryCancel(subscription) + + case other ⇒ + throw new IllegalStateException(s"Unexpected state while establishing: [$other], if this ever happens it is a bug.") + } + } + } catch { case NonFatal(ex) ⇒ set(Inert) tryCancel(subscription) - tryOnError(subscriber, ex) + tryOnError(establishing.subscriber, ex) } } @@ -172,46 +238,66 @@ import scala.util.control.NonFatal @tailrec def rec(ex: Throwable): Unit = get() match { case null ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onError(${t.getMessage}) -> ErrorPublisher") if (!compareAndSet(null, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex) else if (t == null) throw ex case s: Subscription ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> ErrorPublisher") if (!compareAndSet(s, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec(ex) else if (t == null) throw ex case Both(s) ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onError(${t.getMessage}) -> ErrorPublisher") set(Inert) try tryOnError(s, ex) finally if (t == null) throw ex // must throw NPE, rule 2.13 case s: Subscriber[_] ⇒ // spec violation + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onError(${t.getMessage}) -> Inert") getAndSet(Inert) match { case Inert ⇒ // nothing to be done case _ ⇒ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s) } - case _ if t == null ⇒ - // cancelled before onError(null), must throw NPE, rule 2.13 - throw ex - case _ ⇒ // spec violation or cancellation race, but nothing we can do + case est @ Establishing(_, false, OptionVal.None) ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onError(${t.getMessage}), loop") + if (!compareAndSet(est, est.copy(onErrorBuffered = OptionVal.Some(ex)))) rec(ex) + case other ⇒ // spec violation or cancellation race, but nothing we can do + if (t == null) throw ex // must throw NPE, rule 2.13 + // spec violation or cancellation race, but nothing we can do + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onError(${t.getMessage}). spec violation or cancellation race") } val ex = if (t == null) exceptionMustNotBeNullException else t rec(ex) } - @tailrec override final def onComplete(): Unit = + @tailrec override def onComplete(): Unit = { get() match { - case null ⇒ if (!compareAndSet(null, EmptyPublisher)) onComplete() - case s: Subscription ⇒ if (!compareAndSet(s, EmptyPublisher)) onComplete() - case Both(s) ⇒ + case null ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(null).onComplete -> EmptyPublisher") + if (!compareAndSet(null, EmptyPublisher)) onComplete() + case s: Subscription ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> EmptyPublisher") + if (!compareAndSet(s, EmptyPublisher)) onComplete() + case b @ Both(s) ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert") set(Inert) tryOnComplete(s) case s: Subscriber[_] ⇒ // spec violation + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onComplete -> Inert") set(Inert) EmptyPublisher.subscribe(s) - case _ ⇒ // spec violation or cancellation race, but nothing we can do + case est @ Establishing(_, false, OptionVal.None) ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($est).onComplete -> Establishing with buffered complete") + if (!est.onCompleteBuffered && !compareAndSet(est, est.copy(onCompleteBuffered = true))) onComplete() + case other ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onComplete spec violation") + // spec violation or cancellation race, but nothing we can do } + } override def onNext(t: T): Unit = if (t == null) { val ex = elementMustNotBeNullException + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode.onNext(null)") @tailrec def rec(): Unit = get() match { case x @ (null | _: Subscription) ⇒ if (!compareAndSet(x, ErrorPublisher(ex, "failed-VirtualProcessor"))) rec() @@ -222,28 +308,38 @@ import scala.util.control.NonFatal rec() throw ex // must throw NPE, rule 2:13 } else { - @tailrec def rec(): Unit = + @tailrec def rec(): Unit = { get() match { - case Both(s) ⇒ - try s.onNext(t) - catch { + case h: HasActualSubscriber ⇒ + val s = h.subscriber + try { + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(${h.getClass.getName}($s)).onNext($t).rec()") + s.onNext(t) + } catch { case NonFatal(e) ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Both($s)).onNext($t) threw, spec violation -> Inert") set(Inert) throw new IllegalStateException("Subscriber threw exception, this is in violation of rule 2:13", e) } + case s: Subscriber[_] ⇒ // spec violation + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($s).onNext($t).rec(): spec violation -> Inert") val ex = new IllegalStateException(noDemand) getAndSet(Inert) match { case Inert ⇒ // nothing to be done case _ ⇒ ErrorPublisher(ex, "failed-VirtualProcessor").subscribe(s) } throw ex - case Inert | _: Publisher[_] ⇒ // nothing to be done + case Inert | _: Publisher[_] ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode(Inert|Publisher).onNext($t).rec(): nop") + // nothing to be done case other ⇒ + if (VirtualProcessor.Debug) println(s"VirtualPublisher#$hashCode($other).onNext($t).rec() -> ErrorPublisher") val pub = ErrorPublisher(new IllegalStateException(noDemand), "failed-VirtualPublisher") if (!compareAndSet(other, pub)) rec() else throw pub.t } + } rec() } @@ -264,6 +360,7 @@ import scala.util.control.NonFatal // Release def ungateDemandAndRequestBuffered(): Unit = { + if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}.WrappedSubscription($real).ungateDemandAndRequestBuffered") // Ungate demand val requests = getAndSet(PassThrough).demand // And request buffered demand @@ -272,11 +369,13 @@ import scala.util.control.NonFatal override def request(n: Long): Unit = { if (n < 1) { + if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}.WrappedSubscription($real).request($n)") tryCancel(real) VirtualProcessor.this.getAndSet(Inert) match { - case Both(s) ⇒ rejectDueToNonPositiveDemand(s) - case Inert ⇒ // another failure has won the race - case _ ⇒ // this cannot possibly happen, but signaling errors is impossible at this point + case Both(subscriber) ⇒ rejectDueToNonPositiveDemand(subscriber) + case est: Establishing ⇒ rejectDueToNonPositiveDemand(est.subscriber) + case Inert ⇒ // another failure has won the race + case _ ⇒ // this cannot possibly happen, but signaling errors is impossible at this point } } else { // NOTE: At this point, batched requests might not have been dispatched, i.e. this can reorder requests. @@ -286,13 +385,19 @@ import scala.util.control.NonFatal // The only invariant we need to keep is to never emit more requests than the downstream emitted so far. @tailrec def bufferDemand(n: Long): Unit = { val current = get() - if (current eq PassThrough) real.request(n) - else if (!compareAndSet(current, Buffering(current.demand + n))) bufferDemand(n) + if (current eq PassThrough) { + if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription($real).bufferDemand($n) passthrough") + real.request(n) + } else if (!compareAndSet(current, Buffering(current.demand + n))) { + if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription($real).bufferDemand($n) buffering") + bufferDemand(n) + } } bufferDemand(n) } } override def cancel(): Unit = { + if (VirtualProcessor.Debug) println(s"VirtualPublisher#${VirtualProcessor.this.hashCode}WrappedSubscription.cancel() -> Inert") VirtualProcessor.this.set(Inert) real.cancel() } @@ -320,7 +425,6 @@ import scala.util.control.NonFatal @InternalApi private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] { import ReactiveStreamsCompliance._ import VirtualProcessor.Inert - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { requireNonNullSubscriber(subscriber) @tailrec def rec(): Unit = { @@ -340,7 +444,8 @@ import scala.util.control.NonFatal rec() // return value is boolean only to make the expressions above compile } - @tailrec final def registerPublisher(pub: Publisher[_]): Unit = + @tailrec final def registerPublisher(pub: Publisher[_]): Unit = { + if (VirtualProcessor.Debug) println(s"$this.registerPublisher: $pub") get() match { case null ⇒ if (!compareAndSet(null, pub)) registerPublisher(pub) // retry @@ -355,6 +460,7 @@ import scala.util.control.NonFatal case unexpected ⇒ throw new IllegalStateException(s"internal error, unexpected state: $unexpected") } + } override def toString: String = s"VirtualPublisher(state = ${get()})" }