=str #16751 Update to reactive-streams 1.0-RC3
Changed rules: * 1.9 Always onSubscribe prior to any other signals * 1.9 NullPointerException if subscriber is null * 3.17 Long overflow, effectively unbounded instead of onError Fixed some more things: * fixed some FIXME * Long drop and take * memory leaks in tck tests, use BeforeClass to create ActorSystem use AfterClass to shutdown ActorSystem * more tck tests * don't emit OnComplete when substream is cancelled * work around for memory leak in PrefixAndTail
This commit is contained in:
parent
e3e01d2c9b
commit
23c533fdd5
79 changed files with 862 additions and 414 deletions
|
|
@ -113,40 +113,36 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
} else {
|
||||
endOfStream match {
|
||||
case eos @ (NotReached | Completed) ⇒
|
||||
val demand = subscription.totalDemand + elements
|
||||
//Check for overflow
|
||||
if (demand < 1) {
|
||||
try tryOnError(subscription.subscriber, totalPendingDemandMustNotExceedLongMaxValueException)
|
||||
finally unregisterSubscriptionInternal(subscription)
|
||||
} else {
|
||||
subscription.totalDemand = demand
|
||||
// returns Long.MinValue if the subscription is to be terminated
|
||||
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long =
|
||||
if (requested == 0) {
|
||||
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
|
||||
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
|
||||
} else if (buffer.count(subscription) > 0) {
|
||||
val goOn = try {
|
||||
subscription.dispatch(buffer.read(subscription))
|
||||
true
|
||||
} catch {
|
||||
case _: SpecViolation ⇒
|
||||
unregisterSubscriptionInternal(subscription)
|
||||
false
|
||||
}
|
||||
if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
|
||||
else Long.MinValue
|
||||
} else if (eos ne NotReached) Long.MinValue
|
||||
else requested
|
||||
val d = subscription.totalDemand + elements
|
||||
// Long overflow, Reactive Streams Spec 3:17: effectively unbounded
|
||||
val demand = if (d < 1) Long.MaxValue else d
|
||||
subscription.totalDemand = demand
|
||||
// returns Long.MinValue if the subscription is to be terminated
|
||||
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long =
|
||||
if (requested == 0) {
|
||||
// if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore`
|
||||
if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0
|
||||
} else if (buffer.count(subscription) > 0) {
|
||||
val goOn = try {
|
||||
subscription.dispatch(buffer.read(subscription))
|
||||
true
|
||||
} catch {
|
||||
case _: SpecViolation ⇒
|
||||
unregisterSubscriptionInternal(subscription)
|
||||
false
|
||||
}
|
||||
if (goOn) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos)
|
||||
else Long.MinValue
|
||||
} else if (eos ne NotReached) Long.MinValue
|
||||
else requested
|
||||
|
||||
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
|
||||
case Long.MinValue ⇒
|
||||
eos(subscription.subscriber)
|
||||
unregisterSubscriptionInternal(subscription)
|
||||
case x ⇒
|
||||
subscription.totalDemand = x
|
||||
requestFromUpstreamIfRequired()
|
||||
}
|
||||
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
|
||||
case Long.MinValue ⇒
|
||||
eos(subscription.subscriber)
|
||||
unregisterSubscriptionInternal(subscription)
|
||||
case x ⇒
|
||||
subscription.totalDemand = x
|
||||
requestFromUpstreamIfRequired()
|
||||
}
|
||||
case ErrorCompleted(_) ⇒ // ignore, the Subscriber might not have seen our error event yet
|
||||
}
|
||||
|
|
@ -227,7 +223,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
|
|||
* Register a new subscriber.
|
||||
*/
|
||||
protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match {
|
||||
case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
|
||||
case NotReached if subscriptions.exists(_.subscriber == subscriber) ⇒ ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
|
||||
case NotReached ⇒ addSubscription(subscriber)
|
||||
case Completed if buffer.nonEmpty ⇒ addSubscription(subscriber)
|
||||
case eos ⇒ eos(subscriber)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue