!str #16263 Made subscription timeout handling run mostly inside the actor

This commit is contained in:
Björn Antonsson 2014-11-12 13:05:57 +01:00
parent d60d5bc849
commit 5a26718050
5 changed files with 134 additions and 140 deletions

View file

@ -6,11 +6,12 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorLogging
import akka.actor.Cancellable
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import akka.stream.ReactiveStreamsConstants
import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration
/**
* INTERNAL API
@ -20,6 +21,7 @@ private[akka] object MultiStreamOutputProcessor {
case class SubstreamRequestMore(substream: SubstreamKey, demand: Long)
case class SubstreamCancel(substream: SubstreamKey)
case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any])
case class SubstreamSubscriptionTimeout(substream: SubstreamKey)
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
override def request(elements: Long): Unit =
@ -38,7 +40,7 @@ private[akka] object MultiStreamOutputProcessor {
final case class Failed(e: Throwable) extends CompletedState
}
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: SubscriptionTimeout)
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable)
extends SimpleOutputs(actor, pump) with Publisher[Any] {
import SubstreamOutput._
@ -69,6 +71,7 @@ private[akka] object MultiStreamOutputProcessor {
}
private def closePublisher(withState: CompletedState): Unit = {
subscriptionTimeout.cancel()
state.getAndSet(withState) match {
case Attached(sub) closeSubscriber(sub, withState)
case _: CompletedState throw new IllegalStateException("Attempted to double shutdown publisher")
@ -82,14 +85,13 @@ private[akka] object MultiStreamOutputProcessor {
}
override def subscribe(s: Subscriber[_ >: Any]): Unit = {
if (subscriptionTimeout.cancelAndHandle(s)) {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("GroupBy substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
subscriptionTimeout.cancel()
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
}
}
@ -109,6 +111,7 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
this: Actor with ActorLogging
import MultiStreamOutputProcessor._
import StreamSubscriptionTimeoutSupport._
protected def nextId(): Long
@ -116,11 +119,10 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
protected def createSubstreamOutput(): SubstreamOutput = {
val id = SubstreamKey(nextId())
val outputs = publisherWithStreamSubscriptionTimeout {
new SubstreamOutput(id, self, this, _)
}
substreamOutputs(outputs.key) = outputs
outputs
val cancellable = scheduleSubscriptionTimeout(self, SubstreamSubscriptionTimeout(id))
val output = new SubstreamOutput(id, self, this, cancellable)
substreamOutputs(output.key) = output
output
}
protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
@ -141,6 +143,14 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
case SubstreamRequestMore(key, demand) substreamOutputs(key).enqueueOutputDemand(demand)
case SubstreamCancel(key) invalidateSubstreamOutput(key)
case SubstreamSubscribe(key, subscriber) substreamOutputs(key).attachSubscriber(subscriber)
case SubstreamSubscriptionTimeout(key) subscriptionTimedOut(substreamOutputs(key))
}
override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match {
case s: SubstreamOutput
s.cancel(cause)
s.attachSubscriber(CancelingSubscriber)
case _ // ignore
}
}