+str #15086 substream publishers now have subscription-timeouts

This commit is contained in:
Konrad 'ktoso' Malawski 2014-11-03 15:29:02 +01:00
parent 62fb38b402
commit 15abdaeb15
15 changed files with 423 additions and 62 deletions

View file

@ -4,6 +4,8 @@
package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorLogging
import akka.actor.Cancellable
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings
@ -36,7 +38,8 @@ private[akka] object MultiStreamOutputProcessor {
final case class Failed(e: Throwable) extends CompletedState
}
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump) extends SimpleOutputs(actor, pump) with Publisher[Any] {
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: SubscriptionTimeout)
extends SimpleOutputs(actor, pump) with Publisher[Any] {
import SubstreamOutput._
@ -79,12 +82,14 @@ private[akka] object MultiStreamOutputProcessor {
}
override def subscribe(s: Subscriber[_ >: Any]): Unit = {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher"))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
if (subscriptionTimeout.cancelAndHandle(s)) {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("GroupBy substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
}
}
}
@ -100,7 +105,9 @@ private[akka] object MultiStreamOutputProcessor {
/**
* INTERNAL API
*/
private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor
private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubscriptionTimeoutSupport {
this: Actor with ActorLogging
import MultiStreamOutputProcessor._
protected def nextId(): Long
@ -109,7 +116,9 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor
protected def createSubstreamOutput(): SubstreamOutput = {
val id = SubstreamKey(nextId())
val outputs = new SubstreamOutput(id, self, this)
val outputs = publisherWithStreamSubscriptionTimeout {
new SubstreamOutput(id, self, this, _)
}
substreamOutputs(outputs.key) = outputs
outputs
}
@ -142,6 +151,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
private var _nextId = 0L
protected def nextId(): Long = { _nextId += 1; _nextId }
override val subscriptionTimeoutSettings = _settings.subscriptionTimeoutSettings
override protected def fail(e: Throwable): Unit = {
failOutputs(e)
super.fail(e)