#19342 Eliminated race in SubstreamSubscriptionTimeoutSpec due to concat
This commit is contained in:
parent
b1d99ca5a8
commit
5359e0f6f6
2 changed files with 10 additions and 213 deletions
|
|
@ -224,204 +224,3 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorMaterial
|
|||
override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object TwoStreamInputProcessor {
|
||||
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
|
||||
override def onError(cause: Throwable): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullException(cause)
|
||||
impl ! OtherStreamOnError(cause)
|
||||
}
|
||||
override def onComplete(): Unit = impl ! OtherStreamOnComplete
|
||||
override def onNext(element: T): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullElement(element)
|
||||
impl ! OtherStreamOnNext(element)
|
||||
}
|
||||
override def onSubscribe(subscription: Subscription): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
|
||||
impl ! OtherStreamOnSubscribe(subscription)
|
||||
}
|
||||
}
|
||||
|
||||
case object OtherStreamOnComplete extends DeadLetterSuppression
|
||||
final case class OtherStreamOnNext(element: Any) extends DeadLetterSuppression
|
||||
final case class OtherStreamOnSubscribe(subscription: Subscription) extends DeadLetterSuppression
|
||||
final case class OtherStreamOnError(ex: Throwable) extends DeadLetterSuppression
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class TwoStreamInputProcessor(_settings: ActorMaterializerSettings, val other: Publisher[Any])
|
||||
extends ActorProcessorImpl(_settings) {
|
||||
import akka.stream.impl.TwoStreamInputProcessor._
|
||||
|
||||
val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
|
||||
override val subreceive: SubReceive = new SubReceive(waitingForUpstream)
|
||||
|
||||
override def inputOnError(e: Throwable): Unit = TwoStreamInputProcessor.this.onError(e)
|
||||
|
||||
override def waitingForUpstream: Receive = {
|
||||
case OtherStreamOnComplete ⇒ onComplete()
|
||||
case OtherStreamOnSubscribe(subscription) ⇒ onSubscribe(subscription)
|
||||
case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e)
|
||||
}
|
||||
|
||||
override def upstreamRunning: Receive = {
|
||||
case OtherStreamOnNext(element) ⇒ enqueueInputElement(element)
|
||||
case OtherStreamOnComplete ⇒ onComplete()
|
||||
case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e)
|
||||
}
|
||||
override protected def completed: Actor.Receive = {
|
||||
case OtherStreamOnSubscribe(_) ⇒ throw ActorPublisher.NormalShutdownReason
|
||||
}
|
||||
}
|
||||
|
||||
override def activeReceive: Receive =
|
||||
secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
|
||||
|
||||
other.subscribe(new OtherActorSubscriber(self))
|
||||
|
||||
override def pumpFinished(): Unit = {
|
||||
secondaryInputs.cancel()
|
||||
super.pumpFinished()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object MultiStreamInputProcessor {
|
||||
case class SubstreamKey(id: Long)
|
||||
|
||||
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends AtomicReference[Subscription] with Subscriber[T] {
|
||||
override def onError(cause: Throwable): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullException(cause)
|
||||
impl ! SubstreamOnError(key, cause)
|
||||
}
|
||||
override def onComplete(): Unit = impl ! SubstreamOnComplete(key)
|
||||
override def onNext(element: T): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullElement(element)
|
||||
impl ! SubstreamOnNext(key, element)
|
||||
}
|
||||
override def onSubscribe(subscription: Subscription): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
|
||||
if (compareAndSet(null, subscription)) impl ! SubstreamStreamOnSubscribe(key, subscription)
|
||||
else subscription.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
case class SubstreamOnComplete(key: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
case class SubstreamOnNext(key: SubstreamKey, element: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
case class SubstreamOnError(key: SubstreamKey, e: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
|
||||
class SubstreamInput(val key: SubstreamKey, bufferSize: Int, processor: MultiStreamInputProcessorLike, pump: Pump) extends BatchingInputBuffer(bufferSize, pump) {
|
||||
// Not driven directly
|
||||
override val subreceive = new SubReceive(Actor.emptyBehavior)
|
||||
|
||||
def substreamOnComplete(): Unit = onComplete()
|
||||
def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription)
|
||||
def substreamOnError(e: Throwable): Unit = onError(e)
|
||||
def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem)
|
||||
|
||||
override protected def inputOnError(e: Throwable): Unit = {
|
||||
super.inputOnError(e)
|
||||
processor.invalidateSubstreamInput(key, e)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] trait MultiStreamInputProcessorLike extends Pump { this: Actor ⇒
|
||||
|
||||
import MultiStreamInputProcessor._
|
||||
|
||||
protected def nextId(): Long
|
||||
protected def inputBufferSize: Int
|
||||
|
||||
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInput]
|
||||
private val waitingForOnSubscribe = collection.mutable.Map.empty[SubstreamKey, SubstreamSubscriber[Any]]
|
||||
|
||||
val inputSubstreamManagement: Receive = {
|
||||
case SubstreamStreamOnSubscribe(key, subscription) ⇒
|
||||
substreamInputs(key).substreamOnSubscribe(subscription)
|
||||
waitingForOnSubscribe -= key
|
||||
case SubstreamOnNext(key, element) ⇒
|
||||
substreamInputs(key).substreamOnNext(element)
|
||||
case SubstreamOnComplete(key) ⇒
|
||||
substreamInputs(key).substreamOnComplete()
|
||||
substreamInputs -= key
|
||||
case SubstreamOnError(key, e) ⇒
|
||||
substreamInputs(key).substreamOnError(e)
|
||||
}
|
||||
|
||||
def createSubstreamInput(): SubstreamInput = {
|
||||
val key = SubstreamKey(nextId())
|
||||
val inputs = new SubstreamInput(key, inputBufferSize, this, this)
|
||||
substreamInputs(key) = inputs
|
||||
inputs
|
||||
}
|
||||
|
||||
def createAndSubscribeSubstreamInput(p: Publisher[Any]): SubstreamInput = {
|
||||
val inputs = createSubstreamInput()
|
||||
val sub = new SubstreamSubscriber[Any](self, inputs.key)
|
||||
waitingForOnSubscribe(inputs.key) = sub
|
||||
p.subscribe(sub)
|
||||
inputs
|
||||
}
|
||||
|
||||
def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = {
|
||||
substreamInputs(substream).cancel()
|
||||
substreamInputs -= substream
|
||||
pump()
|
||||
}
|
||||
|
||||
protected def failInputs(e: Throwable): Unit = {
|
||||
cancelWaitingForOnSubscribe()
|
||||
substreamInputs.values foreach (_.cancel())
|
||||
}
|
||||
|
||||
protected def finishInputs(): Unit = {
|
||||
cancelWaitingForOnSubscribe()
|
||||
substreamInputs.values foreach (_.cancel())
|
||||
}
|
||||
|
||||
private def cancelWaitingForOnSubscribe(): Unit =
|
||||
waitingForOnSubscribe.valuesIterator.foreach { sub ⇒
|
||||
sub.getAndSet(CancelledSubscription) match {
|
||||
case null ⇒ // we were first
|
||||
case subscription ⇒
|
||||
// SubstreamOnSubscribe is still in flight and will not arrive
|
||||
subscription.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] abstract class MultiStreamInputProcessor(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamInputProcessorLike {
|
||||
private var _nextId = 0L
|
||||
protected def nextId(): Long = { _nextId += 1; _nextId }
|
||||
|
||||
override protected val inputBufferSize = _settings.initialInputBufferSize
|
||||
|
||||
override protected def fail(e: Throwable) = {
|
||||
failInputs(e)
|
||||
super.fail(e)
|
||||
}
|
||||
|
||||
override def pumpFinished() = {
|
||||
finishInputs()
|
||||
super.pumpFinished()
|
||||
}
|
||||
|
||||
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse inputSubstreamManagement
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue