!str add ActorProducer and fix FanOutBox
The ActorProducer is an actor-based Publisher which runs a thunk of code until Stop is thrown. This means that completion is only signaled if demand is present for one more element, which makes sense and it legal according to the wording of the spec. The TCK was too strict in this regard and has been relaxed. Things would have “worked” without the relaxation if I had not also fixed the output buffer management. SubscriptionManagement in collaboration with the ResizableMultiReaderRingBuffer previously generated demand on its own, acting like a true buffer. This is undesired since we want to auto-tune the input buffers, which would get a lot more complicated and instable with autonomously buffering output stages in the mix. Removing this extra-buffering uncovered several places in the test suite which implicitly relied on this, which were fixed as well.
This commit is contained in:
parent
9298e720ed
commit
659eff725a
15 changed files with 273 additions and 183 deletions
|
|
@ -36,14 +36,12 @@ private[akka] object SubscriberManagement {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor {
|
||||
def subscriber[T]: spi.Subscriber[T]
|
||||
private[akka] trait SubscriptionWithCursor[T] extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor {
|
||||
def subscriber: spi.Subscriber[T]
|
||||
def isActive: Boolean = cursor != Int.MinValue
|
||||
def deactivate(): Unit = cursor = Int.MinValue
|
||||
|
||||
def dispatch[T](element: T): Unit = subscriber.onNext(element)
|
||||
|
||||
override def toString: String = "Subscription" + System.identityHashCode(this) // helpful for testing
|
||||
def dispatch(element: T): Unit = subscriber.onNext(element)
|
||||
|
||||
/////////////// internal interface, no unsynced access from subscriber's thread //////////////
|
||||
|
||||
|
|
@ -56,24 +54,38 @@ private[akka] trait SubscriptionWithCursor extends spi.Subscription with Resizab
|
|||
*/
|
||||
private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors {
|
||||
import SubscriberManagement._
|
||||
type S <: SubscriptionWithCursor
|
||||
type S <: SubscriptionWithCursor[T]
|
||||
type Subscriptions = List[S]
|
||||
|
||||
def initialBufferSize: Int
|
||||
def maxBufferSize: Int
|
||||
|
||||
// we keep an element (ring) buffer which serves two purposes:
|
||||
// 1. Pre-fetch elements from our own upstream before they are requested by our downstream.
|
||||
// 2. Allow for limited request rate differences between several downstream subscribers.
|
||||
//
|
||||
// The buffering logic is as follows:
|
||||
// 1. We always request as many elements from our upstream as are still free in our current buffer instance.
|
||||
// If a subscriber requests more than this number (either once or as a result of several `requestMore` calls)
|
||||
// we resize the buffer if possible.
|
||||
// 2. If two subscribers drift apart in their request rates we use the buffer for keeping the elements that the
|
||||
// slow subscriber is behind with, thereby resizing the buffer if necessary and still allowed.
|
||||
/**
|
||||
* called when we are ready to consume more elements from our upstream
|
||||
* MUST NOT call pushToDownstream
|
||||
*/
|
||||
protected def requestFromUpstream(elements: Int): Unit
|
||||
|
||||
/**
|
||||
* called before `shutdown()` if the stream is *not* being regularly completed
|
||||
* but shut-down due to the last subscriber having cancelled its subscription
|
||||
*/
|
||||
protected def cancelUpstream(): Unit
|
||||
|
||||
/**
|
||||
* called when the spi.Publisher/Processor is ready to be shut down
|
||||
*/
|
||||
protected def shutdown(completed: Boolean): Unit
|
||||
|
||||
/**
|
||||
* Use to register a subscriber
|
||||
*/
|
||||
protected def createSubscription(subscriber: spi.Subscriber[T]): S
|
||||
|
||||
private[this] val buffer = new ResizableMultiReaderRingBuffer[T](initialBufferSize, maxBufferSize, this)
|
||||
|
||||
protected def bufferDebug: String = buffer.toString
|
||||
|
||||
// optimize for small numbers of subscribers by keeping subscribers in a plain list
|
||||
private[this] var subscriptions: Subscriptions = Nil
|
||||
|
||||
|
|
@ -83,25 +95,11 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
// if non-null, holds the end-of-stream state
|
||||
private[this] var endOfStream: EndOfStream = NotReached
|
||||
|
||||
// called when we are ready to consume more elements from our upstream
|
||||
// the implementation of this method is allowed to synchronously call `pushToDownstream`
|
||||
// if (part of) the requested elements are already available
|
||||
protected def requestFromUpstream(elements: Int): Unit
|
||||
|
||||
// called before `shutdown()` if the stream is *not* being regularly completed
|
||||
// but shut-down due to the last subscriber having cancelled its subscription
|
||||
protected def cancelUpstream(): Unit
|
||||
|
||||
// called when the spi.Publisher/Processor is ready to be shut down.
|
||||
protected def shutdown(): Unit
|
||||
|
||||
// Use to register a subscriber
|
||||
protected def createSubscription(subscriber: spi.Subscriber[T]): S
|
||||
|
||||
def cursors = subscriptions
|
||||
|
||||
// called from `Subscription::requestMore`, i.e. from another thread,
|
||||
// override to add synchronization with itself, `subscribe` and `unregisterSubscription`
|
||||
/**
|
||||
* more demand was signaled from a given subscriber
|
||||
*/
|
||||
protected def moreRequested(subscription: S, elements: Int): Unit =
|
||||
if (subscription.isActive) {
|
||||
|
||||
|
|
@ -110,31 +108,22 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
if (requested == 0) {
|
||||
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
|
||||
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
|
||||
} else {
|
||||
val x =
|
||||
try {
|
||||
subscription.dispatch(buffer.read(subscription)) // as long as we can produce elements from our buffer we do so synchronously
|
||||
-1 // we can't directly tailrec from here since we are in a try/catch, so signal with special value
|
||||
} catch {
|
||||
case NothingToReadException ⇒ if (eos ne NotReached) Long.MinValue else requested // terminate or request from upstream
|
||||
}
|
||||
if (x == -1) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) else x
|
||||
}
|
||||
} else if (buffer.count(subscription) > 0) {
|
||||
subscription.dispatch(buffer.read(subscription))
|
||||
dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
|
||||
} else if (eos ne NotReached) Long.MinValue
|
||||
else requested
|
||||
|
||||
endOfStream match {
|
||||
case eos @ (NotReached | Completed) ⇒
|
||||
val demand = subscription.requested + elements
|
||||
assert(demand >= 0)
|
||||
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
|
||||
case Long.MinValue ⇒
|
||||
eos(subscription.subscriber)
|
||||
unregisterSubscriptionInternal(subscription)
|
||||
case 0 ⇒
|
||||
subscription.requested = 0
|
||||
case x ⇒
|
||||
subscription.requested = x
|
||||
requestFromUpstreamIfRequired()
|
||||
case requested ⇒
|
||||
subscription.requested = requested
|
||||
requestFromUpstreamIfPossible(requested)
|
||||
}
|
||||
case ErrorCompleted(_) ⇒ // ignore, the spi.Subscriber might not have seen our error event yet
|
||||
}
|
||||
|
|
@ -146,81 +135,73 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
case head :: tail ⇒ maxRequested(tail, math.max(head.requested, result))
|
||||
case _ ⇒ result
|
||||
}
|
||||
if (pendingFromUpstream == 0)
|
||||
requestFromUpstreamIfPossible(maxRequested(subscriptions))
|
||||
}
|
||||
|
||||
private[this] final def requestFromUpstreamIfPossible(elements: Long): Unit = {
|
||||
val toBeRequested = buffer.potentiallyAvailable(math.min(elements, Int.MaxValue).toInt) // Cap at Int.MaxValue
|
||||
if (toBeRequested > 0) {
|
||||
pendingFromUpstream += toBeRequested
|
||||
requestFromUpstream(toBeRequested)
|
||||
val desired = Math.min(Int.MaxValue, Math.min(maxRequested(subscriptions), buffer.maxAvailable) - pendingFromUpstream).toInt
|
||||
if (desired > 0) {
|
||||
pendingFromUpstream += desired
|
||||
requestFromUpstream(desired)
|
||||
}
|
||||
}
|
||||
|
||||
// this method must be called by the implementing class whenever a new value is available to be pushed downstream
|
||||
/**
|
||||
* this method must be called by the implementing class whenever a new value is available to be pushed downstream
|
||||
*/
|
||||
protected def pushToDownstream(value: T): Unit = {
|
||||
@tailrec def dispatchAndReturnMaxRequested(remaining: Subscriptions, result: Long = 0): Long =
|
||||
@tailrec def dispatch(remaining: Subscriptions, sent: Boolean = false): Boolean =
|
||||
remaining match {
|
||||
case head :: tail ⇒
|
||||
var requested = head.requested
|
||||
if (requested > 0)
|
||||
try {
|
||||
val element = buffer.read(head)
|
||||
head.dispatch(element)
|
||||
requested -= 1
|
||||
head.requested = requested
|
||||
} catch {
|
||||
case NothingToReadException ⇒ throw new IllegalStateException("Output buffer read failure") // we just wrote a value, why can't we read it?
|
||||
}
|
||||
dispatchAndReturnMaxRequested(tail, math.max(result, requested))
|
||||
case _ ⇒ result
|
||||
if (head.requested > 0) {
|
||||
val element = buffer.read(head)
|
||||
head.dispatch(element)
|
||||
head.requested -= 1
|
||||
dispatch(tail, true)
|
||||
} else dispatch(tail, sent)
|
||||
case _ ⇒ sent
|
||||
}
|
||||
|
||||
endOfStream match {
|
||||
case NotReached ⇒
|
||||
pendingFromUpstream -= 1
|
||||
if (!buffer.write(value)) throw new IllegalStateException("Output buffer overflow")
|
||||
val maxRequested = dispatchAndReturnMaxRequested(subscriptions)
|
||||
if (pendingFromUpstream == 0) requestFromUpstreamIfPossible(maxRequested)
|
||||
case ShutDown ⇒ // don't throw if we have transitioned into shutdown in the mean-time, since there is an expected
|
||||
case _ ⇒ // race condition between `cancelUpstream` and `pushToDownstream`
|
||||
if (dispatch(subscriptions)) requestFromUpstreamIfRequired()
|
||||
case _ ⇒
|
||||
throw new IllegalStateException("pushToDownStream(...) after completeDownstream() or abortDownstream(...)")
|
||||
}
|
||||
}
|
||||
|
||||
// this method must be called by the implementing class whenever
|
||||
// it has been determined that no more elements will be produced
|
||||
/**
|
||||
* this method must be called by the implementing class whenever
|
||||
* it has been determined that no more elements will be produced
|
||||
*/
|
||||
protected def completeDownstream(): Unit = {
|
||||
if (endOfStream eq NotReached) {
|
||||
// we complete all subscriptions that have no more buffered elements
|
||||
// is non-tail recursion acceptable here? (it's the fastest impl but might stack overflow for large numbers of subscribers)
|
||||
@tailrec def completeDoneSubscriptions(remaining: Subscriptions, result: Subscriptions = Nil): Subscriptions =
|
||||
|
||||
remaining match {
|
||||
case head :: tail ⇒
|
||||
val newResult =
|
||||
if (buffer.count(head) == 0) {
|
||||
head.deactivate()
|
||||
Completed(head.subscriber)
|
||||
result
|
||||
} else head :: result
|
||||
completeDoneSubscriptions(tail, newResult)
|
||||
if (buffer.count(head) == 0) {
|
||||
head.deactivate()
|
||||
Completed(head.subscriber)
|
||||
completeDoneSubscriptions(tail, result)
|
||||
} else completeDoneSubscriptions(tail, head :: result)
|
||||
case _ ⇒ result
|
||||
}
|
||||
endOfStream = Completed
|
||||
subscriptions = completeDoneSubscriptions(subscriptions)
|
||||
if (subscriptions.isEmpty) shutdown()
|
||||
if (subscriptions.isEmpty) shutdown(completed = true)
|
||||
} // else ignore, we need to be idempotent
|
||||
}
|
||||
|
||||
// this method must be called by the implementing class to push an error downstream
|
||||
/**
|
||||
* this method must be called by the implementing class to push an error downstream
|
||||
*/
|
||||
protected def abortDownstream(cause: Throwable): Unit = {
|
||||
endOfStream = ErrorCompleted(cause)
|
||||
subscriptions.foreach(s ⇒ endOfStream(s.subscriber))
|
||||
subscriptions = Nil
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a new subscriber.
|
||||
*/
|
||||
protected def registerSubscriber(subscriber: spi.Subscriber[T]): Unit = endOfStream match {
|
||||
case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒
|
||||
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice"))
|
||||
|
|
@ -238,17 +219,18 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
eos(subscriber)
|
||||
}
|
||||
|
||||
// called from `Subscription::cancel`, i.e. from another thread,
|
||||
// override to add synchronization with itself, `subscribe` and `moreRequested`
|
||||
/**
|
||||
* called from `Subscription::cancel`, i.e. from another thread,
|
||||
* override to add synchronization with itself, `subscribe` and `moreRequested`
|
||||
*/
|
||||
protected def unregisterSubscription(subscription: S): Unit =
|
||||
unregisterSubscriptionInternal(subscription)
|
||||
|
||||
// must be idempotent
|
||||
private def unregisterSubscriptionInternal(subscription: S): Unit = {
|
||||
// is non-tail recursion acceptable here? (it's the fastest impl but might stack overflow for large numbers of subscribers)
|
||||
def removeFrom(remaining: Subscriptions): Subscriptions =
|
||||
@tailrec def removeFrom(remaining: Subscriptions, result: Subscriptions = Nil): Subscriptions =
|
||||
remaining match {
|
||||
case head :: tail ⇒ if (head eq subscription) tail else head :: removeFrom(tail)
|
||||
case head :: tail ⇒ if (head eq subscription) tail reverse_::: result else removeFrom(tail, head :: result)
|
||||
case _ ⇒ throw new IllegalStateException("Subscription to unregister not found")
|
||||
}
|
||||
if (subscription.isActive) {
|
||||
|
|
@ -260,7 +242,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
endOfStream = ShutDown
|
||||
cancelUpstream()
|
||||
}
|
||||
shutdown()
|
||||
shutdown(completed = false)
|
||||
} else requestFromUpstreamIfRequired() // we might have removed a "blocking" subscriber and can continue now
|
||||
} // else ignore, we need to be idempotent
|
||||
}
|
||||
|
|
@ -288,9 +270,7 @@ private[akka] abstract class AbstractProducer[T](val initialBufferSize: Int, val
|
|||
|
||||
override def createSubscription(subscriber: spi.Subscriber[T]): S = new Subscription(subscriber)
|
||||
|
||||
protected class Subscription(val _subscriber: spi.Subscriber[T]) extends SubscriptionWithCursor {
|
||||
def subscriber[B]: spi.Subscriber[B] = _subscriber.asInstanceOf[spi.Subscriber[B]]
|
||||
|
||||
protected class Subscription(val subscriber: spi.Subscriber[T]) extends SubscriptionWithCursor[T] {
|
||||
override def requestMore(elements: Int): Unit =
|
||||
if (elements <= 0) throw new IllegalArgumentException("Argument must be > 0")
|
||||
else moreRequested(this, elements) // needs to be able to ignore calls after termination / cancellation
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue